aiven-open / s3-connector-for-apache-kafka Goto Github PK
View Code? Open in Web Editor NEWAiven's S3 Sink Connector for Apache Kafka®
License: Apache License 2.0
Aiven's S3 Sink Connector for Apache Kafka®
License: Apache License 2.0
If not already currently possible (I wasn't able to find any mention of it in the docs), it would be great to be able to configure aws.access.key.id
and aws.secret.access.key
from environment variables.
We use s3 filename template prefix/'%Y_%m_%d__%H_%M_%S_%f
to sort filenames alphabetically.
The next new file is guaranteed to receive the following name in alphabetical order.
In kafka, we have several partitions of one topic, each of them must be written with the same prefix (prefix=topic_name) in order.
It's possible to ensure the files order with this template by running no more than 1 connector task.
Timestamp variable have next parameters:
unit parameter values:
yyyy - year, e.g. 2020 (please note that YYYY is deprecated and is interpreted as yyyy)
MM - month, e.g. 03
dd - day, e.g. 01
HH - hour, e.g. 24
With these parameters, files recorded within 1 hour will not differ in name.
Adding the partition number and offset to the file name in the template can solve this problem, but it makes working with the root prefix more difficult.
Uniqueness can be ensured by adding minutes, seconds, milliseconds to the timestamp variable.
Looks like it's enough to extend the following functionality :
private static final Map<String, DateTimeFormatter> TIMESTAMP_FORMATTERS =
Map.of(
"yyyy", DateTimeFormatter.ofPattern("yyyy"),
"MM", DateTimeFormatter.ofPattern("MM"),
"dd", DateTimeFormatter.ofPattern("dd"),
"HH", DateTimeFormatter.ofPattern("HH")
);
with next parameters:
"%M" - Minutes in two-digit format.
"%S" - Seconds in two-digit format.
"%f" - Microseconds.
We have several topics, each of them already containing gigabytes of data (~1-10 millions of records). We need to export the data to S3.
Using the Aiven S3 Connector we run into Out-of-memory errors indicating that the Kafka Connect JVM process does not have enough heap space.
The S3 connector runs into errors.
The entire Kafka Connect cluster is lagging.
The Aiven CLI stops working and returns an 503 error.
Looking at the logs it looks like the connector is permanently ingesting messages from the topic and storing them in memory.
(log messages come from here)
It looks like the connector is not fast enough in writing to S3 and thus the memory is not freed in time.
We managed to get rid of the Out-of-memory errors by scaling up the Kafka Connect cluster. However, this is not a suitable long-term solution as we would need to setup multiple such connectors in parallel in the future.
We would like to have something that gives us some control over the memory consumption of the connector, e.g., a configuration for the maximum size of the input records buffer.
PS: Trying out the Confluent S3 connector provided by Aiven (version 5.0.0) does not run into Out-of-memory errors and utilizes a lot less memory but it's not an option for us.
AWS credentials:
https://github.com/aiven/aiven-kafka-connect-s3/blob/master/build.gradle#L45
It should be documented in README, that the connector is targeted at Java 11.
Hi,
We are currently using this S3 Sink connector to dump content of Kafka topic into a AWS S3.
However, we notice in our monitoring that there is some random failure with S3 DNS resolving.
The connector seems to retry/success record processing few milliseconds after but we are wondering if it is a normal behavior, why it happened and if there is a way to avoid it.
Here is the trace :
... 34 more
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1127)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1311)
at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
at com.amazonaws.http.conn.$Proxy48.connect(Unknown Source)
at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at jdk.internal.reflect.GeneratedMethodAccessor134.invoke(Unknown Source)
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:112)
at com.amazonaws.http.DelegatingDnsResolver.resolve(DelegatingDnsResolver.java:38)
at com.amazonaws.SystemDefaultDnsResolver.resolve(SystemDefaultDnsResolver.java:27)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1302)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1368)
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1509)
at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)
Caused by: java.net.UnknownHostException: lcdp-prod-audit-database.s3.eu-west-3.amazonaws.com
at java.base/java.lang.Thread.run(Thread.java:829)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:218)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:374)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:404)
at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
at io.aiven.kafka.connect.s3.S3SinkTask.flush(S3SinkTask.java:122)
at java.base/java.util.Collections$UnmodifiableMap.forEach(Collections.java:1505)
at java.base/java.util.HashMap.forEach(HashMap.java:1337)
at io.aiven.kafka.connect.s3.S3SinkTask.flushFile(S3SinkTask.java:140)
at io.aiven.kafka.connect.common.output.parquet.ParquetOutputWriter.writeRecords(ParquetOutputWriter.java:70)
at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:284)
at org.apache.parquet.hadoop.ParquetFileWriter.start(ParquetFileWriter.java:339)
at java.base/java.io.OutputStream.write(OutputStream.java:122)
at io.aiven.kafka.connect.common.output.parquet.ParquetPositionOutputStream.write(ParquetPositionOutputStream.java:47)
at io.aiven.kafka.connect.s3.S3OutputStream.write(S3OutputStream.java:79)
at io.aiven.kafka.connect.s3.S3OutputStream.newMultipartUpload(S3OutputStream.java:96)
at com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3574)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4998)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5052)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:752)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:784)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1135)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1189)
com.amazonaws.SdkClientException: Unable to execute HTTP request: lcdp-prod-audit-database.s3.eu-west-3.amazonaws.com
[2022-07-26 07:43:08,220] ERROR [lcdp-prod-audit-sink|task-0] WorkerSinkTask{id=lcdp-prod-audit-sink-0} Commit of offsets threw an unexpected exception for sequence number 37048: null (org.apache.kafka.connect.runtime.WorkerSinkTask:269)
Yours faithfully,
LCDP
we have a KSQL stream outputting avro to a topic.
we would like to add an additional s3 sink connector that uses the same topic, where the output is a csv file.
is this a supported use case for this connector ?
i am guessing not based on this documentation:
The value.converter property must be set to org.apache.kafka.connect.converters.ByteArrayConverter for this data format.
i know i can create a new stream that outputs delimited data, but just want to make sure i am not missing a feature of this connector.
many thanks,
scott
is there a way to have the connector set the 1st line of each csv file as a comma separated list of the headers ?
I'm using both file.name.prefix
and file.name.template
together, if that matters. Seems like the prefix is completely ignored. I can see that the previous deprecated settings are looked at in the sink code, but can't find any references to the new prefix property.
The connector task must be refactored to be close to https://github.com/aiven/aiven-kafka-connect-gcs/blob/01c537109439e38f2c12d434c598a4dfd53bbf0d/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java.
Some important aspects:
OutputWriter
.TopicPartitionRecordGrouper
). Please note that key grouping is not supported by S3 connector, so shouldn't be transferred.https://github.com/aiven/aiven-kafka-connect-transforms can be used as an example.
I have an issue with some files in s3 where all the content file is in one line (without return line), most of the files are formatted correctly (one event by line) but in some cases (with same connector setup) there are some files misformatted
do you know if there is any parameter to fix this issue?
How can we set in connector config to just hold latest 10 record of each key or just hold data of 10 minutes ago? or partitioning with key and time period.
Hello,
I'm trying to run this connector to sync data from a compacted kafka topic to an s3 location by key. After running the "gradle build" and adding it the the connect node packages. We're seeing a strange issue when starting a connector.
WARN unhandled due to prior sendError (org.eclipse.jetty.server.HttpChannelState:787)
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: io/aiven/kafka/connect/common/config/AivenCommonConfig
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:410)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:358)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:311)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:554)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
at org.eclipse.jetty.server.Server.handle(Server.java:516)
at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487)
at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
at org.eclipse.jetty.io.ssl.SslConnection$DecryptedEndPoint.onFillable(SslConnection.java:555)
at org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:410)
at org.eclipse.jetty.io.ssl.SslConnection$2.succeeded(SslConnection.java:164)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131)
at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883)
at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: io/aiven/kafka/connect/common/config/AivenCommonConfig
at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:255)
at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:237)
at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:438)
at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:263)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:234)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684)
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
... 39 more
I first checked this class in the source code. io/aiven/kafka/connect/common/config/AivenCommonConfig - but the only definition i see is an import on line 34 in /src/main/java/o/aiven/kafka/connect/s3/config. The file path for the missing class doesn't exist and i can't find anywhere that is defining it or packaging it as a calss. Does this class exist? Or am I overlooking something obvious?
Connector config is -
{
"name": "Avien_S3_sink_conncector",
"config":
{
"connector.class": "io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector",
"tasks.max": "8",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"topics": "TP.TOPIC.NAME.LATEST",
"value.converter.schemas.enable": "false",
"format.output.type" : "json",
"aws.s3.bucket.name" : "hs-topic-bucket",
"aws.s3.region": "us-east-1",
"format.output.fields" : "key,value",
"file.name.template" : "{{topic}}-{{partition}}-{{key}}",
"aws.sts.role.arn" : "arn:aws:iam::**:role//KAFKACONNECT",
"s3.credentials.provider.class": "io.confluent.connect.s3.auth.AwsAssumeRoleCredentialsProvider",
"aws.sts.role.session.name" : "avienconnect"
}
}
Hello, what is the delivery semantic of this connector , since the confluent one is exactly-once in some cases or at-least-once.
Thanks
Adding support for templates to the aws_s3_prefix field would be really helpful. For example, it would allow us to store the files in subfolders by date.
Example:
aws_s3_prefix: /bucket/folder/${date}
AWS ECS Task or (likely) EKS pod IAM credentials provided via container env variables that allow S3 access fails.
IAM:
// IAM Role: arn:aws:iam::XXXX:role/XXX-ecs-instance-role
// Trust
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": [
"ec2.amazonaws.com",
"ecs-tasks.amazonaws.com"
]
},
"Action": "sts:AssumeRole"
}
]
}
// Permission
{
"Statement": [
{
"Action": [
"s3:*"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::mybucket",
"arn:aws:s3:::mybucket/*"
]
}
],
"Version": "2012-10-17"
}
Should this support mounted IAM/STS credentials vs having to define explicitly long-term or short-term: https://github.com/aiven/s3-connector-for-apache-kafka#credentials
ECS Task IAM Ref: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html
EKS Service Account Ref: https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html
Current behavior requires providing either long-term or short-term combo in the configs.
Running without them yields following error:
// Error
org.apache.kafka.common.config.ConfigException: Either {aws.access.key.id, aws.secret.access.key} or {aws.sts.role.arn, aws.sts.role.session.name} should be set
Attempting to assume a role using an assumed role does not work either (and even if it did doesn't feel elegant)
// config sample:
{
"aws.sts.role.arn":"arn:aws:iam::XXXX:role/XXX-ecs-instance-role",
"aws.sts.role.session.name":"aiven-connect-s3"
}
// Error
com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException: User: arn:aws:sts::XXX:assumed-role/XXX-ecs-instance-role/i-0eXXXXXXXX is not authorized to perform: sts:AssumeRole on resource: arn:aws:iam::XXX:role/XXX-ecs-instance-role (Service: AWSSecurityTokenService; Status Code: 403; Error Code: AccessDenied; Request ID: <>; Proxy: null)
Or am I missing something? Thanks!
Hello there,
we are trying to use Aiven s3 KafkaConnect, but it isn't consuming any messages after changing to a SSL and authenticated Kafka cluster.
With a cluster without SSL or authentication it's working perfectly fine with the same configuration except for the sasl/ssl settings.
I think probably is a small config error, but I've been struggling to fix it, so your help will be very much appreciated.
No errors appear on logs, and it seems to connect properly since if I intentionally change connection parameters to wrong values (usr/pass/truststore or IPs) I see errors.
In order to check the SSL configuration I've done kafka-console-producer.sh and kafka-console-consumer.sh with the very same .properties and hosts successfully sending/receiving messages.
In order to try to isolate the problem from the KafkaConnect Connector, I developed my own simple Connnector that just outputs calls to the Console, but the behavior is the same: runs OK when connecting to NON-SSL brokers while does not receive messages or prints errors when connecting to a SSL enabled Broker.
The plugin I was trying to use is aiven-kakfa-s3-connector.
Thank you very much.
Best,
Javier Arias
# Kafka broker IP addresses to connect to
bootstrap.servers=sslbrokers:9092
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.truststore.location=truststore.jks
ssl.truststore.password=truststorepass
ssl.protocol=TLS
security.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="usr" \
password="pass";
group.id=connect
config.storage.topic=connect-config
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
# Path to directory containing the connector jar and dependencies
plugin.path=/plugins
# Converters to use to convert keys and values
# key.converter=org.apache.kafka.connect.storage.StringConverter
# value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
# The internal converters Kafka Connect uses for storing offset and configuration data
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.port=8084
# Kafka broker IP addresses to connect to
bootstrap.servers=unsecuredbrokers:9092
group.id=connect-local
config.storage.topic=connect-config
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
config.storage.replication.factor=1
offset.storage.replication.factor=1
offset.storage.partitions=1
status.storage.replication.factor=1
status.storage.partitions=1
# Path to directory containing the connector jar and dependencies
plugin.path=/plugins
# Converters to use to convert keys and values
# key.converter=org.apache.kafka.connect.storage.StringConverter
# value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
# The internal converters Kafka Connect uses for storing offset and configuration data
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.port=8084
In Kafka 2.6 (https://github.com/apache/kafka/pull/2604/files#diff-cf2293dfed7e0171bde14764be09117fR84), a connector is expected to be a descendant of SinkConnector
or SourceConnector
, not generic Connector
. Otherwise it can't be initialized.
Currently, the partitioning capbilities in Aiven are very limited and doesn't enable dynamic partition like AWS firehose support https://aws.amazon.com/blogs/big-data/kinesis-data-firehose-now-supports-dynamic-partitioning-to-amazon-s3/.
I guess it could also support as third implementation like similar to https://github.com/canelmas/kafka-connect-field-and-time-partitioner
Having an avro schema with a field defined like this:
{
"name": "osType",
"type": [
"string"
],
"default": "UNKNOWN"
},
Results in failure to process it with this kind of a stacktrace:
org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1564)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1445)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1323)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1047)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:92)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:540)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:496)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:496)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type STRUCT: class java.lang.String for field: "null"
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:241)
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
... 23 more
The issue seems to be caused by using too old versions of org.apache.kafka:connect-api:1.1.0
& io.confluent:kafka-avro-serializer:4.1.4
=> I suggest to update the dependencies to resolve the issue.
The s3 file only has startoffset. How to obtain the endoffset
Migrate S3 build maven to gradle
When using the S3 connector and grouping records by key we have encountered this error:
consumer` poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records
Increasing the interval doesn't fix this issue. Looking at the consumer offsets I can see that it's failing to commit any offset, however it is still writing the records to S3 and it's writing more than one batch. Unfortunately the connector appears to be then retry uploading the fetched batches until the connector is killed.
The topic in question has millions of records on it, and if I set the offset manually to skip most of the messages it appears to work OK. When not grouping by key the same topic is ingested to S3 without any issues. In short it appears to be a load issue specific to grouping by key.
With the default max.poll.records
of 500 and max.poll.interval.ms
of 300000, I would have thought the consumer offsets would have been written every ~500 records. It's certainly able to process more than 500 records in that interval. Can you advise on what triggers the offsets being committed?
Hi all,
I'm not an expert in java so it is difficult to me to know if the tombstone records are supported. If had seen that the null key values are managed properly, but I did not see any mention for the tombstone (key with value but null payload) case.
My plan is to use this connector to write down, as a log, all the different messages that are publish into some topics and there a few tombstone cases.
Thank you very much,
Connector configuration class AivenKafkaConnectS3Config
must be brought closer to GcsSinkConfig
in style. It can be used as the reference.
Some points:
S3SinkConfig
.AivenKafkaConnectS3Constants
into the config class.configDef.define
with group
parameter;I've been fighting for a few hours trying to get current HEAD running in Confluents latest (5.5.1) Kafka Connect Docker image. After not finding anything wrong with my build I went back to tag 2.7.0 with JDK 1.8 and it works.
The connector simply doesn't load on startup, the loader gets picked up but the plugin doesn't get added. This is what it should look like, on current head the second line isn't showing up:
[2020-08-24 04:37:36,763] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/aiven-kafka-connect-s3/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2020-08-24 04:37:36,763] INFO Added plugin 'io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
Their image runs on OpenJDK 1.8, this might be the reason but I don't know how to verify this.
I tried building master with 1.8 but CompressionType from commons is incompatible (class file has wrong version 55.0, should be 52.0
).
Dockerfile for reference:
FROM gradle:6.6.0-jdk11 AS aiven-kafka-connect-s3-builder
ENV RELEASE_VERSION="2.6.0"
RUN git clone https://github.com/aiven/aiven-kafka-connect-s3.git && \
cd aiven-kafka-connect-s3 && \
gradle installDist && \
cd build/install/aiven-kafka-connect-s3
FROM confluentinc/cp-kafka-connect-base:5.5.1
COPY --from=aiven-kafka-connect-s3-builder /home/gradle/aiven-kafka-connect-s3/build/install/ /usr/share/java/
See https://github.com/aiven/aiven-kafka-connect-gcs/blob/master/.travis.yml as the reference
Hello,
Is there an S3 Source connector that can read the data uploaded by this Sink connector?
Hi, when using the connector to share event-data from Posgresql Debezium to S3 via Kafka in Parquet, we have an issue to get a "Date Format"
In Kafka, the payload is :
"created_date": 1643631507020,
The schema created by the Debezium is this one
{
"name": "created_date",
"type": {
"type": "long",
"connect.version": 1,
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"logicalType": "timestamp-millis"
}
},
Using a S3 connector to share this data as "Parquet file", we can configure a smt transformation to transform as string
"transforms.TsCreatedDate.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.TsCreatedDate.field": "created_date", "transforms.TsCreatedDate.format": "yyyy-MM-dd'T'HH:mm:ssZ", "transforms.TsCreatedDate.target.type": "string",
But the expected date format in Parquet is date.
We still get the "long format" or "string" with SMT transformation.
required int64 created_date;
Attended format should be: DATE, TIMESTAMP_MILLIS, TIMESTAMP_MICROS
How can we resolve this ?
AivenKafkaConnect
prefix is unnecessary and can be removed.
However, it must be kept in AivenKafkaConnectS3SinkConnector
(this cass name is a part of configuration).
Are there artifacts (preferably an uber jar) available somewhere where we can download to install this connector in our self-hosted connect cluster?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.