Giter Club home page Giter Club logo

Comments (7)

ispeters avatar ispeters commented on June 30, 2024 2

Hi @georgikoyrushki95, I was able to wrap a cppcoro::async_generator<int> in a Stream here: https://godbolt.org/z/Kh3rWTMao.

I don't think it makes sense to add that particular Stream type to Unifex because we intend to be free of dependencies, but a type that's analogous to cppcoro::async_generator<T> would be a welcome addition.

from libunifex.

ccotter avatar ccotter commented on June 30, 2024 1

Hi @georgikoyrushki95, I was able to wrap a cppcoro::async_generator<int> in a Stream here: https://godbolt.org/z/Kh3rWTMao.

Really nice! Thanks for sharing this example.

from libunifex.

georgikoyrushki95 avatar georgikoyrushki95 commented on June 30, 2024

Also if not, should it be built in a way that it interoperates with cppcoro::async_generator. Basically being able to go both ways cppcoro::async_generator <-> libunifex::async_stream?

from libunifex.

georgikoyrushki95 avatar georgikoyrushki95 commented on June 30, 2024

@ispeters thanks so much! This is exactly what I'm looking for!

I don't think it makes sense to add that particular Stream type to Unifex because we intend to be free of dependencies, but a type that's analogous to cppcoro::async_generator would be a welcome addition.

For this, do you mean a libunifex-specific async generator and adapting that to a stream? Basically identical to what you have done in your godbolt, the only difference being we add the equivalent of cppcoro::async_generator<T> in libunifex as a prerequisite?

Or do you want the stream to be backed by "pure" senders?

Both would work for us & it appears it'd be equivalent for the consumers of the stream values. The difference would be in how the values are produced, I guess.

Let me know your thoughts, I think this is something I'm interested to contribute :)

from libunifex.

ispeters avatar ispeters commented on June 30, 2024

@georgikoyrushki95, I think a coroutine-based async sequence and a "raw sender"-based one would serve different needs and offer different trade-offs so, if I get to be greedy, I want both.

Our experience at Meta has been that coroutines are easier to read, write, debug, and just generally maintain than composition-of-sender algorithms-style code. The cost of that ease is basically overhead; coroutines don't optimize as well as raw senders (either for size or speed). The advice we give to internal teams adopting Unifex is that they should prefer coroutines until they know that the overheads are unacceptable, at which point they can refactor to the lower-level abstraction of raw senders.

My ideal coroutine-based async sequence that's analogous to cppcoro::async_generator<T> would:

  • support co_await and co_yield inside the generating coroutine
  • feel like a unifex::task<> (have scheduler affinity, support awaiting Senders, support similar unwind-on-cancellation semantics, etc.)
    • note: this would also mean parameterizing the type to make it possible to opt in to a noexcept version like unifex::nothrow_task<>
  • map naturally to a Stream (possibly by being a Stream—I'm not sure what interfaces make sense)
    • note: this implies that an async generator coroutine type probably ought to support async clean-up, and it's not obvious to me how you'd express the clean-up work from within the coroutine; maybe unifex::at_coroutine_exit is the way?

I'm not sure what an ideal sender-based Stream would look like; I kind of suspect there's room for several different implementations. @janondrusek plans to upstream an internal type that's currently named unicast_flow; it's essentially a one-slot async queue with an interface like this:

template <typename T>
struct unicast_flow {
  /**
   * Sends t through the flow.
   * 
   * If all previous values of t have been consumed then this saves t
   * as a value within the flow and then completes immediately, waking
   * a pending consumer if there is one. If there's a previously saved t
   * waiting to be consumed, the sender will suspend until a consumer
   * notifies that the previously saved value has been consumed.
   * 
   * The returned sender completes with set_done if the stream consumer
   * has requested stop on a next-sender or awaited the cleanup-sender.
   */
  sender auto notify(const T& t);
  sender auto notify(T&& t);

  /**
   * Puts the flow in a "done" state.
   *
   * This works like "notifying an end-of-flow sentinel"; if there's a
   * consumer waiting for the next value, that consumer will be resumed
   * with set_done. If there's no waiting consumer, the next consumer to
   * arrive will complete with done. If there's a saved t value waiting to be
   * consumed, the sender returned from stop() will suspend until that t
   * has been consumed.
   */
  sender auto stop();

  /**
   * Returns a stream whose next-senders produce the values sent
   * through the flow by calls to notify(). The returned stream completes
   * when a next-sender observes the done signal sent when the producer
   * awaits the sender returned from stop().
   */
  stream auto start();
};

unicast_flow works pretty well for us but I think it leaves plenty of holes in the design space that could be explored with other types. One such hole is a Stream-shaped view of an unbounded queue, which would allow you to provide a synchronous, non-blocking producer interface. Another hole is a Stream that works like the coroutine-based design I described above, but implemented in terms of Senders; I'm not really sure what this would look like.

from libunifex.

georgikoyrushki95 avatar georgikoyrushki95 commented on June 30, 2024

Hey, so I have been looking into this over the past couple of days. Wanted to double-check something with you on the expected scheduler affinity of the async_generator. Basically, do we want to have it span suspensions of the generator when a co_yield is happening: this sounds like the most intuitive way, but does potentially involve some overhead (could end up in always rescheduling when we resume the generator). The below, in other words:

void foo()
{
    // assume thread where foo() executes is thread id == 0
    static unifex::single_thread_context genCtx; // assume thread id == 1

    auto generator = []() -> async_generator<int> {
        std::cout << "thread id before scheduling =" << std::this_thread::get_id() << std::endl; // prints 0
        co_await unifex::schedule(genCtx.get_scheduler());
        std::cout << "thread id after scheduling=" << std::this_thread::get_id() << std::endl; // prints 1
        for(int i = 0; i < 5; ++i) {         
               co_yield i;
               std::cout << "thread id after resuming gen =" << std::this_thread::get_id() << std::endl; // always prints 1
        }     
    }();

    auto result = bexsr::sync_wait([&generator]() -> task<int> {
        int sum = 0;
        for (auto itr = co_await generator.begin(); itr != generator.end(); co_await ++itr) {
            std::cout << "thread id in the outer task =" << std::this_thread::get_id() << std::endl; // always prints 0
            sum += *itr;
        }
        co_return sum;
    }());

    // do something with result
}

In essence, when the generator resumes, we always switch onto the latest thread it was scheduled on? And if that's the case, where would it be a good place to do the switch? My thinking is within the generator's awaiter::await_suspend, but not 100% sure how that can be expressed: basically I need to resume the schedule coroutine, which in itself needs to resume the generator. Is something like this possible?

from libunifex.

ispeters avatar ispeters commented on June 30, 2024

Sorry for the delayed response; holiday PTO and performance review season have me inundated.

The short reply is that I think we need to provide the semantics @georgikoyrushki95 described in his example code—I think anything else would violate the principle of least surprise.

One thing: I think it was a mistake to make co_await unifex::schedule(s) as magical as we did. Coroutines probably should support a way to explicitly switch schedulers, but we should do it with something that looks more like co_await reschedule_current_coroutine(s), or something, so that it's distinctly different from schedule(s).

And if that's the case, where would it be a good place to do the switch?

If I've understood the coroutine spec correctly, co_yield <expr> lowers to co_await promise.yield_value(<expr>), which means you have an opportunity to inject a custom Awaitable: the result of yield_value can be of a type that does what you want.

Without having tried to implement a generator like this before, I suspect you'll need to enforce the scheduler affinity invariant in the await_resume method of the awaitable; it'd be nice if you could somehow figure out when a reschedule is unnecessary (because you're already on the right context), but a brute-force solution is to reschedule on every resumption.

I think the way to build "reschedule on each resumption" is to write a little custom receiver that resumes the generator in set_value() and then connect an instance of that receiver to the result of schedule(gen.currentScheduler_). You can put the resulting operation state somewhere in the awaitable and start() it when you want the reschedule to happen.

from libunifex.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.