opentsdb / opentsdb-rpc-kafka Goto Github PK
View Code? Open in Web Editor NEWA set of OpenTSDB plugins for consuming from Apache Kafka
License: Apache License 2.0
A set of OpenTSDB plugins for consuming from Apache Kafka
License: Apache License 2.0
Hello, do you have any examples for reference? I want to use it to synchronize data to opentsdb
Could you add some documentation regarding which JDK and Maven version this project is intended to be built with? Latest versions are unsupported it seems (test issues w/ powermock)
Update: JDK version 8 👍
Hi ,
We are loosing lots of data points while using this plugin.
I don't see any error in the region server logs or hdfs logs.
I have enabled some more logging in the plugin code and I think, the plugin is not able to read all messages when the through-put is high.
The TSDB plugin configuration is given below.
KafkaRpcPlugin.groups=tsdbpublisher
KafkaRpcPlugin.tsdbpublisher.topics=xxxxx.data.10m.tsdb
KafkaRpcPlugin.tsdbpublisher.consumerType = raw
KafkaRpcPlugin.tsdbpublisher.deserializer = net.opentsdb.data.deserializers.JSONDeserializer
KafkaRpcPlugin.tsdbpublisher.rate = 10000
KafkaRpcPlugin.tsdbpublisher.threads = 16
Any suggestions for tuning?
Thanks,
-Gk
Hi,
I'm trying to use this plugin but get following error
Unable to deserialize data java.lang.TypeNotPresentException: Type [unknown] not present
I'm sending JSON Strings
{"type":"Metrics","metric":"vmMemoryGB","timestamp":1500822865231,"value":7.0,"tags":{"host":"host-1"}}
Can you please specify how you are expecting messages from kafka stream.
Are there any benchmarks to measure if and under what conditions this plugin might be dropping data that it reads from Kafka or writes to TSDB.
This is somehow related to #2 as well
I've had some trouble getting this plugin to run and was facing a NullPointerException when it reads the first message.
2018-12-05 10:30:16,185 ERROR [histograms_0_314c7ab81c1d] KafkaRpcPluginThread: Exception in kafkaReader or Tsdb Writer
java.lang.NullPointerException: null
at net.opentsdb.tsd.KafkaRpcPluginThread.run(KafkaRpcPluginThread.java:284) ~[opentsdb-rpc-kafka-0.1.0-all.jar:]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_191]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_191]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
When a new consumer group is created, that group instantiates the specified deserialiser and creates the plugin thread. In its constructor, the thread then goes and gets the deserialiser from the consumer group.
However, since the consumer group first starts the thread and then instantiates the Deserialiser, the reference to the deserialiser in the thread object is null. This reference is never updated which is eventually causing a null pointer exception when the first message comes in.
I think the creation of the threads should happen after the deserialiser is initialised.
Hello,I see that kafka_0.8 is used in the pom.xml file of your code, but my kafka version is 0.11. Will there be an error when using your plugin? hope to answer
Just wondering, can I use telnet style message for this plugin? Want to rid of bursting tcollectors
I'm facing this error when trying to restart the opentsdb with plugin
[main] PluginLoader: Unable to locate any plugins of the type: net.opentsdb.tsd.HttpSerializer
Hello, I am trying to compile it from source on my system and getting the following error. Any leads are greatly appreciated:
# mvn package -Pshaded
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by com.google.inject.internal.cglib.core.$ReflectUtils$1 (file:/usr/share/maven/lib/guice.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain)
WARNING: Please consider reporting this to the maintainers of com.google.inject.internal.cglib.core.$ReflectUtils$1
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
[INFO] Scanning for projects...
[INFO] Inspecting build with total of 1 modules...
[INFO] Installing Nexus Staging features:
[INFO] ... total of 1 executions of maven-deploy-plugin replaced with nexus-staging-maven-plugin
[INFO]
[INFO] ------------------< net.opentsdb:opentsdb-rpc-kafka >-------------------
[INFO] Building opentsdb-rpc-kafka 2.4.1-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-enforcer-plugin:1.2:enforce (enforce-maven) @ opentsdb-rpc-kafka ---
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ opentsdb-rpc-kafka ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 4 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ opentsdb-rpc-kafka ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 14 source files to /root/downloads/opentsdb-rpc-kafka/target/classes
[INFO] -------------------------------------------------------------
[ERROR] COMPILATION ERROR :
[INFO] -------------------------------------------------------------
[ERROR] Source option 5 is no longer supported. Use 6 or later.
[ERROR] Target option 1.5 is no longer supported. Use 1.6 or later.
[INFO] 2 errors
[INFO] -------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.408 s
[INFO] Finished at: 2021-06-22T00:52:00Z
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project opentsdb-rpc-kafka: Compilation failure: Compilation failure:
[ERROR] Source option 5 is no longer supported. Use 6 or later.
[ERROR] Target option 1.5 is no longer supported. Use 1.6 or later.
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
What needs to be done to use the plug-in kafka? Create themes or other related reference examples? Follow the instructions to download the code, compile it, and add the corresponding parameters to the opentsdb configuration file. When starting tsd, the error will be reported as follows:
[root@hadoop5 bin]# ./tsdb tsd
Exception in thread "main" java.lang.RuntimeException: Initialization failed
at net.opentsdb.tools.TSDMain.main(TSDMain.java:237)
Caused by: java.lang.RuntimeException: Failed to initialize class net.opentsdb.tsd.KafkaRpcPlugin
at net.opentsdb.tsd.RpcManager.createAndInitialize(RpcManager.java:429)
at net.opentsdb.tsd.RpcManager.initializeRpcPlugins(RpcManager.java:402)
at net.opentsdb.tsd.RpcManager.instance(RpcManager.java:142)
at net.opentsdb.tools.TSDMain.main(TSDMain.java:202)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at net.opentsdb.tsd.RpcManager.createAndInitialize(RpcManager.java:421)
... 3 more
Caused by: java.lang.NoClassDefFoundError: joptsimple/internal/Strings
at net.opentsdb.tsd.KafkaRpcPluginGroup.(KafkaRpcPluginGroup.java:136)
at net.opentsdb.tsd.KafkaRpcPlugin.createConsumerGroups(KafkaRpcPlugin.java:189)
at net.opentsdb.tsd.KafkaRpcPlugin.initialize(KafkaRpcPlugin.java:91)
... 8 more
Caused by: java.lang.ClassNotFoundException: joptsimple.internal.Strings
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 11 more
If you don't add a plug-in, it will start normally.I don't know why, do you need to modify Linux to install plug-ins and so on? For our beginners, please provide a complete reference sample, thank you!
Supplement:
A sentence like this is printed in the log:
10:16:32.439 INFO [TSDB.initializePlugins] - Successfully initialized storage exception handler plugin [net.opentsdb.tsd.KafkaStorageExceptionHandler] version: 2.4.0
My opentsdb version is 2.3.1, plug-in version 2.3. X
Unfortunately the code can not be compiled for OpenTSDB 2.3.0, therefore when I try to run on 2.3.0 Java return exception
10:02:13.106 INFO [JSONDeserializer.deserialize] - We got a message!>>
10:02:13.589 ERROR [JSONDeserializer.deserialize] - Unable to deserialize data
java.lang.TypeNotPresentException: Type [unknown] not present
at sun.reflect.annotation.TypeNotPresentExceptionProxy.generateException(TypeNotPresentExceptionProxy.java:46) ~[na:1.7.0_181]
at sun.reflect.annotation.AnnotationInvocationHandler.invoke(AnnotationInvocationHandler.java:84) ~[na:1.7.0_181]
at com.sun.proxy.$Proxy4.value(Unknown Source) ~[na:na]
at com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector.findSubtypes(JacksonAnnotationIntrospector.java:561) ~[jackson-databind-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.jsontype.impl.StdSubtypeResolver._collectAndResolveByTypeId(StdSubtypeResolver.java:270) ~[jackson-databind-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.jsontype.impl.StdSubtypeResolver.collectAndResolveSubtypesByTypeId(StdSubtypeResolver.java:190) ~[jackson-databind-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.deser.BasicDeserializerFactory.findTypeDeserializer(BasicDeserializerFactory.java:1567) ~[jackson-databind-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.DeserializationContext.findRootValueDeserializer(DeserializationContext.java:483) ~[jackson-databind-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.ObjectMapper._findRootDeserializer(ObjectMapper.java:4178) ~[jackson-databind-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3997) ~[jackson-databind-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3079) ~[jackson-databind-2.9.5.jar:2.9.5]
at net.opentsdb.utils.JSON.parseToObject(JSON.java:136) ~[tsdb-2.3.1.jar:]
at net.opentsdb.data.deserializers.JSONDeserializer.deserialize(JSONDeserializer.java:74) ~[opentsdb-rpc-kafka-2.3.2.jar:na]
at net.opentsdb.tsd.KafkaRpcPluginThread.run(KafkaRpcPluginThread.java:284) [opentsdb-rpc-kafka-2.3.2.jar:]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1152) [na:1.7.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) [na:1.7.0_181]
at java.lang.Thread.run(Thread.java:748) [na:1.7.0_181]
Caused by: java.lang.NoClassDefFoundError: net/opentsdb/rollup/RollUpDataPoint
at java.lang.ClassLoader.defineClass1(Native Method) ~[na:1.7.0_181]
at java.lang.ClassLoader.defineClass(ClassLoader.java:808) ~[na:1.7.0_181]
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) ~[na:1.7.0_181]
at java.net.URLClassLoader.defineClass(URLClassLoader.java:442) ~[na:1.7.0_181]
at java.net.URLClassLoader.access$100(URLClassLoader.java:64) ~[na:1.7.0_181]
at java.net.URLClassLoader$1.run(URLClassLoader.java:354) ~[na:1.7.0_181]
at java.net.URLClassLoader$1.run(URLClassLoader.java:348) ~[na:1.7.0_181]
at java.security.AccessController.doPrivileged(Native Method) ~[na:1.7.0_181]
at java.net.URLClassLoader.findClass(URLClassLoader.java:347) ~[na:1.7.0_181]
at java.lang.ClassLoader.loadClass(ClassLoader.java:430) ~[na:1.7.0_181]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:323) ~[na:1.7.0_181]
at java.lang.ClassLoader.loadClass(ClassLoader.java:363) ~[na:1.7.0_181]
at java.lang.Class.forName0(Native Method) ~[na:1.7.0_181]
at java.lang.Class.forName(Class.java:278) ~[na:1.7.0_181]
at sun.reflect.generics.factory.CoreReflectionFactory.makeNamedType(CoreReflectionFactory.java:114) ~[na:1.7.0_181]
at sun.reflect.generics.visitor.Reifier.visitClassTypeSignature(Reifier.java:125) ~[na:1.7.0_181]
at sun.reflect.generics.tree.ClassTypeSignature.accept(ClassTypeSignature.java:49) ~[na:1.7.0_181]
at sun.reflect.annotation.AnnotationParser.parseSig(AnnotationParser.java:432) ~[na:1.7.0_181]
at sun.reflect.annotation.AnnotationParser.parseClassValue(AnnotationParser.java:413) ~[na:1.7.0_181]
at sun.reflect.annotation.AnnotationParser.parseMemberValue(AnnotationParser.java:342) ~[na:1.7.0_181]
at sun.reflect.annotation.AnnotationParser.parseAnnotation2(AnnotationParser.java:283) ~[na:1.7.0_181]
at sun.reflect.annotation.AnnotationParser.parseAnnotation(AnnotationParser.java:223) ~[na:1.7.0_181]
at sun.reflect.annotation.AnnotationParser.parseAnnotationArray(AnnotationParser.java:756) ~[na:1.7.0_181]
at sun.reflect.annotation.AnnotationParser.parseArray(AnnotationParser.java:528) ~[na:1.7.0_181]
at sun.reflect.annotation.AnnotationParser.parseMemberValue(AnnotationParser.java:348) ~[na:1.7.0_181]
at sun.reflect.annotation.AnnotationParser.parseAnnotation2(AnnotationParser.java:283) ~[na:1.7.0_181]
at sun.reflect.annotation.AnnotationParser.parseAnnotations2(AnnotationParser.java:117) ~[na:1.7.0_181]
at sun.reflect.annotation.AnnotationParser.parseAnnotations(AnnotationParser.java:70) ~[na:1.7.0_181]
at java.lang.Class.initAnnotationsIfNecessary(Class.java:3281) ~[na:1.7.0_181]
at java.lang.Class.getDeclaredAnnotations(Class.java:3258) ~[na:1.7.0_181]
at com.fasterxml.jackson.databind.util.ClassUtil.findClassAnnotations(ClassUtil.java:1072) ~[jackson-databind-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver.resolveClassAnnotations(AnnotatedClassResolver.java:154) ~[jackson-databind-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver.resolveFully(AnnotatedClassResolver.java:118) ~[jackson-databind-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver.resolve(AnnotatedClassResolver.java:69) ~[jackson-databind-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.introspect.BasicClassIntrospector._resolveAnnotatedClass(BasicClassIntrospector.java:277) ~[jackson-databind-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.introspect.BasicClassIntrospector.collectProperties(BasicClassIntrospector.java:187) ~[jackson-databind-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.introspect.BasicClassIntrospector.forDeserialization(BasicClassIntrospector.java:107) ~[jackson-databind-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.introspect.BasicClassIntrospector.forDeserialization(BasicClassIntrospector.java:16) ~[jackson-databind-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.DeserializationConfig.introspect(DeserializationConfig.java:731) ~[jackson-databind-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:324) ~[jackson-databind-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:264) ~[jackson-databind-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:244) ~[jackson-databind-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:142) ~[jackson-databind-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.DeserializationContext.findRootValueDeserializer(DeserializationContext.java:477) ~[jackson-databind-2.9.5.jar:2.9.5]
... 9 common frames omitted
Caused by: java.lang.ClassNotFoundException: net.opentsdb.rollup.RollUpDataPoint
at java.net.URLClassLoader$1.run(URLClassLoader.java:359) ~[na:1.7.0_181]
at java.net.URLClassLoader$1.run(URLClassLoader.java:348) ~[na:1.7.0_181]
at java.security.AccessController.doPrivileged(Native Method) ~[na:1.7.0_181]
at java.net.URLClassLoader.findClass(URLClassLoader.java:347) ~[na:1.7.0_181]
at java.lang.ClassLoader.loadClass(ClassLoader.java:430) ~[na:1.7.0_181]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:323) ~[na:1.7.0_181]
at java.lang.ClassLoader.loadClass(ClassLoader.java:363) ~[na:1.7.0_181]
... 53 common frames omitted
Was trying to update OpenTSDB version in pom.xml
<dependency>
<groupId>net.opentsdb</groupId>
<artifactId>opentsdb</artifactId>
<version>2.3.0</version>
</dependency>
But doesn't it fails with error message:
[INFO] -------------------------------------------------------------
[ERROR] /Users/cerberus/devel/kafka_rpc/src/main/java/net/opentsdb/data/Histogram.java:[25,25] cannot find symbol
symbol: class HistogramPojo
location: package net.opentsdb.core
[ERROR] /Users/cerberus/devel/kafka_rpc/src/main/java/net/opentsdb/data/Histogram.java:[31,32] cannot find symbol
symbol: class HistogramPojo
[ERROR] /Users/cerberus/devel/kafka_rpc/src/main/java/net/opentsdb/data/Aggregate.java:[31,32] cannot find symbol
symbol: class RollUpDataPoint
[ERROR] /Users/cerberus/devel/kafka_rpc/src/main/java/net/opentsdb/data/Histogram.java:[55,77] cannot find symbol
symbol: variable metric
location: class net.opentsdb.data.Histogram
....
Any advice how to compile plugin for OpenTSDB 2.3.0 ?
tsd.rpc.plugins = net.opentsdb.tsd.KafkaRpcPlugin
KafkaRpcPlugin.kafka.zookeeper.connect = slave02:2181,slave03:2181,slave04:2181
KafkaRpcPlugin.groups = connect-cluster
KafkaRpcPlugin.connect-cluster.topics = tagValue
KafkaRpcPlugin.connect-cluster.consumerType = raw
KafkaRpcPlugin.connect-cluster.Deserializer = net.opentsdb.data.deserializers.JSONDeserializer
KafkaRpcPlugin.connect-cluster.rate = 250000
KafkaRpcPlugin.connect-cluster.threads = 4
tsd.http.rpc.plugins = net.opentsdb.tsd.KafkaHttpRpcPlugin
tsd.core.storage_exception_handler.enable = true
tsd.core.storage_exception_handler.plugin = net.opentsdb.tsd.KafkaStorageExceptionHandler
KafkaRpcPlugin.kafka.metadata.broker.list = slave02:9092,slave03:9092,slave04:9092
KafkaRpcPlugin.seh.topic.default = tagValue
Successfully initialized storage exception handler plugin [net.opentsdb.tsd.KafkaStorageExceptionHandler] version: 2.4.0 but Failed to initialize class net.opentsdb.tsd.KafkaRpcPlugin
We tried formatting the message with logstash to the desired format and pushing that to Kafka but somehow logstash would always create an array but not an object "tags".
It turns out when coming from logstash there is an internal (unchangeable) identifier called "tags", which will always be an array and never and object (https://discuss.elastic.co/t/add-tag/61837):
add_tag
always adds the string to an array in the field namedtags
. The name of thetags
field isn't configurable.
Even if we don't use the function add_tag logstash will create an array of the data provided. This is unfortunate.
Is it possible to make the name of the field "tags" configurable so that it could be renamed on the providing / the Kafka side?
I have not understood this part:
It includes a Storage Exception Handler plugin that will post messages back to a Kafka queue if writing to storage fails
The config specifies
KafkaRpcPlugin.groups = TsdbConsumer,TsdbRequeueConsumer
KafkaRpcPlugin.seh.topic.default = TSDB_Requeue
So does this mean that second group is created to post the failed messages back to the requeue topic?
Do I also need to specify this:
KafkaRpcPlugin.TsdbRequeueConsumer.topics = TSDB_Requeue
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.