Giter Club home page Giter Club logo

Comments (6)

dcadenas avatar dcadenas commented on September 16, 2024 1

No worries at all! Family always comes first, and I hope your little one gets well soon. I'm just grateful for the work you've put into the library and am looking forward to your insights whenever you have the time. Take care and thank you for the update!

from ractor.

dcadenas avatar dcadenas commented on September 16, 2024 1

My only concern with pushing logic into the sender space, is that we effect QoS for all downstream targets for a single slow target, which kind of breaks the point of pub-sub models. Probably the correct approach (imho) is dealing with the LAGGED signal as you suggest.

I hadn't considered that, which makes a lot of sense. Here's some code to reproduce the issue:

use std::time::Duration;

use tracing_subscriber::{fmt, prelude::*, EnvFilter};

use ractor::{Actor, ActorProcessingErr, ActorRef, OutputPort};

struct Counter;

struct CounterState {
    count: i64,
    output_port: OutputPort<i64>,
}

enum CounterMessage {
    Increment(i64),
    Subscribe(ActorRef<CounterMessage>),
    DoSomethingSlow(i64),
}

#[ractor::async_trait]
impl Actor for Counter {
    type Msg = CounterMessage;

    type State = CounterState;
    type Arguments = ();

    async fn pre_start(
        &self,
        _myself: ActorRef<Self::Msg>,
        _: (),
    ) -> Result<Self::State, ActorProcessingErr> {
        tracing::info!("Starting the counter actor");
        // create the initial state
        Ok(CounterState {
            count: 0,
            output_port: OutputPort::default(),
        })
    }

    async fn handle(
        &self,
        _myself: ActorRef<Self::Msg>,
        message: Self::Msg,
        state: &mut Self::State,
    ) -> Result<(), ActorProcessingErr> {
        match message {
            CounterMessage::Increment(how_much) => {
                if state.count < 5 {
                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                }

                if state.count == 5 {
                    tracing::info!("Producer speed up, to ensure that we fill the 10 items broadcast buffer, receiver loop will be broken from lagging messages and silently drop");
                }

                state.count += how_much;
                tracing::info!("Producer current count: {}", state.count);
                state.output_port.send(how_much);
            }
            CounterMessage::Subscribe(subscriber) => {
                state
                    .output_port
                    .subscribe(subscriber, |i64| Some(CounterMessage::DoSomethingSlow(i64)));
            }
            CounterMessage::DoSomethingSlow(value) => {
                tokio::time::sleep(std::time::Duration::from_secs(2)).await;
                state.count += value;
                tracing::info!("Consumer current count: {}", state.count);
            }
        }
        Ok(())
    }
}

fn init_logging() {
    tracing_subscriber::registry()
        .with(fmt::layer())
        .with(EnvFilter::from_default_env())
        .init();
}

#[tokio::main]
async fn main() {
    init_logging();

    let (actor, handle) = Actor::spawn(Some("test_name".to_string()), Counter, ())
        .await
        .expect("Failed to start actor!");

    let (actor_subscriber, handle_subscriber) =
        Actor::spawn(Some("test_subscriber".to_string()), Counter, ())
            .await
            .expect("Failed to start actor!");

    actor
        .send_message(CounterMessage::Subscribe(actor_subscriber.clone()))
        .expect("Failed to send message");

    // +5 +10 -5 a few times, printing the value via RPC
    for _i in 0..25 {
        actor
            .send_message(CounterMessage::Increment(1))
            .expect("Failed to send message");
    }

    tokio::spawn(async move {
        tokio::time::sleep(Duration::from_secs(25)).await;
        actor.stop(None);
        actor_subscriber.stop(None);
    });

    handle.await.expect("Actor failed to exit cleanly");
    handle_subscriber
        .await
        .expect("Actor failed to exit cleanly");
}

I'm happy to share the real use case because it's open source. It's a server that consumes direct messages from the Nostr network.

The server processes messages that can arrive very quickly from a websocket port connected to the Nostr network relays. Initially, I encountered no issues because the test messages from my local test server were fewer than 10. However, as soon as I exceeded that limit, the initial query returned too much data too quickly, so my message handler here wasn't being called due to a broken loop. I was puzzled because my subscriber wasn't being called. Eventually, I "solved" it by adding a limit to the initial query here. This workaround mostly prevents such initial large bursts, which would be rare for the moment, but it's not a real long term solution.

I'm pretty new to Rust and the actor model, so any feedback or suggestion on how to use the crate or anything else will always be welcome.

from ractor.

slawlor avatar slawlor commented on September 16, 2024

I just wanted to state I'm seeing these posts, and am thrilled with the progress. However a sick 5-month-old has prevented me from doing proper reviews here. Rest assured I will get to it, sorry for the delay

from ractor.

slawlor avatar slawlor commented on September 16, 2024

OK now I've actually had 20 minutes to read this!

I'm actually surprised this is an issue. In the OutputPort, the spawned task is just doing a recv() -> map() -> send() to the target actor, which has an unbounded buffer. Do you have a repro you can share of where this is missing messages under heavy load? (I understand if it's proprietary or something of course).

However if this is indeed a problem, it means the tokio scheduler is really busy that it can't do a simple dequeue of a port, then two blocking operations.

My only concern with pushing logic into the sender space, is that we effect QoS for all downstream targets for a single slow target, which kind of breaks the point of pub-sub models. Probably the correct approach (imho) is dealing with the LAGGED signal as you suggest.

from ractor.

slawlor avatar slawlor commented on September 16, 2024

I'm trying to understand what's going on here, we're creating a broadcast channel with a "buffer" of 10 messages. We were exiting the subscription if we got a Lagged(num_dropped) error which isn't right I think. In the event we're lagged, we should log it and probably try and continue on.

However I'm honestly surprised this is happening, because we spawn a dedicated task to take the output port's received messages and forward them to an actor's (unbounded) channel. The only thing I could think of is if the converter is really slow converting messages from type A -> B.

Like as long as the subscription is running, we shouldn't ever fill the buffer of 10 unless we're under super heavy cpu pressure I would hope.

from ractor.

slawlor avatar slawlor commented on September 16, 2024

I added a branch to handle the lagged error, but i'm not in love with the idea of dropping messages. It makes me wonder if I should write my own implementation of the broadcast channel...

d4767b2

from ractor.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.