Comments (12)
Ok. I'll investigate.
from open.channelextensions.
Did you experience this bug with 3.3.2?
from open.channelextensions.
This test passes: https://github.com/electricessence/Open.ChannelExtensions/blob/master/Open.ChannelExtensions.Tests/BatchTests.cs#L112-L153
from open.channelextensions.
Now that the other bug is fixed, you should be able to 'flush' the reader periodically with a timer/interval.
var c = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = false, SingleWriter = false });
var reader = c.Reader.Batch(2);
Task.Run(async ()=>{
await Task.Delay(1000);
while(!reader.Completion.IsCompleted)
{
reader.ForceBatch();
await Task.Delay(1000);
}
});
await reader.ReadAllAsync(()=>{/*...*/});
from open.channelextensions.
Here's my example:
var c = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = false, SingleWriter = false });
var reader = c.Reader.Batch(3);
Task.Run(async () =>
{
await Task.Delay(1000);
c.Writer.TryWrite(1);
c.Writer.TryWrite(2);
c.Writer.TryWrite(3);
c.Writer.TryWrite(4);
c.Writer.TryWrite(5);
await Task.Delay(3000);
Console.WriteLine(reader.ForceBatch());
});
await reader.ReadAllAsync(async batch =>
{
Console.WriteLine($"Batch: {string.Join(",", batch.Select(i => i.ToString()))}");
await Task.Delay(500);
});
from open.channelextensions.
K. Adding to tests.
from open.channelextensions.
Thanks again for helping with this. This is a tricky one, that if the implementation was to buffer all incomming channel content to another channel then it probably wouldn't be an issue.
I want to retain the BatchingChannelReader
as a 'reader' that only pulls from it's source and subsequently batches on demand. As you might imagine, there could be unintended side effects if that wasn't the case.
The difference between your test and mine is that mine delays the read operation and yours doesn't.
Yours reveals a situation that would only occur with .ForceBatch()
where I'm 'awaiting' the source before proceeding but because the source doesn't have anything new, the await is stuck.
What I was trying to avoid before was having to double await which typically requires a Task.WhenAny
instead of simply awaiting a ValueTask
.
Again, appreciate the help. I'm investigating further to find a solution.
from open.channelextensions.
I can suggest that BatchingChannelReader
could actually work with "wrapped" items and `.ForceBatch' could post some predefined "command" item that cause completion of the batch. Then you don't need to mess with tasks.
from open.channelextensions.
The problem is, when implementing the buffer differently, the code is quite simple and without any additional contention. But as soon as I add the .ForceBatch()
method, everything gets way more complex and ultimately less performant because of contention.
Still working on it.
from open.channelextensions.
Released an update.
https://www.nuget.org/packages/Open.ChannelExtensions/3.4.0
I tried a few different methods. I'm not 100% happy with having to create Tasks under the hood, but it's actually not too terrible. It does work as expected. All tests are passing.
I avoided incurring the same overhead on the Join operation and implemented some minor improvements I should have already.
from open.channelextensions.
@alexeynikitin Thanks again. Lemme know if there's anything else.
from open.channelextensions.
@electricessence Ok, thank you!
from open.channelextensions.
Related Issues (20)
- Support disposable messages HOT 9
- Can we state a gradual thread increment strategy in variable max_concurrency ? HOT 1
- Ambiguous ReadAllConcurrentlyAsync signatures, when using named arguments HOT 5
- Update System.Threading.Channels HOT 2
- Batching with time limit to force data through on an interval HOT 16
- Different/Incorrect Behaviour in .NET Framework HOT 9
- Why there is prefetching in ReadUntilCancelledAsync? HOT 10
- IntelliSense not showing in VisualStudio HOT 4
- BatchingChannelReader Timer ObjectDisposedException HOT 3
- Exception handling in pipeline HOT 14
- Channels created by `Source` or `ToChannel` do not reach completion when canceled. HOT 4
- Test involving a BatchingChannelReader with 'WithTimeout' integration keeps failing HOT 5
- Multi channel multi transformation - how to do it using ChannelExtensions HOT 3
- ReadAllConcurrently completes before last task completes HOT 1
- Unbatch / Split HOT 3
- Exception handling HOT 4
- Supporting .Net 6 and up HOT 2
- Conditional package reference to System.Threading.Channels NuGet HOT 4
- IAsyncEnumerable ToChannel Extension not visible in .NET 8 projects HOT 3
- Question: Exception in Pipe breaks the channel? HOT 4
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from open.channelextensions.