Giter Club home page Giter Club logo

mirus's Introduction

Build Status License

Mirus

A tool for distributed, high-volume replication between Apache Kafka clusters based on Kafka Connect. Designed for easy operation in a high-throughput, multi-cluster environment.

Features

  • Dynamic Configuration: Uses the Kafka Connect REST API for dynamic API-driven configuration
  • Precise Replication: Supports a regex whitelist and an explicit topic whitelist
  • Simple Management for Multiple Kafka Clusters: Supports multiple source clusters with one Worker process
  • Continuous Ingestion: Continues consuming from the source cluster while flushing and committing offsets
  • Built for Dynamic Kafka Clusters: Able to handle topics and partitions being created and deleted in source and destination clusters
  • Scalable: Creates a configurable set of worker tasks that are distributed across a Kafka Connect cluster for high performance, even when pushing data over the internet
  • Fault tolerant: Includes a monitor thread that looks for task failures and optionally auto-restarts
  • Monitoring: Includes custom JMX metrics for production ready monitoring and alerting

Overview

Mirus is built around Apache Kafka Connect, providing SourceConnector and SourceTask implementations optimized for reading data from Kafka source clusters. The MirusSourceConnector runs a KafkaMonitor thread, which monitors the source and destination Apache Kafka cluster partition allocations, looking for changes and applying a configurable topic whitelist. Each task is responsible for a subset of the matching partitions, and runs an independent KafkaConsumer and KafkaProducer client pair to do the work of replicating those partitions.

Tasks can be restarted independently without otherwise affecting a running cluster, are monitored continuously for failure, and are optionally automatically restarted.

To understand how Mirus distributes work across a cluser of machines please read the Kafka Connect documentation.

Installation

To build a package containing the Mirus jar file and all dependencies, run mvn package -P all:

  • target/mirus-${project.version}-all.zip

This package can be unzipped for use (see Quick Start).

Maven also builds the following artifacts when you run mvn package. These are useful if you need customized packaging for your own environment:

  • target/mirus-${project.version}.jar: Primary Mirus jar (dependencies not included)
  • target/mirus-${project.version}-run.zip: A package containing the Mirus run control scripts

Usage

These instructions assume you have expanded the mirus-${project.version}-all.zip package.

Mirus Worker Instance

A single Mirus Worker can be started using this helper script.

> bin/mirus-worker-start.sh [worker-properties-file]

worker-properties-file: Path to the Mirus worker properties file, which configures the Kafka Connect framework. See quickstart-worker.properties for an example.

Options:

--override property=value: optional command-line override for any item in the Mirus worker properties file. Multiple override options are supported (similar to the equivalent flag in Kafka).

Mirus Offset Tool

Mirus includes a simple tool for reading and writing offsets. This can be useful for migration from other replication tools, for debugging, and for offset monitoring in production. The tool supports CSV and JSON input and output.

For detailed usage:

> bin/mirus-offset-tool.sh --help

Quick Start

To run the Quick Start example you will need running Kafka and Zookeeper clusters to work with. We will assume you have a standard Apache Kafka Quickstart test cluster running on localhost. Follow the Kafka Quick Start instructions.

For this tutorial we will set up a Mirus worker instance to mirror the topic test in loop-back mode to another topic in the same cluster. To avoid a conflict the destination topic name will be set to test.mirror using the destination.topic.name.suffix configuration option.

  1. Build the full Mirus project using Maven

    > mvn package -P all
    
  2. Unpack the Mirus "all" package

    > mkdir quickstart; cd quickstart; unzip ../target/mirus-*-all.zip
    
  3. Start the quickstart worker using the sample worker properties file

    > bin/mirus-worker-start.sh config/quickstart-worker.properties
    
    
  4. In another terminal, confirm the Mirus Kafka Connect REST API is running

    > curl localhost:8083
    
    {"version":"1.1.0","commit":"fdcf75ea326b8e07","kafka_cluster_id":"xdxNfx84TU-ennOs7EznZQ"}
    
  5. Submit a new MirusSourceConnector configuration to the REST API with the name mirus-quickstart-source

    > curl localhost:8083/connectors/mirus-quickstart-source/config \
          -X PUT \
          -H 'Content-Type: application/json' \
          -d '{
               "name": "mirus-quickstart-source",
               "connector.class": "com.salesforce.mirus.MirusSourceConnector",
               "tasks.max": "5",
               "topics.whitelist": "test",
               "destination.topic.name.suffix": ".mirror",
               "destination.consumer.bootstrap.servers": "localhost:9092",
               "consumer.bootstrap.servers": "localhost:9092",
               "consumer.client.id": "mirus-quickstart",
               "consumer.key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
               "consumer.value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"
           }'
    
  6. Confirm the new connector is running

    > curl localhost:8083/connectors
    
    ["mirus-quickstart-source"]
    
    > curl localhost:8083/connectors/mirus-quickstart-source/status
    
    {"name":"mirus-quickstart-source","connector":{"state":"RUNNING","worker_id":"1.2.3.4:8083"},"tasks":[],"type":"source"}
    
  7. Create source and destination topics test and test.mirror in your Kafka cluster

    > cd ${KAFKA_HOME}
    
    > bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic 'test' --partitions 1 --replication-factor 1
    Created topic "test".
    
    > bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic 'test.mirror' --partitions 1 --replication-factor 1
    Created topic "test.mirror".
    
  8. Mirus should detect that the new source and destination topics are available and create a new Mirus Source Task:

    > curl localhost:8083/connectors/mirus-quickstart-source/status
    
    {"name":"mirus-quickstart-source","connector":{"state":"RUNNING","worker_id":"10.126.22.44:8083"},"tasks":[{"state":"RUNNING","id":0,"worker_id":"10.126.22.44:8083"}],"type":"source"}
    

Any message you write to the topic test will now be mirrored to test.mirror.

REST API

See the documentation for Kafka Connect REST API.

Configuration

Kafka Connect Configuration

Mirus shares most Worker and Source configuration with the Kafka Connect framework. For general information on configuring the framework see:

Mirus Specific Configuration

Mirus-specific configuration properties are documented in these files:

  • Mirus Source Properties These can be added to the JSON config object posted to the REST API /config endpoint to configure a new MirusSourceConnector instance. In addition, the Kafka Consumer instances created by Mirus Tasks can be configured using a consumer. prefix on the standard Kafka Consumer properties. The equivalent KafkaProducer options are configured in the Mirus Worker Properties file (see below). The destination.consumer.prefix can be used to override the properties of the KafkaConsumer that connects to the destination Kafka cluster.

  • Mirus Worker Properties These are Mirus extensions to the Kafka Connect configuration, and should be applied to the Worker Properties file provided at startup. The Kafka Producer instances created by Kafka Connect can also be configured using a producer. prefix on the standard Kafka Producer properties.

Destination Topic Checking

By default, Mirus checks that the destination topic exists in the destination Kafka cluster before starting to replicate data to it. This feature can be disabled by setting the enable.destination.topic.checking config option to false.

As of version 0.2.0, destination topic checking can also support topic re-routing performed by the RegexRouter Single-Message Transformation. No other Router Transformations are supported, so destination topic checking must be disabled in order to use them.

Metrics

Mirus produces some custom metrics in addition to the standard Kafka Connect metrics.

JMX Queries are as follows

Latency (MirrorJmxReporter)

objectName="mirus:type=MirusSourceConnector,topic=*" attribute="replication-latency-ms-max"
objectName="mirus:type=MirusSourceConnector,topic=*" attribute="replication-latency-ms-min"
objectName="mirus:type=MirusSourceConnector,topic=*" attribute="replication-latency-ms-avg"
objectName="mirus:type=MirusSourceConnector,topic=*" attribute="replication-latency-ms-count"

Destination Information (MissingPartitionsJmxReporter)

objectName="mirus:type=mirus" attribute="missing-dest-partitions-count"

Connector Metrics (ConnectorJmxReporter)

objectName="mirus:type=connector-metrics,connector=*" attribute="task-failed-restart-attempts-count"
objectName="mirus:type=connector-metrics,connector=*" attribute="connector-failed-restart-attempts-count"
objectName="mirus:type=connector-metrics,connector=*" attribute="failed-task-count"
objectName="mirus:type=connector-metrics,connector=*" attribute="paused-task-count"
objectName="mirus:type=connector-metrics,connector=*" attribute="destroyed-task-count"
objectName="mirus:type=connector-metrics,connector=*" attribute="running-task-count"
objectName="mirus:type=connector-metrics,connector=*" attribute="unassigned-task-count"

Task Metrics (TaskJmxReporter)

objectName="mirus:type=connector-task-metrics,connector=*" attribute="task-failed-restart-attempts-count"

Developer Info

To preform a release: mvn release:prepare release:perform GPG Keys may need to be passed to maven with -Darguments='-Dgpg.passphrase= -Dgpg.keyname=55Z32RD1'

Discussion

Questions or comments can also be posted on the Mirus Github issues page.

Maintainers

Paul Davidson and contributors.

Code Style

This project uses the Google Java Format.

mirus's People

Contributors

adammelliott avatar arabelle avatar d4v1de avatar dalassi1 avatar ddbrodsky avatar jeremybolster avatar parafiend avatar pdavidson100 avatar renatomefi avatar seratch avatar snyk-bot avatar srahimeen avatar svc-scm avatar yonggang avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

mirus's Issues

Deprecate Prefix & Suffix settings in favor of RegexRouter

AFAICT, the features of prefix and suffix can be done with a SMT

For example

transforms=TopicRename

transforms.TopicRename.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.TopicRename.regex=(.*)
transforms.TopicRename.replacement=mirrored.$1.some-suffix

Excessive logging (Exception thrown while calling task.commitRecord())

Hi,

We have an issue with Mirus version 0.6.9. We are having excessive number of logs with the Exception thrown while calling task.commitRecord().

Exception thrown while calling task.commitRecord() (org.apache.kafka.connect.runtime.WorkerSourceTask)
java.lang.NullPointerException
at com.salesforce.mirus.metrics.MirrorJmxReporter.recordMirrorLatency(MirrorJmxReporter.java:162)
at com.salesforce.mirus.MirusSourceTask.commitRecord(MirusSourceTask.java:241)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:470)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$sendRecords$4(WorkerSourceTask.java:387)
at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:228)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:684)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:655)
at org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:574)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:561)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:561)
at org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$3(Sender.java:785)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:327)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)
at java.base/java.lang.Thread.run(Thread.java:829)

From what I see the actual issue is that topic which is initialized in mirrorJmxExporter and the topic which is passed to recordMirrorLatency method are not the same.
Topics for mirrorJmxExporter are initialized here

this.mirrorJmxReporter.addTopics(topicPartitionList);
and this will be topic which is source.

But topic which is sent to recordMirrorLatency

mirrorJmxReporter.recordMirrorLatency(sourceRecord.topic(), latency);
is taken from sourceRecord.

When SourceRecord is built topic is prefixed/suffixed so it becomes destination topic

String topic = destinationTopicNamePrefix + consumerRecord.topic() + destinationTopicNameSuffix;

So finally when record is passed to mirrorJmxExporter it tries to get something for that topic and breaks.

After compiling the Mirus with changing the topic for recordMirrorLatency with something like this
mirrorJmxReporter.recordMirrorLatency(sourceRecord.sourcePartition().get("topic").toString(), latency) the problem goes away (this would not be correct way to solve this problem, but tried it just to see if it helps).

From what I see the problem got introduced in Mirus in this PR #89

This should be fixed since there is no config option or something similar to disable this behavior.

BR,
Hrvoje

SSL configuration

Hello,
I am evaluating different tools for data kafka replication.
I've read awesome comments about Mirus tool so I've decided give it a try.
Playing with that tool my scenario would be replicating data from k1(noSSL) --> k2(SSL enabled)
What properties should I apply in the worker to get those connections working.
I've got this worker config file.

`# Kafka broker bootstrap server - this is Source cluster
bootstrap.servers=k1Ips:9092
group.id=mirus
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
header.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
config.storage.topic=mirus-config
status.storage.topic=mirus-status
offset.storage.topic=mirus-offsets

producer.security.protocol=SSL
producer.ssl.endpoint.identification.algorithm=""
producer.ssl.truststore.location= trustore.jks
producer.ssl.truststore.password=*************
producer.ssl.keystore.location=keystore.jks
producer.ssl.keystore.password=**********
producer.ssl.key.password=************

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
My connector looks like this:{
"name": "mirus-source",
"connector.class": "com.salesforce.mirus.MirusSourceConnector",
"tasks.max": "5",
"topics.regex.list": "^(.pattern).",
"destination.topic.name.suffix": "",
"enable.destination.topic.checking": "false",
"destination.consumer.bootstrap.servers": "k2Ips:9092",
"consumer.bootstrap.servers": "k1IPs",
"consumer.client.id": "mirus-ot2",
"consumer.key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"consumer.value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}`

do I miss something?

Many thanks beforehand.

Can we manipulate the filter any of the config parameters(eg. password configuration) during the GET call

While doing the GET operation for displaying Task status, I'm seeing passwords in plain text. Is there any way to edit/mask the config parameters (like passwords in my case) while doing the GET call

Below is the sample example:

curl localhost:8083/connectors/source-2-destination/tasks | jq

[
{
"id": {
"connector": "source-2-destination",
"task": 0
},
"config": {
"partitions": "[{"topic":"topicname","partition":0}]",
"consumer.ssl.truststore.password": "<plaintext password>",
"consumer.ssl.endpoint.identification.algorithm": "",
"consumer.group.id": "mirus-source-2-destination-grp",
"consumer.ssl.truststore.location": "/path/.jks",
"consumer.bootstrap.servers": "destination-kafka-cluster:9093",
"task.class": "com.salesforce.mirus.MirusSourceTask",
"consumer.value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"consumer.security.protocol": "SSL",
"consumer.ssl.keystore.location": "/path/keystore.jks",
"consumer.ssl.keystore.password": "<plaintext password>",
"consumer.client.id": "source-2-destinationsource-2-destination-0",
"consumer.key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}
},

Mirus failing to start when brokers are deployed to new ip's

Hi Team,

I had a mirus pipeline working on a source Kafka cluster to a destination server and I used DNS connection url in the connection string.

I spun up destination Kafka cluster in new servers for upgrading Kafka from 1.1 to 2.1.

Since then, Mirus is failing to start with the below exception:

[2019-03-27 09:21:21,458] ERROR Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
org.apache.kafka.connect.errors.ConnectException: Error while attempting to create/find topic(s) 'mirus-offsets'
	at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:255)
	at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:99)
	at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:127)
	at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:109)
	at org.apache.kafka.connect.runtime.Worker.start(Worker.java:174)
	at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:114)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:215)
	at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.util.concurrent.FutureTask.run(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.]
	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
	at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:228)
	... 11 more
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.]
[2019-03-27 09:21:21,480] INFO DefaultSessionIdManager workerName=node0 (org.eclipse.jetty.server.session)

Mirus stops after writing the above error in the logs.

I also verified that the mirus-* topics have required ACL's added on them.

Please advice on how to fix it.

Configuring the Mirus quickstart to replicate from one system to another

I'm new to Kafka and Mirus and have been experimenting with the quickstarts for both of them. It was pretty easy to get the default configuration going (hats off to the Mirus team) on a Centos VM, but now I'd to like to set up replication from one instance of Kafka to another that is on a separate VM. Changing destination.bootstrap.servers to the IP of second VM had no effect. Assuming this is possible with the Mirus quickstart can someone advise on my next steps? Thank you.

Getting similar function as "measuring lag" (mirrormaker) in mirus

Dear Mirus team,

We would like to monitor how far is topic replica from one cluster to another using mirus. Using mirror-maker we use the following to monitor "lag":

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mirroring-data --describe

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID

Where LAG is obtained from log-end-offset (highwatermark) minus current-offset.

How can we do a similar function using mirus? We can access to mirus jmx values.

Thanks,
Pablo.

In kafka connect we can access the latest offset in the group_offset connect topic.
kafka-console-consumer.sh --bootstrap-server localhost:9092 --property print.key=true --topic mirus-offsets

Theres a KIP in kafka KIP-196 in WIP for Add metrics to Kafka Connect framework.

Plans for migrate topic metadata beyond partition count

I had a look around and I believe the tool is able to identify if the partition count between source and destination topic is the same.

Is there any plans to be able to replicate other topic metadata like retention, max message size, etc?

Thanks.

Mirus deployed in k8s pod is failing with "java.lang.OutOfMemoryError"

Our Mirus service running in K8s pod is filing with "java.lang.OutOfMemoryError". Below is the error that we are seeing in our Mirus logs.

java.lang.OutOfMemoryError: Java heap space
Dumping heap to /tmp/heapdump.bin ...
Heap dump file created [2116271685 bytes in 3.475 secs]

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Thread-2"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-33"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-14"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-13"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-1"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "server-timer"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-3"
*** java.lang.instrument ASSERTION FAILED ***: "!errorOutstanding" with message can't create name string at JPLISAgent.c line: 807

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-2"
*** java.lang.instrument ASSERTION FAILED ***: "!errorOutstanding" with message can't create name string at JPLISAgent.c line: 807

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-24"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-23"

We tried to increase Max heap size from 2 GB to 5 GB but still issue is not fixed. When checked the heap dumps, we found that the topic message data is occupying most the heap space. We also also tried by configuring the producer with below configs:

    producer.buffer.memory=10000000
    producer.batch.size=1000
    producer.acks=1

Please suggest on how to over come this issue.

Can Mirus run as a Kafka Connect connector?

We already have a Kafka Connect cluster, so it would be ideal to run Mirus as yet another connector in our cluster instead of running it as a separate server. Is this possible?

Thanks.

Enabling SSL connectivity

Hi Mirus team,

I'm able to mirror data from source to destination for a topic with open (*) permissions on port 9092. But, I'm not able to setup/mirror data over SSL connection. Can you help with the steps/document (if any) to set SSL connectivity.

Regards,
Hari

OffsetSerDeTest fails on Windows due to line separator

Currently both cvsList and jsonList end their Strings with \n:

"connector-id,topic,1,123\n",

This causes the tests to fail on Windows.

The tests should either not include the line separator or take into account the System.lineSeparator()

Default consumer offset reset policy - earliest

By default Kafka Consumer has auto.offset.reset policy configured to latest. But it looks that the implementation in this connector is reverse -- unless it is configured it will start with earliest.

String offsetReset = (String) consumerProperties.get("auto.offset.reset");
if ("latest".equalsIgnoreCase(offsetReset)) {
logger.trace("Seeking to end");
consumer.seekToEnd(Collections.singletonList(tp));
} else {
logger.trace("Seeking to beginning");
consumer.seekToBeginning(Collections.singletonList(tp));

And that value is extracted from the consumer prefixed ones:

return simpleConfig.originalsWithPrefix("consumer.");

Would it make sense to make the default the same as the normal connector to keep it consistent with normal consumer groups?

mirus.connector.auto.restart.enabled doesnt work

Dear Mirus Team,

Setting the option "mirus.connector.auto.restart.enabled" and "mirus.task.auto.restart.enabled" in connect-distributed.properties configuration file seams to be not working. Task always "RESTART" and connector is created and logs show "The configuration 'mirus.connector.auto.restart.enabled' / 'mirus.task.auto.restart.enabled' was supplied but isn't a known config.", with no line that set a default value or config value.
The default value is true (com/salesforce/mirus/config/MirusConfigDefinition.java)
Could you please check?

Thanks,
Pablo.

Mirror not correct

Hi all,
I have problem:
image

When mirror to destination, the message changed. How resolve?

Thanks all.

org.apache.kafka.common.errors.RecordTooLargeException produce duplicated messages

Dear Mirus Tema,

We detect that if an error of "org.apache.kafka.common.errors.RecordTooLargeException" from kafka server occurs, mirus flush data, fail and retry flushing again producing out of space disk considerably fast.

Could you please check this?

Thanks,
Pablo.

[2019-05-31 15:47:32,630] ERROR WorkerSourceTask{id=test_flight_deduplication-1} failed to send record to flight-deduplication-openstack-openstack: {} (org.apache.kafka.connect.runtime.WorkerSourceTask)
org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

[2019-05-31 15:47:33,796] INFO WorkerSourceTask{id=test_flight_deduplication-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-05-31 15:47:33,797] INFO WorkerSourceTask{id=test_flight_deduplication-1} flushing 73 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-05-31 15:47:34,082] INFO WorkerSourceTask{id=test_flight_deduplication-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-05-31 15:47:34,083] INFO WorkerSourceTask{id=test_flight_deduplication-0} flushing 69 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)

Not able to convert header value of replicated topic from ByteArray to String

Hi,

This may not be a bug but something I'm not doing right.

I'm adding a header to Kafka message as shown below.

 final ProducerRecord rec = new ProducerRecord(topic, key, message);
 rec.headers().add(new RecordHeader("testHeader", "HeaderValue".getBytes(StandardCharsets.UTF_8)));
 final RecordMetadata metadata = (RecordMetadata) producer.send(rec).get();

And then at the consumer side I'm reading the header.

            for (final ConsumerRecord<String, String> record : records)
            {
                 final Headers sourceHeaders = record.headers();
                 if (sourceHeaders != null)
                 {
                     final Header header = sourceHeaders.lastHeader("testHeader");
                 if (header != null)
                 {
                     System.out.printf("key = %s, value = %s%n", header.key(), new String(header.value()));
                } } 

If I consume from the same topic I get the value
key = testHeader, value = HeaderValue

If I consume from the replicated topic I get the value
key = testHeader, value = SGVhZGVyVmFsdWU=

I see that mirus does header conversion using ByteArrayConverter. So how do I get the original string value from the replicated topic?

Applying transforms

Hello,

First of all thanks for this tool, it looks super nice!

I've been trying to make a different setup where I actually apply a transform in between the mirroring, the problem is that my transform receives a null in the record apply as the kafka connect interface suggests, here is the example:

kafka-connect_1            | Caused by: java.lang.NullPointerException
kafka-connect_1            | 	at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:187)
kafka-connect_1            | 	at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:44)
kafka-connect_1            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
kafka-connect_1            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

configuration

{
   "connector.class": "com.salesforce.mirus.MirusSourceConnector",
   "tasks.max": "1",
   "topics.whitelist": "legacy_data_syncer.op.accounts",
   "destination.topic.name.suffix": ".mirror",
   "destination.bootstrap.servers": "kafka:9092",
   "consumer.bootstrap.servers": "kafka:9092",
   "connector.key.converter": "org.apache.kafka.connect.json.JsonConverter",
   "connector.value.converter": "org.apache.kafka.connect.json.JsonConverter",
   "consumer.client.id": "mirus-legacy_data_syncer.op.accounts1",
   "consumer.key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
   "consumer.value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
   "transforms":"unwrap",
   "transforms.unwrap.type":"io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope",
   "transforms.unwrap.array.encoding":"document",
   "transforms.unwrap.flatten.struct":"false"
}

Am I trying to something not supported or there's more going on?

Thanks for any help!

Getting Mirus JMX Metrics

Dear Mirus Team,

First of all, thanks for this excellent project.

We start a connector cluster (consisting in 3 connector worker servers) and copy mirus jar library to plugin directory and successfully replicate topics from one cluster (origin) to another (destination).

We would like to use JMX metrics to monitor mirus status in the connector cluster.

We found in project source class (AbstractMirusJmxReporter):
metrics.addReporter(new JmxReporter("mirus"));

and in ConnectorJmxReporter class:
private static final String CONNECTOR_JMX_GROUP_NAME = "connector-metrics";

We try to get the Mbean path using visualVM, but we couldn't find the mirus path in JVM Mbeans.
Do we need to enable something in configuration?

Regards,
Pablo

Mirroring not working

I have 2 clusters running on my machine and have created connector with below details

curl localhost:8083/connectors/mirus-quickstart-source-1/config \
      -X PUT \
      -H 'Content-Type: application/json' \
      -d '{
           "name": "mirus-quickstart-source-1",
           "connector.class": "com.salesforce.mirus.MirusSourceConnector",
           "tasks.max": "5",
           "topics.whitelist": "test-mir",
           "destination.topic.name.suffix": ".mirror",
           "destination.bootstrap.servers": "localhost:9093",
           "consumer.bootstrap.servers": "localhost:9092",
           "consumer.client.id": "mirus-quickstart",
           "consumer.key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
           "consumer.value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"
       }'

However the mirroring is not happening.
One broker runs with listener port 9092 and zk running on port 2181
another broker runs with listener port 9093 and zk running on port 2182
source broker -- one with listner port 9092
destination broker -- one with listner port 9092

Are my configuration for connector correct? what am i missing?

Produce msg to topic test does not reflect on test.mirror

Hey guys,
After starting kafka locally, and start mirus and getting into the state that:
curl localhost:8083/connectors/mirus-quickstart-source/status
{"name":"mirus-quickstart-source","connector":{"state":"RUNNING","worker_id":"10.126.22.44:8083"},"tasks":[{"state":"RUNNING","id":0,"worker_id":"10.126.22.44:8083"}],"type":"source"}

$ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
This is my first event

$ bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
This is my first event
Processed a total of 1 messages

--- sad story starts here:
$ bin/kafka-console-consumer.sh --topic test.mirror --from-beginning --bootstrap-server localhost:9092
^CProcessed a total of 0 messages

I was using all default configs, thanks~

How should I resolve the issue?

Mirus tasks failing with error "Could not initialize class org.xerial.snappy.Snappy"

Hi Team,

We are seeing some of the tasks going to "FAILED" state (with few tasks in running state)with the below error details:

          "id": 1,
            "state": "FAILED",
            "worker_id": "<ip>:<port>",
            "trace": "java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy\n\tat org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:435)\n\tat org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:466)\n\tat java.io.DataInputStream.readByte(DataInputStream.java:265)\n\tat org.apache.kafka.common.utils.ByteUtils.readVarint(ByteUtils.java:168)\n\tat org.apache.kafka.common.record.DefaultRecord.readFrom(DefaultRecord.java:292)\n\tat org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(DefaultRecordBatch.java:263)\n\tat org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:568)\n\tat org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:537)\n\tat org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:1210)\n\tat org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1245)\n\tat org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:1099)\n\tat org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:545)\n\tat org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:506)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1269)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1135)\n\tat com.salesforce.mirus.MirusSourceTask.poll(MirusSourceTask.java:173)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:245)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n"
        },

We tried installing libc6-compat in our pods but it didn't fix. Please advice on how to fix it.

Connector in distributed mode

Dear Mirus team,

We follow the quick-start and successfully run it.
We follow the kafka distributed connect steps connect distributed, the worker and the connector are created and run with no errors, but events are not replicated. We also run "mirus-start.sh" with the same result.

Is there a guide for running in a "distributed mode"?

Regards,
Pablo.

MirrorMaker migration documentation

Regarding the Medium post

Mirus completely replaced Mirror Maker across all production data-centers at Salesforce in April 2018. Since then our data volumes have continued to grow.

For those who are running mirrormaker and have an active consumer group offset for their data and would prefer not to have duplicates after starting Mirus, is there a migration documentation available, or run-book that Salesforce applied for replacement?

failed to run the quick start example

I have a try of mirus, but failed to run the quick start example, I always failed in step 4:
curl localhost:8083
curl: (7) Failed to connect to localhost port 8083: Connection refused

From the source code in Mirus.java, I do not find any code to start the rest server, is there anything I missed?

BTW, I can run the quick start example after adding some code in Mirus.java to start the rest server.

API end point for health check

Is there any endpoint for Mirus, which can be used as service health check?

Basically I have deployed Mirus on K8s and looking for ways to monitor the health of my service.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.