Comments (6)
No worries at all! Family always comes first, and I hope your little one gets well soon. I'm just grateful for the work you've put into the library and am looking forward to your insights whenever you have the time. Take care and thank you for the update!
from ractor.
My only concern with pushing logic into the sender space, is that we effect QoS for all downstream targets for a single slow target, which kind of breaks the point of pub-sub models. Probably the correct approach (imho) is dealing with the LAGGED signal as you suggest.
I hadn't considered that, which makes a lot of sense. Here's some code to reproduce the issue:
use std::time::Duration;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use ractor::{Actor, ActorProcessingErr, ActorRef, OutputPort};
struct Counter;
struct CounterState {
count: i64,
output_port: OutputPort<i64>,
}
enum CounterMessage {
Increment(i64),
Subscribe(ActorRef<CounterMessage>),
DoSomethingSlow(i64),
}
#[ractor::async_trait]
impl Actor for Counter {
type Msg = CounterMessage;
type State = CounterState;
type Arguments = ();
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
tracing::info!("Starting the counter actor");
// create the initial state
Ok(CounterState {
count: 0,
output_port: OutputPort::default(),
})
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
CounterMessage::Increment(how_much) => {
if state.count < 5 {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
if state.count == 5 {
tracing::info!("Producer speed up, to ensure that we fill the 10 items broadcast buffer, receiver loop will be broken from lagging messages and silently drop");
}
state.count += how_much;
tracing::info!("Producer current count: {}", state.count);
state.output_port.send(how_much);
}
CounterMessage::Subscribe(subscriber) => {
state
.output_port
.subscribe(subscriber, |i64| Some(CounterMessage::DoSomethingSlow(i64)));
}
CounterMessage::DoSomethingSlow(value) => {
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
state.count += value;
tracing::info!("Consumer current count: {}", state.count);
}
}
Ok(())
}
}
fn init_logging() {
tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.init();
}
#[tokio::main]
async fn main() {
init_logging();
let (actor, handle) = Actor::spawn(Some("test_name".to_string()), Counter, ())
.await
.expect("Failed to start actor!");
let (actor_subscriber, handle_subscriber) =
Actor::spawn(Some("test_subscriber".to_string()), Counter, ())
.await
.expect("Failed to start actor!");
actor
.send_message(CounterMessage::Subscribe(actor_subscriber.clone()))
.expect("Failed to send message");
// +5 +10 -5 a few times, printing the value via RPC
for _i in 0..25 {
actor
.send_message(CounterMessage::Increment(1))
.expect("Failed to send message");
}
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(25)).await;
actor.stop(None);
actor_subscriber.stop(None);
});
handle.await.expect("Actor failed to exit cleanly");
handle_subscriber
.await
.expect("Actor failed to exit cleanly");
}
I'm happy to share the real use case because it's open source. It's a server that consumes direct messages from the Nostr network.
The server processes messages that can arrive very quickly from a websocket port connected to the Nostr network relays. Initially, I encountered no issues because the test messages from my local test server were fewer than 10. However, as soon as I exceeded that limit, the initial query returned too much data too quickly, so my message handler here wasn't being called due to a broken loop. I was puzzled because my subscriber wasn't being called. Eventually, I "solved" it by adding a limit to the initial query here. This workaround mostly prevents such initial large bursts, which would be rare for the moment, but it's not a real long term solution.
I'm pretty new to Rust and the actor model, so any feedback or suggestion on how to use the crate or anything else will always be welcome.
from ractor.
I just wanted to state I'm seeing these posts, and am thrilled with the progress. However a sick 5-month-old has prevented me from doing proper reviews here. Rest assured I will get to it, sorry for the delay
from ractor.
OK now I've actually had 20 minutes to read this!
I'm actually surprised this is an issue. In the OutputPort
, the spawned task is just doing a recv() -> map() -> send()
to the target actor, which has an unbounded buffer. Do you have a repro you can share of where this is missing messages under heavy load? (I understand if it's proprietary or something of course).
However if this is indeed a problem, it means the tokio scheduler is really busy that it can't do a simple dequeue of a port, then two blocking operations.
My only concern with pushing logic into the sender space, is that we effect QoS for all downstream targets for a single slow target, which kind of breaks the point of pub-sub models. Probably the correct approach (imho) is dealing with the LAGGED
signal as you suggest.
from ractor.
I'm trying to understand what's going on here, we're creating a broadcast channel with a "buffer" of 10 messages. We were exiting the subscription if we got a Lagged(num_dropped)
error which isn't right I think. In the event we're lagged, we should log it and probably try and continue on.
However I'm honestly surprised this is happening, because we spawn a dedicated task to take the output port's received messages and forward them to an actor's (unbounded) channel. The only thing I could think of is if the converter is really slow converting messages from type A -> B.
Like as long as the subscription is running, we shouldn't ever fill the buffer of 10 unless we're under super heavy cpu pressure I would hope.
from ractor.
I added a branch to handle the lagged error, but i'm not in love with the idea of dropping messages. It makes me wonder if I should write my own implementation of the broadcast channel...
from ractor.
Related Issues (20)
- About section needs update for async-std
- Support `async fn` in traits. HOT 1
- Request for TCP Echo Server Example with TcpListener and TcpStream as Actors HOT 1
- Enum contains type of itself HOT 4
- Add support to downcast a BoxedMessage to get a reference to it's wrapped type without consuming it HOT 4
- When panic=abort is on, panic is not captured. HOT 1
- Lifetimes do not match method in trait HOT 2
- Subscriber-Driven OutputPort Subscriptions
- `post_stop` of children are being called when supervisor fails. HOT 1
- Not depend protobuf-src on windows HOT 1
- Compiler panics HOT 2
- Handle multiple message types HOT 1
- Awaiting input in an actor HOT 1
- SpawnErr when spawning named actor will permanently pollute that name
- As a subscriber, I should can subscribe multiple type of messages from publisher HOT 2
- With async-trait turned off, it is impossible to use factory HOT 3
- flush() method to help testing HOT 1
- call_t! not working with predefined trait type HOT 1
- Add a pre_stop fn or some way to wait until post_stop has finished HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from ractor.