Giter Club home page Giter Club logo

s3-connector-for-apache-kafka's Issues

Extend timestamp variable parameters in File name format

Scenario Overview

We use s3 filename template prefix/'%Y_%m_%d__%H_%M_%S_%f to sort filenames alphabetically.
The next new file is guaranteed to receive the following name in alphabetical order.

In kafka, we have several partitions of one topic, each of them must be written with the same prefix (prefix=topic_name) in order.
It's possible to ensure the files order with this template by running no more than 1 connector task.


Timestamp variable have next parameters:

unit parameter values:
yyyy - year, e.g. 2020 (please note that YYYY is deprecated and is interpreted as yyyy)
MM - month, e.g. 03
dd - day, e.g. 01
HH - hour, e.g. 24


With these parameters, files recorded within 1 hour will not differ in name.
Adding the partition number and offset to the file name in the template can solve this problem, but it makes working with the root prefix more difficult.
Uniqueness can be ensured by adding minutes, seconds, milliseconds to the timestamp variable.


Looks like it's enough to extend the following functionality :

    private static final Map<String, DateTimeFormatter> TIMESTAMP_FORMATTERS =
                    "yyyy", DateTimeFormatter.ofPattern("yyyy"),
                    "MM", DateTimeFormatter.ofPattern("MM"),
                    "dd", DateTimeFormatter.ofPattern("dd"),
                    "HH", DateTimeFormatter.ofPattern("HH")

with next parameters:

"%M" - Minutes in two-digit format.
"%S" - Seconds in two-digit format.
"%f" - Microseconds.

Out-of-Memory errors when sinking large topics

Scenario Overview

We have several topics, each of them already containing gigabytes of data (~1-10 millions of records). We need to export the data to S3.


Using the Aiven S3 Connector we run into Out-of-memory errors indicating that the Kafka Connect JVM process does not have enough heap space.


The S3 connector runs into errors.
The entire Kafka Connect cluster is lagging.
The Aiven CLI stops working and returns an 503 error.


Looking at the logs it looks like the connector is permanently ingesting messages from the topic and storing them in memory.
(log messages come from here)

It looks like the connector is not fast enough in writing to S3 and thus the memory is not freed in time.

We managed to get rid of the Out-of-memory errors by scaling up the Kafka Connect cluster. However, this is not a suitable long-term solution as we would need to setup multiple such connectors in parallel in the future.

We would like to have something that gives us some control over the memory consumption of the connector, e.g., a configuration for the maximum size of the input records buffer.

PS: Trying out the Confluent S3 connector provided by Aiven (version 5.0.0) does not run into Out-of-memory errors and utilizes a lot less memory but it's not an option for us.

Random UnknownHostException resolving S3 hostname


We are currently using this S3 Sink connector to dump content of Kafka topic into a AWS S3.
However, we notice in our monitoring that there is some random failure with S3 DNS resolving.

The connector seems to retry/success record processing few milliseconds after but we are wondering if it is a normal behavior, why it happened and if there is a way to avoid it.

Here is the trace :

... 34 more
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(
at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(
at org.apache.http.impl.client.CloseableHttpClient.execute(
at org.apache.http.impl.client.CloseableHttpClient.execute(
at org.apache.http.impl.client.InternalHttpClient.doExecute(
at org.apache.http.impl.execchain.ProtocolExec.execute(
at org.apache.http.impl.execchain.MainClientExec.execute(
at org.apache.http.impl.execchain.MainClientExec.establishRoute(
at com.amazonaws.http.conn.$Proxy48.connect(Unknown Source)
at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(
at java.base/java.lang.reflect.Method.invoke(
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(
at jdk.internal.reflect.GeneratedMethodAccessor134.invoke(Unknown Source)
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(
at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(
at com.amazonaws.http.DelegatingDnsResolver.resolve(
at com.amazonaws.SystemDefaultDnsResolver.resolve(
at java.base/
at java.base/
at java.base/
at java.base/$CachedAddresses.get(
Caused by:
at java.base/
at java.base/java.util.concurrent.ThreadPoolExecutor$
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(
at java.base/
at java.base/java.util.concurrent.Executors$
at org.apache.kafka.connect.runtime.WorkerTask.doRun(
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(
at org.apache.kafka.connect.sink.SinkTask.preCommit(
at io.aiven.kafka.connect.s3.S3SinkTask.flush(
at java.base/java.util.Collections$UnmodifiableMap.forEach(
at java.base/java.util.HashMap.forEach(
at io.aiven.kafka.connect.s3.S3SinkTask.flushFile(
at io.aiven.kafka.connect.common.output.parquet.ParquetOutputWriter.writeRecords(
at org.apache.parquet.hadoop.ParquetWriter$
at org.apache.parquet.hadoop.ParquetWriter.<init>(
at org.apache.parquet.hadoop.ParquetFileWriter.start(
at java.base/
at io.aiven.kafka.connect.common.output.parquet.ParquetPositionOutputStream.write(
at io.aiven.kafka.connect.s3.S3OutputStream.write(
at io.aiven.kafka.connect.s3.S3OutputStream.newMultipartUpload(
at com.amazonaws.http.AmazonHttpClient.execute(
at com.amazonaws.http.AmazonHttpClient.execute(
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(
com.amazonaws.SdkClientException: Unable to execute HTTP request:
[2022-07-26 07:43:08,220] ERROR [lcdp-prod-audit-sink|task-0] WorkerSinkTask{id=lcdp-prod-audit-sink-0} Commit of offsets threw an unexpected exception for sequence number 37048: null (org.apache.kafka.connect.runtime.WorkerSinkTask:269)

Yours faithfully,

does csv output require the input to be ByteArrayConverter ?

we have a KSQL stream outputting avro to a topic.

we would like to add an additional s3 sink connector that uses the same topic, where the output is a csv file.

is this a supported use case for this connector ?

i am guessing not based on this documentation:
The value.converter property must be set to org.apache.kafka.connect.converters.ByteArrayConverter for this data format.

i know i can create a new stream that outputs delimited data, but just want to make sure i am not missing a feature of this connector.

many thanks,
scott doesn't seem to work

I'm using both and together, if that matters. Seems like the prefix is completely ignored. I can see that the previous deprecated settings are looked at in the sink code, but can't find any references to the new prefix property.

file in s3 without return line for each event

I have an issue with some files in s3 where all the content file is in one line (without return line), most of the files are formatted correctly (one event by line) but in some cases (with same connector setup) there are some files misformatted

do you know if there is any parameter to fix this issue?



I'm trying to run this connector to sync data from a compacted kafka topic to an s3 location by key. After running the "gradle build" and adding it the the connect node packages. We're seeing a strange issue when starting a connector.

WARN unhandled due to prior sendError (org.eclipse.jetty.server.HttpChannelState:787)
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: io/aiven/kafka/connect/common/config/AivenCommonConfig
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(
at org.glassfish.jersey.servlet.WebComponent.service(
at org.glassfish.jersey.servlet.ServletContainer.service(
at org.glassfish.jersey.servlet.ServletContainer.service(
at org.glassfish.jersey.servlet.ServletContainer.service(
at org.eclipse.jetty.servlet.ServletHolder.handle(
at org.eclipse.jetty.servlet.ServletHandler.doHandle(
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(
at org.eclipse.jetty.server.session.SessionHandler.doHandle(
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(
at org.eclipse.jetty.servlet.ServletHandler.doScope(
at org.eclipse.jetty.server.session.SessionHandler.doScope(
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(
at org.eclipse.jetty.server.handler.ContextHandler.doScope(
at org.eclipse.jetty.server.handler.ScopedHandler.handle(
at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(
at org.eclipse.jetty.server.Server.handle(
at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(
at org.eclipse.jetty.server.HttpChannel.dispatch(
at org.eclipse.jetty.server.HttpChannel.handle(
at org.eclipse.jetty.server.HttpConnection.onFillable(
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(
at org.eclipse.jetty.util.thread.ReservedThreadExecutor$
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(
at org.eclipse.jetty.util.thread.QueuedThreadPool$
at java.base/
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: io/aiven/kafka/connect/common/config/AivenCommonConfig
at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(
at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(
at org.glassfish.jersey.server.ServerRuntime$Responder.process(
at org.glassfish.jersey.server.ServerRuntime$
at org.glassfish.jersey.internal.Errors$
at org.glassfish.jersey.internal.Errors$
at org.glassfish.jersey.internal.Errors.process(
at org.glassfish.jersey.internal.Errors.process(
at org.glassfish.jersey.internal.Errors.process(
at org.glassfish.jersey.process.internal.RequestScope.runInScope(
at org.glassfish.jersey.server.ServerRuntime.process(
at org.glassfish.jersey.server.ApplicationHandler.handle(
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(
... 39 more

I first checked this class in the source code. io/aiven/kafka/connect/common/config/AivenCommonConfig - but the only definition i see is an import on line 34 in /src/main/java/o/aiven/kafka/connect/s3/config. The file path for the missing class doesn't exist and i can't find anywhere that is defining it or packaging it as a calss. Does this class exist? Or am I overlooking something obvious?

Connector config is -
"name": "Avien_S3_sink_conncector",
"connector.class": "io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector",
"tasks.max": "8",
"key.converter": "",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"format.output.type" : "json",
"" : "hs-topic-bucket",
"aws.s3.region": "us-east-1",
"format.output.fields" : "key,value",
"" : "{{topic}}-{{partition}}-{{key}}",
"aws.sts.role.arn" : "arn:aws:iam::**:role//KAFKACONNECT",
"s3.credentials.provider.class": "io.confluent.connect.s3.auth.AwsAssumeRoleCredentialsProvider",
"" : "avienconnect"

Delivery semantic

Hello, what is the delivery semantic of this connector , since the confluent one is exactly-once in some cases or at-least-once.


Add support for templates aws_s3_prefix field

Adding support for templates to the aws_s3_prefix field would be really helpful. For example, it would allow us to store the files in subfolders by date.

aws_s3_prefix: /bucket/folder/${date}

AWS ECS Task or (likely) EKS pod with pre-mounted IAM credentials unsupported

AWS ECS Task or (likely) EKS pod IAM credentials provided via container env variables that allow S3 access fails.


// IAM Role: arn:aws:iam::XXXX:role/XXX-ecs-instance-role

// Trust 
    "Version": "2012-10-17",
    "Statement": [
            "Sid": "",
            "Effect": "Allow",
            "Principal": {
                "Service": [
            "Action": "sts:AssumeRole"

// Permission
    "Statement": [
            "Action": [
            "Effect": "Allow",
            "Resource": [
    "Version": "2012-10-17"

Should this support mounted IAM/STS credentials vs having to define explicitly long-term or short-term:

ECS Task IAM Ref:
EKS Service Account Ref:

Current behavior requires providing either long-term or short-term combo in the configs.
Running without them yields following error:

// Error
org.apache.kafka.common.config.ConfigException: Either {, aws.secret.access.key} or {aws.sts.role.arn,} should be set

Attempting to assume a role using an assumed role does not work either (and even if it did doesn't feel elegant)

// config sample:

// Error User: arn:aws:sts::XXX:assumed-role/XXX-ecs-instance-role/i-0eXXXXXXXX  is not authorized to perform: sts:AssumeRole on resource: arn:aws:iam::XXX:role/XXX-ecs-instance-role (Service: AWSSecurityTokenService; Status Code: 403; Error Code: AccessDenied; Request ID: <>; Proxy: null)

Or am I missing something? Thanks!

Aiven s3 KafkaConnect not consuming from SSL/SASL cluster

Hello there,

we are trying to use Aiven s3 KafkaConnect, but it isn't consuming any messages after changing to a SSL and authenticated Kafka cluster.

With a cluster without SSL or authentication it's working perfectly fine with the same configuration except for the sasl/ssl settings.

I think probably is a small config error, but I've been struggling to fix it, so your help will be very much appreciated.

No errors appear on logs, and it seems to connect properly since if I intentionally change connection parameters to wrong values (usr/pass/truststore or IPs) I see errors.

In order to check the SSL configuration I've done and with the very same .properties and hosts successfully sending/receiving messages.

In order to try to isolate the problem from the KafkaConnect Connector, I developed my own simple Connnector that just outputs calls to the Console, but the behavior is the same: runs OK when connecting to NON-SSL brokers while does not receive messages or prints errors when connecting to a SSL enabled Broker.

The plugin I was trying to use is aiven-kakfa-s3-connector.

Thank you very much.

Javier Arias

# Kafka broker IP addresses to connect to
sasl.mechanism=SCRAM-SHA-256 required \
    username="usr" \

# Path to directory containing the connector jar and dependencies
# Converters to use to convert keys and values

# The internal converters Kafka Connect uses for storing offset and configuration data

# Kafka broker IP addresses to connect to

# Path to directory containing the connector jar and dependencies
# Converters to use to convert keys and values

# The internal converters Kafka Connect uses for storing offset and configuration data

There is an issue handling avro schema with default value defined as String

Having an avro schema with a field defined like this:

            "name": "osType",
            "type": [
            "default": "UNKNOWN"

Results in failure to process it with this kind of a stacktrace:

org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
	at io.confluent.connect.avro.AvroData.toConnectSchema(
	at io.confluent.connect.avro.AvroData.toConnectSchema(
	at io.confluent.connect.avro.AvroData.toConnectSchema(
	at io.confluent.connect.avro.AvroData.toConnectData(
	at io.confluent.connect.avro.AvroConverter.toConnectData(
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(
	at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(
	at java.base/java.util.concurrent.Executors$
	at java.base/
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(
	at java.base/java.util.concurrent.ThreadPoolExecutor$
	at java.base/
Caused by: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type STRUCT: class java.lang.String for field: "null"
	... 23 more

The issue seems to be caused by using too old versions of org.apache.kafka:connect-api:1.1.0 & io.confluent:kafka-avro-serializer:4.1.4 => I suggest to update the dependencies to resolve the issue.

Consumer offsets not committed when using KeyRecordGrouper

When using the S3 connector and grouping records by key we have encountered this error:

consumer` poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing or by reducing the maximum size of batches returned in poll() with max.poll.records

Increasing the interval doesn't fix this issue. Looking at the consumer offsets I can see that it's failing to commit any offset, however it is still writing the records to S3 and it's writing more than one batch. Unfortunately the connector appears to be then retry uploading the fetched batches until the connector is killed.

The topic in question has millions of records on it, and if I set the offset manually to skip most of the messages it appears to work OK. When not grouping by key the same topic is ingested to S3 without any issues. In short it appears to be a load issue specific to grouping by key.

With the default max.poll.records of 500 and of 300000, I would have thought the consumer offsets would have been written every ~500 records. It's certainly able to process more than 500 records in that interval. Can you advise on what triggers the offsets being committed?

Query: Tombstone records support

Hi all,

I'm not an expert in java so it is difficult to me to know if the tombstone records are supported. If had seen that the null key values are managed properly, but I did not see any mention for the tombstone (key with value but null payload) case.

My plan is to use this connector to write down, as a log, all the different messages that are publish into some topics and there a few tombstone cases.

Thank you very much,

Refactor configuration definition

Connector configuration class AivenKafkaConnectS3Config must be brought closer to GcsSinkConfig in style. It can be used as the reference.

Some points:

master incompatible with latest cp-kafka-connect Docker image

I've been fighting for a few hours trying to get current HEAD running in Confluents latest (5.5.1) Kafka Connect Docker image. After not finding anything wrong with my build I went back to tag 2.7.0 with JDK 1.8 and it works.

The connector simply doesn't load on startup, the loader gets picked up but the plugin doesn't get added. This is what it should look like, on current head the second line isn't showing up:

[2020-08-24 04:37:36,763] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/aiven-kafka-connect-s3/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2020-08-24 04:37:36,763] INFO Added plugin 'io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)

Their image runs on OpenJDK 1.8, this might be the reason but I don't know how to verify this.
I tried building master with 1.8 but CompressionType from commons is incompatible (class file has wrong version 55.0, should be 52.0).

Dockerfile for reference:

FROM gradle:6.6.0-jdk11 AS aiven-kafka-connect-s3-builder

RUN git clone && \
    cd aiven-kafka-connect-s3 && \
    gradle installDist && \
    cd build/install/aiven-kafka-connect-s3

FROM confluentinc/cp-kafka-connect-base:5.5.1
COPY --from=aiven-kafka-connect-s3-builder /home/gradle/aiven-kafka-connect-s3/build/install/ /usr/share/java/

S3 Source Connector


Is there an S3 Source connector that can read the data uploaded by this Sink connector?

Issue on "Date Format" using the S3 Sink Connector from Avro to Parquet

Hi, when using the connector to share event-data from Posgresql Debezium to S3 via Kafka in Parquet, we have an issue to get a "Date Format"

In Kafka, the payload is :
"created_date": 1643631507020,

The schema created by the Debezium is this one

      "name": "created_date",
      "type": {
        "type": "long",
        "connect.version": 1,
        "": "",
        "logicalType": "timestamp-millis"

Using a S3 connector to share this data as "Parquet file", we can configure a smt transformation to transform as string
"transforms.TsCreatedDate.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.TsCreatedDate.field": "created_date", "transforms.TsCreatedDate.format": "yyyy-MM-dd'T'HH:mm:ssZ", "": "string",

But the expected date format in Parquet is date.
We still get the "long format" or "string" with SMT transformation.
required int64 created_date;


How can we resolve this ?

Rename classes

AivenKafkaConnect prefix is unnecessary and can be removed.
However, it must be kept in AivenKafkaConnectS3SinkConnector (this cass name is a part of configuration).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.