Giter Club home page Giter Club logo

xactor's Introduction

Xactor is a rust actors framework based on async-std

Documentation

Features

  • Async actors.
  • Actor communication in a local context.
  • Using Futures for asynchronous message handling.
  • Typed messages (No Any type). Generic messages are allowed.

Examples

use xactor::*;

#[message(result = "String")]
struct ToUppercase(String);

struct MyActor;

impl Actor for MyActor {}

#[async_trait::async_trait]
impl Handler<ToUppercase> for MyActor {
    async fn handle(&mut self, _ctx: &mut Context<Self>, msg: ToUppercase) -> String {
        msg.0.to_uppercase()
    }
}

#[xactor::main]
async fn main() -> Result<()> {
    // Start actor and get its address
    let mut addr = MyActor.start().await?;

    // Send message `ToUppercase` to actor via addr
    let res = addr.call(ToUppercase("lowercase".to_string())).await?;
    assert_eq!(res, "LOWERCASE");
    Ok(())
}

Performance

https://github.com/sunli829/xactor-benchmarks

Installation

Xactor requires async-trait on userland.

With cargo add installed, run:

$ cargo add xactor
$ cargo add async-trait

We also provide the tokio runtime instead of async-std. To use it, you need to activate runtime-tokio and disable default features.

You can edit your Cargo.toml as follows:

xactor = { version = "x.x.x", features = ["runtime-tokio"], default-features = false }

References

xactor's People

Contributors

alan5142 avatar alborq avatar critocrito avatar d1plo1d avatar hecsalazarf avatar hoodie avatar jracollins avatar mneumann avatar mu2019 avatar payload avatar sunli829 avatar tron0xhex avatar wpbrown avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

xactor's Issues

Allow `started` and `stopped` to return a Result<(), Error>

Hi,
Cause the world is not perfect, i need have a failable protocole to start an Actor.

Cause i have a subactor, when i stop them sub_address.stop() return a Result<(), Error>, soo, i cant use ? to transmit this error to the top.

I suggest to change this prototype to :

-    async fn started(&mut self, ctx: &Context<Self>) {}
-    async fn stopped(&mut self, ctx: &Context<Self>) {}

+    async fn started(&mut self, ctx: &Context<Self>) -> Result<(), Error> { Ok(()) }
+    async fn stopped(&mut self, ctx: &Context<Self>) -> Result<(), Error> { Ok(()) }

What do you thinks about that ?
May i do a PR ?

Actor Pooling

Hey there,

This library looks really cool, thanks! I was wondering if you had some plans to add something like Actix's SyncArbiter. Xactor runs on tokio or other async run times that can be multithreaded, so it can be useful to have "replicas" of Actors to allow concurrent processing of tasks. Is this something you'd consider adding?
An example is a web or database server that wants to have as many worker actors as threads on the system, but they are all replicas of the same Actor instance which poll for messages from a single queue.

Thanks!

Can't await Caller in Handlers

Using latest master:

error: future cannot be sent between threads safely
  --> src/main.rs:35:79
   |
35 |       async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: Ping) -> usize {
   |  _______________________________________________________________________________^
36 | |         let caller = self.other_actor.caller::<Ping>();
37 | |         let result = caller.call(Ping(10)).await.unwrap();
38 | |         self.count
39 | |     }
   | |_____^ future returned by `__handle` is not `Send`
   |
   = help: the trait `std::marker::Sync` is not implemented for `(dyn std::ops::Fn(Ping) -> std::pin::Pin<std::boxed::Box<(dyn std::future::Future<Output = std::result::Result<usize, anyhow::Error>> + std::marker::Send + 'static)>> + std::marker::Send + 'static)`
note: future is not `Send` as this value is used across an await
  --> src/main.rs:37:22
   |
37 |         let result = caller.call(Ping(10)).await.unwrap();
   |                      ------^^^^^^^^^^^^^^^^^^^^^         - `caller` is later dropped here
   |                      |
   |                      await occurs here, with `caller` maybe used later
   |                      has type `&xactor::Caller<Ping>` which is not `Send`
help: consider moving this into a `let` binding to create a shorter lived borrow
  --> src/main.rs:37:22
   |
37 |         let result = caller.call(Ping(10)).await.unwrap();
   |                      ^^^^^^^^^^^^^^^^^^^^^
   = note: required for the cast to the object type `dyn std::future::Future<Output = usize> + std::marker::Send`

error: aborting due to previous error

The key issue seems to be that &xactor::Caller needs to pass through the await and async_trait wants the handler future to be send. For &xactor::Caller to be Send, xactor:Caller has to be Sync. This can be partially mitigated by rewriting Caller::call to return the created future:

impl<T: Message> Caller<T> {
    pub fn call(&self, msg: T) -> CallerFuture<T> {
        (self.caller_fn)(msg)
    }
}

That allows the following to work, but it is still a bit akward. We have to store fut and await it in a new statement to avoid the async generator from capturing an &xactor:Caller for self in Caller:call.

#[async_trait::async_trait]
impl Handler<Ping> for MyActor2 {
    async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: Ping) -> usize {
        let caller = self.other_actor.caller::<Ping>();
        let fut = caller.call(Ping(10));
        let result = fut.await.unwrap();
        self.count
    }
}

We can wrap the boxed caller_fn in a Mutex to make xactor:Caller Sync and thus &xactor:Caller Send. This lets the generated future for the handler to capture &xactor:Caller and still be send itself.

pub struct Caller<T: Message> {
    pub actor_id: u64,
    pub(crate) caller_fn: Mutex<CallerFn<T>>,
}

impl<T: Message> Caller<T> {
    pub fn call(&self, msg: T) -> CallerFuture<T> {
        (self.caller_fn.lock().unwrap())(msg)
    }
}

I believe the unwrap should never panic because the code run under the lock is from xactor itself and doesn't panic.

This finally allows:

#[async_trait::async_trait]
impl Handler<Ping> for MyActor2 {
    async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: Ping) -> usize {
        let caller = self.other_actor.caller::<Ping>();
        let result = caller.call(Ping(10)).await.unwrap();
        self.count
    }
}

I'm not sure if this is the best solution, but I haven't been able to figure out an alternative. We don't necessarily need Caller to thread-safe, the Mutex is just working around the fact that &xactor::Caller has to be captured for Caller::call.

Documentation: How best to handle errors in handler?

What is best practice for when a handler encounters an error in the actor model? Is it better that I return a #[message(result = "Result<()>")] or is it better to stop the Actor and let the Supervisor restart it? When is each approach better?

Can Caller be Clone?

Since Caller is the equivalent of a channel's Sender, I would expect to be able to freely clone it. I am aware that I can get around this if I have access to the underlying Addr, but I would like to avoid that so that my code can be abstract over the exact type of the actor that actually implements the Caller I'm after. (And deal with collections of Caller<T> with heterogenous implementators)

Supervisors start intervals again on restart

I just noticed that Context intervals are not stopped expliccitely when a Context or Actor is stopped. in the case of a supervisor restarting the actor, this has the effect that the interval is started a second time if it is started inside of fn started().

#[xactor::main] Main not working

Hello, when i try to use macro : #[xactor::main]
App dont build,
I get this error :

error: macros that expand to items must be delimited with braces or followed by a semicolon
 --> src/main.rs:4:1
  |
4 | / async fn main() -> Result<()> {
5 | |     Ok(())
6 | | }
  | |_^
  |
help: change the delimiters to curly braces
  |
4 | {
5 | }
  |
help: add a semicolon
  |
6 | };
  |  ^

error: macro expansion ignores token `,` and any following
 --> src/main.rs:4:1
  |
3 |   #[xactor::main]
  |   --------------- caused by the macro expansion here
4 | / async fn main() -> Result<()> {
5 | |     Ok(())
6 | | }
  | |_^
  |
  = note: the usage of `xactor::main!` is likely invalid in item context

error: main function cannot have a return value
 --> src/main.rs:4:1
  |
4 | / async fn main() -> Result<()> {
5 | |     Ok(())
6 | | }
  | |_^

warning: unused import: `xactor::*`
 --> src/main.rs:1:5
  |
1 | use xactor::*;
  |     ^^^^^^^^^
  |
  = note: `#[warn(unused_imports)]` on by default

error[E0601]: `main` function not found in crate `popdb`
 --> src/main.rs:1:1
  |
1 | / use xactor::*;
2 | |
3 | | #[xactor::main]
4 | | async fn main() -> Result<()> {
5 | |     Ok(())
6 | | }
  | |_^ consider adding a `main` function to `src/main.rs`

error: aborting due to 4 previous errors; 1 warning emitted

My test code is pretty simple

use xactor::*;

#[xactor::main]
async fn main() -> Result<()> {
    Ok(())
}

I dont see where it fail and how fix... cant make a proposal.

How to test for effects in handlers?

I'm just getting started with xactor and starting to think about how I'll unit test later on.

It looks like Context can be used to create effects with functions like send_later and send_interval. How would a unit test verify that a handler requests an effect like send_interval in a particular scenario?

Support concurrent processing of messages by actor

As far as I understand the current implementation of start_actor() will process messages sequentially. The next message will only be processed, once the previous one is completely processed, even if the actor's async fn handle() yields.

Theoretically async fn handle() could be executed concurrently instead, for example by using something like StreamExt#for_each_concurrent().
But then handle() cannot be given mutable access to self anymore obviously. So the actor would need to use interior mutability to change state.

What are you thoughts on this?

Callers/Senders can't be tested

Hi there,
I just noticed that Sender<T> and Caller<T> just return an error when sending the message.
For convenience it would be nicer if you could test if they can be used without having to send a message.
How about I open a PR about this?

Sender / Caller docu

There is no real documentation about the usage of senders / callers, although there are a crucial part to minimize dependencies.

If you are open for that to change I would like to update the documentation and provide a working example using it, or maybe adjust the subscriber example to use Sender / Caller?

Supervisors cannot be stopped (formerly "Supervisor causes panic")

Hi there,
thanks for the nice crate ๐Ÿ‘
I was trying out Supervisors though and I don't quite understand how to use them. I seem to always get a panic inside of futures-channels mpcs code when I launch one.

here is a minimal example

use xactor::Actor;

pub struct ServiceA {
    pub session_id: u32,
}

#[async_trait::async_trait]
impl Actor for ServiceA { }

#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    xactor::Supervisor::start(|| ServiceA { session_id: 4 }).await?;

    Ok(())
}

this causes a panic with the following messages for me

thread 'async-std/runtime' panicked at 'Receiver::next_message called after `None`', /home/hoodie/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-channel-0.3.5/src/mpsc/mod.rs:1141:41

do you have any idea what could be the reason for this?

thank you

Current maintenance status and any concept of a recipient?

What's the current maintenance of this crate? If you have active plans to maintain, I'd consider using this over actix.

Separately, Actix has the concept of a recipient, which is basically a wrapper around the address sender. Wondering if you have something similar - couldn't find anything after looking through the docs.

How to have multiple actors subscribe to the same task and not duplicate work?

Currently if I spawn multiple actors:

let _daddr = DownloadActor::start_default().await.unwrap();
let _daddr2 = DownloadActor::start_default().await.unwrap();

and then publish a job:

Broker::from_registry().await.unwrap().publish(msg)

the effect I get is that each actor picks up the job. What I'd like is just one of them to pick up the job, like you'd have with a normal FIFO queue system.

Is that possible with xactor?

Documentation - Global Actors

The documentation is very clear on how to create a Global Actor / Service.

However, would anyone be able to share the idea behind Global Actor(s) and Local Actor(s) as opposed to normal Actor(s).

What is the advantage of having a Global Actor (Service) and why would one want to have a Global Actor? I'm gathering it'd have something to do with sharing state, but could someone please explain in more details?

using anyhow::Error

Why use anyhow::Error, I don't get why it is needed, it leaks the interface so I have to add anyhow to my dependencies if I want to handle errors from xactor.Also, I would like to not have to learn about "anyhow" just to use xactor. I think a concrete Error struct would be better maybe?

Subscription system doesn't play well with System::wait_all()

Consider example:

use xactor::*;

#[message]
#[derive(Clone)]
struct Die;

struct MyActor;

#[async_trait::async_trait]
impl Actor for MyActor {
    async fn started(&mut self, ctx: &Context<Self>) {
        ctx.subscribe::<Die>().await.unwrap();
    }
}

#[async_trait::async_trait]
impl Handler<Die> for MyActor {
    async fn handle(&mut self, ctx: &Context<Self>, _msg: Die) {
        // Stop the actor without error
        ctx.stop(None);
    }
}

#[xactor::main]
async fn main() {
    MyActor.start().await;

    Broker::from_registry().await.publish(Die).unwrap();
}

Here main() never returns because there is a Broker in registry for Die message. Workaround may be to store broker's Addr somewhere and stop it explicitly when needed but then it becomes a user responsibility and may be confusing without documentation.

So my proposition will be either handle Brokers lifecycle inside crate or update documentation so users know about theirs responsibility. What do you think?

Deadlock inside Service::from_registry()

Service::from_registry locks the registry until Actor::started for this service is finished. If I try to subscribe the service to some messages inside Actor::started function or try to start another service there, I will get a deadlock. Example:

use xactor::*;

#[derive(Default)]
struct MyService;

#[async_trait::async_trait]
impl Actor for MyService {
    async fn started(&mut self, ctx: &Context<Self>) {
        AnotherService::from_registry().await; // ctx.subscribe::<AnyMessage>().await doesn't work also
    }
}

impl Service for MyService {}

#[derive(Default)]
struct AnotherService;

impl Actor for AnotherService {}

impl Service for AnotherService {}

#[xactor::main]
async fn main() {
    MyService::from_registry().await;

    println!("Unreachable!");
}

sending interval through context gives panic

Looked in the source code and it looks like there is a comment noting this in context.rs;

    /// Returns the address of the actor.
    pub fn address(&self) -> Addr<A> {
        Addr {
            actor_id: self.actor_id,
            // This getting unwrap panics   <---- saw this
            tx: self.tx.upgrade().unwrap(),
            rx_exit: self.rx_exit.clone(),
        }
    }

If anyone has ideas, I'd be happy to try to help or I can continue to dig into it further.

Was trying to send on interval a message to itself within the Actor's handle of a message.

#[message]
#[derive(Debug, Clone)]
pub struct Refresh;

#[async_trait]
impl Handler<RequestSchedule> for ReqBasic {
    async fn handle(&mut self, ctx: &mut Context<Self>, msg: RequestSchedule) {
        println!("Actor::ReqBasic message<RequestSchedule> received: {:?}", msg);
        ctx.send_interval(Refresh{}, Duration::from_secs(msg.interval_sec));  // <---- creates panic
    }
}

Gives panic

Actor::ReqBasic message<RequestSchedule> received: RequestSchedule { name: "request_name", api_url: "https://test.com/", interval_sec: 10 }
thread 'async-std/runtime' panicked at 'called `Option::unwrap()` on a `None` value', C:\Users\...\.cargo\registry\src\github.com-1ecc6299db9ec823\xactor-0.7.11\src\context.rs:59:35

Actors not using multiple CPUs

As far as I understand, on most actor implementations, different actors can process their own messages in parallel with respect to other actors, which makes things go faster.
But I can't seem to make xactor do that:

use xactor::{message, Actor, Context, Handler, Result};

const ACTORS: usize = 8;

#[message(result = "usize")]
struct Calc;

struct Calculator;

impl Actor for Calculator {}

#[async_trait::async_trait]
impl Handler<Calc> for Calculator {
    async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: Calc) -> usize {
        let mut res: usize = 1;

        for i in 2..100_000_000 {
            res = res.wrapping_mul(i);
            res /= i - 1;
        }

        res
    }
}

#[xactor::main]
async fn main() -> Result<()> {
    let mut children = Vec::with_capacity(ACTORS);
    let mut responses = Vec::with_capacity(ACTORS);

    for _ in 0..ACTORS {
        children.push(Calculator.start().await.unwrap());
    }

    for addr in &children {
        responses.push(addr.call(Calc));
    }

    let mut res = 0;

    for promise in responses {
        res += promise.await.unwrap();
    }

    println!("result: {}", res);

    Ok(())
}

This is what I get:

$ env ASYNC_STD_THREAD_COUNT=8 time -v target/release/xactor
result: 799999992
        Command being timed: "target/release/xactor"
        User time (seconds): 5.58
        System time (seconds): 0.00
        Percent of CPU this job got: 99%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 0:05.58
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 2804
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 0
        Minor (reclaiming a frame) page faults: 211
        Voluntary context switches: 51
        Involuntary context switches: 504
        Swaps: 0
        File system inputs: 11
        File system outputs: 0
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 0
        Page size (bytes): 4096
        Exit status: 0

Since AFAIU I am sending the messages to all the actors before starting to await their responses, I would have expected CPU usage to be around 800% on my 8-logical-core CPU rather than 99%.

Am I doing something wrong or is this expected?

Clarify benchmarks in docs?

Currently, the following table of benchmark results is feature in the documentation:

Wait for response Send only
Actix 1548 ms 14 ms
Xactor 930 ms 18 ms

I'm a bit confused by this. To me, "send only" sounds like the latency of sending a message, but the time given is orders of magnitude slower than what one would expect with futures channels (more like hundreds of nanoseconds). Similarly, the time between "wait for response" and "send only" is two orders of magnitude apart, which is again unexpected (I would expect less than 2x difference based on my testing). I ran the benchmarks in the benchmarks repo, but could not find an obvious candidate that would have been used to generate these numbers. Could you help me by clarifying what this means and what methodology was used to measure the results?

32-bit platform error

I tried to compile xactor on a 32-bit platform, but unluckily it fails because it uses std::sync::atomic::AtomicU64, which is not available. I suppose that actor_ids should either be replaced with usize/AtomicUsize type or consider Mutex'ing for 32-bit platforms.

Stop does not clean registred stream

When i stop an actor registred stream keep alive

Exemple with given Actor :

use std::time::Duration;
use tokio::io;
use tokio::net::{TcpStream, TcpListener};
use xactor::*;

struct MyActor;

#[async_trait::async_trait]
impl Actor for MyActor {
    async fn started(&mut self, ctx: &Context<Self>) {
        let listener = TcpListener::bind("0.0.0.0:1234").await.unwrap();
        ctx.add_stream(listener);
        ctx.stop(None);
    }
}

#[async_trait::async_trait]
impl StreamHandler<io::Result<TcpStream>> for MyActor {
    async fn handle(&mut self, _ctx: &Context<Self>, inbound: io::Result<TcpStream>) {
        println!("echo !");
    }
}

#[xactor::main]
async fn main() {
    println!("Start");
    // Exit the program after 3 seconds
    let addr = MyActor.start().await;
    addr.wait_for_stop().await;
    println!("Actor stopped");
    
    sleep(Duration::from_secs(3600)).await;
}

I cant fix myself :(

  • I dont know how wait stream or onechannel on same time and resolve first
  • Actix choose another solution and encapsulate Stream in a dedicated sub-actor

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.