Giter Club home page Giter Club logo

spring-cloud-stream-demo's Introduction

Problem

You just want to write logic for your event driven application, but the boilerplate messaging code keeps getting in the way and it's costing you time. Connecting your apps to messaging servers is cumbersome and you need to work with multiple messaging technologies in your organisation depending on which team you're working in.

Solution

Spring Cloud Stream takes care of the complicated boilerplate code for you, leaving you free to create nice clean business logic which anyone can maintain. Spring Cloud Stream seamlessly unifies many different messaging protocols behind one easy to use API and it smoothes away any subtle differences in approach or features (like partitioning or exchanges for example) so that you can concentrate on building event-driven solutions that "just work".

For the rest of this recipe see the website here.

Pre-requisites

These event driven applications are built on: Spring Boot, Spring Cloud Stream, Maven, and Java 8.

This server-side runs on Docker for Mac and includes: Kafka, Zookeeper, RabbitMQ, and KafDrop (image by by Obsidian Dynamics).

spring-cloud-stream-demo's People

Contributors

benwilcock avatar rseroter avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

spring-cloud-stream-demo's Issues

Add unit test

Adding test cases to cover the business logic and also an example of testing stream applications would be a huge plus for this repository

Statuses.valueOf instead of checking against all the enum values

if (status.equals(Statuses.APPROVED.name())
|| status.equals(Statuses.DECLINED.name())
|| status.equals(Statuses.PENDING.name())
|| status.equals(Statuses.REJECTED.name())) {
this.status = status;

public void setStatus(String status) {
if (status.equals(Statuses.APPROVED.name())
|| status.equals(Statuses.DECLINED.name())
|| status.equals(Statuses.PENDING.name())
|| status.equals(Statuses.REJECTED.name())) {
this.status = status;
} else {
throw new IllegalArgumentException("Cannot set the LoanApplication's status to " + status);
}
}

Instead of all checking against all of the Statuses enum values, we can easily check the existence of the status with the help of Statuses.valueOf which throws IllegalArgumentException as well.

try {
      this.status = Statuses.valueOf(status).name();
} catch (IllegalArgumentException ex){
      throw new IllegalArgumentException("Cannot set the LoanApplication's status to " + status);
}

Profile not getting honoured

@benwilcock : I build and executed with the profile rabbit but still, the kafka binder gets created by DefaultBinderFactory.

Application start up log:

/home/raj/.jdks/openjdk-16.0.2/bin/java -XX:TieredStopAtLevel=1 -noverify -Dspring.profiles.active=rabbit -Dspring.output.ansi.enabled=always -javaagent:/home/raj/.local/share/JetBrains/Toolbox/apps/IDEA-U/ch-1/212.4746.92/lib/idea_rt.jar=38079:/home/raj/.local/share/JetBrains/Toolbox/apps/IDEA-U/ch-1/212.4746.92/bin -Dcom.sun.management.jmxremote -Dspring.jmx.enabled=true -Dspring.liveBeansView.mbeanDomain -Dspring.application.admin.enabled=true -Dfile.encoding=UTF-8 -classpath /home/raj/Documents/repo/spring-cloud-stream-demo/loancheck/target/classes:/home/raj/.m2/repository/org/springframework/cloud/spring-cloud-stream/3.1.3/spring-cloud-stream-3.1.3.jar:/home/raj/.m2/repository/org/springframework/boot/spring-boot-starter-validation/2.4.3/spring-boot-starter-validation-2.4.3.jar:/home/raj/.m2/repository/org/glassfish/jakarta.el/3.0.3/jakarta.el-3.0.3.jar:/home/raj/.m2/repository/org/hibernate/validator/hibernate-validator/6.1.7.Final/hibernate-validator-6.1.7.Final.jar:/home/raj/.m2/repository/jakarta/validation/jakarta.validation-api/2.0.2/jakarta.validation-api-2.0.2.jar:/home/raj/.m2/repository/org/jboss/logging/jboss-logging/3.4.1.Final/jboss-logging-3.4.1.Final.jar:/home/raj/.m2/repository/com/fasterxml/classmate/1.5.1/classmate-1.5.1.jar:/home/raj/.m2/repository/org/springframework/spring-messaging/5.3.4/spring-messaging-5.3.4.jar:/home/raj/.m2/repository/org/springframework/spring-beans/5.3.4/spring-beans-5.3.4.jar:/home/raj/.m2/repository/org/springframework/integration/spring-integration-core/5.4.4/spring-integration-core-5.4.4.jar:/home/raj/.m2/repository/org/springframework/spring-aop/5.3.4/spring-aop-5.3.4.jar:/home/raj/.m2/repository/org/springframework/spring-context/5.3.4/spring-context-5.3.4.jar:/home/raj/.m2/repository/org/springframework/spring-expression/5.3.4/spring-expression-5.3.4.jar:/home/raj/.m2/repository/org/springframework/spring-tx/5.3.4/spring-tx-5.3.4.jar:/home/raj/.m2/repository/io/projectreactor/reactor-core/3.4.3/reactor-core-3.4.3.jar:/home/raj/.m2/repository/org/reactivestreams/reactive-streams/1.0.3/reactive-streams-1.0.3.jar:/home/raj/.m2/repository/org/springframework/integration/spring-integration-jmx/5.4.4/spring-integration-jmx-5.4.4.jar:/home/raj/.m2/repository/org/springframework/retry/spring-retry/1.3.1/spring-retry-1.3.1.jar:/home/raj/.m2/repository/javax/annotation/javax.annotation-api/1.3.2/javax.annotation-api-1.3.2.jar:/home/raj/.m2/repository/org/springframework/cloud/spring-cloud-function-context/3.1.3/spring-cloud-function-context-3.1.3.jar:/home/raj/.m2/repository/net/jodah/typetools/0.6.2/typetools-0.6.2.jar:/home/raj/.m2/repository/org/springframework/cloud/spring-cloud-function-core/3.1.3/spring-cloud-function-core-3.1.3.jar:/home/raj/.m2/repository/org/springframework/boot/spring-boot-starter/2.4.3/spring-boot-starter-2.4.3.jar:/home/raj/.m2/repository/org/springframework/boot/spring-boot/2.4.3/spring-boot-2.4.3.jar:/home/raj/.m2/repository/org/springframework/boot/spring-boot-starter-logging/2.4.3/spring-boot-starter-logging-2.4.3.jar:/home/raj/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar:/home/raj/.m2/repository/ch/qos/logback/logback-core/1.2.3/logback-core-1.2.3.jar:/home/raj/.m2/repository/org/apache/logging/log4j/log4j-to-slf4j/2.13.3/log4j-to-slf4j-2.13.3.jar:/home/raj/.m2/repository/org/apache/logging/log4j/log4j-api/2.13.3/log4j-api-2.13.3.jar:/home/raj/.m2/repository/org/slf4j/jul-to-slf4j/1.7.30/jul-to-slf4j-1.7.30.jar:/home/raj/.m2/repository/jakarta/annotation/jakarta.annotation-api/1.3.5/jakarta.annotation-api-1.3.5.jar:/home/raj/.m2/repository/org/yaml/snakeyaml/1.27/snakeyaml-1.27.jar:/home/raj/.m2/repository/org/slf4j/slf4j-api/1.7.30/slf4j-api-1.7.30.jar:/home/raj/.m2/repository/org/springframework/spring-core/5.3.4/spring-core-5.3.4.jar:/home/raj/.m2/repository/org/springframework/spring-jcl/5.3.4/spring-jcl-5.3.4.jar:/home/raj/.m2/repository/org/springframework/boot/spring-boot-autoconfigure/2.4.3/spring-boot-autoconfigure-2.4.3.jar:/home/raj/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.11.4/jackson-databind-2.11.4.jar:/home/raj/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.11.4/jackson-annotations-2.11.4.jar:/home/raj/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.11.4/jackson-core-2.11.4.jar:/home/raj/.m2/repository/org/springframework/cloud/spring-cloud-stream-binder-kafka/3.1.3/spring-cloud-stream-binder-kafka-3.1.3.jar:/home/raj/.m2/repository/org/springframework/cloud/spring-cloud-stream-binder-kafka-core/3.1.3/spring-cloud-stream-binder-kafka-core-3.1.3.jar:/home/raj/.m2/repository/org/springframework/integration/spring-integration-kafka/5.4.4/spring-integration-kafka-5.4.4.jar:/home/raj/.m2/repository/org/apache/kafka/kafka-clients/2.6.0/kafka-clients-2.6.0.jar:/home/raj/.m2/repository/com/github/luben/zstd-jni/1.4.4-7/zstd-jni-1.4.4-7.jar:/home/raj/.m2/repository/org/lz4/lz4-java/1.7.1/lz4-java-1.7.1.jar:/home/raj/.m2/repository/org/xerial/snappy/snappy-java/1.1.7.3/snappy-java-1.1.7.3.jar:/home/raj/.m2/repository/org/springframework/kafka/spring-kafka/2.6.6/spring-kafka-2.6.6.jar io.pivotal.loancheck.LoanCheckApplication
OpenJDK 64-Bit Server VM warning: Options -Xverify:none and -noverify were deprecated in JDK 13 and will likely be removed in a future release.

                                                                      ___.  ___.   .__  __
_____________  ____   ____  ____   ______ ______         ____________ \_ |__\_ |__ |__|/  |_
\____ \_  __ \/  _ \_/ ___\/ __ \ /  ___//  ___/  ______ \_  __ \__  \ | __ \| __ \|  \   __\
|  |_> >  | \(  <_> )  \__\  ___/ \___ \ \___ \  /_____/  |  | \// __ \| \_\ \ \_\ \  ||  |
|   __/|__|   \____/ \___  >___  >____  >____  >          |__|  (____  /___  /___  /__||__|
|__|                     \/    \/     \/     \/                      \/    \/    \/
--------------------------------------------------------------------------------
Spring Boot Version :  (v2.4.3)
--------------------------------------------------------------------------------

2021-08-20 13:20:53.022  INFO 385163 --- [           main] i.p.loancheck.LoanCheckApplication       : Starting LoanCheckApplication using Java 16.0.2 on 5CD123B3QS with PID 385163 (/home/raj/Documents/repo/spring-cloud-stream-demo/loancheck/target/classes started by raj in /home/raj/Documents/repo/spring-cloud-stream-demo/loancheck)
2021-08-20 13:20:53.023  INFO 385163 --- [           main] i.p.loancheck.LoanCheckApplication       : The following profiles are active: rabbit
2021-08-20 13:20:53.896  INFO 385163 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-08-20 13:20:53.901  INFO 385163 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-08-20 13:20:53.905  INFO 385163 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-08-20 13:20:53.979  INFO 385163 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-20 13:20:53.983  INFO 385163 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-20 13:20:53.991  INFO 385163 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-20 13:20:53.995  INFO 385163 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration' of type [org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-20 13:20:54.001  INFO 385163 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration' of type [org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-20 13:20:54.005  INFO 385163 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'mbeanServer' of type [com.sun.jmx.mbeanserver.JmxMBeanServer] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-20 13:20:54.506  INFO 385163 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 'taskScheduler'
2021-08-20 13:20:54.519  INFO 385163 --- [           main] onConfiguration$FunctionBindingRegistrar : Functional binding is disabled due to the presense of @EnableBinding annotation in your configuration
2021-08-20 13:20:54.650  INFO 385163 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel output
2021-08-20 13:20:54.689  INFO 385163 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel approved
2021-08-20 13:20:54.696  INFO 385163 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel declined
2021-08-20 13:20:54.702  INFO 385163 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel errorChannel
2021-08-20 13:20:54.764  INFO 385163 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel nullChannel
2021-08-20 13:20:54.779  INFO 385163 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageHandler _org.springframework.integration.errorLogger
2021-08-20 13:20:54.814  INFO 385163 --- [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'loancheck-rabbit.output' has 1 subscriber(s).
2021-08-20 13:20:54.816  INFO 385163 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-08-20 13:20:54.817  INFO 385163 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'loancheck-rabbit.errorChannel' has 1 subscriber(s).
2021-08-20 13:20:54.817  INFO 385163 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2021-08-20 13:20:54.818  INFO 385163 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Creating binder: kafka
2021-08-20 13:20:54.988  INFO 385163 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Caching the binder: kafka
2021-08-20 13:20:54.988  INFO 385163 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Retrieving cached binder: kafka
2021-08-20 13:20:55.099  INFO 385163 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Using kafka topic for outbound: approved
2021-08-20 13:20:55.104  INFO 385163 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
	bootstrap.servers = [localhost:9092]
	client.dns.lookup = use_all_dns_ips
	client.id = 
	connections.max.idle.ms = 300000
	default.api.timeout.ms = 60000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS

2021-08-20 13:20:55.209  INFO 385163 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0
2021-08-20 13:20:55.210  INFO 385163 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651
2021-08-20 13:20:55.211  INFO 385163 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1629445855207
2021-08-20 13:20:55.525  INFO 385163 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [localhost:9092]
	buffer.memory = 33554432
	client.dns.lookup = use_all_dns_ips
	client.id = producer-1
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	internal.auto.downgrade.txn.commit = true
	key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

2021-08-20 13:20:55.555  INFO 385163 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0
2021-08-20 13:20:55.556  INFO 385163 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651
2021-08-20 13:20:55.556  INFO 385163 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1629445855555
2021-08-20 13:20:55.576  INFO 385163 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: dgz5wA2vRduHKV46x3-5lQ
2021-08-20 13:20:55.577  INFO 385163 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
2021-08-20 13:20:55.598  INFO 385163 --- [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'loancheck-rabbit.approved' has 1 subscriber(s).
2021-08-20 13:20:55.599  INFO 385163 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Retrieving cached binder: kafka
2021-08-20 13:20:55.600  INFO 385163 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Using kafka topic for outbound: declined
2021-08-20 13:20:55.601  INFO 385163 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
	bootstrap.servers = [localhost:9092]
	client.dns.lookup = use_all_dns_ips
	client.id = 
	connections.max.idle.ms = 300000
	default.api.timeout.ms = 60000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS

2021-08-20 13:20:55.612  INFO 385163 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0
2021-08-20 13:20:55.612  INFO 385163 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651
2021-08-20 13:20:55.612  INFO 385163 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1629445855612
2021-08-20 13:20:55.635  INFO 385163 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [localhost:9092]
	buffer.memory = 33554432
	client.dns.lookup = use_all_dns_ips
	client.id = producer-2
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	internal.auto.downgrade.txn.commit = true
	key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

2021-08-20 13:20:55.640  INFO 385163 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0
2021-08-20 13:20:55.640  INFO 385163 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651
2021-08-20 13:20:55.640  INFO 385163 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1629445855639
2021-08-20 13:20:55.642  INFO 385163 --- [ad | producer-2] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-2] Cluster ID: dgz5wA2vRduHKV46x3-5lQ
2021-08-20 13:20:55.643  INFO 385163 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-2] Closing the Kafka producer with timeoutMillis = 30000 ms.
2021-08-20 13:20:55.646  INFO 385163 --- [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'loancheck-rabbit.declined' has 1 subscriber(s).
2021-08-20 13:20:55.646  INFO 385163 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Retrieving cached binder: kafka
2021-08-20 13:20:55.663  INFO 385163 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
	bootstrap.servers = [localhost:9092]
	client.dns.lookup = use_all_dns_ips
	client.id = 
	connections.max.idle.ms = 300000
	default.api.timeout.ms = 60000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS

2021-08-20 13:20:55.681  INFO 385163 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0
2021-08-20 13:20:55.681  INFO 385163 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651
2021-08-20 13:20:55.681  INFO 385163 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1629445855681
2021-08-20 13:20:55.717  INFO 385163 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 100
	auto.offset.reset = latest
	bootstrap.servers = [localhost:9092]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-anonymous.75d8e926-152a-4945-af97-2a79320acc11-1
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = anonymous.75d8e926-152a-4945-af97-2a79320acc11
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2021-08-20 13:20:55.762  INFO 385163 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0
2021-08-20 13:20:55.762  INFO 385163 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651
2021-08-20 13:20:55.762  INFO 385163 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1629445855762
2021-08-20 13:20:55.773  INFO 385163 --- [           main] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-anonymous.75d8e926-152a-4945-af97-2a79320acc11-1, groupId=anonymous.75d8e926-152a-4945-af97-2a79320acc11] Cluster ID: dgz5wA2vRduHKV46x3-5lQ
2021-08-20 13:20:55.803  INFO 385163 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'output.anonymous.75d8e926-152a-4945-af97-2a79320acc11.errors' has 1 subscriber(s).
2021-08-20 13:20:55.804  INFO 385163 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'output.anonymous.75d8e926-152a-4945-af97-2a79320acc11.errors' has 0 subscriber(s).
2021-08-20 13:20:55.804  INFO 385163 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'output.anonymous.75d8e926-152a-4945-af97-2a79320acc11.errors' has 1 subscriber(s).
2021-08-20 13:20:55.804  INFO 385163 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'output.anonymous.75d8e926-152a-4945-af97-2a79320acc11.errors' has 2 subscriber(s).
2021-08-20 13:20:55.832  INFO 385163 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 100
	auto.offset.reset = latest
	bootstrap.servers = [localhost:9092]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-anonymous.75d8e926-152a-4945-af97-2a79320acc11-2
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = anonymous.75d8e926-152a-4945-af97-2a79320acc11
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2021-08-20 13:20:55.838  INFO 385163 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0
2021-08-20 13:20:55.838  INFO 385163 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651
2021-08-20 13:20:55.838  INFO 385163 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1629445855838
2021-08-20 13:20:55.839  INFO 385163 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-anonymous.75d8e926-152a-4945-af97-2a79320acc11-2, groupId=anonymous.75d8e926-152a-4945-af97-2a79320acc11] Subscribed to topic(s): output
2021-08-20 13:20:55.841  INFO 385163 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2021-08-20 13:20:55.854  INFO 385163 --- [           main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@5f36c8e3
2021-08-20 13:20:55.885  INFO 385163 --- [container-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-anonymous.75d8e926-152a-4945-af97-2a79320acc11-2, groupId=anonymous.75d8e926-152a-4945-af97-2a79320acc11] Cluster ID: dgz5wA2vRduHKV46x3-5lQ
2021-08-20 13:20:55.883  INFO 385163 --- [           main] i.p.loancheck.LoanCheckApplication       : Started LoanCheckApplication in 3.358 seconds (JVM running for 4.317)
2021-08-20 13:20:55.888  INFO 385163 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-anonymous.75d8e926-152a-4945-af97-2a79320acc11-2, groupId=anonymous.75d8e926-152a-4945-af97-2a79320acc11] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2021-08-20 13:20:55.890  INFO 385163 --- [           main] i.p.loancheck.LoanCheckApplication       : The Loancheck Application has started...
2021-08-20 13:20:55.891  INFO 385163 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-anonymous.75d8e926-152a-4945-af97-2a79320acc11-2, groupId=anonymous.75d8e926-152a-4945-af97-2a79320acc11] (Re-)joining group
2021-08-20 13:20:55.907  INFO 385163 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-anonymous.75d8e926-152a-4945-af97-2a79320acc11-2, groupId=anonymous.75d8e926-152a-4945-af97-2a79320acc11] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
2021-08-20 13:20:55.907  INFO 385163 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-anonymous.75d8e926-152a-4945-af97-2a79320acc11-2, groupId=anonymous.75d8e926-152a-4945-af97-2a79320acc11] (Re-)joining group
2021-08-20 13:20:55.911  INFO 385163 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-anonymous.75d8e926-152a-4945-af97-2a79320acc11-2, groupId=anonymous.75d8e926-152a-4945-af97-2a79320acc11] Finished assignment for group at generation 1: {consumer-anonymous.75d8e926-152a-4945-af97-2a79320acc11-2-ca97d9a4-44c7-401b-b128-59a284e75bab=Assignment(partitions=[output-0])}
2021-08-20 13:20:55.917  INFO 385163 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-anonymous.75d8e926-152a-4945-af97-2a79320acc11-2, groupId=anonymous.75d8e926-152a-4945-af97-2a79320acc11] Successfully joined group with generation 1
2021-08-20 13:20:55.917  INFO 385163 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-anonymous.75d8e926-152a-4945-af97-2a79320acc11-2, groupId=anonymous.75d8e926-152a-4945-af97-2a79320acc11] Notifying assignor about the new Assignment(partitions=[output-0])
2021-08-20 13:20:55.919  INFO 385163 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-anonymous.75d8e926-152a-4945-af97-2a79320acc11-2, groupId=anonymous.75d8e926-152a-4945-af97-2a79320acc11] Adding newly assigned partitions: output-0
2021-08-20 13:20:55.927  INFO 385163 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-anonymous.75d8e926-152a-4945-af97-2a79320acc11-2, groupId=anonymous.75d8e926-152a-4945-af97-2a79320acc11] Found no committed offset for partition output-0
2021-08-20 13:20:55.931  INFO 385163 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-anonymous.75d8e926-152a-4945-af97-2a79320acc11-2, groupId=anonymous.75d8e926-152a-4945-af97-2a79320acc11] Found no committed offset for partition output-0
2021-08-20 13:20:55.946  INFO 385163 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-anonymous.75d8e926-152a-4945-af97-2a79320acc11-2, groupId=anonymous.75d8e926-152a-4945-af97-2a79320acc11] Resetting offset for partition output-0 to offset 0.
2021-08-20 13:20:55.968  INFO 385163 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$2  : anonymous.75d8e926-152a-4945-af97-2a79320acc11: partitions assigned: [output-0]

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.