Comments (7)
Hi @georgikoyrushki95, I was able to wrap a cppcoro::async_generator<int>
in a Stream here: https://godbolt.org/z/Kh3rWTMao.
I don't think it makes sense to add that particular Stream type to Unifex because we intend to be free of dependencies, but a type that's analogous to cppcoro::async_generator<T>
would be a welcome addition.
from libunifex.
Hi @georgikoyrushki95, I was able to wrap a
cppcoro::async_generator<int>
in a Stream here: https://godbolt.org/z/Kh3rWTMao.
Really nice! Thanks for sharing this example.
from libunifex.
Also if not, should it be built in a way that it interoperates with cppcoro::async_generator
. Basically being able to go both ways cppcoro::async_generator
<-> libunifex::async_stream
?
from libunifex.
@ispeters thanks so much! This is exactly what I'm looking for!
I don't think it makes sense to add that particular Stream type to Unifex because we intend to be free of dependencies, but a type that's analogous to cppcoro::async_generator would be a welcome addition.
For this, do you mean a libunifex-specific async generator and adapting that to a stream? Basically identical to what you have done in your godbolt, the only difference being we add the equivalent of cppcoro::async_generator<T>
in libunifex as a prerequisite?
Or do you want the stream to be backed by "pure" senders?
Both would work for us & it appears it'd be equivalent for the consumers of the stream values. The difference would be in how the values are produced, I guess.
Let me know your thoughts, I think this is something I'm interested to contribute :)
from libunifex.
@georgikoyrushki95, I think a coroutine-based async sequence and a "raw sender"-based one would serve different needs and offer different trade-offs so, if I get to be greedy, I want both.
Our experience at Meta has been that coroutines are easier to read, write, debug, and just generally maintain than composition-of-sender algorithms-style code. The cost of that ease is basically overhead; coroutines don't optimize as well as raw senders (either for size or speed). The advice we give to internal teams adopting Unifex is that they should prefer coroutines until they know that the overheads are unacceptable, at which point they can refactor to the lower-level abstraction of raw senders.
My ideal coroutine-based async sequence that's analogous to cppcoro::async_generator<T>
would:
- support
co_await
andco_yield
inside the generating coroutine - feel like a
unifex::task<>
(have scheduler affinity, support awaiting Senders, support similar unwind-on-cancellation semantics, etc.)- note: this would also mean parameterizing the type to make it possible to opt in to a
noexcept
version likeunifex::nothrow_task<>
- note: this would also mean parameterizing the type to make it possible to opt in to a
- map naturally to a Stream (possibly by being a Stream—I'm not sure what interfaces make sense)
- note: this implies that an async generator coroutine type probably ought to support async clean-up, and it's not obvious to me how you'd express the clean-up work from within the coroutine; maybe
unifex::at_coroutine_exit
is the way?
- note: this implies that an async generator coroutine type probably ought to support async clean-up, and it's not obvious to me how you'd express the clean-up work from within the coroutine; maybe
I'm not sure what an ideal sender-based Stream would look like; I kind of suspect there's room for several different implementations. @janondrusek plans to upstream an internal type that's currently named unicast_flow
; it's essentially a one-slot async queue with an interface like this:
template <typename T>
struct unicast_flow {
/**
* Sends t through the flow.
*
* If all previous values of t have been consumed then this saves t
* as a value within the flow and then completes immediately, waking
* a pending consumer if there is one. If there's a previously saved t
* waiting to be consumed, the sender will suspend until a consumer
* notifies that the previously saved value has been consumed.
*
* The returned sender completes with set_done if the stream consumer
* has requested stop on a next-sender or awaited the cleanup-sender.
*/
sender auto notify(const T& t);
sender auto notify(T&& t);
/**
* Puts the flow in a "done" state.
*
* This works like "notifying an end-of-flow sentinel"; if there's a
* consumer waiting for the next value, that consumer will be resumed
* with set_done. If there's no waiting consumer, the next consumer to
* arrive will complete with done. If there's a saved t value waiting to be
* consumed, the sender returned from stop() will suspend until that t
* has been consumed.
*/
sender auto stop();
/**
* Returns a stream whose next-senders produce the values sent
* through the flow by calls to notify(). The returned stream completes
* when a next-sender observes the done signal sent when the producer
* awaits the sender returned from stop().
*/
stream auto start();
};
unicast_flow
works pretty well for us but I think it leaves plenty of holes in the design space that could be explored with other types. One such hole is a Stream-shaped view of an unbounded queue, which would allow you to provide a synchronous, non-blocking producer interface. Another hole is a Stream that works like the coroutine-based design I described above, but implemented in terms of Senders; I'm not really sure what this would look like.
from libunifex.
Hey, so I have been looking into this over the past couple of days. Wanted to double-check something with you on the expected scheduler affinity of the async_generator
. Basically, do we want to have it span suspensions of the generator when a co_yield
is happening: this sounds like the most intuitive way, but does potentially involve some overhead (could end up in always rescheduling when we resume the generator). The below, in other words:
void foo()
{
// assume thread where foo() executes is thread id == 0
static unifex::single_thread_context genCtx; // assume thread id == 1
auto generator = []() -> async_generator<int> {
std::cout << "thread id before scheduling =" << std::this_thread::get_id() << std::endl; // prints 0
co_await unifex::schedule(genCtx.get_scheduler());
std::cout << "thread id after scheduling=" << std::this_thread::get_id() << std::endl; // prints 1
for(int i = 0; i < 5; ++i) {
co_yield i;
std::cout << "thread id after resuming gen =" << std::this_thread::get_id() << std::endl; // always prints 1
}
}();
auto result = bexsr::sync_wait([&generator]() -> task<int> {
int sum = 0;
for (auto itr = co_await generator.begin(); itr != generator.end(); co_await ++itr) {
std::cout << "thread id in the outer task =" << std::this_thread::get_id() << std::endl; // always prints 0
sum += *itr;
}
co_return sum;
}());
// do something with result
}
In essence, when the generator resumes, we always switch onto the latest thread it was scheduled on? And if that's the case, where would it be a good place to do the switch? My thinking is within the generator's awaiter::await_suspend
, but not 100% sure how that can be expressed: basically I need to resume
the schedule coroutine, which in itself needs to resume the generator. Is something like this possible?
from libunifex.
Sorry for the delayed response; holiday PTO and performance review season have me inundated.
The short reply is that I think we need to provide the semantics @georgikoyrushki95 described in his example code—I think anything else would violate the principle of least surprise.
One thing: I think it was a mistake to make co_await unifex::schedule(s)
as magical as we did. Coroutines probably should support a way to explicitly switch schedulers, but we should do it with something that looks more like co_await reschedule_current_coroutine(s)
, or something, so that it's distinctly different from schedule(s)
.
And if that's the case, where would it be a good place to do the switch?
If I've understood the coroutine spec correctly, co_yield <expr>
lowers to co_await promise.yield_value(<expr>)
, which means you have an opportunity to inject a custom Awaitable: the result of yield_value
can be of a type that does what you want.
Without having tried to implement a generator like this before, I suspect you'll need to enforce the scheduler affinity invariant in the await_resume
method of the awaitable; it'd be nice if you could somehow figure out when a reschedule is unnecessary (because you're already on the right context), but a brute-force solution is to reschedule on every resumption.
I think the way to build "reschedule on each resumption" is to write a little custom receiver that resumes the generator in set_value()
and then connect
an instance of that receiver to the result of schedule(gen.currentScheduler_)
. You can put the resulting operation state somewhere in the awaitable and start()
it when you want the reschedule to happen.
from libunifex.
Related Issues (20)
- blocking_kind customization does not compile HOT 1
- let_value decay-copies its final result HOT 4
- Using any_object with any_scheduler
- Fix errors when syncing master back to third-party HOT 1
- How to distinguish between cancellation from stop_when and ouside?
- Improve and rename `type_erased_stream` -> `any_stream_of`
- build libunifex failed with g++ 13.1.0
- `task` resumes after being cancelled causing UB HOT 6
- request_stop() return value is invalid
- How to properly use gtest dependency for building libunifex?
- Compile error with MSVC in default settings HOT 3
- Compile failed In Default Setting HOT 3
- Is there something equivalent to thread-local storage, but for coroutines? HOT 1
- /builddir/build/BUILD/libunifex-0.4.0/include/unifex/v2/async_scope.hpp:76:52: error: ‘MEM[(const struct scope_reference *)&optFuture].scope_’ may be used uninitialized [-Werror=maybe-uninitialized]
- Add documentation for unstoppable
- Document that senders run by at_coroutine_exist must succeed
- error_types is often an invalid type for dematerialize sender
- When will the kernel side io_uring passthrough for reading or writing NVMe SSD devices be available?
- When will the kernel side io_uring passthrough for reading or writing NVMe SSD devices be available?
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from libunifex.