Giter Club home page Giter Club logo

streamx's Introduction

THIS REPO IS ARCHIVED Based on Security issues SEC ticket #SEC-2988

StreamX: Kafka Connect for S3

Forked from the awesome kafka-connect-hdfs

StreamX is a kafka-connect based connector to copy data from Kafka to Object Stores like Amazon s3, Google Cloud Storage and Azure Blob Store. It focusses on reliable and scalable data copying. It can write the data out in different formats (like parquet, so that it can readily be used by analytical tools) and also in different partitioning requirements.

##Features :

StreamX inherits rich set of features from kafka-connect-hdfs.

  • Support for writing data in Avro and Parquet formats.
  • Provides Hive Integration where the connector creates partitioned hive table and periodically does add partitions once it writes a new partition to s3
  • Pluggable partitioner :
  • default partitioner : Each Kafka partition will have its data copied under a partition specific directory
  • time based partitioner : Ability to write data on hourly basis
  • field based partitioner : Ability to use a field in the record as custom partitioner

In addition to these, we have made changes to the following to make it work efficiently with s3

  • Exactly-once guarantee using WAL
  • Support for storing Hive tables in Qubole's hive metastore (coming soon)

##Getting Started:

Pre-req : StreamX is based on Kafka Connect framework, which is part of Kafka project. Kafka Connect is added in Kafka 0.9, hence StreamX can only be used with Kafka version >= 0.9. To download Kafka binaries, check here.

Clone : git clone https://github.com/qubole/streamx.git

Branch : For Kafka 0.9, use 2.x branch. For Kafka 0.10 and above, use master branch.

Build : mvn -DskipTests package

Once the build succeeds, StreamX packages all required jars under target/streamx-0.1.0-SNAPSHOT-development/share/java/streamx/* in StreamX repo. This directory needs to be in classpath.

Add Connector to Kafka Connect Classpath:

export CLASSPATH=$CLASSPATH:`pwd`/target/streamx-0.1.0-SNAPSHOT-development/share/java/streamx/*

Start Kafka Connect

In Kafka, change the following in config/connect-distibuted.properties or config/connect-standalone.properties depending on what mode you want to use.

bootstrap.servers=set Kafka end-point (ex: localhost:9092)
key.converter=com.qubole.streamx.ByteArrayConverter
value.converter=com.qubole.streamx.ByteArrayConverter

Use ByteArrayConverter to copy data from Kafka as-is without any changes. (copy JSON/CSV)

Run Kafka Connect in Standalone mode

Set s3.url and hadoop-conf in StreamX config/quickstart-s3.properties. StreamX packages hadoop-conf directory at config/hadoop-conf for ease-of-use. Set s3 access and secret keys in config/hadoop-conf/hdfs-site.xml.

In Kafka, run

bin/connect-standalone etc/kafka/connect-standalone.properties \
  /path/to/streamx/config/quickstart-s3.properties

You are done. Check s3 for ingested data!

Run Kafka Connect in distributed mode
bin/connect-distributed.sh config/connect-distributed.properties

We have started the Kafka Connect framework and the S3 Connector is added to classpath. Kafka Connect framework starts a REST server (rest.port property in connect-distributed.properties) listening for Connect Job requests. The copy job can be submitted by hitting the REST end-point using curl or any REST clients.

For example, to submit a copy job from Kafka to S3

curl -i -X POST \
   -H "Accept:application/json" \
   -H "Content-Type:application/json" \
   -d \
'{"name":"clickstream",
 "config":
{
"name":"clickstream",
"connector.class":"com.qubole.streamx.s3.S3SinkConnector",
"format.class":"com.qubole.streamx.SourceFormat",
"tasks.max":"1",
"topics":"adclicks",
"flush.size":"2",
"s3.url":"s3://streamx/demo",
"hadoop.conf.dir":"/Users/pseluka/src/streamx/hadoop-conf"
}}' \
 'http://localhost:8083/connectors'
  • Uses S3SinkConnector
  • Uses SourceFormat, which copies the data as-is (Note that this needs to be used with ByteArrayConverter)
  • tasks.max refers to number of tasks that copies the data
  • a new file is written after flush.size number of messages
  • S3 Configuration It uses the hadoop file system implementation (s3a/s3n) to write to s3. The connect job has a configuration called hadoop.conf.dir and this needs the directory where hdfs-site.xml and other hadoop configuration resides. StreamX packages the hadoop dependencies, so it need not have hadoop project/jars in its classpath. So, create a directory containing hadoop config files like core-site.xml, hdfs-site.xml and provide the location of this directory in hadoop.conf.dir while submitting copy job. (StreamX provides a default hadoop-conf directory under config/hadoop-conf. Set your s3 access key, secret key there and provide full path in hadoop.conf.dir)

You have submitted the job, check S3 for output files. For the above copy job, it will create s3://streamx/demo/topics/adclicks/partition=x/files.xyz

Note that, a single copy job could consume from multiple topics and writes to topic specific directory.

To delete a Connect job,

curl -i -X DELETE \
   -H "Accept:application/json" \
   -H "Content-Type:application/json" \
 'http://localhost:8083/connectors/clickstream'

To list all Connect jobs,

curl -i -X GET \
   -H "Accept:application/json" \
   -H "Content-Type:application/json" \
 'http://localhost:8083/connectors'

Restarting Connect jobs : All Connect jobs are stored in a Kafka Queue. So, restarting the Kafka Connect will restart all the connectors submitted to it.

Docker Streamx supports Docker, but only in distributed mode To build your image,

docker build -t qubole/streamx .

When you run your container, you can override all the properties in connect-distributed.properties file with env vars. env_vars will be of format CONNECT_BOOTSTRAP_SERVERS corresponding to bootstrap.servers. The convention is to prefix env vars with CONNECT. Example of how to run your container,

docker run -d -p 8083:8083 --env CONNECT_BOOTSTRAP_SERVERS=public_dns:9092 --env CONNECT_AWS_ACCESS_KEY=youracesskey --env CONNECT_AWS_SECRET_KEY=yoursecretkey qubole/streamx

You can also use Avro/Parquet format. Example:

docker run -d -p 8083:8083 --env CONNECT_BOOTSTRAP_SERVERS=public_dns:9092 --env CONNECT_AWS_ACCESS_KEY=youracesskey --env CONNECT_AWS_SECRET_KEY=yoursecretkey  --env CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter --env CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter --env CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://your.schema.registry.com:8081 --env CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://your.schema.registry.com:8081 qubole/streamx

##Roadmap

  • Support other object stores like Google Cloud Storage and Azure Blob Store
  • Currently, data can be written in avro/parquet format. This project will add support for more formats
  • Deal with features related to s3, like small-file consolidation

streamx's People

Contributors

13h3r avatar aalkilani avatar alexlod avatar ashishsachdeva avatar cjmatta avatar coughman avatar ewencp avatar gwenshap avatar ijuma avatar indit-qubole avatar ishiihara avatar jingw avatar jocelyndrean avatar kkonstantine avatar miguno avatar norwood avatar praveenseluka avatar rnpridgeon avatar shikhar avatar skyahead avatar xianzhen avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

streamx's Issues

Error while running copy job in standalone mode

#I running streamx master branch on Apache Kafka .10.0 server and trying to copy Kafka topic messages to s3. I am getting below error in starting connector. I suspect S3 filesystem bucket and key names are not properly passed to underlying APIs. Please let me know if any hadoop configuration property needed to be added.(hdfs-site.xml published below).

Command to start--
./connect-standalone.sh /app/kafka_2.11-0.10.2.1/config/connect-standalone.properties /app/streamx/streamx-0.1.0-SNAPSHOT-package/etc/streamx/connector1.properties

Connector Propeties- connector1.properties
name=s3-sink
connector.class=com.qubole.streamx.s3.S3SinkConnector
format.class=com.qubole.streamx.SourceFormat
tasks.max=1
topics=test
topics.dir=test
logs.dir=logs
flush.size=3
s3.url=s3://platform.com/data/rawdata
hadoop.conf.dir=/app/streamx/streamx-0.1.0-SNAPSHOT-package/etc/streamx/hadoop-conf
partitioner.class=io.confluent.connect.hdfs.partitioner.DailyPartitioner

Error-
Caused by: java.io.IOException: / doesn't exist
at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170)
at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy41.retrieveINode(Unknown Source)
at org.apache.hadoop.fs.s3.S3FileSystem.mkdir(S3FileSystem.java:165)
at org.apache.hadoop.fs.s3.S3FileSystem.mkdirs(S3FileSystem.java:154)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1877)
at com.qubole.streamx.s3.S3Storage.mkdirs(S3Storage.java:68)
at io.confluent.connect.hdfs.DataWriter.createDir(DataWriter.java:374)
at io.confluent.connect.hdfs.DataWriter.(DataWriter.java:174)

Worker Configuration-
connect-standalone.properties
bootstrap.servers=localhost:9092
Kafka
key.converter=com.qubole.streamx.ByteArrayConverter
value.converter=com.qubole.streamx.ByteArrayConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/kafkadata/kafka/connect.offsets

offset.flush.interval.ms=10000

hdfs-site.xml
<configuration> <property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3.S3FileSystem</value> </property> <property> <name>fs.s3.awsAccessKeyId</name> <value>secret</value> </property> <property> <name>fs.s3.awsSecretAccessKey</name> <value>secret</value> </property> </configuration>

NullPointerException when tasks.max > 1 and using s3a

Loaded kafka-connect distributed on 2 machines (using confluent kafka connect docker image). Configured it to work with streamx, created a job with tasks.max=1 while using s3a (configured in the hdfs-site.xml) - everything works fine.

Whenever I raise the number of tasks to be anything other than 1 I get the following error:

java.lang.NullPointerException
	at io.confluent.connect.hdfs.DataWriter.close(DataWriter.java:299)
 	at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:110)
 	at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:302)
 	at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:435)
 	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:147)
 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 	at java.lang.Thread.run(Thread.java:745)

I tried messing around with all the configurations, including kafka timeouts, heartbeat, etc. To no avail. After reading this issue: #30 - I tried using s3n instead and it works without exceptions now!

Thanks

Not a valid partitioner class: io.confluent.connect.hdfs.partitioner.DefaultPartitioner

Hi

I am getting below issue while submitting job. Could you please help me on this issue?

{"error_code":400,"message":"Connector configuration is invalid and contains the following 5 error(s):\nNot a valid partitioner class: io.confluent.connect.hdfs.partitioner.DefaultPartitioner\nNot a valid partitioner class: io.confluent.connect.hdfs.partitioner.DefaultPartitioner\nNot a valid partitioner class: io.confluent.connect.hdfs.partitioner.DefaultPartitioner\nNot a valid partitioner class: io.confluent.connect.hdfs.partitioner.DefaultPartitioner\nNot a valid partitioner class: io.confluent.connect.hdfs.partitioner.DefaultPartitioner\nYou can also find the above list of errors at the endpoint /{connectorType}/config/validate"}

My properties looks like below

{
"name": "s3-streamx-test",
"config": {
"connector.class": "com.qubole.streamx.s3.S3SinkConnector",
"tasks.max": "4",
"topics": "event",
"s3.url": "s3://bucket",
"flush.size": "2",
"format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
"hadoop.conf.dir": "/opt/streamx/hadoop-conf/",
"name": "s3-streamx-test"
}
}

and hdfs-site.xml
<configuration> <property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value> </property> <property> <name>fs.s3.awsAccessKeyId</name> <value>mykey</value> <property> <name>fs.s3.awsSecretAccessKey</name> value>mysecretkey</value> </property> </configuration>

Regards
Anand.

saving json data , partition by specific field (timestamp)

I have a question, data in kafka is in json format, in each event I have a field called"eventTimestamp" which is a long number which represents the event time , I want to save the data in s3 in hourly bucket based on that timestamp, not the time the event was added to Kafka

my settings when I used Kafka s3 connect are :

connector.class=io.confluent.connect.s3.S3SinkConnector
storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.json.JsonFormat
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
timestamp.extractor=RecordField
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
timestamp.field=eventTimestamp
partition.duration.ms=10
locale=en_IN
timezone=UTC

I see that streamx support TimeBasedPartitioner but if I understand it can only support to extract RecordField from parquet or avro not from json

Is it possible to do it with json ?

org.apache.kafka.connect.errors.ConnectException: java.io.FileNotFoundException: File /logs/test-streamx/0/log does not exist

I was following the quickstart and when I tried starting the connector for the first time, I got this exception:

[2017-01-24 00:48:41,877] ERROR Recovery failed at state WAL_APPLIED (io.confluent.connect.hdfs.TopicPartitionWriter:219)
org.apache.kafka.connect.errors.ConnectException: java.io.FileNotFoundException: File /logs/test-streamx/0/log does not exist
	at io.confluent.connect.hdfs.utils.MemoryWAL.truncate(MemoryWAL.java:87)
	at io.confluent.connect.hdfs.TopicPartitionWriter.truncateWAL(TopicPartitionWriter.java:457)
	at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:205)
	at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:232)
	at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:237)
	at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:92)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:384)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File /logs/test-streamx/0/log does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:609)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:822)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:599)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:337)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:289)
	at org.apache.hadoop.fs.RawLocalFileSystem.rename(RawLocalFileSystem.java:372)
	at org.apache.hadoop.fs.ChecksumFileSystem.rename(ChecksumFileSystem.java:513)
	at com.qubole.streamx.s3.S3Storage.renameFile(S3Storage.java:140)
	at com.qubole.streamx.s3.S3Storage.commit(S3Storage.java:96)
	at io.confluent.connect.hdfs.utils.MemoryWAL.truncate(MemoryWAL.java:84)
	... 16 more

I found this statement the HDFS connector doc file:

note: You need to make sure the connector user have write access to the directories
specified in topics.dir and logs.dir. The default value of topics.dir is
/topics and the default value of logs.dir is /logs, if you don't specify the two
configurations, make sure that the connector user has write access to /topics and /logs.
You may need to create /topics and /logs before running the connector as the connector
usually don't have write access to /.

I did this, however, the error still persists. I then created the file manually and it seems to have resolved the problem. Possibly a bug? I'm using the NativeS3FileSystem, but other than that I just followed the quick start.

from kafka(avro format) to S3 (Parquet format)

I am trying to export the avro records from kafka and load them into s3 in parquet format.
I got an exception as shown below.

ERROR WorkerSinkTask{id=s3-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
java.lang.NullPointerException
at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:122)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:377)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:576)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2018-08-07 18:45:49,558] ERROR WorkerSinkTask{id=s3-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
^C[2018-08-07 18:55:39,021] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:65)

quickstart-s3.properties

name=s3-sink
connector.class=com.qubole.streamx.s3.S3SinkConnector
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
partitioner.class=io.confluent.connect.hdfs.partitioner.HourlyPartitioner
tasks.max=1
topics=abc
flush.size=1

s3.url=https://s3.console.aws.amazon.com/s3/buckets/bucketname/topics/?region=us-west-2&tab=overview
hadoop.conf.dir=pathtostreamx/streamx/config/hadoop-conf

connect-standalone.properties

bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter.schemas.enable=true
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=share/java

Thank you for any corrections or suggestions

AWS Athena connection

I am interested to Parquet file to S3 files that will be used by AWS Athena.
Athena can use external catalog exported by Glue.

How can I setup this connection ?

Strange problem of Parquet files in S3

I use streamx to sink kafka data to S3 as parquet files, everything is fine, I can observe the logs, which give me the messages, that the parquet fiiles are generated as expected below,

Dec 19, 2017 8:02:35 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 29B for [accessuri] BINARY: 1 values, 6B raw, 8B comp, 1 pages, encodings: [[2017-12-19 08:02:35,933] INFO Committed s3.test/topics/colin-forecast/year=2017/month=12/day=19/colin-forecast+0+0000045248+0000045248.parquet for colin-forecast-0 (io.confluent.connect.hdfs.TopicPartitionWriter:638)
[2017-12-19 08:02:35,947] INFO Got brand-new compressor [.snappy] (org.apache.hadoop.io.compress.CodecPool:153)
[2017-12-19 08:02:35,948] INFO Starting commit and rotation for topic partition colin-forecast-0 with start offsets {year=2017/month=12/day=19=45249} and end offsets {year=2017/month=12/day=19=45249} (io.confluent.connect.hdfs.TopicPartitionWriter:302)
[2017-12-19 08:02:35,949] INFO Committed s3.test/topics/colin-forecast/year=2017/month=12/day=19/colin-forecast+0+0000045249+0000045249.parquet for colin-forecast-0 (io.confluent.connect.hdfs.TopicPartitionWriter:638)
[2017-12-19 08:02:35,961] INFO Got brand-new compressor [.snappy] (org.apache.hadoop.io.compress.CodecPool:153)
[2017-12-19 08:02:35,962] INFO Starting commit and rotation for topic partition colin-forecast-0 with start offsets {year=2017/month=12/day=19=45250} and end offsets {year=2017/month=12/day=19=45250} (io.confluent.connect.hdfs.TopicPartitionWriter:302)
[2017-12-19 08:02:35,963] INFO Committed s3.test/topics/colin-forecast/year=2017/month=12/day=19/colin-forecast+0+0000045250+0000045250.parquet for colin-forecast-0 (io.confluent.connect.hdfs.TopicPartitionWriter:638)

But I cannot find the parquet files landed in S3, there is nothing in S3, why? Do I need some configuration at S3 side? Thanks in advanced.

InvocationTargetException when trying to run a job

Hi,

I'm trying to submit a job -

{"name":"test",
"config":{"name":"test",
"connector.class":"com.qubole.streamx.s3.S3SinkConnector",
"tasks.max":"1",
"flush.size":"100",
"s3.url":"s3a://test",
"wal.class":"com.qubole.streamx.s3.wal.DBWAL",
"hadoop.conf.dir":"/opt/hadoop/etc/hadoop/",
"topics":"test"}}

But I get a InvocationTargetException and then the job is killed.
My core-site.xml is -


<configuration>
<property>
        <name>fs.AbstractFileSystem.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3A</value>
</property>
<property>
        <name>fs.s3a.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
        <name>fs.s3a.access.key</name>
        <value>xxxx</value>
</property>
<property>
        <name>fs.s3a.secret.key</name>
        <value>xxxxx</value>
</property>
<property>
        <name>fs.s3a.endpoint</name>
        <value>s3.eu-central-1.amazonaws.com</value>
</property>
</configuration>

Hadoop fs -ls s3a://test works fine.
What am I doing wrong here?

Thanks.

S3 Source Connector

Could streamx also provide an S3 Source connector, for reading data from S3 back onto Kafka? This would help in a backup/restore use case.

`NoSuchMethodError` on Kafka 0.11.0.0

I have the following configuration:

OS: OS X Sierra
Java: 1.8.0_131
Kafka: 0.11.0.0 installed via Brew

I've compiled streamx off of master. I'm trying to run the connector in standalone and I see the following error:

$ ./bin/connect-standalone.sh config/connect-standalone.properties /Users/<username>/dev/noodle/streamx/config/quickstart-s3.properties 
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/<username>/dev/noodle/streamx/target/streamx-0.1.0-SNAPSHOT-development/share/java/streamx/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/Cellar/kafka/0.11.0.0/libexec/libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.collect.Sets$SetView.iterator()Lcom/google/common/collect/UnmodifiableIterator;
	at org.reflections.Reflections.expandSuperTypes(Reflections.java:380)
	at org.reflections.Reflections.<init>(Reflections.java:126)
	at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:221)
	at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:198)
	at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:159)
	at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:47)
	at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:68)

Any thoughts?

org.apache.hadoop.fs.s3a.S3AFileSystem.append not supported

I am trying out this connector and am running into the following issue. I start the connector and then I send data to the topic it is reading from. I then stop the connector, start it again, and then continue to send data to the same topic it was reading from before. It appears that it is trying to append to the same log file as before and is throwing an exception because append is not a supported operation in S3.

I am wondering if I am using this connector wrong and if this is the intended behavior. Perhaps a potential fix could be to check if that log file already exists and if it does then rename it and create a new one with the old name? However, this approach might break the exactly once guarantee feature, but I am not sure.

Thank you for any insight and guidance

s3 bucket name redacted below:

[2016-06-28 00:18:01,367] INFO Getting path status for s3a://s3 bucket/streamx/logs/streamx/0/log (streamx/logs/streamx/0/log) (org.apache.hadoop.fs.s3a.S3AFileSystem:684)
[2016-06-28 00:18:01,842] INFO Getting path status for s3a://s3 bucket/streamx/logs/streamx/0/log (streamx/logs/streamx/0/log) (org.apache.hadoop.fs.s3a.S3AFileSystem:684)
[2016-06-28 00:18:01,930] INFO Getting path status for s3a://s3 bucket/streamx/logs/streamx/0/log (streamx/logs/streamx/0/log) (org.apache.hadoop.fs.s3a.S3AFileSystem:684)
[2016-06-28 00:18:02,015] INFO Opening 's3a://s3 bucket/streamx/logs/streamx/0/log' for reading (org.apache.hadoop.fs.s3a.S3AFileSystem:221)
[2016-06-28 00:18:02,016] INFO Getting path status for s3a://s3 bucket/streamx/logs/streamx/0/log (streamx/logs/streamx/0/log) (org.apache.hadoop.fs.s3a.S3AFileSystem:684)
[2016-06-28 00:18:02,101] INFO Actually opening file streamx/logs/streamx/0/log at pos 0 (org.apache.hadoop.fs.s3a.S3AFileSystem:84)
[2016-06-28 00:18:02,190] ERROR Recovery failed at state RECOVERY_PARTITION_PAUSED (io.confluent.connect.hdfs.TopicPartitionWriter:214)
org.apache.kafka.connect.errors.ConnectException: Error creating writer for log file s3a://s3 bucket/streamx/logs/streamx/0/log
at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:91)
at io.confluent.connect.hdfs.wal.FSWAL.apply(FSWAL.java:105)
at io.confluent.connect.hdfs.TopicPartitionWriter.applyWAL(TopicPartitionWriter.java:441)
at io.confluent.connect.hdfs.TopicPartitionWriter.recover(TopicPartitionWriter.java:197)
at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:227)
at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:234)
at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:280)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:176)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Caused by: java.io.IOException: Not supported
at org.apache.hadoop.fs.s3a.S3AFileSystem.append(S3AFileSystem.java:268)

at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1173)
at io.confluent.connect.hdfs.wal.WALFile$Writer.(WALFile.java:221)
at io.confluent.connect.hdfs.wal.WALFile.createWriter(WALFile.java:67)
at io.confluent.connect.hdfs.wal.FSWAL.acquireLease(FSWAL.java:73)

JSON records to Parquet on S3 won't work

Hi

I would like to write parquet directly on S3 my events are only JSON string.
Do you know if what I'm trying to do can work ?

In fact I've already tried with this config

{
"name": "ParquetS3",
"config": {
"name": "ParquetS3",
"connector.class": "com.qubole.streamx.s3.S3SinkConnector",
"format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.hdfs.partitioner.HourlyPartitioner",
"locale": "en",
"timezone": "UTC",
"tasks.max": 11,
"topics": "XXX",
"flush.size": 50000,
"s3.url": "s3n://XXXXX",
"hadoop.conf.dir": "/etc/kafka-connect-s3/hadoop-conf"
}
}

it's actually don't work

The only error I can see in the log is

Task ParquetS3-56 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
java.lang.NullPointerException
at io.confluent.connect.hdfs.DataWriter.close(DataWriter.java:299)
at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:122)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:317)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:480)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:152)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

Is there any other log output for streamX connector

Thanks

Do I have to set up HDFS in order to use streamX?

I noticed I have to configure the hadoop config files like core-site.xml, hdfs-site.xml to configure S3. And I could not find the mentioned config/hadoop-conf in my installation (Kafka 0.10.2.0). So do I have to use HDFS in order to use this streamX?

What I am trying to do is to transform some messages in JSON format to parquet and then store them in S3.

Using spark could achieve this target but it would require a long-running cluster to do, or I can use the checkpoint to do a per day basic ETL.

Bug in S3SinkConenctorConfig

Hello,

I pulled the latest commit and tried to run it. It resulted in the following error:

[2016-07-07 22:44:42,095] ERROR Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:166)
java.lang.NoSuchFieldError: config
at com.qubole.streamx.s3.S3SinkConnectorConfig.(S3SinkConnectorConfig.java:64)
at com.qubole.streamx.s3.S3SinkConnector.start(S3SinkConnector.java:51)
at org.apache.kafka.connect.runtime.Worker.addConnector(Worker.java:186)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:668)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startWork(DistributedHerder.java:640)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.handleRebalanceCompleted(DistributedHerder.java:598)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:184)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159)

Perhaps the following line needs to be added to line 62 of S3SinkConnectorConfig.java?

protected static ConfigDef config = new ConfigDef();

Thanks

Rename project

The current name is completely un-SEOable. Why not kafka-connect-object-stores or kafka-connect-cloud-blobs

OutOfMemory error from S3 transfer manager

Hi, we've been seeing issues where the number of threads open by the plugin seems to be increasing unbounded and the connector workers all eventually die with stack traces pointing to failure to create threads in the S3 transfer manager. We do see data being uploaded to S3 but I'm wondering if this might be a case where we are consuming faster than we can upload. Has this been observed by anyone else in the past and is there a known workaround? Thanks.

[2017-01-16 18:30:15,554] ERROR Task pixel_events_s3-3 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:713)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:949)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1360)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:132)
at com.amazonaws.services.s3.transfer.internal.UploadMonitor.(UploadMonitor.java:129)
at com.amazonaws.services.s3.transfer.TransferManager.upload(TransferManager.java:449)
at com.amazonaws.services.s3.transfer.TransferManager.upload(TransferManager.java:382)
at org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:127)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:376)
at io.confluent.connect.hdfs.avro.AvroRecordWriterProvider$1.close(AvroRecordWriterProvider.java:69)
at io.confluent.connect.hdfs.TopicPartitionWriter.closeTempFile(TopicPartitionWriter.java:514)
at io.confluent.connect.hdfs.TopicPartitionWriter.close(TopicPartitionWriter.java:319)
at io.confluent.connect.hdfs.DataWriter.close(DataWriter.java:299)
at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:110)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:432)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

S3 partition file per hourly batch

Hi

I'd like to know if there is an option to write one file per partition which means per hour.
For example, if i have 5 workers with 5 tasks, and I run hourly batch, is this plugins would know to aggregate the data to one file per the running batch?

Thanks
D.

NullPointer exception using DBWAL and s3n

I'm hoping someone might be able to shine some light on this particular issue as we have been having intermittent stability issues pushing Avro files using s3n and the DBWAL and have seen this particular problem several times. After running a connector with 8 tasks for the last 4-5 days, all of tasks started to fail at around the same time with the issue below. When we hit this issue the only way to restart the job seems to be to drop all the tables that the DBWAL is using and recreate the job. Is this a known issue? Is there a known fix or workaround? Thanks.

[2017-02-21 01:55:32,511] INFO Offset from WAL 304796 for topic partition id 46 (com.qubole.streamx.s3.wal.DBWAL:287)
[2017-02-21 01:55:32,512] INFO truncating table delete from pixel_events_s3_organic_events_wh_13 where id < 2865 (com.qubole.streamx.s3.wal.DBWAL:250)
[2017-02-21 01:55:32,512] INFO Finished recovery for topic partition organic_events_wh-46 (io.confluent.connect.hdfs.TopicPartitionWriter:213)
[2017-02-21 01:55:32,512] INFO truncating table delete from pixel_events_s3_organic_events_wh_38 where id < 2870 (com.qubole.streamx.s3.wal.DBWAL:250)
[2017-02-21 01:55:32,512] INFO Reading wal select * from pixel_events_s3_organic_events_wh_69 order by id desc limit 1 (com.qubole.streamx.s3.wal.DBWAL:211)
[2017-02-21 01:55:32,513] INFO Offset from WAL 298197 for topic partition id 13 (com.qubole.streamx.s3.wal.DBWAL:287)
[2017-02-21 01:55:32,513] INFO Reading wal select * from pixel_events_s3_organic_events_wh_79 order by id desc limit 1 (com.qubole.streamx.s3.wal.DBWAL:211)
[2017-02-21 01:55:32,513] INFO Offset from WAL 311797 for topic partition id 38 (com.qubole.streamx.s3.wal.DBWAL:287)
[2017-02-21 01:55:32,513] INFO Finished recovery for topic partition organic_events_wh-13 (io.confluent.connect.hdfs.TopicPartitionWriter:213)
[2017-02-21 01:55:32,514] INFO Finished recovery for topic partition organic_events_wh-38 (io.confluent.connect.hdfs.TopicPartitionWriter:213)
[2017-02-21 01:55:32,515] INFO Started recovery for topic partition paid_events_wh-23 (io.confluent.connect.hdfs.TopicPartitionWriter:198)
[2017-02-21 01:55:32,517] INFO Started recovery for topic partition organic_events_wh-29 (io.confluent.connect.hdfs.TopicPartitionWriter:198)
[2017-02-21 01:55:32,517] INFO Started recovery for topic partition organic_events_wh-15 (io.confluent.connect.hdfs.TopicPartitionWriter:198)
[2017-02-21 01:55:32,518] INFO Reading wal select * from pixel_events_s3_paid_events_wh_23 order by id desc limit 1 (com.qubole.streamx.s3.wal.DBWAL:211)
[2017-02-21 01:55:32,521] INFO Reading wal select * from pixel_events_s3_organic_events_wh_29 order by id desc limit 1 (com.qubole.streamx.s3.wal.DBWAL:211)
[2017-02-21 01:55:32,522] INFO Recovering file (com.qubole.streamx.s3.wal.DBWAL:223)
[2017-02-21 01:55:32,522] INFO Reading wal select * from pixel_events_s3_organic_events_wh_15 order by id desc limit 1 (com.qubole.streamx.s3.wal.DBWAL:211)
[2017-02-21 01:55:32,524] INFO truncating table delete from pixel_events_s3_organic_events_wh_29 where id < 2857 (com.qubole.streamx.s3.wal.DBWAL:250)
[2017-02-21 01:55:32,530] ERROR Task pixel_events_s3-7 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
java.lang.NullPointerException
at io.confluent.connect.hdfs.DataWriter.close(DataWriter.java:299)
at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:110)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:432)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-02-21 01:55:32,531] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)

Parquet and Hive integration - Example job configuration

Hello Guys,
Thank you so much for this amazing project ! Really a good job :)
I'm really sorry for this stupid issue, but: Is anyone got a configuration example for :

  • a job that uses Hive Integration (where the connector creates partitioned hive table and periodically does add partitions once it writes a new partition to s3)
  • a job that writing data in Parquet (or Avro) formats

Thank so much for your help :)
Have a nice day,
Jocelyn

java.lang.NoSuchFieldError: INSTANCE exception, caused by http client version mismatch

I'm trying to get the s3 connector working but I keep running into this exception when I use the S3AFileSystem (I would use NativeS3FileSystem but for reasons I need S3a).

[2017-01-24 01:50:41,622] INFO Couldn't start HdfsSinkConnector: (io.confluent.connect.hdfs.HdfsSinkTask:73)
org.apache.kafka.connect.errors.ConnectException: java.lang.reflect.InvocationTargetException
	at io.confluent.connect.hdfs.storage.StorageFactory.createStorage(StorageFactory.java:40)
	at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:171)
	at io.confluent.connect.hdfs.HdfsSinkTask.start(HdfsSinkTask.java:65)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:221)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:140)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
	at io.confluent.connect.hdfs.storage.StorageFactory.createStorage(StorageFactory.java:33)
	... 11 more
Caused by: java.lang.NoSuchFieldError: INSTANCE
	at org.apache.http.conn.ssl.SSLConnectionSocketFactory.<clinit>(SSLConnectionSocketFactory.java:144)
	at com.amazonaws.http.apache.client.impl.ApacheConnectionManagerFactory.getPreferredSocketFactory(ApacheConnectionManagerFactory.java:87)
	at com.amazonaws.http.apache.client.impl.ApacheConnectionManagerFactory.create(ApacheConnectionManagerFactory.java:65)
	at com.amazonaws.http.apache.client.impl.ApacheConnectionManagerFactory.create(ApacheConnectionManagerFactory.java:58)
	at com.amazonaws.http.apache.client.impl.ApacheHttpClientFactory.create(ApacheHttpClientFactory.java:51)
	at com.amazonaws.http.apache.client.impl.ApacheHttpClientFactory.create(ApacheHttpClientFactory.java:39)
	at com.amazonaws.http.AmazonHttpClient.<init>(AmazonHttpClient.java:319)
	at com.amazonaws.http.AmazonHttpClient.<init>(AmazonHttpClient.java:303)
	at com.amazonaws.AmazonWebServiceClient.<init>(AmazonWebServiceClient.java:164)
	at com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:564)
	at com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:544)
	at com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:526)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:235)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2653)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687)
	at org.apache.hadoop.fs.FileSystem$Cache.getUnique(FileSystem.java:2675)
	at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:418)
	at com.qubole.streamx.s3.S3Storage.<init>(S3Storage.java:49)
	... 16 more

Some cursory googling is indicating that it is related to a version conflict in the apache http core library. I see in /home/ec2-user/streamx/target/streamx-0.1.0-SNAPSHOT-development/share/java/streamx/* there is the jar httpcore-4.2.4.jar. In /usr/bin/../share/java/kafka, which is also on the classpath, there is the jar httpcore-4.4.3.jar. I can try to take a stab at fixing this, but I figured I'd file an issue in case it is a known issue and/or if there is an established work around.

NullPointerException after recreating Kafka connector

I have been using the S3 connector with solid results. Unfortunately I tried recreating a Kafka connector and am seeing the following exception being thrown across all the tasks. Do you know what might cause this and any potential workarounds? Thanks.

[2017-01-28 04:13:45,313] ERROR Task pixel_events_s3-3 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
java.lang.NullPointerException
at io.confluent.connect.hdfs.DataWriter.close(DataWriter.java:299)
at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:110)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:432)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
[2017-01-28 04:13:45,313] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)

Tests failing?

I am trying to run the tests locally.
I just cloned the project, ran mvn test and I get 35 errors. All of them have to do with Unknown configuration 's3.url'. Moreover the stacktrace for one of those errors:

testAlterSchema(io.confluent.connect.hdfs.avro.AvroHiveUtilTest)  Time elapsed: 101.004 sec  <<< ERROR!
org.apache.kafka.common.config.ConfigException: Unknown configuration 's3.url'
	at org.apache.kafka.common.config.AbstractConfig.get(AbstractConfig.java:74)
	at org.apache.kafka.common.config.AbstractConfig.getString(AbstractConfig.java:109)
	at io.confluent.connect.hdfs.hive.HiveUtil.<init>(HiveUtil.java:33)
	at io.confluent.connect.hdfs.avro.AvroHiveUtil.<init>(AvroHiveUtil.java:44)
	at io.confluent.connect.hdfs.avro.AvroHiveUtilTest.setUp(AvroHiveUtilTest.java:48)

I have tried adding the s3.url in quickstart-hdfs.properties with no luck.
Is this a known issue maybe? Am I missing something obvious?

UPDATE: It seems that the tests don't pick up the files under config/ folder and they fail because the specific config has no default. Any ideas?

Any help would be greatly appreciated!

Writing Avro data

We needed to switch our key and value converter from io.confluent.connect.avro.AvroConverter to com.qubole.streamx.ByteArrayConverter to support writing CSV/JSON data to S3. Is there a supported way to write Avro files using this configuration? I tried scheduling the job with "format.class=io.confluent.connect.hdfs.avro.AvroFormat" however this does not work for me.

java.lang.IllegalAccessError

Hi,
I'm facing below error while running connector. Any quick guide in this ?

java.lang.IllegalAccessError: tried to access field io.confluent.connect.hdfs.HdfsSinkConnectorConfig.config from class com.qubole.streamx.s3.S3SinkConnectorConfig

-- error log
java.lang.IllegalAccessError: tried to access field io.confluent.connect.hdfs.HdfsSinkConnectorConfig.config from class com.qubole.streamx.s3.S3SinkConnectorConfig
at com.qubole.streamx.s3.S3SinkConnectorConfig.(S3SinkConnectorConfig.java:64)
at com.qubole.streamx.s3.S3SinkConnector.start(S3SinkConnector.java:51)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:101)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:126)
at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:183)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:178)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:789)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startWork(DistributedHerder.java:755)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.handleRebalanceCompleted(DistributedHerder.java:715)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:206)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:176)

at java.lang.Thread.run(Thread.java:745)

Thanks,
Mayank

can we store the data as txt/json format in s3?

I want the message published in kafka topics to be stored in s3 bucket as text/json. Can I use StreamX as alternative to Aws Kinesis Firehose delivery stream.The Delivery stream push msg when ever buffer/time reaches the threshold

S3 to Kafka

Any documented approach for getting the dumps from S3 back onto a fresh kafka?

JSON messages not legible in S3 saved files

Hi,

I saw in an earlier issue that there is now support for JSON format. I have followed the README instructions, using
key.converter=com.qubole.streamx.ByteArrayConverter value.converter=com.qubole.streamx.ByteArrayConverter
and
"format.class":"com.qubole.streamx.SourceFormat"

Files are being successfully saved to S3, however, their content are not legible strings, eg.
^A^@^@^@^A^»¯,Ú^OB^V^@^@^@^@^@^@^@x

It seems in the file SourceRecordWriterProvider.java, it is directly writing bytes. Is it converted somewhere to string?

Thanks!

Problem with reassignment of tasks when running in a distributed mode.

Since copying data is done from local to s3,How to deal with partition reassignments when running in a distributed mode?? Sink task for a particular topic partition can run on a different machine after partition reassignment and the recovery to be done after partition reassignment will fail because wal append couldn't find the file on new local storage.

A question

in the connect-distributed properties:
bootstrap.servers= can I provide a list of zookeepers servers? or does this have be be kafka server?

thanks,

Wrong FS error

I got this Wrong Filesystem error while running the maven tests. I have built the latest snapshot versions of kafka packages.

2016-05-25 16:30:43,710 WARN [org.apache.hadoop.hdfs.server.blockmanagement.DecommissionManager$Monitor@4163f1cd] blockmanagement.DecommissionManager (DecommissionManager.java:run(78)) - Monitor interrupted: java.lang.InterruptedException: sleep interrupted
2016-05-25 16:30:43,718 INFO [main] namenode.FSNamesystem (FSNamesystem.java:stopActiveServices(1246)) - Stopping services started for active state
2016-05-25 16:30:43,718 INFO [main] namenode.FSNamesystem (FSNamesystem.java:stopStandbyServices(1334)) - Stopping services started for standby state
2016-05-25 16:30:43,720 INFO [main] mortbay.log (Slf4jLog.java:info(67)) - Stopped HttpServer2$SelectChannelConnectorWithSafeStartup@localhost:0
2016-05-25 16:30:43,721 INFO [main] impl.MetricsSystemImpl (MetricsSystemImpl.java:stop(210)) - Stopping DataNode metrics system...
2016-05-25 16:30:43,722 INFO [main] impl.MetricsSystemImpl (MetricsSystemImpl.java:stop(216)) - DataNode metrics system stopped.
2016-05-25 16:30:43,722 INFO [main] impl.MetricsSystemImpl (MetricsSystemImpl.java:shutdown(605)) - DataNode metrics system shutdown complete.
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 3.307 sec <<< FAILURE! - in io.confluent.connect.hdfs.wal.WALTest
testWALMultiClient(io.confluent.connect.hdfs.wal.WALTest) Time elapsed: 3.302 sec <<< ERROR!
java.lang.IllegalArgumentException: Wrong FS: file:/tmp/topics/topic/12/5a3b1baf-f9a5-443b-b043-50ea80c7e15d_tmp.avro, expected: hdfs://localhost:9001
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)
at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:193)
at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:105)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1118)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400)
at org.apache.hadoop.fs.FileSystem.createNewFile(FileSystem.java:1148)
at io.confluent.connect.hdfs.wal.WALTest.testWALMultiClient(WALTest.java:54)

Results :

Failed tests:
HiveIntegrationAvroTest.testHiveIntegrationFieldPartitionerAvro:205 expected:<[hdfs://127.0.0.1:9001/topics/topic/int=16, hdfs://127.0.0.1:9001/topics/topic/int=17, hdfs://127.0.0.1:9001/topics/topic/int=18]> but was:<[]>
TopicPartitionWriterTest.testWriteRecordFieldPartitioner:148->verify:263 expected:<3> but was:<1>
TopicPartitionWriterTest.testWriteRecordTimeBasedPartition:189->verify:263 expected:<3> but was:<1>
HiveIntegrationParquetTest.testHiveIntegrationParquet:149 expected:<[hdfs://127.0.0.1:9001/topics/topic/partition=12]> but was:<[]>
HiveIntegrationParquetTest.testHiveIntegrationTimeBasedPartitionerParquet:300 expected:<9> but was:<3>
ParquetHiveUtilTest.testAlterSchema:132 expected:<6> but was:<1>
ParquetHiveUtilTest.testCreateTable:92 expected:<6> but was:<3>
Tests in error:
HdfsSinkTaskTest.testSinkTaskStartWithRecovery:98 » IllegalArgument Wrong FS: ...
AvroHiveUtilTest.testAlterSchema:92->prepareData:146 » IllegalArgument Wrong F...
AvroHiveUtilTest.testCreateTable:53->prepareData:146 » IllegalArgument Wrong F...
DataWriterAvroTest.testGetPreviousOffsets:212 » IllegalArgument Wrong FS: file...
DataWriterAvroTest.testProjectBackWard:401 » FileNotFound File does not exist:...
DataWriterAvroTest.testProjectNone:459 » FileNotFound File does not exist: hdf...
DataWriterAvroTest.testRebalance:298 » IllegalArgument Wrong FS: file:/tmp/top...
DataWriterAvroTest.testRecovery:117 » IllegalArgument Wrong FS: file:/tmp/topi...
DataWriterAvroTest.testWriteRecord:73 » IllegalArgument Wrong FS: file:/tmp/to...
DataWriterAvroTest.testWriteRecordMultiplePartitions:176 » IllegalArgument Wro...
DataWriterAvroTest.testWriteRecordNonZeroInitailOffset:248 » IllegalArgument W...
HiveIntegrationAvroTest.testHiveIntegrationAvro:126 » IllegalArgument Wrong FS...
HiveIntegrationAvroTest.testSyncWithHiveAvro:70 » IllegalArgument Wrong FS: fi...
DataWriterParquetTest.testWriteRecord:81 » FileNotFound File hdfs://127.0.0.1:...
WALTest.testWALMultiClient:54 » IllegalArgument Wrong FS: file:/tmp/topics/top...

Tests run: 45, Failures: 7, Errors: 15, Skipped: 0

[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 03:39 min
[INFO] Finished at: 2016-05-25T16:30:43-07:00
[INFO] Final Memory: 43M/437M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test (default-test) on project kafka-connect-hdfs: There are test failures.
[ERROR]
[ERROR] Please refer to /Users/kushumsharma/backup/github/streamx/target/surefire-reports for the individual test results.
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException

publish jars to Maven

Hello! Thanks for your work.

Could you publish this to Maven? It would help many of your users.

Cheers!

Folders as files with $ dollar sign in their name when using s3n

When using s3n protocol, many "folder" files (sized 0 bytes) are generated along their respective folders with $ in their names, like "all_$folder$".

Some of the folders themselves aren't even created and only these dollar named files are, like "+tmp_$folder$". Even though this is printed in the logs:

[2017-02-01 17:30:12,308] INFO OutputStream for key 'topics/+tmp/all/year=2017/month=02/day=01/hour=17/0d60b9d1-7dcc-468a-b0c4-682609280877_tmp.parquet' writing to tempfile '/tmp/hadoop-root/s3/output-1823693314448547785.tmp' (org.apache.hadoop.fs.s3native.NativeS3FileSystem)

No +tmp directory created :-\ Only the 0 byte file.

Is there an elegant way of stopping the generation of these files? In s3a this doesn't occur but using that seems buggy at the moment (I need to open another issue for this) so I've resorted to s3n.

Thanks

Question: Do you regularly merge kafka-connect-hdfs updates to your fork?

I see there is lots of development that happens with kafka-connect-hdfs. I have been building a solution with your S3 connector as we are on S3 and not HDFS anymore. As I continue development before deploying to production (I am working on my own custom partitioner), I would love to have the latest updates from kafka-connect-hdfs as well as the latest updates to your own S3 connector. What are your own reasons for not merging the latest kafka-connect-hdfs changes to your fork?

Use c3p0 for JDBC connection pooling

StreamX DBWAL uses JDBC to communicate with mysql. If StreamX is long running - and does not get any messages, the connection gets closed (after wait_timeout) and DBWAL fails later. To avoid this, we can use some connection pooling. The pool will renew the connection periodically.

Why only last append is taken from rds when applying a wal ???

I was going through the code of RDSWal and bumped into a doubt.Why only last record is taken from rds table for a topicPartition to make a commit.Since a single topicPartition can write data to multiple files (tempFiles) based on partitioner logic we use,I guess there is a chance that we might not commit all the temp files upon wal apply.Any clarification on this this highly appreciated.

Tmp File Double slash

Hi,
I got my tmp file looks wrong, it contains double slash in the path like:
"s3://airy-poc-ingestion/topics//+tmp/avrotest/partition=0/ad91f606-8ebd-4aab-8598-335d0c566e31_tmp.avro"
"s3://airy-poc-ingestion/topics//+tmp/avrotest/partition=0/ad91f606-8ebd-4aab-8598-335d0c566e31_tmp.avro"

commited files looks just fine: "s3://airy-poc-ingestion/topics/avrotest/partition=0/avrotest+0+0000000000+0000000002.avro"

Support Openstack Swift object store

Openstack Swift is also widely used object store. Many vendors like rackspace, Oracle, etc. It will be nice to support Openstack Swift as well.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.