Spring boot starter for Apache Pulsar
Simple start consist only from 3 simple steps.
<dependency>
<groupId>io.github.majusko</groupId>
<artifactId>pulsar-java-spring-boot-starter</artifactId>
<version>${version}</version>
</dependency>
Create your configuration class with all producers you would like to register.
@Configuration
public class TestProducerConfiguration {
@Bean
public ProducerFactory producerFactory() {
return new ProducerFactory()
.addProducer("my-topic", MyMsg.class)
.addProducer("other-topic", String.class);
}
}
Use registered producers by simply injecting the PulsarTemplate
into your service.
@Service
class MyProducer {
@Autowired
private PulsarTemplate<MyMsg> producer;
void send(MyMsg msg) {
producer.send("my-topic", msg);
}
}
Annotate your service method with @PulsarConsumer
annotation.
@Service
class MyConsumer {
@PulsarConsumer(topic="my-topic", clazz=MyMsg.class)
void consume(MyMsg msg) {
producer.send(TOPIC, msg);
}
}
pulsar.service-url=pulsar://localhost:6650
Default configuration:
#PulsarClient
pulsar.service-url=pulsar://localhost:6650
pulsar.io-threads=10
pulsar.listener-threads=10
pulsar.enable-tcp-no-delay=false
pulsar.keep-alive-interval-sec=20
pulsar.connection-timeout-sec=10
pulsar.operation-timeout-sec=15
pulsar.starting-backoff-interval-ms=100
pulsar.max-backoff-interval-sec=10
pulsar.consumer-name-delimiter=
pulsar.namespace=default
pulsar.tenant=public
#Consumer
pulsar.consumer.default.dead-letter-policy-max-redeliver-count=-1
pulsar.consumer.default.ack-timeout-ms=3000
pulsar.service-url
- URL used to connect to pulsar cluster.pulsar.io-threads
- Number of threads to be used for handling connections to brokers.pulsar.listener-threads
- Set the number of threads to be used for message listeners/subscribers.pulsar.enable-tcp-no-delay
- Whether to use TCP no-delay flag on the connection, to disable Nagle algorithm.pulsar.keep-alive-interval-sec
- Keep alive interval for each client-broker-connection.pulsar.connection-timeout-sec
- duration of time to wait for a connection to a broker to be established. If the duration passes without a response from the broker, the connection attempt is dropped.pulsar.operation-timeout-sec
- Operation timeout.pulsar.starting-backoff-interval-ms
- Duration of time for a backoff interval (Retry algorithm).pulsar.max-backoff-interval-sec
- The maximum duration of time for a backoff interval (Retry algorithm).pulsar.consumer-name-delimiter
- Consumer names are connection of bean name and method with a delimiter. By default, there is no delimiter and words are connected together.pulsar.namespace
- Namespace separation. For example: app1/app2 OR dev/staging/prod. More in Namespaces docs.pulsar.tenant
- Pulsar multi-tenancy support. More in Multi Tenancy docs.
pulsar.consumer.default.dead-letter-policy-max-redeliver-count
- How many times should pulsar try to retry sending the message to consumer.pulsar.consumer.default.ack-timeout-ms
- How soon should be the message acked and how soon will dead letter mechanism try to retry to send the message.
In case you need to access pulsar metadata you simply use PulsarMessage
as a wrapper and data will be injected for you.
@Service
class MyConsumer {
@PulsarConsumer(topic="my-topic", clazz=MyMsg.class)
void consume(PulsarMessage<MyMsg> myMsg) {
producer.send(TOPIC, msg.getValue());
}
}
You can configure topic names in application.properties
my.custom.topic.name=foo
@PulsarConsumer(topic = "${my.custom.topic.name}", clazz = MyMsg.class)
public void consume(MyMsg myMsg) {
}
All contributors are welcome. If you never contributed to the open-source, start with reading the Github Flow.
- Pick a task from simple roadmap in Projects section.
- Create a pull request with reference (url) to the task inside the Projects section.
- Rest and enjoy the great feeling of being a contributor.
- Create an issue
- Create a pull request with reference to the issue
- Rest and enjoy the great feeling of being a contributor.