startappdev / kafka-connect-mongodb Goto Github PK
View Code? Open in Web Editor NEWMongoDB sink connector for Kafka Connect
License: Apache License 2.0
MongoDB sink connector for Kafka Connect
License: Apache License 2.0
I followed the README, when I try to register the connector, the following errors appear. Where am I doing wrong?
I copied the jar into the project folder and added to CLASSPATH. Zookeeper, kafka and even schema-registry is actively listening.
echo $CLASSPATH
/Users/inanc/dev/br/integration-test/kafkasinktest/lib/kafka-connect-mongodb-assembly-1.0.jar
# start the connect
connect-distributed.sh ./connect-distributed.properties
# register the connector
# after this command, connect log displays errors as in the below header:
# ERROR MESSAGES WHEN I TRY TO REGISTER
curl -X POST -H "Content-Type: application/json" --data @./mongo_connector_configs.json http://localhost:8083/connectors
bootstrap.servers=localhost:9092
group.id=testGroup
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connectoffsets
offset.flush.interval.ms=10000
config.storage.topic=connectconfigs
# I NEEDED TO ADD THIS OR IT DISPLAYS ERRORS:
# I also created connectstatus topic for this
status.storage.topic=connectstatus
I even tried to create the mongo testdb with some bogus data.
{
"name":"mongo-connector-testTopic",
"config" :{
"connector.class":"com.startapp.data.MongoSinkConnector",
"tasks.max":"5",
"db.host":"localhost",
"db.port":"27017",
"db.name":"testdb",
"db.collections":"testcollection",
"write.batch.enabled":"true",
"write.batch.size":"200",
"connect.use_schema":"false",
"topics":"testTopic"
}
}
[2017-05-06 13:51:25,123] INFO Kafka version : 0.10.2.1 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-05-06 13:51:25,123] INFO Kafka commitId : e89bffd6b2eff799 (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-05-06 13:51:25,123] INFO ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = testGroup
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig:180)
[2017-05-06 13:51:25,125] WARN The configuration 'config.storage.topic' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2017-05-06 13:51:25,125] WARN The configuration 'status.storage.topic' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2017-05-06 13:51:25,125] WARN The configuration 'internal.key.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2017-05-06 13:51:25,125] WARN The configuration 'offset.flush.interval.ms' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2017-05-06 13:51:25,125] WARN The configuration 'key.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2017-05-06 13:51:25,125] WARN The configuration 'internal.key.converter' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2017-05-06 13:51:25,125] WARN The configuration 'internal.value.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2017-05-06 13:51:25,125] WARN The configuration 'value.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2017-05-06 13:51:25,125] WARN The configuration 'internal.value.converter' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2017-05-06 13:51:25,125] WARN The configuration 'offset.storage.topic' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2017-05-06 13:51:25,126] WARN The configuration 'value.converter' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2017-05-06 13:51:25,126] WARN The configuration 'key.converter' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2017-05-06 13:51:25,126] INFO Kafka version : 0.10.2.1 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-05-06 13:51:25,126] INFO Kafka commitId : e89bffd6b2eff799 (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-05-06 13:51:25,133] INFO Discovered coordinator 192.168.99.1:9092 (id: 2147483647 rack: null) for group testGroup. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:586)
[2017-05-06 13:51:25,136] INFO Finished reading KafkaBasedLog for topic connectconfigs (org.apache.kafka.connect.util.KafkaBasedLog:146)
[2017-05-06 13:51:25,136] INFO Started KafkaBasedLog for topic connectconfigs (org.apache.kafka.connect.util.KafkaBasedLog:148)
[2017-05-06 13:51:25,136] INFO Started KafkaConfigBackingStore (org.apache.kafka.connect.storage.KafkaConfigBackingStore:248)
[2017-05-06 13:51:25,136] INFO Herder started (org.apache.kafka.connect.runtime.distributed.DistributedHerder:195)
[2017-05-06 13:51:25,138] INFO Discovered coordinator 192.168.99.1:9092 (id: 2147483647 rack: null) for group testGroup. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:586)
[2017-05-06 13:51:25,140] INFO (Re-)joining group testGroup (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:420)
[2017-05-06 13:51:25,148] INFO Successfully joined group testGroup with generation 7 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
[2017-05-06 13:51:25,149] INFO Joined group and got assignment: Assignment{error=0, leader='connect-1-e78cdc33-aefd-4572-a42d-e09f63ce02e3', leaderUrl='http://192.168.1.26:8083/', offset=-1, connectorIds=[], taskIds=[]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1151)
[2017-05-06 13:51:25,149] INFO Starting connectors and tasks using config offset -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:814)
[2017-05-06 13:51:25,149] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:824)
May 06, 2017 1:51:25 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.
[2017-05-06 13:51:25,414] INFO Started o.e.j.s.ServletContextHandler@3e10dc6{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2017-05-06 13:51:25,431] INFO Started ServerConnector@320de594{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
[2017-05-06 13:51:25,431] INFO Started @1017ms (org.eclipse.jetty.server.Server:379)
[2017-05-06 13:51:25,431] INFO REST server listening at http://192.168.1.26:8083/, advertising URL http://192.168.1.26:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:150)
[2017-05-06 13:51:25,431] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:56)
[2017-05-06 13:51:26,674] INFO Reflections took 1506 ms to scan 63 urls, producing 3991 keys and 24266 values (org.reflections.Reflections:229)
[2017-05-06 13:51:28,967] WARN (org.eclipse.jetty.servlet.ServletHandler:620)
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.AbstractMethodError: org.apache.kafka.connect.connector.Connector.config()Lorg/apache/kafka/common/config/ConfigDef;
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:489)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:427)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:812)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
at org.eclipse.jetty.server.Server.handle(Server.java:499)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:311)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.AbstractMethodError: org.apache.kafka.connect.connector.Connector.config()Lorg/apache/kafka/common/config/ConfigDef;
at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:278)
at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:260)
at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:509)
at org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:334)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:271)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:267)
at org.glassfish.jersey.internal.Errors.process(Errors.java:315)
at org.glassfish.jersey.internal.Errors.process(Errors.java:297)
at org.glassfish.jersey.internal.Errors.process(Errors.java:267)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:317)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:305)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:1154)
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:473)
... 23 more
Caused by: java.lang.AbstractMethodError: org.apache.kafka.connect.connector.Connector.config()Lorg/apache/kafka/common/config/ConfigDef;
at org.apache.kafka.connect.connector.Connector.validate(Connector.java:133)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:254)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:508)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:505)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:248)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:198)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
[2017-05-06 13:51:28,969] INFO 0:0:0:0:0:0:0:1 - - [06/May/2017:10:51:28 +0000] "POST /connectors HTTP/1.1" 500 294 72 (org.apache.kafka.connect.runtime.rest.RestServer:60)
[2017-05-06 13:51:28,969] WARN /connectors (org.eclipse.jetty.server.HttpChannel:396)
javax.servlet.ServletException: javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.AbstractMethodError: org.apache.kafka.connect.connector.Connector.config()Lorg/apache/kafka/common/config/ConfigDef;
at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:130)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
at org.eclipse.jetty.server.Server.handle(Server.java:499)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:311)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
at java.lang.Thread.run(Thread.java:748)
Caused by: javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.AbstractMethodError: org.apache.kafka.connect.connector.Connector.config()Lorg/apache/kafka/common/config/ConfigDef;
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:489)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:427)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:812)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110)
... 10 more
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.AbstractMethodError: org.apache.kafka.connect.connector.Connector.config()Lorg/apache/kafka/common/config/ConfigDef;
at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:278)
at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:260)
at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:509)
at org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:334)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:271)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:267)
at org.glassfish.jersey.internal.Errors.process(Errors.java:315)
at org.glassfish.jersey.internal.Errors.process(Errors.java:297)
at org.glassfish.jersey.internal.Errors.process(Errors.java:267)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:317)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:305)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:1154)
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:473)
... 23 more
Caused by: java.lang.AbstractMethodError: org.apache.kafka.connect.connector.Connector.config()Lorg/apache/kafka/common/config/ConfigDef;
at org.apache.kafka.connect.connector.Connector.validate(Connector.java:133)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:254)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:508)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:505)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:248)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:198)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
[2017-05-06 13:51:28,971] WARN Could not send response error 500: javax.servlet.ServletException: javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.AbstractMethodError: org.apache.kafka.connect.connector.Connector.config()Lorg/apache/kafka/common/config/ConfigDef; (org.eclipse.jetty.server.HttpChannel:482)
We want to set a monitoring for multiple topics.
How should we do that.
Steps are not completed, info is not clearly provided.
Following your curl command to register the connector here,
curl -X POST -H "Content-Type: application/json" --data @/tmp/mongo_connector_configs.json http://ec2-52-62-139-3.ca-central-1.compute.amazonaws.comlocalhost:8083/connectors
I checked on
http://ec2-52-62-139-3.ca-central-1.compute.amazonaws.com:8083
{"version":"0.10.0.2.5.3.0-37","commit":"6449c38a63c093f9"}
if I checked on
http://ec2-52-60-119-2.ca-central-1.compute.amazonaws.com:8083/connectors
This site can’t be reached
Can you please tell what I missed here?
Hi team,
I received the following error when starting the sink connector. May I know what's the reason?
[2018-05-04 18:00:53,498] ERROR WorkerSinkTask{id=mongo-connector-nav-sink-T002-1} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.DataException: nav-json-dev-VW_kafka_Purchase_Invoice
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:95)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:467)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Hey everybody,
Is there a way to extract the message key (String deserializer) as a separate field into the mongodb sink?
Thanks
Please ! , how to config mongod username and password if mongo auth is enable ?
We have configured the connector per the README, using the published JAR. We're using Kafka 0.10.0.1 and Scala 2.11. When we load the config (below), the connector starts, but then errors out with
Feb 23 22:25:36 es2.eng.internal.etca kafka[15286]: [2017-02-23 22:25:36,110] ERROR Task mongo-connector-processed-gen2-ids-4 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142) Feb 23 22:25:36 es2.eng.internal.etca kafka[15286]: java.lang.NoSuchMethodError: com.startapp.data.MongoSinkConfig.getBoolean(Ljava/lang/String;)Z Feb 23 22:25:36 es2.eng.internal.etca kafka[15286]: at com.startapp.data.MongoSinkConfig.<init>(MongoSinkConfig.scala:18) Feb 23 22:25:36 es2.eng.internal.etca kafka[15286]: at com.startapp.data.MongoSinkConfig$.apply(MongoSinkConfig.scala:54) Feb 23 22:25:36 es2.eng.internal.etca kafka[15286]: at com.startapp.data.MongoSinkTask.start(MongoSinkTask.scala:33) Feb 23 22:25:36 es2.eng.internal.etca kafka[15286]: at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:207) Feb 23 22:25:36 es2.eng.internal.etca kafka[15286]: at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:139) Feb 23 22:25:36 es2.eng.internal.etca kafka[15286]: at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) Feb 23 22:25:36 es2.eng.internal.etca kafka[15286]: at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) Feb 23 22:25:36 es2.eng.internal.etca kafka[15286]: at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) Feb 23 22:25:36 es2.eng.internal.etca kafka[15286]: at java.util.concurrent.FutureTask.run(FutureTask.java:266) Feb 23 22:25:36 es2.eng.internal.etca kafka[15286]: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) Feb 23 22:25:36 es2.eng.internal.etca kafka[15286]: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) Feb 23 22:25:36 es2.eng.internal.etca kafka[15286]: at java.lang.Thread.run(Thread.java:745)
Our config:
{ "name":"mongo-connector-processed-gen2-ids", "config" :{ "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "connector.class":"com.startapp.data.MongoSinkConnector", "tasks.max":"5", "db.host":"mongo-m4xl.test.eng.internal.etca", "db.port":"27017", "db.name":"ids", "db.collections":"gen2", "write.batch.enabled":"true", "write.batch.size":"200", "connect.use_schema":"false", "topics":"processed-gen2-ids" } }
We've tried quoting and unquoting the booleans, with no change. we've left them out and tested, but it still errors on that line.
Any thoughts on solutions?
Hi Team,
we are using confluent kafka 4.1 which is built on Apache Kafka 1.1.0. can we use this connector there?
Here Setup
Maxwell is reading from mysql binary logs and sending it to kafka.
Can you please suggest, Let me know if you need more logs.
more mongo_connector_configs.json.bkp
{
"name":"mongo-connector-",
"config" :{
"connector.class":"com.startapp.data.MongoSinkConnector",
"tasks.max":"5",
"db.host":"",
"db.port":"27017",
"db.name":"*",
"db.collections":"mysqlcdc",
"write.batch.enabled":"true",
"write.batch.size":"200",
"connect.use_schema":"false",
"topics":"maxwell"
}
}
more /tmp/connect-distributed.properties|grep -v '#'
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=mongo-connect-offsets
offset.flush.interval.ms=10000
config.storage.topic=mongo-connect-configs
bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
maxwell
mongo-connect-configs
mongo-connect-offs - marked for deletion
mongo-connect-offsets
test - marked for deletion
Thanks
Dk
Hi, I am using all the steps as suggested. But when I am trying to send a post request to connector port. It is giving following error-
{"error_code":500,"message":"Failed to find any class that implements Connector and which name matches com.startapp.data.MongoSinkConnector, available connectors are: org.apache.kafka.connect.tools.MockConnector, org.apache.kafka.connect.tools.VerifiableSourceConnector, org.apache.kafka.connect.file.FileStreamSinkConnector, org.apache.kafka.connect.tools.VerifiableSinkConnector, org.apache.kafka.connect.sink.SinkConnector, org.apache.kafka.connect.tools.MockSourceConnector, org.apache.kafka.connect.source.SourceConnector, org.apache.kafka.connect.file.FileStreamSourceConnector, org.apache.kafka.connect.tools.MockSinkConnector, org.apache.kafka.connect.tools.SchemaSourceConnector"}
This is on master, latest commit is 9f7dd59.
$ sbt clean assembly
[info] Loading global plugins from /home/mm/.sbt/0.13/plugins
[info] Loading project definition from /mnt/vshared/mh/opensrccode/kafka-connect-mongodb.startappdev/project
[warn] module not found: com.artima.supersafe#sbtplugin;1.1.0
[warn] ==== typesafe-ivy-releases: tried
[warn] https://repo.typesafe.com/typesafe/ivy-releases/com.artima.supersafe/sbtplugin/scala_2.10/sbt_0.13/1.1.0/ivys/ivy.xml
[warn] ==== sbt-plugin-releases: tried
[warn] https://repo.scala-sbt.org/scalasbt/sbt-plugin-releases/com.artima.supersafe/sbtplugin/scala_2.10/sbt_0.13/1.1.0/ivys/ivy.xml
[warn] ==== local: tried
[warn] /home/mm/.ivy2/local/com.artima.supersafe/sbtplugin/scala_2.10/sbt_0.13/1.1.0/ivys/ivy.xml
[warn] ==== public: tried
[warn] https://repo1.maven.org/maven2/com/artima/supersafe/sbtplugin_2.10_0.13/1.1.0/sbtplugin-1.1.0.pom
[warn] ==== local-preloaded-ivy: tried
[warn] /home/mm/.sbt/preloaded/com.artima.supersafe/sbtplugin/1.1.0/ivys/ivy.xml
[warn] ==== local-preloaded: tried
[warn] file:////home/mm/.sbt/preloaded/com/artima/supersafe/sbtplugin_2.10_0.13/1.1.0/sbtplugin-1.1.0.pom
[warn] ==== bintray-sbt-plugins: tried
[warn] http://dl.bintray.com/sbt/sbt-plugin-releases/com.artima.supersafe/sbtplugin/scala_2.10/sbt_0.13/1.1.0/ivys/ivy.xml
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: UNRESOLVED DEPENDENCIES ::
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: com.artima.supersafe#sbtplugin;1.1.0: not found
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn]
[warn] Note: Some unresolved dependencies have extra attributes. Check that these dependencies exist with the requested attributes.
[warn] com.artima.supersafe:sbtplugin:1.1.0 (scalaVersion=2.10, sbtVersion=0.13)
[warn]
[warn] Note: Unresolved dependencies path:
[warn] com.artima.supersafe:sbtplugin:1.1.0 (scalaVersion=2.10, sbtVersion=0.13) (/mnt/vshared/mh/opensrccode/kafka-connect-mongodb.startappdev/project/plugins.sbt#L2-3)
[warn] +- default:kafka-connect-mongodb-startappdev-build:0.1-SNAPSHOT (scalaVersion=2.10, sbtVersion=0.13)
sbt.ResolveException: unresolved dependency: com.artima.supersafe#sbtplugin;1.1.0: not found
at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:313)
at sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:191)
at sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:168)
at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:156)
at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:156)
at sbt.IvySbt$$anonfun$withIvy$1.apply(Ivy.scala:133)
at sbt.IvySbt.sbt$IvySbt$$action$1(Ivy.scala:57)
at sbt.IvySbt$$anon$4.call(Ivy.scala:65)
at xsbt.boot.Locks$GlobalLock.withChannel$1(Locks.scala:95)
at xsbt.boot.Locks$GlobalLock.xsbt$boot$Locks$GlobalLock$$withChannelRetries$1(Locks.scala:80)
at xsbt.boot.Locks$GlobalLock$$anonfun$withFileLock$1.apply(Locks.scala:99)
at xsbt.boot.Using$.withResource(Using.scala:10)
at xsbt.boot.Using$.apply(Using.scala:9)
at xsbt.boot.Locks$GlobalLock.ignoringDeadlockAvoided(Locks.scala:60)
at xsbt.boot.Locks$GlobalLock.withLock(Locks.scala:50)
at xsbt.boot.Locks$.apply0(Locks.scala:31)
at xsbt.boot.Locks$.apply(Locks.scala:28)
at sbt.IvySbt.withDefaultLogger(Ivy.scala:65)
at sbt.IvySbt.withIvy(Ivy.scala:128)
at sbt.IvySbt.withIvy(Ivy.scala:125)
at sbt.IvySbt$Module.withModule(Ivy.scala:156)
at sbt.IvyActions$.updateEither(IvyActions.scala:168)
at sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1481)
at sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1477)
at sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$121.apply(Defaults.scala:1512)
at sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$121.apply(Defaults.scala:1510)
at sbt.Tracked$$anonfun$lastOutput$1.apply(Tracked.scala:37)
at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1515)
at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1509)
at sbt.Tracked$$anonfun$inputChanged$1.apply(Tracked.scala:60)
at sbt.Classpaths$.cachedUpdate(Defaults.scala:1532)
at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1459)
at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1411)
at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)
at sbt.std.Transform$$anon$4.work(System.scala:63)
at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228)
at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228)
at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
at sbt.Execute.work(Execute.scala:237)
at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:228)
at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:228)
at sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159)
at sbt.CompletionService$$anon$2.call(CompletionService.scala:28)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[error] (*:update) sbt.ResolveException: unresolved dependency: com.artima.supersafe#sbtplugin;1.1.0: not found
Project loading failed: (r)etry, (q)uit, (l)ast, or (i)gnore? q
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.