Comments (7)
Probably a fair bit late with my answer, but I was faced with a similar challenge as you last week: Several web clients connecting to a backend server sent events stream. I implemented this using the ConnectableFlux construct from Reactor, as documented here: http://projectreactor.io/docs/core/release/reference/#advanced-broadcast-multiple-subscribers-connectableflux.
ConnectableFlux allows you to keep the KafkaReceiver open and subscribe clients through SSE dynamically. For my use case I implemented this as follows (an extract from my Spring WebFlux RequestHandler):
// the wrapped KafkaReceiver
private final ConnectableFlux<ServerSentEvent<NetworkPointEvent>> eventPublisher;
public RequestHandler(Map<String, Object> config) {
// conversion to ConnectableFlux.
// Alternative to "publish is "replay"
// which resends all past received Kafka Messages to each new observer
eventPublisher = kafkaReceiver.receive()
.map(consumerRecord -> ServerSentEvent.builder(consumerRecord.value()).build())
.publish();
// subscribes to the KafkaReceiver -> starts consumption (without observers attached)
eventPublisher.connect();
}
public RouterFunction<ServerResponse> getRouterFunction() {
return RouterFunctions.route(GET("/kafka-messages"), this::subscribeToMessages);
}
private Mono<ServerResponse> subscribeToMessages(ServerRequest serverRequest) {
return ServerResponse.status(HttpStatus.OK).body(BodyInserters.fromServerSentEvents(eventPublisher));
}
Hope this still provides some use to anyone!
from reactor-kafka.
For the reference - here similar issue with some solution as well #56
from reactor-kafka.
I got some success with following workaround to keep kafkaReceiver
alive. i.e., declaring logStream
as local variable in StreamHandler
I still need to create a fake subscriber to prevent it from unsubscribing when all web clients are closed.
@Component
class StreamHandler(final val kafkaDataReceiver: KafkaReceiver<String, String>,
final val objectMapper: ObjectMapper) {
final val logStream = kafkaDataReceiver.receive().map { objectMapper.readValue(it.value(), Log::class.java) }.log("StreamHandler").share()
init{
logStream.subscribe {
//TODO: fake subscriber to keep kafka connect active
}
}
fun fetchLogsSSE(req: ServerRequest) = ok()
.contentType(TEXT_EVENT_STREAM)
.body(logStream, Log::class.java)
fun fetchLogs(req: ServerRequest) = ok()
.contentType(APPLICATION_STREAM_JSON)
.body(logStream, Log::class.java)
}
from reactor-kafka.
Hi @xmlking
Thanks for reporting and sorry for the delay responding to your question.
By design, the Kafka Receiver doesn't support multiple subscribers subscribing to the Flux on Kafka Receiver. One of the main reasons is to use the Kafka consumer model natively.
On the first one you posted, I don't see how you subscribe to the logStream
. Maybe you are trying to create the Receiver's consumer Flux more than one times.
In the case where you worked around this situation, it is made sure that there is a single subscriber based on your fake subscriber invocation.
It might be worth conditionally operating your routes from the actual subscribe method itself. This way you keep the single subscription model as well you have all your subscriber logic inside the actual subscribe method.
You can see a sample here
from reactor-kafka.
@xmlking Closing this for now. Please feel free to re-open if you need more information or have any suggestion on this. Thanks!
from reactor-kafka.
Thanks @ThomasBorghs will try this solution
Here is my code base
from reactor-kafka.
I solved the problem by transforming the KafkaFlux to a shared flux
from reactor-kafka.
Related Issues (20)
- reactor-kafka docs from 1.3.11 above referencing old 1.1.0.RELEASE docs HOT 4
- Reference doc format error in "Sample Scenarios" header HOT 4
- Micrometer Metrics HOT 3
- GH-321 / PR 325 - Observation propagation HOT 4
- Documentation Feedback on Reactor Kafka's Observation API HOT 6
- GH-321 / PR 325 - No trace observed in neither log, neither trace aggregation system HOT 10
- Rebalancing always waits until maxDelayRebalance with AckMode.EXACTLY_ONCE
- GH-321 / PR 325 - Observation propagation issue, traces are the same while it should not HOT 4
- Allow SenderRecord to take into account existing Observation HOT 13
- ClassCastException on attempt to get bootstrap server from SenderOption/ReceiverOptions HOT 3
- Reactive kafka consumer doesn't pause on calling the consumer.pause() method
- Metrics reactor_{receiver|sender}_active{sum|count|bucket} always shows 0 HOT 1
- Provide option for KafkaReceiver's graceful shutdown HOT 2
- Expose KafkaConsumer in ReceiverOptionsCustomizer.addAssignListener() - parity with KafkaBindingRebalanceListener
- Multiple kafka Consumers in Same Consumer Group receiving same messages
- Micrometer tracing context propagation problem HOT 1
- Reactor Kafka Producer Not Following Round Robin Assignment
- Application Pod Crashed: Kafka Producer Exhausted 3GB Heap Memory When Broker Failed
- Consumption stopped with slow consumer
- reactor.kafka.receiver.internals.MockReceiverTest > consumerMethods FAILED
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from reactor-kafka.