after updating the Kafka stack to version 5.4.1 and building the latest Kafk Backup commit of the master
branch, we still run into issues with the backup sink that are similar to the issues with the restore source (#39).
We finally figured out that the backup sink fails because our records have headers.
We adapted your system test framework to add a single static header to each generated message (see this fork commit). In result, the simple test fails.
$ curl http://localhost:8083/connectors/backup-sink/status
...
{"name":"backup-sink","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"connect:8083","trace":"..."}],"type":"sink"}
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: backup-test-1partition error: Not a byte array! false
at de.azapps.kafkabackup.common.AlreadyBytesConverter.fromConnectData(AlreadyBytesConverter.java:19)
at de.azapps.kafkabackup.common.record.RecordSerde.write(RecordSerde.java:121)
at de.azapps.kafkabackup.common.segment.SegmentWriter.append(SegmentWriter.java:78)
at de.azapps.kafkabackup.common.partition.PartitionWriter.append(PartitionWriter.java:57)
at de.azapps.kafkabackup.sink.BackupSinkTask.put(BackupSinkTask.java:68)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
... 10 more
$ coyote -c 01_simple_roundtrip_test.yaml
2020/04/14 17:04:49 Starting coyote-tester
2020/04/14 17:04:49 Starting processing group: [ Setup Cluster to Backup ]
2020/04/14 17:04:52 Success, command 'docker-compose up -d', test 'Docker Compose Up'. Stdout: ""
2020/04/14 17:05:26 Success, command 'bash -c '
echo "Trying to reach Kafka Connect. Try "
for ((i=0;i<60;i++)); do
docker-compose exec -T to-backup-connect curl "http://localhost:8083/connectors" && docker-compose exec -T restore-to-connect curl "http://localhost:8083/connectors" && break;
echo "$i/60"
sleep 10;
done'
', test 'Wait for Connect to get up'. Stdout: "Trying to reach Kafka Connect. Try \n0/60\n1/60\n2/60\n[][]"
2020/04/14 17:05:26 Starting processing group: [ Create Topic for tests ]
2020/04/14 17:05:29 Success, command 'docker-compose exec -T to-backup-kafka runutil create_topic backup-test-1partition 1', test ''. Stdout: ""
2020/04/14 17:05:29 Starting processing group: [ Produce Messages ]
2020/04/14 17:05:31 Success, command 'docker-compose exec -T to-backup-kafka runutil produce_messages backup-test-1partition 0 0 300', test 'Produce 300 messages'. Stdout: ""
2020/04/14 17:05:31 Starting processing group: [ Consume messages ]
2020/04/14 17:05:37 Success, command 'docker-compose exec -T to-backup-kafka runutil consume_messages backup-test-1partition cg-100 100', test 'Consume 100 messages with cg-100'. Stdout: "Consumed 100 messages\n"
2020/04/14 17:05:43 Success, command 'docker-compose exec -T to-backup-kafka runutil consume_messages backup-test-1partition cg-200 200', test 'Consume 200 messages with cg-200'. Stdout: "Consumed 200 messages\n"
2020/04/14 17:05:49 Success, command 'docker-compose exec -T to-backup-kafka runutil consume_messages backup-test-1partition cg-300 300', test 'Consume 300 messages with cg-300'. Stdout: "Consumed 300 messages\n"
2020/04/14 17:05:49 Starting processing group: [ Check Consumer Group Offsets ]
2020/04/14 17:05:52 Success, command 'docker-compose exec -T to-backup-kafka runutil kafka_group_describe cg-100', test 'Check Consumer Group cg-100'. Stdout: "\nGROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID\ncg-100 backup-test-1partition 0 100 300 200 - - -\n"
2020/04/14 17:05:54 Success, command 'docker-compose exec -T to-backup-kafka runutil kafka_group_describe cg-200', test 'Check Consumer Group cg-200'. Stdout: "\nGROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID\ncg-200 backup-test-1partition 0 200 300 100 - - -\n"
2020/04/14 17:05:57 Success, command 'docker-compose exec -T to-backup-kafka runutil kafka_group_describe cg-300', test 'Check Consumer Group cg-200'. Stdout: "\nGROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID\ncg-300 backup-test-1partition 0 300 300 0 - - -\n"
2020/04/14 17:05:57 Starting processing group: [ Start Kafka Backup ]
2020/04/14 17:05:58 Success, command 'docker-compose exec -T to-backup-kafka runutil rm -rf "/kafka-backup/001_simple_1partition_test/"', test 'Clean previous data'. Stdout: ""
2020/04/14 17:05:58 Success, command 'docker-compose exec -T to-backup-connect
curl -vs --stderr - -X POST -H "Content-Type: application/json"
--data @-
"http://localhost:8083/connectors"
', test 'Create an Kafka Backup Connector'. Stdout: "* Hostname was NOT found in DNS cache\n* Trying 127.0.0.1...\n* Connected to localhost (127.0.0.1) port 8083 (#0)\n> POST /connectors HTTP/1.1\r\n> User-Agent: curl/7.38.0\r\n> Host: localhost:8083\r\n> Accept: */*\r\n> Content-Type: application/json\r\n> Content-Length: 477\r\n> \r\n} [data not shown]\n* upload completely sent off: 477 out of 477 bytes\n< HTTP/1.1 201 Created\r\n< Date: Tue, 14 Apr 2020 15:05:58 GMT\r\n< Location: http://localhost:8083/connectors/backup-sink\r\n< Content-Type: application/json\r\n< Content-Length: 477\r\n* Server Jetty(9.4.20.v20190813) is not blacklisted\n< Server: Jetty(9.4.20.v20190813)\r\n< \r\n{ [data not shown]\n{\"name\":\"backup-sink\",\"config\":{\"connector.class\":\"de.azapps.kafkabackup.sink.BackupSinkConnector\",\"tasks.max\":\"1\",\"topics.regex\":\"backup-test.*\",\"key.converter\":\"de.azapps.kafkabackup.common.AlreadyBytesConverter\",\"value.converter\":\"de.azapps.kafkabackup.common.AlreadyBytesConverter\",\"target.dir\":\"/kafka-backup/001_simple_1partition_test/\",\"max.segment.size.bytes\":\"10485760\",\"cluster.bootstrap.servers\":\"to-backup-kafka:9092\",\"name\":\"backup-sink\"},\"tasks\":[],\"type\":\"sink\"}* Connection #0 to host localhost left intact\n"
2020/04/14 17:06:28 Success, command 'sleep 30', test ''. Stdout: ""
2020/04/14 17:06:28 Starting processing group: [ Stop Cluster that was backed up ]
2020/04/14 17:06:32 Success, command 'docker-compose stop to-backup-kafka', test 'Docker Compose Down'. Stdout: ""
2020/04/14 17:06:32 Starting processing group: [ Restore ]
2020/04/14 17:06:35 Success, command 'docker-compose exec -T restore-to-kafka runutil create_topic backup-test-1partition 1', test 'Create Topic'. Stdout: ""
2020/04/14 17:06:36 Success, command 'docker-compose exec -T restore-to-connect
curl -vs --stderr - -X POST -H "Content-Type: application/json"
--data @-
"http://localhost:8083/connectors"
', test 'Create an Kafka Backup Restore Connector'. Stdout: "* Hostname was NOT found in DNS cache\n* Trying 127.0.0.1...\n* Connected to localhost (127.0.0.1) port 8083 (#0)\n> POST /connectors HTTP/1.1\r\n> User-Agent: curl/7.38.0\r\n> Host: localhost:8083\r\n> Accept: */*\r\n> Content-Type: application/json\r\n> Content-Length: 661\r\n> \r\n} [data not shown]\n* upload completely sent off: 661 out of 661 bytes\n< HTTP/1.1 201 Created\r\n< Date: Tue, 14 Apr 2020 15:06:36 GMT\r\n< Location: http://localhost:8083/connectors/backup-source\r\n< Content-Type: application/json\r\n< Content-Length: 655\r\n* Server Jetty(9.4.20.v20190813) is not blacklisted\n< Server: Jetty(9.4.20.v20190813)\r\n< \r\n{ [data not shown]\n{\"name\":\"backup-source\",\"config\":{\"connector.class\":\"de.azapps.kafkabackup.source.BackupSourceConnector\",\"tasks.max\":\"1\",\"topics\":\"backup-test-1partition\",\"key.converter\":\"de.azapps.kafkabackup.common.AlreadyBytesConverter\",\"value.converter\":\"de.azapps.kafkabackup.common.AlreadyBytesConverter\",\"source.dir\":\"/kafka-backup/001_simple_1partition_test/\",\"batch.size\":\"1000\",\"cluster.bootstrap.servers\":\"restore-to-kafka:9092\",\"cluster.key.deserializer\":\"org.apache.kafka.common.serialization.ByteArrayDeserializer\",\"cluster.value.deserializer\":\"org.apache.kafka.common.serialization.ByteArrayDeserializer\",\"name\":\"backup-source\"},\"tasks\":[],\"type\":\"source\"}* Connection #0 to host localhost left intact\n"
2020/04/14 17:07:06 Success, command 'sleep 30', test ''. Stdout: ""
2020/04/14 17:07:06 Starting processing group: [ Verify Backup ]
The last step blocks infinitely (in theory, we waited 15 min).
I think this is a bug in your implementation that we would love to see fixed.