This is a repository that contains a collection of runnable Spring Boot applications to demonstrate the usage of Spring and Apache Kafka together.
To run these demo applications, you must have Kafka running on your localhost:9092
.
If you have Kafka running elsewhere, please update the properties for the bootstrap server accordingly (spring.kafka.bootstrap-servers
).
It is recommended to use a three-node cluster to run these applications.
For convenience, we provide a docker-compose script that allows you to run a three-node Kafka cluster locally and make one of them available at localhost:9092
.
-
Checkout this repository
-
Go to the root of this repository on a command line and do:
docker-compose up.
This repository contains several standalone Spring Boot applications for demonstrating various features mentioned during the talk. You can just navigate to the README on each application for more details.
This application creates a Kafka topic named spring-kafka-app0-demo1
with a single partition.
Then it publishes random text data to this topic using KafkaTemplate
and consumes them using a KafkaListener.
KafkaTemplate
and all its necessary components, such as the ProducerFactory,
are auto-configured through Spring Boot.
KafkaListener
relies on a ConcurrnetKafkaListenerContainerFactory
auto-configured through Boot to create its message listener container.
The application uses the Java Faker library to generate 100 random Book recommendations each time it runs. The listener consumes the records and prints them on the console.
This application also includes a basic JUnit 5-based test using EmbeddedKafkaBroker.
We have two publishers in this application, both using custom KafkaTemplate
beans.
One KafkaTemplate
uses Long
as the key type, the other uses UUID,
while both use String
as the value type.
The publishers are initiated using REST endpoints.
The application creates two Kafka topics - spring-kafka-app1-demo1
and spring-kafka-app1-demo2
both with 10 partitions.
Two listeners are provided using KafkaListener.
The first listener uses the TopicPartition
and PartitionOffset
annotations to control selective partitions and initial assignments on them.
The second listener is a variant of the first, demonstrating how wildcard patterns can be used for partitions and initial assignments.
Look at the listeners for how they override the deserializer properties instead of what is auto-configured through Spring Boot.
To see the application in action, you can run it (e.g., on an IDE) and then issue the following two CURL commands.
curl -X POST http://localhost:8080/publish/demo1
curl -X POST http://localhost:8080/publish/demo2
The first command invokes the first publisher that publishes the topic spring-kafka-app1-demo1
and the second for the topic spring-kafka-app1-demo2
.
Listeners log the data received on the console with the key, partition, and offset received.
In this app, we use a POJO to publish and consume the JsonSerializer
and JsonDeserializer
from Spring for Apache Kafka.
The application creates a Kafka topic - spring-kafka-app2-demo
.
It publishes a domain object Foo
into the topic and then consumes it using a KafkaListener.
Look at the application.properties
configuration file to see how the serializer/deserializer properties are configured.
In this application, we have a publisher and a listener.
The listener extends from AbstractConsumerSeekAware,
which implements ConsumerSeekAware.
The application creates a topic named spring-kafka-app3-demo
with a single partition.
We always seek the partition to offset 20 each time the listener starts. The publisher sends 100 records each time when the app starts.
This application demonstrates how container error handling works in Spring for Apache Kafka.
The application has a publisher, which is invoked through a REST endpoint.
Each time the REST endpoint is called, the publisher sends ten records to the topic spring-kafka-app4-demo
.
On the consumer side, we have a contrived logic for throwing an exception each time the offset is > 0 and divisible by 10.
By default, when the exception happens, the DefaultErrorHandler
will kick in with ten maximum tries without any backoff.
However, we provide a custom bean declaration for DefaultErrorHandler,
which the boot autoconfiguration picks up and configure with the container factory.
This custom handler will re-try for a maximum of 3 times with two seconds of backoff each time.
The handler also is configured with a DeadLetterPublishingRecoverer
, which sends the record in error to a default DLT topic.
When the app runs, you will notice that the record with offset % 9 == 0
condition fails, and then tries to retry three times and then is recovered by sending it to the DLT.
There is also a convenient KafkaListener
method for DLT where it logs the DLT messages.
To exercise the app, run it first and then issue the following CURL command.
curl -X POST http://localhost:8080/publish/demo
This application demonstrates the ErrorHandlingDeserializer
in action.
We have a publisher that sends data as String,
but the consumer expects an Integer,
thus it causes a deserialization error.
Look at the application.properties
file to see how the ErrorHandlingDeserializer
and the actual delegate types are used.
The application creates a topic named spring-kafka-app5-demo
.
When the application starts, it sends a text to the topic, which the consumer receives and runs into a deserialization exception.
Since deserialization exceptions are not retried by default, the DefaultErrorHandler
will not retry the records but will try to recover them.
We configure the error handler with a DeadLetterPublishingRecoverer
for the records in error to be sent.
The application provides a convenient KafkaListener
to consume from the DLT topic (spring-kafka-app5-demo.DLT
).
In this application, we see how non-blocking retries work. Two records are sent, with values "one" and "two"; the listener throws an exception when it receives "one." This record then goes through the non-blocking retry chain until finally ending up in the dead letter topic. We can see that the "two" record is successfully processed before "one" is retried.
To run the app, start SpringKafkaApp6
.
On the console, you should see the record delivery attempts with "one" being retried after 2 seconds, then 3, then 4.5, then 6.75, then to the DLT.
This application demonstrates batch listeners and manual acknowledgement.
To run the app, start SpringKafkaApp7
.
Listener prints out the number of records in the batch received. In addition, for education purposes, we also show how to manually acknowledge each record in the batch.
This application demonstrates observability with zipkin.
First, make sure that you run zipkin server using one of the methods described here.
Here is how start it using Docker compose: docker run -d -p 9411:9411 openzipkin/zipkin
.
Then run the app by starting SpringKafkaApp8
.
Go to the zipkin UI (default: localhost:9411
) and observe the traces and spans.
spring-integration-kafka-app - Spring Integration Kafka Outbound Channel Adapter and Message Driven Channel Adapter Demo
This is an application in which we demonstrate the usage of Spring Integration Kafka support.
This basic application shows the Kafka outbound channel adapter and message-driven channel adapter.
The outbound channel adapter is configured with a KafkaTemplate
auto-configured through Spring Boot.
Kafka message-driven channel adapter uses a custom message listener container created in the app.
The application runner bean sends messages to a channel called toKafka,
which the outbound adapter listens to and sends to a Kafka topic.
Message-driven channel adapter consumes from the topic and puts the messages on another message channel (fromKafka
).
The application runner in the app receives from this channel fromKafka
and prints the information on the console.
This is an introductory Spring Cloud Stream application in which we demonstrate the functioning of a supplier, function and consumer.
Supplier produces the current time in milliseconds as a Long
value.
By default, the supplier in Spring Cloud Stream runs every second, and we use that default.
A function receives this supplied data and converts this to UTC-based time.
Then a consumer receives this UTC-based time and prints it on the console.
Look at the application properties to see how the functions are activated.
To run the application, run it, and you will see the time data getting logged on the console in UTC format.
This application shows how non-functional style suppliers can be written for on-demand triggering using the StreamBridge
API.
The application has a REST endpoint, which will trigger the publishing of the data through the StreamBridge
API upon invoking.
StreamBridge
creates all the necessary output bindings.
This demo app also has a consumer that consumes from the Kafka topic to which the StreamBridge
is publishing.
You can just run the application and use the REST endpoint as below.
curl -X POST -H "Content-Type: text/plain" --data "StreamBridge Demo" http://localhost:8080/publish/demo
The consumer will print the published data on the topic through StreamBridge.
This application shows how Spring Cloud Stream Kafka Streams binder works.
The example is based on the canonical word count application. It is written using Spring Cloud Stream binder for Kafka Streams using java.util.function.Function to represent a processor. It uses a single input and a single output. In essence, the application receives text messages from an input topic, computes word occurrence counts in a configurable time window, and reports that in an output topic.
You can just run the application first.
Kafka Streams processor is named countWords.
The application also uses the regular Kafka binder for producing data every second - the same book recommendation data we used in the other apps from the Java Faker library.
This supplier function is called provideWords,
which produces a topic called words
from which the Kafka Streams processor consumes data.
The countWords
processor writes the count information to a topic called counts
.
We have another consumer function using the regular Kafka binder that listens on this counts
topic and prints the word count information on the console.
curl localhost:8080/actuator/metrics/kafka.stream.thread.commit.total | jq .
curl localhost:8080/actuator/kafkastreamstopology | jq .
curl localhost:8080/actuator/kafkastreamstopology/clicks-applicationId
curl localhost:8080/actuator/kafkastreamstopology/updates-applicationId
Popular UI tool for visualizing the topology: https://zz85.github.io/kafka-streams-viz/
This advanced sample of a Spring Cloud Stream processor using Kafka Streams support shows both KStream and KTable bindings.
The following are the two applications in this sample.
-
Spring Cloud Stream-based Kafka Streams processor
-
Spring Cloud Stream producer application to generate data for the processor
Kafka Streams processor uses java.util.function.BiFunction
to demonstrate two inputs and an output.
The processor consumes user region data as KTable,
and then the user clicks information as KStream.
Then it produces the clicks per region info on the outbound.
The same outbound information is stored in a state store to demonstrate the interactive query capabilities of Kafka Streams exposed as IteractiveQueryService
in Spring Cloud Stream.
The application also has a second processor to listen from the outbound topic to log the information. In addition, the application also exposes a REST endpoint, using which the user clicks data per region can be queried.
Run the SpringCloudStreamApp4Application
app first, then the producer app - UserClicksRegionProducerApplication.
The producer app has two REST endpoints that allow you to publish user-region and user-click information to Kafka topics.
First, enter some data for the user region.
curl -X POST localhost:8090/user-region/alice/asia
At this point, Alice lives in Asia.
Now send some click impression data from Alice.
curl -X POST localhost:8090/user-clicks/alice/12
Watch the console of the SpringCloudStreamApp4Application
and see that the clicks per region information is logged from the test processor.
Invoke the REST endpoint to extract this same information through an interactive query.
curl localhost:8080/updates/asia | jq .
You can just enter more POST data as above and verify that you see the correct output.
curl localhost:8080/actuator/metrics/kafka.stream.thread.commit.total | jq .
curl localhost:8080/actuator/kafkastreamstopology | jq .
curl localhost:8080/actuator/kafkastreamstopology/clicks-applicationId
curl localhost:8080/actuator/kafkastreamstopology/updates-applicationId
curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/clicks-in-0
Note: All bindings corresponding to this Kafka Streams application id will be stopped.