Giter Club home page Giter Club logo

kafka-backup's Introduction

Kafka Backup

Hi all, I am currently not able to maintain this Project on my own. If you are interested in supporting me, please let me know for example by opening an issue.

Kafka Backup is a tool to back up and restore your Kafka data including all (configurable) topic data and especially also consumer group offsets. To the best of our knowledge, Kafka Backup is the only viable solution to take a cold backup of your Kafka data and restore it correctly.

It is designed as two connectors for Kafka Connect: A sink connector (backing data up) and a source connector (restoring data).

Currently kafka-backup supports backup and restore to/from the file system.

Features

  • Backup and restore topic data
  • Backup and restore consumer-group offsets
  • Currently supports only backup/restore to/from local file system
  • Released as a jar file or packaged as a Docker image

Getting Started

Option A) Download binary

Download the latest release from GitHub and unzip it.

Option B) Use Docker image

Pull the latest Docker image from Docker Hub

DO NOT USE THE latest STAGE IN PRODUCTION. latest are automatic builds of the master branch. Be careful!

Option C) Build from source

Just run ./gradlew shadowJar in the root directory of Kafka Backup. You will find the CLI tools in the bin directory.

Start Kafka Backup

backup-standalone.sh --bootstrap-server localhost:9092 \
    --target-dir /path/to/backup/dir --topics 'topic1,topic2'

In Docker:

docker run -d -v /path/to/backup-dir/:/kafka-backup/ --rm \
    kafka-backup:[LATEST_TAG] \
    backup-standalone.sh --bootstrap-server kafka:9092 \
    --target-dir /kafka-backup/ --topics 'topic1,topic2'

You can pass options via CLI arguments or using environment variables:

Parameter Type/required? Description
--bootstrap-server
BOOTSTRAP_SERVER
[REQUIRED] The Kafka server to connect to
--target-dir
TARGET_DIR
[REQUIRED] Directory where the backup files should be stored
--topics
TOPICS
<T1,T2,…> List of topics to be backed up. You must provide either --topics or --topics-regex. Not both
--topics-regex
TOPICS_REGEX
Regex of topics to be backed up. You must provide either --topics or --topics-regex. Not both
--max-segment-size
MAX_SEGMENT_SIZE
Size of the backup segments in bytes DEFAULT: 1GiB
--command-config
COMMAND_CONFIG
Property file containing configs to be passed to Admin Client. Only useful if you have additional connection options
--debug
DEBUG=y
Print Debug information
--help Prints this message

Kafka Backup does not stop! The Backup process is a continous background job that runs forever as Kafka models data as a stream without end. See Issue 52: Support point-in-time snapshots for more information.

Restore data

restore-standalone.sh --bootstrap-server localhost:9092 \
    --target-dir /path/to/backup/dir --topics 'topic1,topic2'

In Docker:

docker run -v /path/to/backup/dir:/kafka-backup/ --rm \
    kafka-backup:[LATEST_TAG]
    restore-standalone.sh --bootstrap-server kafka:9092 \
    --source-dir /kafka-backup/ --topics 'topic1,topic2'

You can pass options via CLI arguments or using environment variables:

Parameter Type/required? Description
--bootstrap-server
BOOTSTRAP_SERVER
[REQUIRED] The Kafka server to connect to
--source-dir
SOURCE_DIR
[REQUIRED] Directory where the backup files are found
--topics
TOPICS
[REQUIRED] List of topics to restore
--batch-size
BATCH_SIZE
Batch size (Default: 1MiB)
--offset-file
OFFSET_FILE
File where to store offsets. THIS FILE IS CRUCIAL FOR A CORRECT RESTORATION PROCESS IF YOU LOSE IT YOU NEED TO START THE BACKUP FROM SCRATCH. OTHERWISE YOU WILL HAVE DUPLICATE DATA Default: [source-dir]/restore.offsets
--command-config
COMMAND_CONFIG
Property file containing configs to be passed to Admin Client. Only useful if you have additional connection options
--help
HELP
Prints this message
--debug
DEBUG
Print Debug information (if using the environment variable, set it to 'y')

More Documentation

License

This project is licensed under the Apache License Version 2.0 (see LICENSE).

kafka-backup's People

Contributors

cschellenbach avatar gstuder-ona avatar itadventurer avatar jay7x avatar lbmaster001 avatar loffek avatar nickcharles avatar wesselvs avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-backup's Issues

Errors during the backup process

[2020-04-17 16:34:18,557] ERROR WorkerSinkTask{id=backup-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:559)
org.apache.kafka.connect.errors.DataException: Topic1 error: Not a byte array! �q�0��
	at de.azapps.kafkabackup.common.AlreadyBytesConverter.fromConnectData(AlreadyBytesConverter.java:19)
	at de.azapps.kafkabackup.common.record.RecordSerde.write(RecordSerde.java:121)
	at de.azapps.kafkabackup.common.segment.SegmentWriter.append(SegmentWriter.java:78)
	at de.azapps.kafkabackup.common.partition.PartitionWriter.append(PartitionWriter.java:57)
	at de.azapps.kafkabackup.sink.BackupSinkTask.put(BackupSinkTask.java:68)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[2020-04-17 16:34:18,567] ERROR WorkerSinkTask{id=backup-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Topic1 error: Not a byte array! �q�0��
	at de.azapps.kafkabackup.common.AlreadyBytesConverter.fromConnectData(AlreadyBytesConverter.java:19)
	at de.azapps.kafkabackup.common.record.RecordSerde.write(RecordSerde.java:121)
	at de.azapps.kafkabackup.common.segment.SegmentWriter.append(SegmentWriter.java:78)
	at de.azapps.kafkabackup.common.partition.PartitionWriter.append(PartitionWriter.java:57)
	at de.azapps.kafkabackup.sink.BackupSinkTask.put(BackupSinkTask.java:68)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
	... 10 more
[2020-04-17 16:34:18,568] ERROR WorkerSinkTask{id=backup-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)

Unable to connect kafka cluster via SSL (solved)

It's not clear to me how it connect kafka cluster which requires SSL.

For now I'm playing with simple case. I'm starting kafka backup sink in standalone connector right on kafka cluster node. kafka-connect is starting fine, AdminClient is connecting well. Though backup sink cannot connect. Consumer config part still show security.protocol = PLAINTEXT during initialisation stage.

So far I tried this in my connect-backup-sink.properties:

cluster.bootstrap.servers=kafka5.tld:9093
cluster.security.protocol=SSL
cluster.ssl.truststore.type=PKCS12
cluster.ssl.truststore.location=/opt/kafka/config/ssl/truststore.pkcs12
cluster.ssl.truststore.password=[CENSORED]
#
bootstrap.servers=kafka5.tld:9093
security.protocol=SSL
ssl.truststore.type=PKCS12
ssl.truststore.location=/opt/kafka/config/ssl/truststore.pkcs12
ssl.truststore.password=[CENSORED]
#
consumer.bootstrap.servers=kafka5.tld:9093
consumer.security.protocol=SSL
consumer.ssl.truststore.type=PKCS12
consumer.ssl.truststore.location=/opt/kafka/config/ssl/truststore.pkcs12
consumer.ssl.truststore.password=[CENSORED]
#
cluster.consumer.bootstrap.servers=kafka5.tld:9093
cluster.consumer.security.protocol=SSL
cluster.consumer.ssl.truststore.type=PKCS12
cluster.consumer.ssl.truststore.location=/opt/kafka/config/ssl/truststore.pkcs12
cluster.consumer.ssl.truststore.password=[CENSORED]

Am I missing something?

Though I still see this in logs:

[2020-01-17 09:36:38,028] INFO ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [kafka5.tld:9093]
        check.crcs = true
        client.dns.lookup = default
        client.id = connector-consumer-chrono-qa-backup-sink-0
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = connect-chrono-qa-backup-sink
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig:347)

AdminClient is fine though:

[2020-01-17 09:36:38,114] INFO AdminClientConfig values:
        bootstrap.servers = [kafka5.tld:9093]
        client.dns.lookup = default
        client.id =
        connections.max.idle.ms = 300000
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 120000
        retries = 5
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = SSL
        security.providers = null
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = /opt/kafka/config/ssl/truststore.pkcs12
        ssl.truststore.password = [hidden]
        ssl.truststore.type = PKCS12
 (org.apache.kafka.clients.admin.AdminClientConfig:347)

Document how to cleanup old segments when doing a backup

A usual setup is to have Kafka Backup running continuously in the background which writes data to the local disk.
Then, in parallel a CronJob will move the data to the target backup storage. Afterwards it is possible to delete old segments and indexes. (NOT the current ones)
We should document how to do that gracefully. (Maybe even provide a script for that)

Consumer offset

@azapps First of all thanks for this wonderful open source project.

I am writing a blog post on backup and restore of Kafka Topics in a Kubernetes environment with another open source project OpenEBS providing the underlying persistent container attached storage.

For now I settled on using Spredfast's S3 connector but my friend Arash Kaffamanesh pointed me to your work. I had a couple of questions.

At the time of restore , how do i let the consumer know from where to start consuming ?
Can you please share additional differences with spredfast's connector ?

my Kafka environment runs in Kubernetes. Ideally I want a backup/restore storage location outside my cluster so that I can get it back in event of a failure.

backup location is determined by target.dir , it becomes difficult to manage a path on a node if the environment is Kubernetes.

Adding a restore to different destination topic

Hello,

I didn't see in the documentation if it's possible to be able to restore to a different destination topic, such as mybackupedtopic-restored. It would help with testing restore procedures without disturbing the existing topic, among other things.

Thanks,

Chris

Support Backup Compaction

Currently there is no support for compaction. But it would be probably nice for Kafka Backup to support it

How to split the restore process in chunks

The easiest way to restore data is to do it at-once. Sometimes we have more data than fits on a disk. We should document how to chunk the restore process
(By topics, by batches of segments, etc…)

Missing Configuration Variable: cluster.key.deserializer

Hello,
Could be please help with this.
I use kafka 2.4.0 (http://apache.volia.net/kafka/2.4.0/kafka_2.12-2.4.0.tgz), connectors running in standalone mode.
Backup Sink Connector is working find for me and backups saved in /backup dir.
For restore I use two configs:

  1. connect-standalone-restore.properties
bootstrap.servers=kafka-test-2:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
rest.port=8084
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
  1. connect-backup-source.properties
name=backup-source
connector.class=de.azapps.kafkabackup.source.BackupSourceConnector
tasks.max=1
topics=test5
cluster.key.converter=de.azapps.kafkabackup.common.AlreadyBytesConverter
cluster.value.converter=de.azapps.kafkabackup.common.AlreadyBytesConverter
source.dir=/backup
batch.size=500
cluster.bootstrap.servers=kafka-test-2:9092

run restore

./bin/connect-standalone.sh restore-config/connect-standalone-restore.properties restore-config/connect-backup-source.properties

error

[2020-03-08 09:18:50,991] INFO Kafka version: 2.4.0 (org.apache.kafka.common.utils.AppInfoParser:117)
[2020-03-08 09:18:50,991] INFO Kafka commitId: 77a89fcf8d7fa018 (org.apache.kafka.common.utils.AppInfoParser:118)
[2020-03-08 09:18:50,991] INFO Kafka startTimeMs: 1583659130991 (org.apache.kafka.common.utils.AppInfoParser:119)
[2020-03-08 09:18:50,996] INFO Created connector backup-source (org.apache.kafka.connect.cli.ConnectStandalone:112)
[2020-03-08 09:18:50,996] INFO BackupSourceConfig values:
        batch.size = 500
        source.dir = /backup
        topics = test5
 (de.azapps.kafkabackup.source.BackupSourceConfig:347)
[2020-03-08 09:18:50,996] INFO WorkerSourceTask{id=backup-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
[2020-03-08 09:18:50,996] INFO WorkerSourceTask{id=backup-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
[2020-03-08 09:18:50,997] ERROR WorkerSourceTask{id=backup-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
java.lang.RuntimeException: Missing Configuration Variable: cluster.key.deserializer
        at de.azapps.kafkabackup.source.BackupSourceConfig.<init>(BackupSourceConfig.java:41)
        at de.azapps.kafkabackup.source.BackupSourceTask.start(BackupSourceTask.java:41)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2020-03-08 09:18:50,998] ERROR WorkerSourceTask{id=backup-source-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
[2020-03-08 09:18:50,998] INFO Stopped BackupSourceTask (de.azapps.kafkabackup.source.BackupSourceTask:159)
[2020-03-08 09:18:50,998] INFO [Producer clientId=connector-producer-backup-source-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1183)

On-disk files compression

It'd be nice to store files on-disk in compressed form to save some disk space/cost. I guess compression support is already there as Kafka can do few compression methods on topics. So may be just need to expose few config options (or maybe it's already there just undocumented).

Sink connector not apennding backup file

Steps done:

  1. Configured the sink connector
  2. Checked the size of the backup file
  3. Manually added data to Kafka topic
"tasks": [{
    "state": "FAILED",
    "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: de.azapps.kafkabackup.common.segment.SegmentWriter$SegmentException: Trying to override a written record. There is something terribly wrong in your setup! Please check whether you are trying to override an existing backup
       at de.azapps.kafkabackup.sink.BackupSinkTask.put(BackupSinkTask.java:77)
       at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
       ... 10 more
Caused by: de.azapps.kafkabackup.common.segment.SegmentWriter$SegmentException: Trying to override a written record. There is something terribly wrong in your setup! Please check whether you are trying to override an existing backup
       at de.azapps.kafkabackup.common.segment.SegmentWriter.append(SegmentWriter.java:72)
       at de.azapps.kafkabackup.common.partition.PartitionWriter.append(PartitionWriter.java:57)
       at de.azapps.kafkabackup.sink.BackupSinkTask.put(BackupSinkTask.java:68)
       ... 11 more
",
    "id": 0,
    "worker_id": "kafka-connect:8083"
  }],

Optional.isEmpty() error when using Java8 included in cp-ansible v5.4 kafka connect

I noticed that the connector was failing with kafka deployed using the standard deployment of cp-ansible (https://github.com/confluentinc/cp-ansible) at version 5.4 - seems like the provisioned java version (8) is lower than expected here.

The simple fix to use !isPresent() in SegmentReader.java? From the docs here: https://docs.confluent.io/current/installation/versions-interoperability.html it seems like Java8 is a supported jvm for the 5.4 series, so it may be most-compatible to match that?

"Failed to construct kafka consumer" error while using latest cp-kafka-connect 5.3.1

I keep getting the above error when I try to use the connector using the latest kafka connect from confluent. Looking through the log output, It says the error is Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.internals.AbstractCoordinator

I tried compiling the connector using the latest versions kafka connect jars, kafka-clients-2.3.1 and connect-api-2.3.1, instead of the 2.3.0-SNAPSHOT this project uses. This did not help since I could not compile the project anymore, getting because of a superclass method not implemented error.

The kafka-connect works fine if I don't give it this connectors jar, so I dont think the problem is with Kafka connect.

Heres a snippet of log output just before the crash

[2019-11-29 13:37:27,206] INFO Kafka version: 2.3.0-SNAPSHOT (org.apache.kafka.common.utils.AppInfoParser)
[2019-11-29 13:37:27,206] INFO Kafka commitId: 8f2d073863edfa87 (org.apache.kafka.common.utils.AppInfoParser)
[2019-11-29 13:37:27,206] INFO Kafka startTimeMs: 1575034647205 (org.apache.kafka.common.utils.AppInfoParser)

Heres the same sections log output when I dont supply in this connectors jar file

[2019-11-29 13:48:28,815] INFO Kafka version: 5.3.1-ccs (org.apache.kafka.common.utils.AppInfoParser)
[2019-11-29 13:48:28,815] INFO Kafka commitId: 03799faf9878a999 (org.apache.kafka.common.utils.AppInfoParser)
[2019-11-29 13:48:28,816] INFO Kafka startTimeMs: 1575035308815 (org.apache.kafka.common.utils.AppInfoParser)
[2019-11-29 13:48:28,889] INFO [Producer clientId=producer-2] Cluster ID: XDNIN6YHT9m3FIeakkdlvQ (org.apache.kafka.clients.Metadata)

I downgraded Kafka connect to version 5.2.0 from about 6 months ago. This time I get this error instead java.lang.NoSuchMethodError: org.apache.kafka.common.utils.AppInfoParser.registerAppInfo

Restoring multi-partition topic

I am not sure if it's an issue, maybe I missed something in connect-backup-source.properties.
When I restore topic with 24 partitions it creates topic with one partitions and restore failed.
Restore successful if I create 24 partitions topic before restore.

Have I missed something or multi-partition topic creation not supported so far ?

NPE in RecordSerde.java::write() when records have null values

Noticed this when doing (probably terrible) things to some streams in my kafka instance - I haven't dug in too deeply, could be wrong, but it seems like null is a placeholder value for deletion? The ::write() function crashes but ::read() seems happy enough with null values.

Easy fix is to guard .value() with a null-check and write zero-length, though I don't have great testing up on restore yet to ensure this doesn't break things in a way I didn't expect.

Fallacy: Offset Restore is coupled to the restore-records step

Under normal circumstances this would never cause an issue, but imagine following situation:

Cluster A Cluster B
Produce 100 Messages
Do a Backup
Restore Backup from Cluster A
Produce 10 Messages (does not matter)
Consume 50 Messages with CG1
Consume 105 Messages with CG2
Restore incremental Backup from Cluster A

What happens:

  • Cluster B has 150 messages (so far so good)
  • Cluster B has a Consumer Group CG2 with a commit to 105 messages
  • Cluster B has no Consumer Group CG1!

Reason:

Consumer Group offsets are restored during restoration of records:

offsetSource.syncGroupForOffset(topicPartition, sourceOffset, targetOffset);

This means offsets can be only committed in the same moment as the
record with the same offset is written. Thus, when we restore partial
backups when a new cg offset should be committed that is smaller than
the offset of the currently written record, this would fail silently.

Possible solutions

Ignore the problem

I think that in 99.9% of cases this is a non-issue. But it is
definitely unexpected behaviour. I think we should fix that (maybe not
in the first release)

Write source offsets into headers

Confluent Replicator writes source offsets to the target record as a
header entry. We could use the same approach to be able to restore the
data. The downside here is that we would need to do linear scans to
find the correct offset for our offset commits

Understand how MirrorMaker 2 approaches this

See
https://github.com/apache/kafka/blob/trunk/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java#L90

I currently do not know how this works but I think it is worth investigating.

Add Magic Byte to files

Currently we do not know whether the file format will change. We need to prepare for future evolution of this tool. The simplest way would be to reserve the first byte(s) to put in some number (version of the file format) that helps Kafka Backup to identify the version.

Standalone `kafka-backup` tool

It would be great to have a wrapper around kafka-connect-standalone to do backups and restores.
The tricky part here is to be able to stop the tool as soon as the restore is done (related: #46 )

Improve Test Coverage of Integration Tests

  • Support and differentiate between null and empty messages #14
  • Restore can fail when at least one partition was empty #17
  • Starting the Backup Sink multiple times #41
  • Delete old segments manually when doing a backup → should still work

Restore found no records when using same name in Backup-Source.properties

I recently had some headache finding out, why kafka backup refused to write anything to my cluster when restoring. I always wiped the whole cluster by executing this script:

#/bin/sh
echo "Stopping Kafka Server..."
systemctl stop kafka
echo "Stopping Zookeeper Server..."
systemctl stop kafka-zookeeper
echo "Deleting Kafka Storage..."
rm -rf /opt/kafka/data-logs/kafka/*
echo "Deleting Zookeeper Storage..."
rm -rf /opt/kafka/data-logs/zookeeper/version-2/*
echo "Finished shutdown and cleanup"
echo "Zookeeper Status: "
systemctl status kafka-zookeeper | grep "Active"
echo "Kafka Status: "
systemctl status kafka | grep "Active"

It only worked in some rare cases and when I tried to restore a topic I never restored before.
Now I found out, that changing the name=backup-source in the config/connect-backup-source.properties makes it possible to restore the topic again. Is this an intended behavior and why is there no doc for such a thing ? I tried to find the location where this information could be stored, but had no luck yet. I could imagine, that adding messages after doing the restore only these will be written. But why does kafka-backup not realize when I wipe the whole custer or just delete the topic ? Is there no possibility to notice such changes ?

Error "Plugin class loader for connector was not found" during the backup

I see multiple error log entries during the start of the process:

[2020-04-17 16:56:14,068] ERROR Plugin class loader for connector: 'de.azapps.kafkabackup.sink.BackupSinkConnector' was not found. Returning: org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@5b068087 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)

On-disk files encryption

It'd be great to store files on disk in encrypted form. This allows to easily upload data nightly into public cloud storages for long-term storage. No additional encryption is required then. Even good symmetric encryption should be enough for first time.

Pluggable storage architecture

It seems like it would be useful to support various backup formats and Target systems:

Backup formats:

  • binary data (currently implemented): small size. Fast. But hard to recover if tools are lost or there are serious bugs in the serialization format
  • Json (see implementation by getdreams): potentially large overhead. But easier to understand for humans. Hard to represent binary data gracefully. Base64 is possible but huge. Maybe a good option for strings or json topics.
  • Encryption (See #30)
  • Compression (See #29)

Target storage systems:

  • local disk (currently implemented)
  • Cloud storage (S3, Azure Blobs, ...) #26
  • On premises object storage (glusterfs, ...)

I am not sure which part should be covered by Kafka Backup and which should be handled by the underlying system. But this gets interesting as soon as we want to stream data from and to object storage where we do not have the easy possibility to manipulate data locally.

In terms of implementation I imagine a pluggable storage layer consisting of storage sinks that are agnostic to the data format and accept a stream of data to be written to or read from. And on top of it a layer of Transformation Middleware that can apply transformation to the data (E.g. compression, encryption, etc)

But as this is a backup tool I would like to be very cautious about too many features to reduce the probability of serious bugs and would like to keep it as simple as possible.

Feel free to share your comments!

Build failed with 'Received status code 501 from server: HTTPS Required' message (Maven Central repo requires HTTPS now)

Today kafka-backup build fails with following errors.

After quick googling I found Maven Central repo requires HTTPS now:
https://support.sonatype.com/hc/en-us/articles/360041287334

Effective January 15, 2020, The Central Repository no longer supports insecure communication over plain HTTP and requires that all requests to the repository are encrypted over HTTPS.

Could you please update build settings accordingly?

Build error output:

+ git clone https://github.com/itadventurer/kafka-backup.git
Cloning into 'kafka-backup'...
+ cd kafka-backup
+ ./gradlew shadowJar
Downloading https://services.gradle.org/distributions/gradle-5.1.1-all.zip
............................................................................................................................
Unzipping /root/.gradle/wrapper/dists/gradle-5.1.1-all/97z1ksx6lirer3kbvdnh7jtjg/gradle-5.1.1-all.zip to /root/.gradle/wrapper/dists/gradle-5.1.1-all/97z1ksx6lirer3kbvdnh7jtjg
Set executable permissions for: /root/.gradle/wrapper/dists/gradle-5.1.1-all/97z1ksx6lirer3kbvdnh7jtjg/gradle-5.1.1/bin/gradle

Welcome to Gradle 5.1.1!

Here are the highlights of this release:
 - Control which dependencies can be retrieved from which repositories
 - Production-ready configuration avoidance APIs

For more details see https://docs.gradle.org/5.1.1/release-notes.html

Starting a Gradle Daemon (subsequent builds will be faster)

FAILURE: Build failed with an exception.

* What went wrong:
Could not determine the dependencies of task ':shadowJar'.
> Could not resolve all dependencies for configuration ':runtimeClasspath'.
   > Could not resolve org.slf4j:slf4j-api:1.7.26.
     Required by:
         project :
      > Could not resolve org.slf4j:slf4j-api:1.7.26.
         > Could not get resource 'http://repo.maven.apache.org/maven2/org/slf4j/slf4j-api/1.7.26/slf4j-api-1.7.26.pom'.
            > Could not GET 'http://repo.maven.apache.org/maven2/org/slf4j/slf4j-api/1.7.26/slf4j-api-1.7.26.pom'. Received status code 501 from server: HTTPS Required
   > Could not resolve com.fasterxml.jackson.core:jackson-core:2.9.9.
     Required by:
         project :
      > Skipped due to earlier error
   > Could not resolve com.fasterxml.jackson.core:jackson-databind:2.9.9.
     Required by:
         project :
      > Could not resolve com.fasterxml.jackson.core:jackson-databind:2.9.9.
         > Could not get resource 'http://repo.maven.apache.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.9.9/jackson-databind-2.9.9.pom'.
            > Could not GET 'http://repo.maven.apache.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.9.9/jackson-databind-2.9.9.pom'. Received status code 501 from server: HTTPS Required
   > Could not resolve net.sf.jopt-simple:jopt-simple:6.0-alpha-3.
     Required by:
         project :
      > Skipped due to earlier error

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights.

* Get more help at https://help.gradle.org

BUILD FAILED in 39s

Decrease verbosity of Restore CLI

Currently it is very verbose as it is logging all Kafka Connect INFO logs. It should show log only WARN and ERRORs and information regarding Kafka Backup. Everything else should be written to the log file but not to stdout

Integration Tests

We need an automatic test suite that runs for PRs with good coverage.

We should also test already fixed issues:

  • NPE in RecordSerde.java::write() when records have null values #14
  • Restore can fail when at least one partition was empty #17

Previous Discussion:

@itadventurer Do you have a good idea how to do the system tests (what to use for?) – You can find some tests in system_tests – but they need to be executed and checked manually :( – In the long run this is obviously not a good idea

@gstuder-ona @itadventurer yeah, it's a little tricky. Off the top of my head TravisCI + Docker + https://github.com/wurstmeister/kafka-docker could do what we want here? I've set up something similar in the past. To get confidence to use this in a real setup we may need to do something equivalent anyway...

@itadventurer Yeah, I also thought about something like this ;) How did you do test automation afterwards in the past? Using Bash scripts?

@gstuder-ona Nope - python - the closest equivalent in Java world probably is groovy? Though the bash isn't really an issue for me - probably better in a "real" language but all my deployments are linux-y, so...
Maybe getting TravisCI or something running with a docker-compose'd kafka would work basically with what's there right now, aside from some "docker exec" prefixes? Then you can refactor the bash utils into whatever format you like (JUnit? or whatever the cool kids are using nowadays - I've been out of the JVM for a few years).
(Using "you" in the generic sense here :-) happy to help if I manage the time - got an open testing ticket for this on my side, so I'll see...)

kafka-backup sink task is failing

I'm playing with kafka-backup using SSL and it seems failing at very beginning of actual backup.
This happens after

...
[2020-01-17 10:05:31,879] INFO Resetting offset for tenancity.chrono-billable-datasink-0 based upon existing consumer group offsets or, if there are none, the consumer's 'auto.offset.reset' value. (de.azapps.kafkabackup.sink.BackupSinkTask:93)
[2020-01-17 10:05:31,879] INFO [Consumer clientId=connector-consumer-chrono-qa-backup-sink-0, groupId=connect-chrono-qa-backup-sink] Seeking to offset 266640 for partition greenie.raw-data-6 (org.apache.kafka.clients.consumer.KafkaConsumer:1564)
[2020-01-17 10:05:31,880] INFO [Consumer clientId=connector-consumer-chrono-qa-backup-sink-0, groupId=connect-chrono-qa-backup-sink] Seeking to offset 251914 for partition greenie.raw-data-1 (org.apache.kafka.clients.consumer.KafkaConsumer:1564)
[2020-01-17 10:05:31,880] INFO [Consumer clientId=connector-consumer-chrono-qa-backup-sink-0, groupId=connect-chrono-qa-backup-sink] Seeking to offset 34697 for partition tenancity.invalid-post-validation-4 (org.apache.kafka.clients.consumer.KafkaConsumer:1564)
[2020-01-17 10:05:31,880] INFO [Consumer clientId=connector-consumer-chrono-qa-backup-sink-0, groupId=connect-chrono-qa-backup-sink] Seeking to offset 486093 for partition tenancity.billable-meter-data-3 (org.apache.kafka.clients.consumer.KafkaConsumer:1564)
[2020-01-17 10:05:37,709] ERROR WorkerSinkTask{id=chrono-qa-backup-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:559)
java.lang.NullPointerException
        at de.azapps.kafkabackup.common.record.RecordSerde.write(RecordSerde.java:98)
        at de.azapps.kafkabackup.common.segment.SegmentWriter.append(SegmentWriter.java:72)
        at de.azapps.kafkabackup.common.partition.PartitionWriter.append(PartitionWriter.java:57)
        at de.azapps.kafkabackup.sink.BackupSinkTask.put(BackupSinkTask.java:60)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
[2020-01-17 10:05:37,750] INFO WorkerSinkTask{id=chrono-qa-backup-sink-0} Committing offsets synchronously using sequence number 1: [...LARGE OUTPUT HERE...]
[2020-01-17 10:05:37,788] ERROR WorkerSinkTask{id=chrono-qa-backup-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException
        at de.azapps.kafkabackup.common.record.RecordSerde.write(RecordSerde.java:98)
        at de.azapps.kafkabackup.common.segment.SegmentWriter.append(SegmentWriter.java:72)
        at de.azapps.kafkabackup.common.partition.PartitionWriter.append(PartitionWriter.java:57)
        at de.azapps.kafkabackup.sink.BackupSinkTask.put(BackupSinkTask.java:60)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
        ... 10 more
[2020-01-17 10:05:37,789] ERROR WorkerSinkTask{id=chrono-qa-backup-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
[2020-01-17 10:05:37,794] INFO Stopped BackupSinkTask (de.azapps.kafkabackup.sink.BackupSinkTask:128)
[2020-01-17 10:05:37,804] INFO [Consumer clientId=connector-consumer-chrono-qa-backup-sink-0, groupId=connect-chrono-qa-backup-sink] Revoke previously assigned partitions [... LARGE OUTPUT HERE ...]
[2020-01-17 10:05:37,805] INFO [Consumer clientId=connector-consumer-chrono-qa-backup-sink-0, groupId=connect-chrono-qa-backup-sink] Member connector-consumer-chrono-qa-backup-sink-0-13ce20de-a2c0-4ce8-afcf-b08b11dae1fd sending LeaveGroup request to coordinator spdachronokafkaq1.node.spda:9093 (id: 2147483646 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:916)

Then process still persists but seems doing nothing...
I'm using Kafka 2.4 build with scala 2.12. Connection to kafka cluster is made via SSL but locally on one of cluster nodes. Kafka and kafka-backup are both in docker containers in host network. Both containers are based on adoptopenjdk/openjdk11:ubuntu-jre.

NullpointerException when Record Timestamp is null

First of all thanks for this project!
When performing a kafka backup, it fails with nullpointer exception because of the first record of a topic has a "null" timestamp:

Record{topic: kafka.backup.status, partition: 3, offset: 82, key: byte[31], value: byte[79], timestampType: CreateTime, timestamp: 1585135162346, headers: ConnectHeaders(headers=)}
Record{topic: kafka.backup.status, partition: 3, offset: 83, key: byte[31], value: byte[82], timestampType: CreateTime, timestamp: 1585135165458, headers: ConnectHeaders(headers=)}
Record{topic: kafka.backup.status, partition: 3, offset: 84, key: byte[31], value: byte[79], timestampType: CreateTime, timestamp: 1585135732945, headers: ConnectHeaders(headers=)}
Record{topic: kafka.backup.status, partition: 3, offset: 85, key: byte[31], value: byte[1537], timestampType: CreateTime, timestamp: 1585135737340, headers: ConnectHeaders(headers=)}
Record{topic: kafka.backup.status, partition: 3, offset: 86, key: byte[31], value: byte[79], timestampType: CreateTime, timestamp: 1585136057716, headers: ConnectHeaders(headers=)}
Record{topic: cdc.minio.autoportal, partition: 0, offset: 0, key: byte[42], value: byte[1166], timestampType: CreateTime, timestamp: null, headers: ConnectHeaders(headers=)}
[2020-03-25 11:34:22,187] ERROR WorkerSinkTask{id=kafka-backup-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
java.lang.NullPointerException
        at de.azapps.kafkabackup.common.record.RecordSerde.write(RecordSerde.java:102)
        at de.azapps.kafkabackup.common.segment.SegmentWriter.append(SegmentWriter.java:75)
        at de.azapps.kafkabackup.common.partition.PartitionWriter.append(PartitionWriter.java:57)
        at de.azapps.kafkabackup.sink.BackupSinkTask.put(BackupSinkTask.java:68)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

I fixed the problem by changing de.azapps.kafkabackup.common.record.RecordSerde.java:

 public static void write(OutputStream outputStream, Record record) throws IOException {
        Converter converter = new AlreadyBytesConverter();
        DataOutputStream dataStream = new DataOutputStream(outputStream);
        dataStream.writeLong(record.kafkaOffset());
        dataStream.writeInt(record.timestampType().id);
        if (record.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) {
            if (record.timestamp() != null){
                dataStream.writeLong(record.timestamp());
            }else{
                dataStream.writeLong(0);
            }

        }
...

Actually I am not sure if I messed up my records or if it is a bug in the code
Greetings

PartitionWriter Initialisation against kafka start offset

In PartitonWriter constructor I can see the following:
currentSegment = new SegmentWriter(topic, partition, 0, topicDir); // do not forget to add the current segment to the partition index. Even if it is empty partitionIndex.appendSegment(currentSegment.filePrefix(), 0);

You hard-code startOffset to 0, ideally for topics that have a ttl so don't start at 0 anymore we would like to see the startoffset being another number.

What would be according to you the best way to handle this ?

Also when a set of file have been created with 0 as starting point when the earliest offset if for example 1234. Can the partition cli restore task use the content of the file first index rather than the name regex of the file ?

Support point-in-time backups

This is a one-off tool (means it does not need to run in background after backup is done), so the reliance on background daemon process is funny. There is no need to run kafka-connect as a daemon at all.

Please publish binary releases

This is intended as a tool, so please publish binary release, it is not fun trying to compile code every time I would like to use it.

Unit Tests

  • Data Structures
  • Index data structures
  • (De)Serializers

Exception when creating backup of records with headers

Dear Anatoly,

after updating the Kafka stack to version 5.4.1 and building the latest Kafk Backup commit of the master branch, we still run into issues with the backup sink that are similar to the issues with the restore source (#39).

We finally figured out that the backup sink fails because our records have headers.
We adapted your system test framework to add a single static header to each generated message (see this fork commit). In result, the simple test fails.

More specifically, the backup sink fails:

$ curl http://localhost:8083/connectors/backup-sink/status
...
{"name":"backup-sink","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"connect:8083","trace":"..."}],"type":"sink"}

Here is the wrapped trace:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: backup-test-1partition error: Not a byte array! false
at de.azapps.kafkabackup.common.AlreadyBytesConverter.fromConnectData(AlreadyBytesConverter.java:19)
at de.azapps.kafkabackup.common.record.RecordSerde.write(RecordSerde.java:121)
at de.azapps.kafkabackup.common.segment.SegmentWriter.append(SegmentWriter.java:78)
at de.azapps.kafkabackup.common.partition.PartitionWriter.append(PartitionWriter.java:57)
at de.azapps.kafkabackup.sink.BackupSinkTask.put(BackupSinkTask.java:68)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
... 10 more

The inner DataException refers to the value false which is the value of the static header we've added. Whatever header(s) we define, this exception is always being thrown for the first header's value.

As a result, no data is written to the test topic on to-resume-kafka that could be consumed by the system test to verify restored records.

Here is the full output of the coyote run:

$ coyote -c 01_simple_roundtrip_test.yaml 
2020/04/14 17:04:49 Starting coyote-tester
2020/04/14 17:04:49 Starting processing group: [ Setup Cluster to Backup ]
2020/04/14 17:04:52 Success, command 'docker-compose up -d', test 'Docker Compose Up'. Stdout: ""
2020/04/14 17:05:26 Success, command 'bash -c '
  echo "Trying to reach Kafka Connect. Try "
  for ((i=0;i<60;i++)); do
    docker-compose exec -T to-backup-connect curl "http://localhost:8083/connectors" && docker-compose exec -T restore-to-connect curl "http://localhost:8083/connectors" && break;
    echo "$i/60"
    sleep 10;
  done'
', test 'Wait for Connect to get up'. Stdout: "Trying to reach Kafka Connect. Try \n0/60\n1/60\n2/60\n[][]"
2020/04/14 17:05:26 Starting processing group: [ Create  Topic for tests ]
2020/04/14 17:05:29 Success, command 'docker-compose exec -T to-backup-kafka runutil create_topic backup-test-1partition 1', test ''. Stdout: ""
2020/04/14 17:05:29 Starting processing group: [ Produce Messages ]
2020/04/14 17:05:31 Success, command 'docker-compose exec -T to-backup-kafka runutil produce_messages backup-test-1partition 0 0 300', test 'Produce 300 messages'. Stdout: ""
2020/04/14 17:05:31 Starting processing group: [ Consume messages ]
2020/04/14 17:05:37 Success, command 'docker-compose exec -T to-backup-kafka runutil consume_messages backup-test-1partition cg-100 100', test 'Consume 100 messages with cg-100'. Stdout: "Consumed 100 messages\n"
2020/04/14 17:05:43 Success, command 'docker-compose exec -T to-backup-kafka runutil consume_messages backup-test-1partition cg-200 200', test 'Consume 200 messages with cg-200'. Stdout: "Consumed 200 messages\n"
2020/04/14 17:05:49 Success, command 'docker-compose exec -T to-backup-kafka runutil consume_messages backup-test-1partition cg-300 300', test 'Consume 300 messages with cg-300'. Stdout: "Consumed 300 messages\n"
2020/04/14 17:05:49 Starting processing group: [ Check Consumer Group Offsets ]
2020/04/14 17:05:52 Success, command 'docker-compose exec -T to-backup-kafka runutil kafka_group_describe cg-100', test 'Check Consumer Group cg-100'. Stdout: "\nGROUP           TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID\ncg-100          backup-test-1partition 0          100             300             200             -               -               -\n"
2020/04/14 17:05:54 Success, command 'docker-compose exec -T to-backup-kafka runutil kafka_group_describe cg-200', test 'Check Consumer Group cg-200'. Stdout: "\nGROUP           TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID\ncg-200          backup-test-1partition 0          200             300             100             -               -               -\n"
2020/04/14 17:05:57 Success, command 'docker-compose exec -T to-backup-kafka runutil kafka_group_describe cg-300', test 'Check Consumer Group cg-200'. Stdout: "\nGROUP           TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID\ncg-300          backup-test-1partition 0          300             300             0               -               -               -\n"
2020/04/14 17:05:57 Starting processing group: [ Start Kafka Backup ]
2020/04/14 17:05:58 Success, command 'docker-compose exec -T to-backup-kafka runutil rm -rf "/kafka-backup/001_simple_1partition_test/"', test 'Clean previous data'. Stdout: ""
2020/04/14 17:05:58 Success, command 'docker-compose exec -T to-backup-connect
  curl -vs --stderr - -X POST -H "Content-Type: application/json"
       --data @-
       "http://localhost:8083/connectors"
', test 'Create an Kafka Backup Connector'. Stdout: "* Hostname was NOT found in DNS cache\n*   Trying 127.0.0.1...\n* Connected to localhost (127.0.0.1) port 8083 (#0)\n> POST /connectors HTTP/1.1\r\n> User-Agent: curl/7.38.0\r\n> Host: localhost:8083\r\n> Accept: */*\r\n> Content-Type: application/json\r\n> Content-Length: 477\r\n> \r\n} [data not shown]\n* upload completely sent off: 477 out of 477 bytes\n< HTTP/1.1 201 Created\r\n< Date: Tue, 14 Apr 2020 15:05:58 GMT\r\n< Location: http://localhost:8083/connectors/backup-sink\r\n< Content-Type: application/json\r\n< Content-Length: 477\r\n* Server Jetty(9.4.20.v20190813) is not blacklisted\n< Server: Jetty(9.4.20.v20190813)\r\n< \r\n{ [data not shown]\n{\"name\":\"backup-sink\",\"config\":{\"connector.class\":\"de.azapps.kafkabackup.sink.BackupSinkConnector\",\"tasks.max\":\"1\",\"topics.regex\":\"backup-test.*\",\"key.converter\":\"de.azapps.kafkabackup.common.AlreadyBytesConverter\",\"value.converter\":\"de.azapps.kafkabackup.common.AlreadyBytesConverter\",\"target.dir\":\"/kafka-backup/001_simple_1partition_test/\",\"max.segment.size.bytes\":\"10485760\",\"cluster.bootstrap.servers\":\"to-backup-kafka:9092\",\"name\":\"backup-sink\"},\"tasks\":[],\"type\":\"sink\"}* Connection #0 to host localhost left intact\n"
2020/04/14 17:06:28 Success, command 'sleep 30', test ''. Stdout: ""
2020/04/14 17:06:28 Starting processing group: [ Stop Cluster that was backed up ]
2020/04/14 17:06:32 Success, command 'docker-compose stop to-backup-kafka', test 'Docker Compose Down'. Stdout: ""
2020/04/14 17:06:32 Starting processing group: [ Restore ]
2020/04/14 17:06:35 Success, command 'docker-compose exec -T restore-to-kafka runutil create_topic backup-test-1partition 1', test 'Create Topic'. Stdout: ""
2020/04/14 17:06:36 Success, command 'docker-compose exec -T restore-to-connect
  curl -vs --stderr - -X POST -H "Content-Type: application/json"
       --data @-
       "http://localhost:8083/connectors"
', test 'Create an Kafka Backup Restore Connector'. Stdout: "* Hostname was NOT found in DNS cache\n*   Trying 127.0.0.1...\n* Connected to localhost (127.0.0.1) port 8083 (#0)\n> POST /connectors HTTP/1.1\r\n> User-Agent: curl/7.38.0\r\n> Host: localhost:8083\r\n> Accept: */*\r\n> Content-Type: application/json\r\n> Content-Length: 661\r\n> \r\n} [data not shown]\n* upload completely sent off: 661 out of 661 bytes\n< HTTP/1.1 201 Created\r\n< Date: Tue, 14 Apr 2020 15:06:36 GMT\r\n< Location: http://localhost:8083/connectors/backup-source\r\n< Content-Type: application/json\r\n< Content-Length: 655\r\n* Server Jetty(9.4.20.v20190813) is not blacklisted\n< Server: Jetty(9.4.20.v20190813)\r\n< \r\n{ [data not shown]\n{\"name\":\"backup-source\",\"config\":{\"connector.class\":\"de.azapps.kafkabackup.source.BackupSourceConnector\",\"tasks.max\":\"1\",\"topics\":\"backup-test-1partition\",\"key.converter\":\"de.azapps.kafkabackup.common.AlreadyBytesConverter\",\"value.converter\":\"de.azapps.kafkabackup.common.AlreadyBytesConverter\",\"source.dir\":\"/kafka-backup/001_simple_1partition_test/\",\"batch.size\":\"1000\",\"cluster.bootstrap.servers\":\"restore-to-kafka:9092\",\"cluster.key.deserializer\":\"org.apache.kafka.common.serialization.ByteArrayDeserializer\",\"cluster.value.deserializer\":\"org.apache.kafka.common.serialization.ByteArrayDeserializer\",\"name\":\"backup-source\"},\"tasks\":[],\"type\":\"source\"}* Connection #0 to host localhost left intact\n"
2020/04/14 17:07:06 Success, command 'sleep 30', test ''. Stdout: ""
2020/04/14 17:07:06 Starting processing group: [ Verify Backup ]

The last step blocks infinitely (in theory, we waited 15 min).

I think this is a bug in your implementation that we would love to see fixed.

Thank you for your project and best regards
Sebastian

Option to exit process after the backup is completed?

I would love to run this as a cli tool. Finally figured out configuration to run this with connect-standalone, but it hangs around after the backup is done, so I need to interactively Ctrl-C it to stop the process, which results in:

[2020-04-08 09:35:45,458] ERROR Plugin class loader for connector: 'de.azapps.kafkabackup.sink.BackupSinkConnector' was not found. Returning: org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@62e20a76 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)

Yet another NPE

Just hit into NPE below yesterday (using commit 3c95089). Tried today with latest commit from master (f30b9ad) though it's still here. Output below is from latest version.

What's changed. I did migration to eCryptfs. I stopped kafka-backup, renamed target dir, emptied and chattr +i'd backup sink config (to prevent kafka-backup to be started by Puppet again). Then I deployed eCryptfs changes, did rsync back, then un-chattr +i'd it and reapplied Puppet.

Now main question should we try to debug this at all? Or should I just wipe it and do another fresh backup? This is QA so we have some time in case.

[2020-03-17 02:23:47,321] INFO [Consumer clientId=connector-consumer-chrono_qa-backup-sink-0, groupId=connect-chrono_qa-backup-sink] Setting offset for partition [redacted].chrono-billable-datasink-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=kafka5.node:9093 (id: 5 rack: null), epoch=187}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:762)
[2020-03-17 02:23:47,697] ERROR WorkerSinkTask{id=chrono_qa-backup-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
java.lang.NullPointerException
        at de.azapps.kafkabackup.sink.BackupSinkTask.close(BackupSinkTask.java:122)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:397)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:591)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
[2020-03-17 02:23:47,705] ERROR WorkerSinkTask{id=chrono_qa-backup-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
[2020-03-17 02:23:47,705] INFO Stopped BackupSinkTask (de.azapps.kafkabackup.sink.BackupSinkTask:139)

Restore can fail when at least one partition was empty

If you have a topic backup where the count of entries is smaller than the count of partitions. You will get an IndexOutOfBoundsException when you try to restore the backup of the topic. Error message is Could not find offset 0 in topic test01, segment segment_partition_005_from_offset_0000000000 Maybe you add a flag when storing the partition, that it is empty, or you modify how empty partitions are handled (e.g. printing a warning message to the log).
Full Stacktrace:

java.lang.IndexOutOfBoundsException: Could not find offset 0 in topic test01, segment segment_partition_005_from_offset_0000000000
        at de.azapps.kafkabackup.common.segment.SegmentReader.seek(SegmentReader.java:50)
        at de.azapps.kafkabackup.common.partition.PartitionReader.seek(PartitionReader.java:49)
        at de.azapps.kafkabackup.common.partition.PartitionReader.<init>(PartitionReader.java:34)
        at de.azapps.kafkabackup.source.BackupSourceTask.lambda$findPartitions$0(BackupSourceTask.java:85)
        at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
        at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
        at de.azapps.kafkabackup.source.BackupSourceTask.findPartitions(BackupSourceTask.java:79)
        at de.azapps.kafkabackup.source.BackupSourceTask.start(BackupSourceTask.java:46)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:202)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)

build/compile errors

./gradlew shadowJar results in the below error for me 😞

Appreciate it if you could provide some pointers on fixing this issue, Also do we have any pre-built docker image for kafka-backup? Thanks 🙏

./gradlew shadowJar

> Task :compileJava FAILED
/Users/prasadzone/kafka-backup/src/main/java/de/azapps/kafkabackup/common/segment/SegmentReader.java:49: error: cannot find symbol
        if (optionalPosition.isEmpty()) {
                            ^
  symbol:   method isEmpty()
  location: variable optionalPosition of type Optional<Long>
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
1 error

FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':compileJava'.
> Compilation failed; see the compiler error output for details.

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights.

* Get more help at https://help.gradle.org

BUILD FAILED in 1s
1 actionable task: 1 executed

How to calculate Heap and RAM requirements for Kafka Backup

seems -Xmx1024M or Docker memory_limit=1152M is not enough for this cluster :( Any ideas about how to calculate HEAP/RAM size for kafka-backup?

I have currently no idea how to calculate this – let us discuss how to approximate the Heap and Memory usage.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.