Giter Club home page Giter Club logo

s3-connector-for-apache-kafka's Introduction

Aiven's S3 Sink Connector for Apache Kafka

Pull Request Workflow

This is a sink Apache Kafka Connect connector that stores Apache Kafka messages in an AWS S3 bucket.

Table of Contents

How it works

The connector subscribes to the specified Kafka topics and collects messages coming in them and periodically dumps the collected data to the specified bucket in AWS S3.

Requirements

The connector requires Java 11 or newer for development and production.

Authorization

The connector needs the following permissions to the specified bucket:

  • s3:GetObject
  • s3:PutObject
  • s3:AbortMultipartUpload
  • s3:ListMultipartUploadParts
  • s3:ListBucketMultipartUploads

In case of Access Denied error, see https://aws.amazon.com/premiumsupport/knowledge-center/s3-troubleshoot-403/

Authentication

To make the connector work, a user has to specify AWS credentials that allow writing to S3. There are two ways to specify AWS credentials in this connector:

  1. Long term credentials.

    It requires both aws.access.key.id and aws.secret.access.key to be specified.

  2. Short term credentials.

    The connector will request a temporary token from the AWS STS service and assume a role from another AWS account. It requires aws.sts.role.arn, aws.sts.role.session.name to be specified.

  3. Use default provider chain or custom provider

    If you prefer to use AWS default provider chain, you can leave {aws.access.key.id and aws.secret.access.key} and {aws.sts.role.arn, aws.sts.role.session.name} blank. In case you prefer to build your own custom provider, pass the custom provider class as a parameter to aws.credential.provider

It is important not to use both 1 and 2 simultaneously. Using option 2, it is recommended to specify the S3 bucket region in aws.s3.region and the corresponding AWS STS endpoint in aws.sts.config.endpoint. It's better to specify both or none. It is also important to specify aws.sts.role.external.id for the security reason. (see some details here).

File name format

File name format is tightly related to Record Grouping

The connector uses the following format for output files (blobs): <prefix><filename>.

<prefix>is the optional prefix that can be used, for example, for subdirectories in the bucket. <filename> is the file name. The connector has the configurable template for file names.

Configuration property `file.name.template`. If not set, default template is used: `{{topic}}-{{partition}}-{{start_offset}}`

It supports placeholders with variable names: {{ variable_name }}. Currently, supported variables are:

  • topic - the Kafka topic;
  • partition:padding=true|false - the Kafka partition, if padding set to true it will set leading zeroes for offset, the default value is false;
  • start_offset:padding=true|false - the Kafka offset of the first record in the file, if padding set to true it will set leading zeroes for offset, the default value is false;
  • timestamp:unit=yyyy|MM|dd|HH - the timestamp of when the Kafka record has been processed by the connector.
    • unit parameter values:
      • yyyy - year, e.g. 2020 (please note that YYYY is deprecated and is interpreted as yyyy)
      • MM - month, e.g. 03
      • dd - day, e.g. 01
      • HH - hour, e.g. 24
  • key - the Kafka key.

To add zero padding to Kafka offsets, you need to add additional parameter padding in the start_offset variable, which value can be true or false (the default). For example: {{topic}}-{{partition}}-{{start_offset:padding=true}}.gz will produce file names like mytopic-1-00000000000000000001.gz.

To add zero padding to partition number, you need to add additional parameter padding in the partition variable, which value can be true or false (the default). For example: {{topic}}-{{partition:padding=true}}-{{start_offset}}.gz will produce file names like mytopic-0000000001-1.gz.

To add formatted timestamps, use timestamp variable.
For example: {{topic}}-{{partition}}-{{start_offset}}-{{timestamp:unit=yyyy}}{{timestamp:unit=MM}}{{timestamp:unit=dd}}.gz will produce file names like mytopic-2-1-20200301.gz.

To configure the time zone for the timestamp variable, use file.name.timestamp.timezone property. Please see the description of properties in the "Configuration" section.

Only the certain combinations of variables and parameters are allowed in the file name template (however, variables in a template can be in any order). Each combination determines the mode of record grouping the connector will use. Currently, supported combinations of variables and the corresponding record grouping modes are:

  • topic, partition, start_offset, and timestamp - grouping by the topic, partition, and timestamp;
  • key - grouping by the key.
  • key, topic, partition - grouping by the topic, partition, and key.

See record grouping in the next section for more details.

If the file name template is not specified, the default value is {{topic}}-{{partition}}-{{start_offset}} (+ .gz when compression is enabled).

Record grouping

Incoming records are being grouped until flushed. The connector flushes grouped records in one file per offset.flush.interval.ms setting for partitions that have received new messages during this period. The setting defaults to 60 seconds.

Record grouping, similar to Kafka topics, has 2 modes:

  • Changelog: Connector groups all records in the order received from a Kafka topic, and stores all of them in a file.
  • Compact: Connector groups all records by an identity (e.g. key) and only keeps the latest value stored in a file.

Modes are defined implicitly by the fields used of the file name template.

Grouping by the topic and partition

Mode: Changelog

In this mode, the connector groups records by the topic and partition. When a file is written, an offset of the first record in it is added to its name.

For example, let's say the template is {{topic}}-part{{partition}}-off{{start_offset}}. If the connector receives records like

topic:topicB partition:0 offset:0
topic:topicA partition:0 offset:0
topic:topicA partition:0 offset:1
topic:topicB partition:0 offset:1
flush

there will be two files topicA-part0-off0 and topicB-part0-off0 with two records in each.

Each flush produces a new set of files. For example:

topic:topicA partition:0 offset:0
topic:topicA partition:0 offset:1
flush
topic:topicA partition:0 offset:2
topic:topicA partition:0 offset:3
flush

In this case, there will be two files topicA-part0-off0 and topicA-part0-off2 with two records in each.

Grouping by the key

Mode: Compact

In this mode, the connector groups records by the Kafka key. It always puts one record in a file, the latest record that arrived before a flush for each key. Also, it overwrites files if later new records with the same keys arrive.

This mode is good for maintaining the latest values per key as files on GCS.

Let's say the template is k{{key}}. For example, when the following records arrive

key:0 value:0
key:1 value:1
key:0 value:2
key:1 value:3
flush

there will be two files k0 (containing value 2) and k1 (containing value 3).

After a flush, previously written files might be overwritten:

key:0 value:0
key:1 value:1
key:0 value:2
key:1 value:3
flush
key:0 value:4
flush

In this case, there will be two files k0 (containing value 4) and k1 (containing value 3).

The string representation of a key

The connector in this mode uses the following algorithm to create the string representation of a key:

  1. If key is null, the string value is "null" (i.e., string literal null).
  2. If key schema type is STRING, it's used directly.
  3. Otherwise, Java .toString() is applied.

If keys of you records are strings, you may want to use org.apache.kafka.connect.storage.StringConverter as key.converter.

Warning: Single key in different partitions

The group by key mode primarily targets scenarios where each key appears in one partition only. If the same key appears in multiple partitions, the result may be unexpected.

For example:

topic:topicA partition:0 key:x value:aaa
topic:topicA partition:1 key:x value:bbb
flush

file kx may contain aaa or bbb, i.e. the behavior is non-deterministic.

Data Format

Connector class name, in this case: io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector.

S3 Object Names

S3 connector stores series of files in the specified bucket. Each object is named using pattern [<aws.s3.prefix>]<topic>-<partition>-<startoffset>[.gz] (see [#file-name-format](File name format section) for more patterns). The .gz extension is used if gzip compression is used, see file.compression.type below.

Data File Format

Output files are text files that contain one record per line (i.e., they're separated by \n) except PARQUET format.

There are four types of data format available:

  • [Default] Flat structure, where field values are separated by comma (csv)

    Configuration: format.output.type=csv. Also, this is the default if the property is not present in the configuration.

  • Complex structure, where file is in format of JSON lines. It contains one record per line and each line is a valid JSON object(jsonl)

    Configuration: format.output.type=jsonl.

  • Complex structure, where file is a valid JSON array of record objects.

    Configuration: format.output.type=json.

  • Complex structure, where file is in Apache Parquet file format.

    Configuration: format.output.type=parquet.

The connector can output the following fields from records into the output: the key, the value, the timestamp, the offset and headers. (The set and the order of output: the key, the value, the timestamp, the offset and headers. The set of these output fields is configurable.) The field values are separated by comma.

CSV Format example

The key and the value—if they're output—are stored as binaries encoded in Base64.

For example, if we output key,value,offset,timestamp, a record line might look like:

a2V5,TG9yZW0gaXBzdW0gZG9sb3Igc2l0IGFtZXQ=,1232155,1554210895

It is possible to control the encoding of the value field by setting format.output.fields.value.encoding to base64 or none.

If the key, the value or the timestamp is null, an empty string will be output instead:

,,,1554210895

A comma separated list of fields to include in output. Supported values are: key, offset, timestamp, headers, and value. Defaults to value.

NB!

  • The key.converter property must be set to org.apache.kafka.connect.converters.ByteArrayConverter or org.apache.kafka.connect.storage.StringConverter for this data format.

  • The value.converter property must be set to org.apache.kafka.connect.converters.ByteArrayConverter for this data format.

JSONL Format example

For example, if we output key,value,offset,timestamp, a record line might look like:

 { "key": "k1", "value": "v0", "offset": 1232155, "timestamp":"2020-01-01T00:00:01Z" }

OR

  { "key": "user1", "value": {"name": "John", "address": {"city": "London"}}, "offset": 1232155, "timestamp":"2020-01-01T00:00:01Z" }

It is recommended to use

  • org.apache.kafka.connect.storage.StringConverter or
  • org.apache.kafka.connect.json.JsonConverter or
  • io.confluent.connect.avro.AvroConverter.

as key.converter and/or value.converter to make output files human-readable.

NB!

  • The value of the format.output.fields.value.encoding property is ignored for this data format.
  • Value/Key schema will not be presented in output file, even if value.converter.schemas.enable property is true. But, it is still important to set this property correctly, so that connector could read records correctly.

JSON Format example

For example, if we output key,value,offset,timestamp, an output file might look like:

[
  { "key": "k1", "value": "v0", "offset": 1232155, "timestamp":"2020-01-01T00:00:01Z" }, 
  { "key": "k2", "value": "v1", "offset": 1232156, "timestamp":"2020-01-01T00:00:05Z" }
]

OR

[
  { "key": "user1", "value": {"name": "John", "address": {"city": "London"}}, "offset": 1232155, "timestamp":"2020-01-01T00:00:01Z" }
]

It is recommended to use

  • org.apache.kafka.connect.storage.StringConverter,
  • org.apache.kafka.connect.json.JsonConverter, or
  • io.confluent.connect.avro.AvroConverter.

as key.converter and/or value.converter to make output files human-readable.

NB!

  • The value of the format.output.fields.value.encoding property is ignored for this data format.
  • Value/Key schema will not be presented in output file, even if value.converter.schemas.enable property is true. But, it is still important to set this property correctly, so that connector could read records correctly.
NB!

For both JSON and JSONL another example could be for a single field output e.g. value, a record line might look like:

{ "value": "v0" }

OR

{ "value": {"name": "John", "address": {"city": "London"}} }

In this case it sometimes make sense to get rid of additional JSON object wrapping the actual value using format.output.envelope. Having format.output.envelope=false can produce the following output:

"v0"

OR

{"name": "John", "address": {"city": "London"}}

Parquet format example

For example, if we output key,offset,timestamp,headers,value, an output Parquet schema might look like this:

{
    "type": "record", "fields": [
      {"name": "key", "type": "RecordKeySchema"},
      {"name": "offset", "type": "long"},
      {"name": "timestamp", "type": "long"},
      {"name": "headers", "type": "map"},
      {"name": "value", "type": "RecordValueSchema"}
  ]
}

where RecordKeySchema - a key schema and RecordValueSchema - a record value schema. This means that in case you have the record and key schema like:

Key schema:

{
  "type": "string"
}

Record schema:

{
    "type": "record", "fields": [
      {"name": "foo", "type": "string"},
      {"name": "bar", "type": "long"}
  ]
}

the final Avro schema for Parquet is:

{
    "type": "record", "fields": [
      {"name": "key", "type": "string"},
      {"name": "offset", "type": "long"},
      {"name": "timestamp", "type": "long"},
      {"name": "headers", "type": "map", "values": "long"},
      { "name": "value", 
        "type": "record", 
        "fields": [
          {"name": "foo", "type": "string"},
          {"name": "bar", "type": "long"}
        ]
      }
  ]
}

For a single-field output e.g. value, a record line might look like:

{ "value": {"name": "John", "address": {"city": "London"}} }

In this case it sometimes make sense to get rid of additional JSON object wrapping the actual value using format.output.envelope. Having format.output.envelope=false can produce the following output:

{"name": "John", "address": {"city": "London"}}

NB!

  • The value of the format.output.fields.value.encoding property is ignored for this data format.
  • Due to Avro limitation message headers values must be the same datatype
  • If you use org.apache.kafka.connect.json.JsonConverter be sure that you message contains schema. E.g. possible JSON message:
    {
      "schema": {
        "type": "struct", 
        "fields": [
          {"type":"string", "field": "name"}
        ]
      }, "payload": {"name":  "foo"}
    }
  • Connector works just fine with and without Schema Registry
  • format.output.envelope=false is ignored if the value is not of type org.apache.avro.Schema.Type.RECORD or org.apache.avro.Schema.Type.MAP.

Usage

Connector Configuration

Important Note Since version 2.6 all existing configuration is deprecated and will be replaced with new one during a certain transition period (within 2-3 releases)

List of deprecated configuration parameters:

  • aws_access_key_id - AWS Access Key ID for accessing S3 bucket. Mandatory.
  • aws_secret_access_key - AWS S3 Secret Access Key. Mandatory.
  • aws_s3_endpoint - The endpoint configuration (service endpoint & signing region) to be used for requests.
  • aws_s3_region - Name of the region for the bucket used for storing the records. Defaults to us-east-1.
  • aws_s3_bucket - Name of an existing bucket for storing the records. Mandatory.
  • aws_s3_prefix - The prefix that will be added to the file name in the bucket. Can be used for putting output files into a subdirectory.
  • output_compression- Compression type for output files. Supported algorithms are gzip, snappy, zstd and none. Defaults to gzip.
  • output_fields - A comma separated list of fields to include in output. Supported values are: key, offset, timestamp and value. Defaults to value.

List of new configuration parameters:

  • aws.access.key.id - AWS Access Key ID for accessing S3 bucket.
  • aws.secret.access.key - AWS S3 Secret Access Key.
  • aws.s3.bucket.name - - Name of an existing bucket for storing the records. Mandatory. See bucket name rules: https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
  • aws.s3.endpoint - The endpoint configuration (service endpoint & signing region) to be used for requests.
  • aws.s3.prefix - [Deprecated] Use file.name.prefix and file.name.template instead. The prefix that will be added to the file name in the bucket. Can be used for putting output files into a subdirectory.
  • aws.s3.region - Name of the region for the bucket used for storing the records. Defaults to us-east-1.
  • aws.sts.role.arn - AWS role ARN, for cross-account access role instead of aws.access.key.id and aws.secret.access.key
  • aws.sts.role.external.id - AWS ExternalId for cross-account access role
  • aws.sts.role.session.name - AWS session name for cross-account access role
  • aws.sts.role.session.duration - Session duration for cross-account access role in Seconds. Minimum value - 900.
  • aws.sts.config.endpoint - AWS STS endpoint for cross-account access role.
  • file.name.template - The file name. The connector has the configurable template for file names. Constant string prefix could be added to the file name to put output files into a subdirectory.
  • file.compression.type - Compression type for output files. Supported algorithms are gzip, snappy, zstd and none. Defaults to gzip.
  • format.output.fields - A comma separated list of fields to include in output. Supported values are: key, offset, timestamp, value and headers. Defaults to value.
  • format.output.fields.value.encoding - Controls encoding of value field. Possible values are: base64 and none. Defaults: base64
  • timestamp.timezone - The time zone in which timestamps are represented. Accepts short and long standard names like: UTC, PST, ECT, Europe/Berlin, Europe/Helsinki, or America/New_York. For more information please refer to https://docs.oracle.com/javase/tutorial/datetime/iso/timezones.html. The default is UTC.
  • timestamp.source - The source of timestamps. Supports only wallclock which is the default value.

Configuration

Here you can read about the Connect workers configuration and here, about the connector Configuration.

Here is an example connector configuration with descriptions:

### Standard connector configuration

## Fill in your values in these:

## These must have exactly these values:

# The Java class for the connector
connector.class=io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector

# The key converter for this connector
key.converter=org.apache.kafka.connect.storage.StringConverter

# The value converter for this connector
value.converter=org.apache.kafka.connect.json.JsonConverter

# Identify, if value contains a schema.
# Required value converter is `org.apache.kafka.connect.json.JsonConverter`.
value.converter.schemas.enable=false

# The type of data format used to write data to the GCS output files.
# The supported values are: `csv`, `json`, `jsonl` and `parquet`.
# Optional, the default is `csv`.
format.output.type=jsonl

# A comma-separated list of topics to use as input for this connector
# Also a regular expression version `topics.regex` is supported.
# See https://kafka.apache.org/documentation/#connect_configuring
topics=topic1,topic2

### Connector-specific configuration
### Fill in you values
# AWS Access Key ID
aws.access.key.id=YOUR_AWS_KEY_ID

# AWS Access Secret Key
aws.secret.access.key=YOUR_AWS_SECRET_ACCESS_KEY

#AWS Region
aws.s3.region=us-east-1

#File name template
file.name.template=dir1/dir2/{{topic}}-{{partition:padding=true}}-{{start_offset:padding=true}}.gz

#The name of the S3 bucket to use
#Required.
aws.s3.bucket.name=my-bucket

# The set of the fields that are to be output, comma separated.
# Supported values are: `key`, `value`, `offset`, `timestamp` and `headers`.
# Optional, the default is `value`.
format.output.fields=key,value,offset,timestamp

# The option to enable/disable wrapping of plain values into additional JSON object(aka envelope)
# Optional, the default value is `true`.
format.output.envelope=true

# The compression type used for files put on GCS.
# The supported values are: `gzip`, `snappy`, `zstd`, `none`.
# Optional, the default is `none`.
file.compression.type=gzip

# The time zone in which timestamps are represented.
# Accepts short and long standard names like: `UTC`, `PST`, `ECT`,
# `Europe/Berlin`, `Europe/Helsinki`, or `America/New_York`. 
# For more information please refer to https://docs.oracle.com/javase/tutorial/datetime/iso/timezones.html.
# The default is `UTC`.
timestamp.timezone=Europe/Berlin

# The source of timestamps.
# Supports only `wallclock` which is the default value.
timestamp.source=wallclock

S3 multi-part uploads

To configure S3 multi-part uploads buffer size change:

  • aws.s3.part.size.bytes - The Part Size in S3 Multi-part Uploads in bytes. Maximum is 2GB and default is 5MB.

Retry strategy configuration

There are four configuration properties to configure retry strategy exists.

Apache Kafka connect retry strategy configuration property

  • kafka.retry.backoff.ms - The retry backoff in milliseconds. This config is used to notify Apache Kafka Connect to retry delivering a message batch or performing recovery in case of transient exceptions. Maximum value is 24 hours.

AWS S3 retry strategy configuration properties

  • aws.s3.backoff.delay.ms - S3 default base sleep time for non-throttled exceptions in milliseconds. Default is 100 ms.
  • aws.s3.backoff.max.delay.ms - S3 maximum back-off time before retrying a request in milliseconds. Default is 20 000 ms.
  • aws.s3.backoff.max.retries - Maximum retry limit (if the value is greater than 30, there can be integer overflow issues during delay calculation). Default is 3.

AWS S3 server side encryption properties

  • aws.s3.sse.algorithm - The name of the Server-side encryption algorithm to use for uploads. If unset the default SSE-S3 is used.
  • To use SSE-S3 set to AES256 or leave empty
  • To use SSE-KMS set to aws:kms
  • To use DSSE-KMS set to aws:kms:dsse

Development

Developing together with Commons library

This project depends on Common Module for Apache Kafka Connect. Normally, an artifact of it published to a globally accessible repository is used. However, if you need to introduce changes to both this connector and Commons library at the same time, you should short-circuit the development loop via locally published artifacts. Please follow this steps:

  1. Checkout the main HEAD of Commons.
  2. Ensure the version here is with -SNAPSHOT prefix.
  3. Make changes to Commons.
  4. Publish it locally with ./gradlew publishToMavenLocal.
  5. Change the version in the connector's build.gradle (ext.aivenConnectCommonsVersion) to match the published snapshot version of Commons.

After that, the latest changes you've done to Commons will be used.

When you finish developing the feature and is sure Commons won't need to change:

  1. Make a proper release of Commons.
  2. Publish the artifact to the currently used globally accessible repository.
  3. Change the version of Commons in the connector to the published one.

Integration testing

Integration tests are implemented using JUnit, Gradle and Docker.

To run them, you need:

  • Docker installed.

Integration testing doesn't require valid AWS credentials.

To simulate AWS S3 behaviour, tests use LocalStack.

In order to run the integration tests, execute from the project root directory:

./gradlew clean integrationTest

License

This project is licensed under the Apache License, Version 2.0.

Trademarks

Apache Kafka, Apache Kafka Connect are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. AWS S3 is a trademark and property of their respective owners. All product and service names used in this website are for identification purposes only and do not imply endorsement.

s3-connector-for-apache-kafka's People

Contributors

actions-user avatar ampem avatar anatolypopov avatar c0urante avatar dependabot[bot] avatar docemmetbrown avatar helenmel avatar hnousiainen avatar ivanyu avatar jeqo avatar jjaakola-aiven avatar jlprat avatar juha-aiven avatar keejon avatar oikarinen avatar rikonen avatar ryanskraba avatar snuyanzin avatar steephengeorge avatar stephen-harris avatar tvainika avatar vedrankolka avatar willyborankin avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

s3-connector-for-apache-kafka's Issues

Delivery semantic

Hello, what is the delivery semantic of this connector , since the confluent one is exactly-once in some cases or at-least-once.

Thanks

S3 Source Connector

Hello,

Is there an S3 Source connector that can read the data uploaded by this Sink connector?

Rename classes

AivenKafkaConnect prefix is unnecessary and can be removed.
However, it must be kept in AivenKafkaConnectS3SinkConnector (this cass name is a part of configuration).

Extend timestamp variable parameters in File name format

Scenario Overview

We use s3 filename template prefix/'%Y_%m_%d__%H_%M_%S_%f to sort filenames alphabetically.
The next new file is guaranteed to receive the following name in alphabetical order.

In kafka, we have several partitions of one topic, each of them must be written with the same prefix (prefix=topic_name) in order.
It's possible to ensure the files order with this template by running no more than 1 connector task.

Issue:

Timestamp variable have next parameters:

unit parameter values:
yyyy - year, e.g. 2020 (please note that YYYY is deprecated and is interpreted as yyyy)
MM - month, e.g. 03
dd - day, e.g. 01
HH - hour, e.g. 24

Consequences:

With these parameters, files recorded within 1 hour will not differ in name.
Adding the partition number and offset to the file name in the template can solve this problem, but it makes working with the root prefix more difficult.
Uniqueness can be ensured by adding minutes, seconds, milliseconds to the timestamp variable.

Details:

Looks like it's enough to extend the following functionality :

    private static final Map<String, DateTimeFormatter> TIMESTAMP_FORMATTERS =
            Map.of(
                    "yyyy", DateTimeFormatter.ofPattern("yyyy"),
                    "MM", DateTimeFormatter.ofPattern("MM"),
                    "dd", DateTimeFormatter.ofPattern("dd"),
                    "HH", DateTimeFormatter.ofPattern("HH")
            );

with next parameters:

"%M" - Minutes in two-digit format.
"%S" - Seconds in two-digit format.
"%f" - Microseconds.

AWS ECS Task or (likely) EKS pod with pre-mounted IAM credentials unsupported

AWS ECS Task or (likely) EKS pod IAM credentials provided via container env variables that allow S3 access fails.

IAM:

// IAM Role: arn:aws:iam::XXXX:role/XXX-ecs-instance-role

// Trust 
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "ec2.amazonaws.com",
                    "ecs-tasks.amazonaws.com"
                ]
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

// Permission
{
    "Statement": [
        {
            "Action": [
                "s3:*"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:s3:::mybucket",
                "arn:aws:s3:::mybucket/*"
            ]
        }
    ],
    "Version": "2012-10-17"
}

Should this support mounted IAM/STS credentials vs having to define explicitly long-term or short-term: https://github.com/aiven/s3-connector-for-apache-kafka#credentials

ECS Task IAM Ref: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html
EKS Service Account Ref: https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html


Current behavior requires providing either long-term or short-term combo in the configs.
Running without them yields following error:

// Error
org.apache.kafka.common.config.ConfigException: Either {aws.access.key.id, aws.secret.access.key} or {aws.sts.role.arn, aws.sts.role.session.name} should be set

Attempting to assume a role using an assumed role does not work either (and even if it did doesn't feel elegant)

// config sample:
{
"aws.sts.role.arn":"arn:aws:iam::XXXX:role/XXX-ecs-instance-role",
"aws.sts.role.session.name":"aiven-connect-s3"
}

// Error

com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException: User: arn:aws:sts::XXX:assumed-role/XXX-ecs-instance-role/i-0eXXXXXXXX  is not authorized to perform: sts:AssumeRole on resource: arn:aws:iam::XXX:role/XXX-ecs-instance-role (Service: AWSSecurityTokenService; Status Code: 403; Error Code: AccessDenied; Request ID: <>; Proxy: null)

Or am I missing something? Thanks!

file.name.prefix doesn't seem to work

I'm using both file.name.prefix and file.name.template together, if that matters. Seems like the prefix is completely ignored. I can see that the previous deprecated settings are looked at in the sink code, but can't find any references to the new prefix property.

master incompatible with latest cp-kafka-connect Docker image

I've been fighting for a few hours trying to get current HEAD running in Confluents latest (5.5.1) Kafka Connect Docker image. After not finding anything wrong with my build I went back to tag 2.7.0 with JDK 1.8 and it works.

The connector simply doesn't load on startup, the loader gets picked up but the plugin doesn't get added. This is what it should look like, on current head the second line isn't showing up:

[2020-08-24 04:37:36,763] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/aiven-kafka-connect-s3/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
[2020-08-24 04:37:36,763] INFO Added plugin 'io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)

Their image runs on OpenJDK 1.8, this might be the reason but I don't know how to verify this.
I tried building master with 1.8 but CompressionType from commons is incompatible (class file has wrong version 55.0, should be 52.0).

Dockerfile for reference:

FROM gradle:6.6.0-jdk11 AS aiven-kafka-connect-s3-builder

ENV RELEASE_VERSION="2.6.0"
RUN git clone https://github.com/aiven/aiven-kafka-connect-s3.git && \
    cd aiven-kafka-connect-s3 && \
    gradle installDist && \
    cd build/install/aiven-kafka-connect-s3

FROM confluentinc/cp-kafka-connect-base:5.5.1
COPY --from=aiven-kafka-connect-s3-builder /home/gradle/aiven-kafka-connect-s3/build/install/ /usr/share/java/

Refactor configuration definition

Connector configuration class AivenKafkaConnectS3Config must be brought closer to GcsSinkConfig in style. It can be used as the reference.

Some points:

Query: Tombstone records support

Hi all,

I'm not an expert in java so it is difficult to me to know if the tombstone records are supported. If had seen that the null key values are managed properly, but I did not see any mention for the tombstone (key with value but null payload) case.

My plan is to use this connector to write down, as a log, all the different messages that are publish into some topics and there a few tombstone cases.

Thank you very much,

Out-of-Memory errors when sinking large topics

Scenario Overview

We have several topics, each of them already containing gigabytes of data (~1-10 millions of records). We need to export the data to S3.

Issue:

Using the Aiven S3 Connector we run into Out-of-memory errors indicating that the Kafka Connect JVM process does not have enough heap space.

Consequences:

The S3 connector runs into errors.
The entire Kafka Connect cluster is lagging.
The Aiven CLI stops working and returns an 503 error.

Details:

Looking at the logs it looks like the connector is permanently ingesting messages from the topic and storing them in memory.
(log messages come from here)

It looks like the connector is not fast enough in writing to S3 and thus the memory is not freed in time.

We managed to get rid of the Out-of-memory errors by scaling up the Kafka Connect cluster. However, this is not a suitable long-term solution as we would need to setup multiple such connectors in parallel in the future.

We would like to have something that gives us some control over the memory consumption of the connector, e.g., a configuration for the maximum size of the input records buffer.

PS: Trying out the Confluent S3 connector provided by Aiven (version 5.0.0) does not run into Out-of-memory errors and utilizes a lot less memory but it's not an option for us.

There is an issue handling avro schema with default value defined as String

Having an avro schema with a field defined like this:

      {
            "name": "osType",
            "type": [
                "string"
            ],
            "default": "UNKNOWN"
        },

Results in failure to process it with this kind of a stacktrace:

org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
	at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
	at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1564)
	at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1445)
	at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1323)
	at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1047)
	at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:92)
	at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:540)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:496)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:496)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type STRUCT: class java.lang.String for field: "null"
	at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:241)
	at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
	at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
	... 23 more

The issue seems to be caused by using too old versions of org.apache.kafka:connect-api:1.1.0 & io.confluent:kafka-avro-serializer:4.1.4 => I suggest to update the dependencies to resolve the issue.

file in s3 without return line for each event

I have an issue with some files in s3 where all the content file is in one line (without return line), most of the files are formatted correctly (one event by line) but in some cases (with same connector setup) there are some files misformatted

do you know if there is any parameter to fix this issue?

java.lang.NoClassDefFoundError

Hello,

I'm trying to run this connector to sync data from a compacted kafka topic to an s3 location by key. After running the "gradle build" and adding it the the connect node packages. We're seeing a strange issue when starting a connector.

WARN unhandled due to prior sendError (org.eclipse.jetty.server.HttpChannelState:787)
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: io/aiven/kafka/connect/common/config/AivenCommonConfig
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:410)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:358)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:311)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:554)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
at org.eclipse.jetty.server.Server.handle(Server.java:516)
at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487)
at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
at org.eclipse.jetty.io.ssl.SslConnection$DecryptedEndPoint.onFillable(SslConnection.java:555)
at org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:410)
at org.eclipse.jetty.io.ssl.SslConnection$2.succeeded(SslConnection.java:164)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131)
at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883)
at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: io/aiven/kafka/connect/common/config/AivenCommonConfig
at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:255)
at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:237)
at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:438)
at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:263)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:234)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684)
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
... 39 more

I first checked this class in the source code. io/aiven/kafka/connect/common/config/AivenCommonConfig - but the only definition i see is an import on line 34 in /src/main/java/o/aiven/kafka/connect/s3/config. The file path for the missing class doesn't exist and i can't find anywhere that is defining it or packaging it as a calss. Does this class exist? Or am I overlooking something obvious?

Connector config is -
{
"name": "Avien_S3_sink_conncector",
"config":
{
"connector.class": "io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector",
"tasks.max": "8",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"topics": "TP.TOPIC.NAME.LATEST",
"value.converter.schemas.enable": "false",
"format.output.type" : "json",
"aws.s3.bucket.name" : "hs-topic-bucket",
"aws.s3.region": "us-east-1",
"format.output.fields" : "key,value",
"file.name.template" : "{{topic}}-{{partition}}-{{key}}",
"aws.sts.role.arn" : "arn:aws:iam::**:role//KAFKACONNECT",
"s3.credentials.provider.class": "io.confluent.connect.s3.auth.AwsAssumeRoleCredentialsProvider",
"aws.sts.role.session.name" : "avienconnect"
}
}

Issue on "Date Format" using the S3 Sink Connector from Avro to Parquet

Hi, when using the connector to share event-data from Posgresql Debezium to S3 via Kafka in Parquet, we have an issue to get a "Date Format"

In Kafka, the payload is :
"created_date": 1643631507020,

The schema created by the Debezium is this one

 {
      "name": "created_date",
      "type": {
        "type": "long",
        "connect.version": 1,
        "connect.name": "org.apache.kafka.connect.data.Timestamp",
        "logicalType": "timestamp-millis"
      }
    },

Using a S3 connector to share this data as "Parquet file", we can configure a smt transformation to transform as string
"transforms.TsCreatedDate.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.TsCreatedDate.field": "created_date", "transforms.TsCreatedDate.format": "yyyy-MM-dd'T'HH:mm:ssZ", "transforms.TsCreatedDate.target.type": "string",

But the expected date format in Parquet is date.
We still get the "long format" or "string" with SMT transformation.
required int64 created_date;

Attended format should be: DATE, TIMESTAMP_MILLIS, TIMESTAMP_MICROS

How can we resolve this ?

Aiven s3 KafkaConnect not consuming from SSL/SASL cluster

Hello there,

we are trying to use Aiven s3 KafkaConnect, but it isn't consuming any messages after changing to a SSL and authenticated Kafka cluster.

With a cluster without SSL or authentication it's working perfectly fine with the same configuration except for the sasl/ssl settings.

I think probably is a small config error, but I've been struggling to fix it, so your help will be very much appreciated.

No errors appear on logs, and it seems to connect properly since if I intentionally change connection parameters to wrong values (usr/pass/truststore or IPs) I see errors.

In order to check the SSL configuration I've done kafka-console-producer.sh and kafka-console-consumer.sh with the very same .properties and hosts successfully sending/receiving messages.

In order to try to isolate the problem from the KafkaConnect Connector, I developed my own simple Connnector that just outputs calls to the Console, but the behavior is the same: runs OK when connecting to NON-SSL brokers while does not receive messages or prints errors when connecting to a SSL enabled Broker.

The plugin I was trying to use is aiven-kakfa-s3-connector.

Thank you very much.

Best,
Javier Arias

connect-sslbroker.properties

# Kafka broker IP addresses to connect to
bootstrap.servers=sslbrokers:9092
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.truststore.location=truststore.jks
ssl.truststore.password=truststorepass
ssl.protocol=TLS
security.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="usr" \
    password="pass";

group.id=connect
config.storage.topic=connect-config
offset.storage.topic=connect-offsets
status.storage.topic=connect-status

# Path to directory containing the connector jar and dependencies
plugin.path=/plugins
# Converters to use to convert keys and values
# key.converter=org.apache.kafka.connect.storage.StringConverter
# value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter


# The internal converters Kafka Connect uses for storing offset and configuration data
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.port=8084

connect-localhost.properties

# Kafka broker IP addresses to connect to
bootstrap.servers=unsecuredbrokers:9092

group.id=connect-local
config.storage.topic=connect-config
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
config.storage.replication.factor=1
offset.storage.replication.factor=1
offset.storage.partitions=1
status.storage.replication.factor=1
status.storage.partitions=1

# Path to directory containing the connector jar and dependencies
plugin.path=/plugins
# Converters to use to convert keys and values
# key.converter=org.apache.kafka.connect.storage.StringConverter
# value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter


# The internal converters Kafka Connect uses for storing offset and configuration data
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.port=8084

Add support for templates aws_s3_prefix field

Adding support for templates to the aws_s3_prefix field would be really helpful. For example, it would allow us to store the files in subfolders by date.

Example:
aws_s3_prefix: /bucket/folder/${date}

Consumer offsets not committed when using KeyRecordGrouper

When using the S3 connector and grouping records by key we have encountered this error:

consumer` poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records

Increasing the interval doesn't fix this issue. Looking at the consumer offsets I can see that it's failing to commit any offset, however it is still writing the records to S3 and it's writing more than one batch. Unfortunately the connector appears to be then retry uploading the fetched batches until the connector is killed.

The topic in question has millions of records on it, and if I set the offset manually to skip most of the messages it appears to work OK. When not grouping by key the same topic is ingested to S3 without any issues. In short it appears to be a load issue specific to grouping by key.

With the default max.poll.records of 500 and max.poll.interval.ms of 300000, I would have thought the consumer offsets would have been written every ~500 records. It's certainly able to process more than 500 records in that interval. Can you advise on what triggers the offsets being committed?

Random UnknownHostException resolving S3 hostname

Hi,

We are currently using this S3 Sink connector to dump content of Kafka topic into a AWS S3.
However, we notice in our monitoring that there is some random failure with S3 DNS resolving.

The connector seems to retry/success record processing few milliseconds after but we are wondering if it is a normal behavior, why it happened and if there is a way to avoid it.

Here is the trace :

... 34 more
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1127)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1311)
at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
at com.amazonaws.http.conn.$Proxy48.connect(Unknown Source)
at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at jdk.internal.reflect.GeneratedMethodAccessor134.invoke(Unknown Source)
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:112)
at com.amazonaws.http.DelegatingDnsResolver.resolve(DelegatingDnsResolver.java:38)
at com.amazonaws.SystemDefaultDnsResolver.resolve(SystemDefaultDnsResolver.java:27)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1302)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1368)
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1509)
at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)
Caused by: java.net.UnknownHostException: lcdp-prod-audit-database.s3.eu-west-3.amazonaws.com
at java.base/java.lang.Thread.run(Thread.java:829)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:218)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:374)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:404)
at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
at io.aiven.kafka.connect.s3.S3SinkTask.flush(S3SinkTask.java:122)
at java.base/java.util.Collections$UnmodifiableMap.forEach(Collections.java:1505)
at java.base/java.util.HashMap.forEach(HashMap.java:1337)
at io.aiven.kafka.connect.s3.S3SinkTask.flushFile(S3SinkTask.java:140)
at io.aiven.kafka.connect.common.output.parquet.ParquetOutputWriter.writeRecords(ParquetOutputWriter.java:70)
at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:284)
at org.apache.parquet.hadoop.ParquetFileWriter.start(ParquetFileWriter.java:339)
at java.base/java.io.OutputStream.write(OutputStream.java:122)
at io.aiven.kafka.connect.common.output.parquet.ParquetPositionOutputStream.write(ParquetPositionOutputStream.java:47)
at io.aiven.kafka.connect.s3.S3OutputStream.write(S3OutputStream.java:79)
at io.aiven.kafka.connect.s3.S3OutputStream.newMultipartUpload(S3OutputStream.java:96)
at com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3574)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4998)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5052)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:752)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:784)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1135)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1189)
com.amazonaws.SdkClientException: Unable to execute HTTP request: lcdp-prod-audit-database.s3.eu-west-3.amazonaws.com
[2022-07-26 07:43:08,220] ERROR [lcdp-prod-audit-sink|task-0] WorkerSinkTask{id=lcdp-prod-audit-sink-0} Commit of offsets threw an unexpected exception for sequence number 37048: null (org.apache.kafka.connect.runtime.WorkerSinkTask:269)

Yours faithfully,
LCDP

does csv output require the input to be ByteArrayConverter ?

we have a KSQL stream outputting avro to a topic.

we would like to add an additional s3 sink connector that uses the same topic, where the output is a csv file.

is this a supported use case for this connector ?

i am guessing not based on this documentation:
The value.converter property must be set to org.apache.kafka.connect.converters.ByteArrayConverter for this data format.

i know i can create a new stream that outputs delimited data, but just want to make sure i am not missing a feature of this connector.

many thanks,
scott

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.