mravi / kafka-connect-hbase Goto Github PK
View Code? Open in Web Editor NEWKafka Connect to Hbase
License: Apache License 2.0
Kafka Connect to Hbase
License: Apache License 2.0
I have a hbase table with multiple column families. From the given example, I am not sure how do I create my configs.
My table looks like
column family: main, column name: value
column family: main, column name: timestamp
column family: meta, column name: datatype
The Kafka Topic has Value schema that goes like
fields:
- key: Corresponds to the hbase key
- value: Corresponds to the "main:value"
- timestamp: Corresponds to "main:timestamp"
- datatype: Corresponds to "meta:datatype"
Is something like below a valid config?
tasks.max=1
topics=telemetry
zookeeper.quorum=localhost:2181
event.parser.class=io.svectors.hbase.parser.AvroEventParser
hbase.telemetry.rowkey.columns=value, timestamp, datatype
hbase.telemetry.rowkey.delimiter=:
hbase.telemetry.family=main, meta
Hi i have tried integrating the hbase with confluent and used mysql connector to for pipeline between mysql to hbase which throws an error @mravi
ERROR Thread WorkerSinkTask-kafka-cdc-hbase-0 exiting with uncaught exception: (org.apache.kafka.connect.util.ShutdownableThread:84)
java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.sub scribe(Ljava/util/List;Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListe ner;)V
at org.apache.kafka.connect.runtime.WorkerSinkTask.joinConsumerGroupAndS tart(WorkerSinkTask.java:143)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerS inkTaskThread.java:54)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThre ad.java:82)
Exception in thread "WorkerSinkTask-kafka-cdc-hbase-0" java.lang.NoSuchMethodErr or: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/List;Lo rg/apache/kafka/clients/consumer/ConsumerRebalanceListener;)V
at org.apache.kafka.connect.runtime.WorkerSinkTask.joinConsumerGroupAndS tart(WorkerSinkTask.java:143)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerS inkTaskThread.java:54)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThre ad.java:82)
[2016-08-25 03:59:00,230] INFO Source task Thread[WorkerSourceTask-kafka-mysql-j dbc-0,5,main] finished initialization and start (org.apache.kafka.connect.runtim e.WorkerSourceTask:342)
Used HBase 1.2.4 and confluent-3.0.0 (oss)
When starting a HBase sink encountered following error
[2017-01-04 17:58:31,841] INFO HBaseSinkConfig values:
event.parser.class = io.svectors.hbase.parser.AvroEventParser
zookeeper.quorum = localhost:2181
(io.svectors.hbase.config.HBaseSinkConfig:165)
[2017-01-04 17:58:32,016] INFO Sink task org.apache.kafka.connect.runtime.WorkerSinkTask@670b160d finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:155)
[2017-01-04 17:58:32,476] ERROR Task kafka-cdc-hbase-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:302)
[2017-01-04 17:58:32,476] ERROR Task is being killed and will not recover until manually restarted: (org.apache.kafka.connect.runtime.WorkerSinkTask:303)
java.lang.NullPointerException
at org.apache.hadoop.hbase.client.Put.(Put.java:68)
at org.apache.hadoop.hbase.client.Put.(Put.java:58)
at io.svectors.hbase.util.ToPutFunction.apply(ToPutFunction.java:67)
at io.svectors.hbase.sink.HBaseSinkTask.lambda$null$0(HBaseSinkTask.java:77)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at io.svectors.hbase.sink.HBaseSinkTask.lambda$put$1(HBaseSinkTask.java:77)
at java.util.stream.Collectors.lambda$toMap$215(Collectors.java:1321)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1683)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at io.svectors.hbase.sink.HBaseSinkTask.put(HBaseSinkTask.java:76)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:280)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:176)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
[2017-01-04 17:58:32,479] ERROR Thread WorkerSinkTask-kafka-cdc-hbase-0 exiting with uncaught exception: (org.apache.kafka.connect.util.ShutdownableThread:84)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:304)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:176)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Exception in thread "WorkerSinkTask-kafka-cdc-hbase-0" org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:304)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:176)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
For the kafka-connect-hbase to work, what's the message specification? and Is it able to support the deletion to hbase since the Source may delete the source record, we need to react
Thanks,
Hello~
When I run "kafka-avro-console-producer" and send message, ERROR serializing Avro message occurs.
I used your original source, I just edited hadoop version from 2.6.0 to 2.7.0
Please check this error message.
error meesage
[root@server002 confluent]# bin/kafka-avro-console-producer --broker-list localhost:9092 --topic connectTest --property value.schema='{"type":"record","name":"record","fields":[{"name":"id","type":"int"}, {"name":"name", "type": "string"}]}'
/bin/bash: warning: setlocale: LC_ALL: cannot change locale (ko_KR.UTF-8)
/bin/sh: warning: setlocale: LC_ALL: cannot change locale (ko_KR.UTF-8)
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/sepDir/ossFileDownload/confluent-2.0.1/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/sepDir/ossFileDownload/confluent-2.0.1/share/java/confluent-common/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/sepDir/ossFileDownload/confluent-2.0.1/share/java/schema-registry/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
{"id": 1, "name": "foo"}
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
at [Source: sun.net.www.protocol.http.HttpURLConnection$HttpInputStream@359df09a; line: 1, column: 2]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1487)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:447)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2485)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:801)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:697)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3604)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3549)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2650)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:155)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:174)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:225)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:217)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:212)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:57)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:89)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:50)
at io.confluent.kafka.formatter.AvroMessageReader.readMessage(AvroMessageReader.java:157)
at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:56)
at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
version information on Pom.xml
hbase.version>1.2.0</hbase.version>
hadoop.version>2.7.0</hadoop.version>
kafka.version>0.9.0.0</kafka.version>
junit.version>4.12</junit.version>
hadoop.mvn.version>2.7.1</hadoop.mvn.version>
guava.version>16.0</guava.version>
confluent.version>2.0.0</confluent.version>
hbase-sink.properties
name=kafka-cdc-hbase
connector.class=io.svectors.hbase.sink.HBaseSinkConnector
tasks.max=1
topics=connectTest
zookeeper.quorum=localhost:2181
event.parser.class=io.svectors.hbase.parser.AvroEventParser
hbase.connectTest.rowkey.columns=id
hbase.connectTest.rowkey.delimiter=|
hbase.connectTest.family=f1
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.