This is a sample application that demonstrates how to connect to multi kafka clusters with security enabled using multiple binders. This application uses two Kafka clusters both of them are enabled with security (JAAS - SASL/PLAINTEXT).
If you already have two clusters with security (sasl/plaintext) enabled, you can skip this section. However, it may still benefit to go over these instructions in order to avoid any inconsistencies with your environment.
-
Download the latest Apache Kafka distribution (version 1.0.0 or above)
-
Unzip the same zip file into two directories (lets call them as cluster-1 and cluster-2 for the purposes of this sample)
-
cd cluster-1/config
-
Create a new file
kafka_server_jaas.conf
and add the following content:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
user_admin="admin-secret";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
-
Create another file called
zookeeper_jaas.conf
and add the following content:
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
-
Edit the file server.properties and make sure that the following properties are set. Some of these are already in the configuration and if so, you have to find them and modify.
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
listeners=SASL_PLAINTEXT://localhost:9092
advertised.listeners=SASL_PLAINTEXT://localhost:9092
-
Edit the file zookeeper.properties and add the following properties:
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
-
Edit the file producer.properties and add the following content:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
-
Edit the file consumer.properties and add the following content:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
-
Terminal 1
$ export KAFKA_OPTS="-Djava.security.auth.login.config=<PATH-TO-CLUSTER-1>/config/zookeeper_jaas.conf"
$ ./<PATH-TO-CLUSTER-1>/bin/zookeeper-server-start.sh config/zookeeper.properties
-
Terminal 2
$ export KAFKA_OPTS="-Djava.security.auth.login.config=<PATH-TO-CLUSTER-1>/config/kafka_server_jaas.conf"
$ ./<PATH-TO-CLUSTER-1>/bin/kafka-server-start.sh config/server.properties
-
Now we need to do the same for the second cluster. Follow along…
-
cd cluster-2/config
-
Create a new file
kafka_server_jaas.conf
and add the following content:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
user_admin="admin-secret";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
-
Create another file called
zookeeper_jaas.conf
and add the following content:
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
-
Edit the file server.properties and make sure that the following properties are set. Some of these are already in the configuration and if so, you have to find them and modify.
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
listeners=SASL_PLAINTEXT://localhost:9093
advertised.listeners=SASL_PLAINTEXT://localhost:9093
log.dirs=/tmp/kafka-cluster2-logs
zookeeper.connect=localhost:2182
-
Edit the file zookeeper.properties and add the following properties:
clientPort=2182
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
-
Edit the file producer.properties and add the following content:
bootstrap.servers=localhost:9093
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
-
Edit the file consumer.properties and add the following content:
bootstrap.servers=localhost:9093
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
-
Terminal 3
$ export KAFKA_OPTS="-Djava.security.auth.login.config=<PATH-TO-CLUSTER-2>/config/zookeeper_jaas.conf"
$ ./<PATH-TO-CLUSTER-2>/bin/zookeeper-server-start.sh config/zookeeper.properties
-
Terminal 4
$ export KAFKA_OPTS="-Djava.security.auth.login.config=<PATH-TO-CLUSTER-2>/config/kafka_server_jaas.conf"
$ ./<PATH-TO-CLUSTER-2>/bin/kafka-server-start.sh config/server.properties
-
At this point, you should have 2 zookeeper instances and 2 kafka instances running. Verify this by running
jps
(or other OS tools).
The application’s configuration is matched with the plaintext config that is set above (Please review the yml file)
The application contains two StreamListener
methods. The first one receives records from a topic in cluster-1 and output that to a topic in cluster-2.
The second StreamListener
method receives records from a topic in cluster-2 and output that to a topic in cluster-1.
Cluster-1 input topic is named as kafka1-in and output topic is kafka1-out. Similarly for cluster-2 they are kafka2-in and kafka2-out respectively.
Run the application MultiBinderKafkaJaasSample
on the IDE or from CLI.
-
Terminal 5
Create a file called kafka_client_jaas.conf
and add the following content (Save it to any directory, but for this example, lets say it is in /home/username
):
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
-
Terminal 6 - Produce data to kafka1 - input.
$ export KAFKA_OPTS="-Djava.security.auth.login.config=/home/username/kafka_client_jaas.conf"
Go to the kafka installation directory (It doesn’t matter if you are in cluster-1 or cluster-2 at for the instructions below)
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka1-in --producer.config=config/producer.properties
-
Terminal 7 (or split the above terminal into 2 window panes) - Consume data from kafka2 - output where the above data is expected to come through the processor.
$ export KAFKA_OPTS="-Djava.security.auth.login.config=/home/username/kafka_client_jaas.conf"
Go to the kafka installation directory (It doesn’t matter if you are in cluster-1 or cluster-2 at for the instructions below)
$ ./bin/kafka-console-consumer.sh --topic kafka2-out --consumer.config=config/consumer.properties --bootstrap-server=localhost:9093
-
Terminal 8 - Produce data to kafka2 - input.
$ export KAFKA_OPTS="-Djava.security.auth.login.config=/home/username/kafka_client_jaas.conf"
Go to the kafka installation directory (It doesn’t matter if you are in cluster-1 or cluster-2 at for the instructions below)
$ ./bin/kafka-console-producer.sh --broker-list localhost:9093 --topic kafka2-in --producer.config=config/producer.properties
-
Terminal 9 (or split the above terminal into 2 window panes) - Consume data from kafka1 - output where the above data is expected to come through the second processor in the application.
$ export KAFKA_OPTS="-Djava.security.auth.login.config=/home/username/kafka_client_jaas.conf"
Go to the kafka installation directory (It doesn’t matter if you are in cluster-1 or cluster-2 at for the instructions below)
$ ./bin/kafka-console-consumer.sh --topic kafka1-out --consumer.config=config/consumer.properties --bootstrap-server=localhost:9092
-
Now start adding some text to the terminal session where you are running console producer on kafka1-in. Then verify that, you see the same exact text on the the terminal session where you are running the console consumer on kafka2-out.
Similarly, start adding some text to the terminal session where you are running console producer on kafka2-in. Then verify that, you see the same exact text on the the terminal session where you are running the console consumer on kafka1-out.
-
Once you are done with the testing, remember to stop the application, console consumers and producers and your local kafka clusters used for testing.