Giter Club home page Giter Club logo

kafka-connect-mqtt's Introduction

Mqtt to Apache Kafka Connect Build Status Download

Prerequisites

  • Apache Kafka (0.10.x version) is publish-subscribe messaging rethought as a distributed commit log.

Usage

For development:

  • run check (checkstyle, findbugs, test):
    ./gradlew clean check

  • run project:
    connect-standalone.sh /usr/local/etc/kafka/connect-standalone.properties config/mqtt.properties

    • libs needs to be added to CLASSPATH:
      • kafka-connect-mqtt-{project.version}.jar
      • org.eclipse.paho.client.mqttv3-1.0.2.jar
      • if used with ssl there are more.. (./gradlew copyRuntimeLibs copies all runtime libs to ./build/output/lib)

For production:

  • build project: ./gradlew clean jar - output ./build/libs

  • generate API documentation: ./gradlew javadoc - output ./build/docs/javadoc

License

See LICENSE file for License

kafka-connect-mqtt's People

Contributors

fulkman avatar moperacz avatar totojack avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-connect-mqtt's Issues

Thread safety when using LinkedList as Queue

Hello, I'm new to kafka and I'm trying to use kafka with mqtt as source.

I found that in this code, you're using java.util.LinkedList as the queue implementation. The queue will be accessed by multiple threads, one of them is mqtt thread. java.util.LinkedList is not thread safe.

I'm suggesting to use java.util.concurrent.LinkedBlockingQueue as it is thread safe and it supports take method that could wait the thread when the queue is empty. It eliminates the need to loop while queue is empty.

I could make a pull request if you'd like me to.

Connection lost due to NullPointerException immediately on receipt of first message

Connector starts up up fine with Confluent Platform 3.0 but I get a java.lang.NullPointerException when I publish the first MQTT message using mosquitto broker and mosquito_pub.

ERROR MQTT connection lost! (com.evokly.kafka.connect.mqtt.MqttSourceConnector:188)
MqttException (0) - java.lang.NullPointerException
at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:176)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at com.evokly.kafka.connect.mqtt.util.Utils.getConfiguredInstance(Utils.java:13)
at com.evokly.kafka.connect.mqtt.MqttSourceTask.messageArrived(MqttSourceTask.java:216)
at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:354)
at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:162)
... 1 more

Kafka- mqtt multiple topics

Hi, is there any way to publish data from multiple Mqtt topics to multiple Kafka topics with same name as of Mqtt , i.e test1 in Mqtt -> test1 in kafka so on.

INFO Stopped http_8083@4da602fc{HTTP/1.1,[http/1.1]}{0.0.0.0:8083} (org.eclipse.jetty.server.AbstractConnector:343)

image
mqtt.properties

Basic

name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1

#Settings

Where to put processed messages - default to mqtt

#kafka.topic=mqtt

What client id to use - defaults to null which means random client_id

#mqtt.client_id=null

Use clean session in connection? - default true

#mqtt.clean_session=true

What mqtt connection timeout to use - defaults to 30 seconds

#mqtt.connection_timeout=30

What mqtt connection keep alive to use - defaults to 60 seconds

#mqtt.keep_alive_interval=60

Mqtt broker address to use - defaults to tcp://localhost:1883

if using TLS then certs can be used

mqtt.server_uris=tcp://47.105.193.55:1883

Mqtt topic to listen to - defaults to # (wildcard - all)

#mqtt.topic=#

Mqtt QoS to use - defaults to 1

#mqtt.qos=1

CA cert to use to connect to mqtt broker, can be used when connecting to TLS secured brokers - default null

#mqtt.ssl.ca_cert=null

Client cert to use to connect to mqtt broker, can be used when connecting to TLS secured brokers - default null

#mqtt.ssl.cert=null

Client key to use to connect to mqtt broker, can be used when connecting to TLS secured brokers - default null

#mqtt.ssl.key=null

Message processor class to use to process mqtt messages before puting them on kafka - defaults to samle DumbProcessor

#message_processor_class=com.evokly.kafka.connect.mqtt.sample.DumbProcessor

Gradle Build Unsuccessful

After trying to build this project using java 8, the gradle build is unsuccessful. The maven repo url link https://repo.eclipse.org/content/repositories/paho-releases/ seems to be broken. It is unable to get the resources from the maven url because I assume the directory have changed. This is my error output.
Could not resolve all dependencies for configuration ':checkstyle'.

Could not resolve com.puppycrawl.tools:checkstyle:6.12.1.
Required by:
com.evokly:kafka-connect-mqtt:1.1-SNAPSHOT
Could not resolve com.puppycrawl.tools:checkstyle:6.12.1.
> Could not get resource 'https://repo1.maven.org/maven2/com/puppycrawl/tools/checkstyle/6.12.1/checkstyle-6.12.1.pom'.
> Could not GET 'https://repo1.maven.org/maven2/com/puppycrawl/tools/checkstyle/6.12.1/checkstyle-6.12.1.pom'.
> java.lang.RuntimeException: Unexpected error: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty
Could not resolve com.puppycrawl.tools:checkstyle:6.12.1.
> Could not get resource 'https://repo.eclipse.org/content/repositories/paho-releases/com/puppycrawl/tools/checkstyle/6.12.1/checkstyle-6.12.1.pom'.
> Could not GET 'https://repo.eclipse.org/content/repositories/paho-releases/com/puppycrawl/tools/checkstyle/6.12.1/checkstyle-6.12.1.pom'.
> java.lang.RuntimeException: Unexpected error: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty
Could not resolve com.puppycrawl.tools:checkstyle:6.12.1.
> Could not get resource 'https://repo1.maven.org/maven2/com/puppycrawl/tools/checkstyle/6.12.1/checkstyle-6.12.1.pom'.
> Could not GET 'https://repo1.maven.org/maven2/com/puppycrawl/tools/checkstyle/6.12.1/checkstyle-6.12.1.pom'.
> java.lang.RuntimeException: Unexpected error: java.security.InvalidAlgorithmParameterException: the trustAnchors parameter must be non-empty

ERROR Stopping after connector error

Starting the apache kafka connect mqtt ended up with following error.

[2016-08-18 23:47:27,819] INFO Creating connector mqtt of type com.evokly.kafka.connect.mqtt.MqttSourceConnector (org.apache.kafka.connect.runtime.Worker:170)
[2016-08-18 23:47:27,820] INFO Instantiated connector mqtt with version 1.0-SNAPSHOT of type com.evokly.kafka.connect.mqtt.MqttSourceConnector (org.apache.kafka.connect.runtime.Worker:183)
[2016-08-18 23:47:27,821] INFO Start a MqttSourceConnector (com.evokly.kafka.connect.mqtt.MqttSourceConnector:50)
[2016-08-18 23:47:27,822] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:91)
java.lang.NoClassDefFoundError: org/apache/kafka/common/config/ConfigDef$Recommender
at com.evokly.kafka.connect.mqtt.MqttSourceConnector.start(MqttSourceConnector.java:52)
at org.apache.kafka.connect.runtime.Worker.addConnector(Worker.java:186)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.startConnector(StandaloneHerder.java:197)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:145)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:85)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.config.ConfigDef$Recommender
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 5 more
[2016-08-18 23:47:27,826] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:68)
[2016-08-18 23:47:27,830] INFO Stopped ServerConnector@3f22168b{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306)
[2016-08-18 23:47:27,835] INFO Stopped o.e.j.s.ServletContextHandler@23fb172e{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865)
[2016-08-18 23:47:27,836] INFO Herder stopping (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:62)
[2016-08-18 23:47:27,837] INFO Herder stopped (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:77)
[2016-08-18 23:47:27,837] INFO Worker stopping (org.apache.kafka.connect.runtime.Worker:115)
[2016-08-18 23:47:27,837] INFO Stopped FileOffsetBackingStore (org.apache.kafka.connect.storage.FileOffsetBackingStore:61)
[2016-08-18 23:47:27,837] INFO Worker stopped (org.apache.kafka.connect.runtime.Worker:155)
[2016-08-18 23:47:27,837] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:74)

Please help

support for Kafka 0.9

Hi there,
we have Kafka 0.9 in use and can't switch now to 0.10.
Is it possible somehow to use this project in a 0.9 env?

create samples and "some" documentation

  • connection to local MQTT - without auth
  • connection to local MQTT - with auth
  • connection to amazon IoT - TLS
  • some message processing
  • how message processing works
  • some "image" to show how it works

how multiple topics are supported with this kafka-connect-mqtt?

This plugin works very well if mqtt publishes messages on a single topic and kafka consumes those messages on the same topic. Since we use mqtt.properties file and the topics are defined uniquely inside it, is there an option to make kafka-connect-mqtt to listen on multiple topics?

kafka port remain Busy

used this mqtt-kafka-connector and come at the point where the ports which I used for kafka remains busy even though I killed the standalone process,seems to be there is a problem in your code.

configuring SSL

Hey there,
How can I implement SSL certificates for this connector ? I've been trying to pass as single string but it throws an error.
image
Thanks in Advance.

Data getting stored in the same partition of the topic

Hi,

I was testing the MQTT connector. When I checked the logs directory of Kafka, I saw all the data getting subscribed by the connector is getting stored in the same partition of the Kafka topic.

Is this an issue? Or any mistake in the configuration?

Getting error while running "bin\connect-standalone.bat config\connect-standalone.properties config\mqtt.properties" to setup my mqtt broker and Kafka.

I am following this link to setup my mqtt broker and Kafka connectivity.
https://howtoprogram.xyz/2016/07/30/apache-kafka-connect-mqtt-source-tutorial

I wanted to test the flow by publishing some messages to my mqtt topic and subscribe it on kafka topic. While running this command "bin\connect-standalone.bat config\connect-standalone.properties config\mqtt.properties" I am getting below error .

WARN could not get type for name org.osgi.framework.BundleListener from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.osgi.framework.BundleListener
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.(Reflections.java:126)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.(DelegatingClassLoader.java:365)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:277)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:164)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:157)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:56)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
Caused by: java.lang.ClassNotFoundException: org.osgi.framework.BundleListener
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 9 more
[2018-06-25 10:20:14,730] WARN could not get type for name org.apache.kafka.common.utils.MockTime from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.apache.kafka.common.utils.MockTime
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.(Reflections.java:126)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.(DelegatingClassLoader.java:365)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:277)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:164)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:157)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:56)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.utils.MockTime
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 9 more
[2018-06-25 10:20:14,752] WARN could not get type for name com.google.gson.JsonDeserializer from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name com.google.gson.JsonDeserializer
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.(Reflections.java:126)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.(DelegatingClassLoader.java:365)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:277)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:164)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:157)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:56)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
Caused by: java.lang.ClassNotFoundException: com.google.gson.JsonDeserializer
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 9 more
[2018-06-25 10:20:14,761] WARN could not get type for name org.scalatest.junit.JUnitSuite from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.scalatest.junit.JUnitSuite
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.(Reflections.java:126)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.(DelegatingClassLoader.java:365)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:277)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:164)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:157)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:56)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
Caused by: java.lang.ClassNotFoundException: org.scalatest.junit.JUnitSuite
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 9 more
[2018-06-25 10:20:14,845] WARN could not get type for name org.easymock.IAnswer from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.easymock.IAnswer
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.(Reflections.java:126)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.(DelegatingClassLoader.java:365)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:277)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:164)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:157)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:56)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
Caused by: java.lang.ClassNotFoundException: org.easymock.IAnswer
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 9 more
[2018-06-25 10:20:14,860] WARN could not get type for name org.jboss.netty.channel.MessageEvent from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.jboss.netty.channel.MessageEvent
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.(Reflections.java:126)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.(DelegatingClassLoader.java:365)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:277)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:164)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:157)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:56)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
Caused by: java.lang.ClassNotFoundException: org.jboss.netty.channel.MessageEvent
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 9 more
[2018-06-25 10:20:14,914] WARN could not get type for name org.jboss.netty.channel.SimpleChannelHandler from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.jboss.netty.channel.SimpleChannelHandler
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.(Reflections.java:126)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.(DelegatingClassLoader.java:365)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:277)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:164)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:157)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:56)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
Caused by: java.lang.ClassNotFoundException: org.jboss.netty.channel.SimpleChannelHandler
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 9 more
[2018-06-25 10:20:15,018] WARN could not get type for name javax.mail.Authenticator from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name javax.mail.Authenticator
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.(Reflections.java:126)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.(DelegatingClassLoader.java:365)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:277)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:164)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:157)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:56)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
Caused by: java.lang.ClassNotFoundException: javax.mail.Authenticator
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 9 more
[2018-06-25 10:20:15,121] WARN could not get type for name org.apache.kafka.test.MockMetricsReporter from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.apache.kafka.test.MockMetricsReporter
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.(Reflections.java:126)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.(DelegatingClassLoader.java:365)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:277)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:164)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:157)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:56)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.test.MockMetricsReporter
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 9 more
[2018-06-25 10:20:15,206] WARN could not get type for name javax.jms.MessageListener from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name javax.jms.MessageListener
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.(Reflections.java:126)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.(DelegatingClassLoader.java:365)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:277)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:164)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:157)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:56)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
Caused by: java.lang.ClassNotFoundException: javax.jms.MessageListener
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 9 more
[2018-06-25 10:20:15,245] WARN could not get type for name org.eclipse.jetty.jmx.ObjectMBean from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.eclipse.jetty.jmx.ObjectMBean
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.(Reflections.java:126)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.(DelegatingClassLoader.java:365)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:277)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:164)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:157)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:56)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
Caused by: java.lang.ClassNotFoundException: org.eclipse.jetty.jmx.ObjectMBean
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 9 more
[2018-06-25 10:20:15,275] WARN could not get type for name com.google.gson.JsonSerializer from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name com.google.gson.JsonSerializer
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.(Reflections.java:126)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.(DelegatingClassLoader.java:365)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:277)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:164)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:157)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:56)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
Caused by: java.lang.ClassNotFoundException: com.google.gson.JsonSerializer
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 9 more
[2018-06-25 10:20:15,368] WARN could not get type for name org.osgi.framework.BundleActivator from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.osgi.framework.BundleActivator
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.(Reflections.java:126)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.(DelegatingClassLoader.java:365)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:277)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:164)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:157)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:56)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
Caused by: java.lang.ClassNotFoundException: org.osgi.framework.BundleActivator
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 9 more
[2018-06-25 10:20:15,388] WARN could not get type for name org.apache.kafka.clients.admin.MockAdminClient from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.apache.kafka.clients.admin.MockAdminClient
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.(Reflections.java:126)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.(DelegatingClassLoader.java:365)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:277)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:164)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:157)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:56)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.admin.MockAdminClient
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 9 more
[2018-06-25 10:20:15,428] WARN could not get type for name org.scalatest.Assertions from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.scalatest.Assertions
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.(Reflections.java:126)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.(DelegatingClassLoader.java:365)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:277)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:164)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:157)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:56)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
Caused by: java.lang.ClassNotFoundException: org.scalatest.Assertions
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 9 more
[2018-06-25 10:20:15,493] WARN could not get type for name org.osgi.framework.SynchronousBundleListener from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.osgi.framework.SynchronousBundleListener
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.(Reflections.java:126)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.(DelegatingClassLoader.java:365)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:277)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:164)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:157)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:56)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
Caused by: java.lang.ClassNotFoundException: org.osgi.framework.SynchronousBundleListener
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 9 more
[2018-06-25 10:20:15,550] WARN could not get type for name jline.Completor from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name jline.Completor
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.(Reflections.java:126)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.(DelegatingClassLoader.java:365)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:277)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:164)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:157)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:56)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
Caused by: java.lang.ClassNotFoundException: jline.Completor
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 9 more
[2018-06-25 10:20:15,741] WARN The configuration 'offset.flush.interval.ms' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2018-06-25 10:20:15,747] WARN The configuration 'key.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2018-06-25 10:20:15,748] WARN The configuration 'internal.key.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2018-06-25 10:20:15,750] WARN The configuration 'offset.storage.file.filename' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2018-06-25 10:20:15,753] WARN The configuration 'internal.value.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2018-06-25 10:20:15,754] WARN The configuration 'value.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2018-06-25 10:20:15,757] WARN The configuration 'internal.value.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2018-06-25 10:20:15,758] WARN The configuration 'value.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2018-06-25 10:20:15,760] WARN The configuration 'internal.key.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2018-06-25 10:20:15,760] WARN The configuration 'key.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
Jun 25, 2018 10:20:17 AM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

[2018-06-25 10:20:17,916] ERROR WorkerSourceTask{id=mqtt-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.NullPointerException
at org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:457)
at org.eclipse.paho.client.mqttv3.MqttConnectOptions.setServerURIs(MqttConnectOptions.java:443)
at com.evokly.kafka.connect.mqtt.MqttSourceTask.start(MqttSourceTask.java:105)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:164)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2018-06-25 10:20:17,929] ERROR WorkerSourceTask{id=mqtt-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

Configuration to format a message - BYTES / STRING schema ?

@moperacz , It would be a good feature , if we can control the formatting of messages inside your DumbProcessor class.

    @Override
    public SourceRecord[] getRecords(String kafkaTopic) {
        return new SourceRecord[]{new SourceRecord(null, null, kafkaTopic, null,
                Schema.STRING_SCHEMA, mTopic,
                Schema.BYTES_SCHEMA, mMessage.getPayload())};
    }

Schema.BYTES_SCHEMA or Schema.STRING_SCHEMA for a message is based on a configuration.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.