Giter Club home page Giter Club logo

Comments (7)

ThomasBorghs avatar ThomasBorghs commented on May 18, 2024 4

@xmlking

Probably a fair bit late with my answer, but I was faced with a similar challenge as you last week: Several web clients connecting to a backend server sent events stream. I implemented this using the ConnectableFlux construct from Reactor, as documented here: http://projectreactor.io/docs/core/release/reference/#advanced-broadcast-multiple-subscribers-connectableflux.

ConnectableFlux allows you to keep the KafkaReceiver open and subscribe clients through SSE dynamically. For my use case I implemented this as follows (an extract from my Spring WebFlux RequestHandler):

// the wrapped KafkaReceiver
private final ConnectableFlux<ServerSentEvent<NetworkPointEvent>> eventPublisher;

public RequestHandler(Map<String, Object> config) {
    // conversion to ConnectableFlux. 
    // Alternative to "publish is "replay"
    // which resends all past received Kafka Messages to each new observer
    eventPublisher = kafkaReceiver.receive()
        .map(consumerRecord -> ServerSentEvent.builder(consumerRecord.value()).build())
        .publish();

    // subscribes to the KafkaReceiver -> starts consumption (without observers attached)
    eventPublisher.connect();
}

public RouterFunction<ServerResponse> getRouterFunction() {
    return RouterFunctions.route(GET("/kafka-messages"), this::subscribeToMessages);
}

private Mono<ServerResponse> subscribeToMessages(ServerRequest serverRequest) {
    return ServerResponse.status(HttpStatus.OK).body(BodyInserters.fromServerSentEvents(eventPublisher));
}

Hope this still provides some use to anyone!

from reactor-kafka.

artemyarulin avatar artemyarulin commented on May 18, 2024 1

For the reference - here similar issue with some solution as well #56

from reactor-kafka.

xmlking avatar xmlking commented on May 18, 2024

I got some success with following workaround to keep kafkaReceiver alive. i.e., declaring logStream as local variable in StreamHandler
I still need to create a fake subscriber to prevent it from unsubscribing when all web clients are closed.

@Component
class StreamHandler(final val kafkaDataReceiver: KafkaReceiver<String, String>,
                    final val objectMapper: ObjectMapper) {
        final val logStream = kafkaDataReceiver.receive().map { objectMapper.readValue(it.value(), Log::class.java)  }.log("StreamHandler").share()

    init{
        logStream.subscribe {
                //TODO: fake subscriber to keep kafka connect active
        }
    }
    fun fetchLogsSSE(req: ServerRequest) = ok()
            .contentType(TEXT_EVENT_STREAM)
            .body(logStream, Log::class.java)

    fun fetchLogs(req: ServerRequest) = ok()
            .contentType(APPLICATION_STREAM_JSON)
            .body(logStream, Log::class.java)
}

from reactor-kafka.

ilayaperumalg avatar ilayaperumalg commented on May 18, 2024

Hi @xmlking

Thanks for reporting and sorry for the delay responding to your question.

By design, the Kafka Receiver doesn't support multiple subscribers subscribing to the Flux on Kafka Receiver. One of the main reasons is to use the Kafka consumer model natively.

On the first one you posted, I don't see how you subscribe to the logStream. Maybe you are trying to create the Receiver's consumer Flux more than one times.

In the case where you worked around this situation, it is made sure that there is a single subscriber based on your fake subscriber invocation.

It might be worth conditionally operating your routes from the actual subscribe method itself. This way you keep the single subscription model as well you have all your subscriber logic inside the actual subscribe method.

You can see a sample here

from reactor-kafka.

ilayaperumalg avatar ilayaperumalg commented on May 18, 2024

@xmlking Closing this for now. Please feel free to re-open if you need more information or have any suggestion on this. Thanks!

from reactor-kafka.

xmlking avatar xmlking commented on May 18, 2024

Thanks @ThomasBorghs will try this solution
Here is my code base

https://github.com/xmlking/microservices-observability/blob/master/stream-service/src/main/kotlin/com/example/StreamApplication.kt

from reactor-kafka.

yaochaoutokyo avatar yaochaoutokyo commented on May 18, 2024

I solved the problem by transforming the KafkaFlux to a shared flux
image

from reactor-kafka.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.