open-net-libraries / open.channelextensions Goto Github PK
View Code? Open in Web Editor NEWA set of extensions for optimizing/simplifying System.Threading.Channels usage.
License: MIT License
A set of extensions for optimizing/simplifying System.Threading.Channels usage.
License: MIT License
I do not understand why this test involving a Channel Reader keeps reaching case 1:
in the switch case
and still has two elements in the batch. Any ideas ? Every time I reach batch.Should().HaveCount(1)
, the batch
still has 2 items left. Same result whether I do Run Test
or Debug Test
so I am assuming it's not a timing issue ? I don't understand this (intern so quite new to all of these libraries).
Thanks.
[Fact]
public async Task BatchTimeoutIsReached()
{
var c = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = false, SingleWriter = false });
_ = Task.Run(async () =>
{
c.Writer.TryWrite(1);
c.Writer.TryWrite(2);
await Task.Delay(200);
c.Writer.TryWrite(3);
await Task.Delay(200);
c.Writer.TryWrite(4);
c.Writer.TryWrite(5);
c.Writer.TryWrite(6);
});
using var tokenSource = new CancellationTokenSource();
BatchingChannelReader<int, System.Collections.Generic.List<int>> batchReaderWithTimeout = c.Reader.Batch(2).WithTimeout(TimeSpan.FromMilliseconds(100));
await batchReaderWithTimeout.ReadAllAsync(async (batch, i) =>
{
switch (i)
{
case 0:
batch.Should().HaveCount(2);
Assert.Equal(1, batch[0]);
Assert.Equal(2, batch[1]);
break;
case 1:
batch.Should().HaveCount(1);
Assert.Equal(3, batch[0]);
break;
case 2:
batch.Should().HaveCount(2);
Assert.Equal(4, batch[0]);
Assert.Equal(5, batch[1]);
break;
case 3:
batch.Should().HaveCount(1);
Assert.Equal(6, batch[0]);
c.Writer.Complete();
break;
default:
throw new Exception("Shouldn't arrive here.");
}
await Task.Delay(500);
});
}
Today I encountered the following exception:
System.ObjectDisposedException: Cannot access a disposed object.
at System.Threading.TimerQueueTimer.Change(UInt32 dueTime, UInt32 period)
at System.Threading.Timer.Change(Int64 dueTime, Int64 period)
at Open.ChannelExtensions.BatchingChannelReader`1.TryPipeItems(Boolean flush)
at Open.ChannelExtensions.BufferingChannelReader`2.TryRead(TOut& item)
I think this can occur because it's possible TryPipeItems
retrieves the _timer value on thread A, the last item is pushed to the channel on thread B which disposes and nulls _timer, but then TryPipeItems on thread A still tries to use the value it previously loaded which has now been disposed.
I got the same behaviour described here with the .ForceBatch()
:
it doesn't complete batch immediately when called, but when the next item is written.
I couldn't find tests of this method to refer how it is supposed to be used, so I decided it is wrong.
Hello and first of all, awesome library. Making my life a lot easier already. But I seem to missing something. I batched results to perform database lookups efficiently (I don't want 500 db looksup, I want 5 queries that fetch 100 rows each). Now I want to split them back out. But I can't seem to find how to do this.
Am I missing or overlooking something? Is there a pattern to do it (return IAsyncEnumerable<T>
)? Or is there no extension to easily do this?
Im having issues with proper handling of exceptions thrown from pipeline steps.
In some scenarios exception is being swallowed instead of being propagated to caller.
From my observations, it seems that it's somehow related to the .Batch() step, also moment of throwing exceptions may have some meaning.
Am I doing something wrong? How it should be properly handled to propagate exception up?
using System.Threading.Channels;
using Open.ChannelExtensions;
var test = new Test();
try
{
//await test.Scenario1(); //exception catched
//await test.Scenario2(); //exception swallowed
//await test.Scenario3(); //exception catched
//await test.Scenario4(); //exception sometimes catched (~25% chance)
}
catch (Exception)
{
Console.WriteLine("Got exception");
}
class Test
{
public async Task Scenario1()
{
var channel = Channel.CreateBounded<int>(10000);
for (int i = 0; i < 100; i++)
{
await channel.Writer.WriteAsync(i);
}
var task = channel.Reader.Pipe(1, (element) =>
{
throw new Exception();
Console.WriteLine(element);
return 1;
})
.Pipe(2, (evt) =>
{
Console.WriteLine("\t" + evt);
return evt * 2;
})
//.Batch(20)
.PipeAsync(1, async (evt) =>
{
Console.WriteLine("\t\t" + evt);
return Task.FromResult(evt);
})
.ReadAll(task =>
{
});
channel.Writer.TryComplete();
await task;
Console.WriteLine("end");
}
public async Task Scenario2()
{
var channel = Channel.CreateBounded<int>(10000);
for (int i = 0; i < 100; i++)
{
await channel.Writer.WriteAsync(i);
}
var task = channel.Reader.Pipe(1, (element) =>
{
throw new Exception();
Console.WriteLine(element);
return 1;
})
.Pipe(2, (evt) =>
{
Console.WriteLine("\t" + evt);
return evt * 2;
})
.Batch(20)
.PipeAsync(1, async (evt) =>
{
Console.WriteLine("\t\t" + evt);
return Task.FromResult(evt);
})
.ReadAll(task =>
{
});
channel.Writer.TryComplete();
await task;
}
public async Task Scenario3()
{
var channel = Channel.CreateBounded<int>(10000);
for (int i = 0; i < 100; i++)
{
await channel.Writer.WriteAsync(i);
}
var task = channel.Reader.Pipe(1, (element) =>
{
if(element == 20)
throw new Exception();
Console.WriteLine(element);
return 1;
})
.Pipe(2, (evt) =>
{
Console.WriteLine("\t" + evt);
return evt * 2;
})
//.Batch(20)
.PipeAsync(1, async (evt) =>
{
Console.WriteLine("\t\t" + evt);
return Task.FromResult(evt);
})
.ReadAll(task =>
{
});
channel.Writer.TryComplete();
await task;
}
public async Task Scenario4()
{
var channel = Channel.CreateBounded<int>(10000);
for (int i = 0; i < 100; i++)
{
await channel.Writer.WriteAsync(i);
}
var task = channel.Reader.Pipe(1, (element) =>
{
if (element == 20)
throw new Exception();
Console.WriteLine(element);
return 1;
})
.Pipe(2, (evt) =>
{
Console.WriteLine("\t" + evt);
return evt * 2;
})
.Batch(20)
.PipeAsync(1, async (evt) =>
{
Console.WriteLine("\t\t" + evt);
return Task.FromResult(evt);
})
.ReadAll(task =>
{
});
channel.Writer.TryComplete();
await task;
}
}
Current implementation of ReadUntilCancelledAsync which is widely used for implementing other methods like (PipeAsync etc) has some prefetching mechanism. It acquires next item before waiting for the current item to be completed. In cases than items aren't similiar in terms of processing complexity this ends up in blocking at the end of channel. There are free processors, but they do nothing as one of processor have two items active and just taken.
Current code:
do
{
var next = new ValueTask();
while (
!cancellationToken.IsCancellationRequested
&& reader.TryRead(out T? item))
{
await next.ConfigureAwait(false);
next = receiver(item, index++);
}
await next.ConfigureAwait(false);
}
while (
!cancellationToken.IsCancellationRequested
&& await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false));
I had simplified it to:
do
{
while (
!cancellationToken.IsCancellationRequested
&& reader.TryRead(out T? item))
{
await receiver(item, index++).ConfigureAwait(false);
}
}
while (
!cancellationToken.IsCancellationRequested
&& await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false));
Which solves tail blocking, and all tests are running perfectly as well. But I wonder if there are cases which this prefetching was designed to solve? (because it had introduced intentional complexity in the code)
In the following code, reader.Completion
task is never completed, resulting in an infinite await. I would expect the completion task to be canceled. Looking at the source code for Channels, calling TryComplete
without reading the remaining items off the queue will not complete this task.
var cts = new CancellationTokenSource();
var reader = Enumerable.Range(0, 10_000).ToChannel(10, true, cts.Token);
try
{
cts.Cancel();
await reader.ReadAll(_ => {}, cts.Token);
}
catch (Exception)
{
// Catches the OperationCanceledException
}
finally
{
// Will await forever here
await reader.Completion;
}
I like the library a lot and I would like to use it in my projects. However I do have a small issue. When there is an unhandled exception inside .Transform method then the execution simply hangs. For example:
var range = Enumerable.Range(0, 10000);
var pipe = range.ToChannel().Transform(i =>
{
if ((i + 1) % 100 == 0) throw new Exception();
return i.ToString();
});
var result = pipe.ReadAll(i => { });
Now I do realize that exception should be handled with try/catch inside the .Transform however I do believe it should not hang if something unexpected happens. But perhaps I'm doing something wrong.
Hi there,
please consider following snippet:
channel.ReadAllConcurrentlyAsync(maxConcurency: ?,
receiver: ?,
cancellationToken: ?)
This call will fail with CS0121.
Referenced methods:
Additionally, TaskReadAllConcurrentlyAsync
is also affected.
Since System.Threading.Channels is part of the framework since .Net Core 3, the NuGet reference can be made conditional to netstandard 2.0 and 2.1
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0' or '$(TargetFramework)' == 'netstandard2.1'">
<PackageReference Include="System.Threading.Channels" Version="8.*" />
</ItemGroup>
This way .net 6 and .net 8 versions of Open.ChannelExtensions won't have any dependencies.
As per Microsoft:
The System.Threading.Channels namespace provides a set of synchronization data structures for passing data between producers and consumers asynchronously. The library targets .NET Standard and works on all .NET implementations. This library is available in the System.Threading.Channels NuGet package. However, if you're using .NET Core 3.0 or later, the package is included as part of the framework.
And Steven Toub recommends using System.Threading.Channels that ships with framework, over the NuGet when possible, since the native version is optimized and performs better (paraphrasing). Direct quote below:
System.Threading.Channels is part of the .NET Core shared framework, meaning a .NET Core app can start using it without installing anything additional. It’s also available as a separate NuGet package, though the separate implementation doesn’t have all of the optimizations that built-in implementation has, in large part because the built-in implementation is able to take advantage of additional runtime and library support in .NET Core.
Source: https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/
Hey,
My producer creates disposable messages that are consumed by readallasync.
Is it possible for you to dispose it automatically without needing me to do it manually?
Thanks!
I would like to clarify if following behaviour is by design or not.
Imagine that channel is created from AsyncEnumerable
. If DoB()
throws an exception, then all next produced values will only be handled in PipeAsync
that calls DoA()
. They will never reach PipeAsync
with DoB()
or ReadAllAsync()
await .....
.ToChannel()
.PipeAsync(async x =>
{
return await DoA();
})
.PipeAsync(async x =>
{
return await DoB();
})
.ReadAllAsync(x =>
{
await Finish();
});
Am i supposed to try/catch
all exceptions in Pipe
methods and then return some faulty code that is handled by Filter
method?
Hi there,
could you possibly update the reference to System.Threading.Channels
to 5.0.0
?
Hi guys,
I have a problem upgrading from 6.x to 8.x, it appears that ToChannel extension is not visible anymore for IAsyncEnumerable<>
I downloaded the project and added a test method as I remarked that the test project is targeting .NET 8 to see whether the code would build or not
[Theory]
[InlineData(testSize1)]
[InlineData(testSize2)]
[SuppressMessage("Reliability", "CA2012:Use ValueTasks correctly", Justification = "Testing only.")]
[SuppressMessage("CodeQuality", "IDE0079:Remove unnecessary suppression", Justification = "<Pending>")]
public static async Task ToChannelTest(int testSize)
{
var range = CreateAsyncEnumerableRange(testSize);
var result = new List<int>(testSize);
var startedAt = TimeProvider.System.GetTimestamp();
ChannelReader<int> reader = range
.ToChannel(singleReader: true, deferredExecution: true);
_ = reader.ReadAll(i => result.Add(i), true);
await reader.Completion;
var elapsed = TimeProvider.System.GetElapsedTime(startedAt);
}
static async IAsyncEnumerable<int> CreateAsyncEnumerableRange(int testSize)
{
foreach (var i in Enumerable.Range(0, testSize))
yield return i;
await Task.CompletedTask;
}
hi,
I didn't see any discussion options. Hence opening it as an issue. Primarily looking for suggestion on how to do this using this ChannelExtensions.
Currently I am using TPL with multiple bufferblock and the code was written almost 7 years back. So now I am trying to convert this to using channels and I cam across this library which seemed to simplify the task. But I am not very clear on how to use it as I have need multiple intermediate channels to process the whole pipeline.
The main thing that I cannot figure out is that 1 entry on 1st channel can generate 10K-100K records for 2nd channel. So when I use PipeAsync, how do I code for it as it seems to assume 1 input on source channel transformed to 1 output item. I dont want to write 100K rows as a single item on 2nd channel. So primary question is how do I achieve this? Any suggestions would be very helpful.
I was referring to examples from this repo and below link.
https://blog.maartenballiauw.be/post/2020/08/26/producer-consumer-pipelines-with-system-threading-channels.html
Thanks!
I'm trying to determine if this project can help me construct a channel that does the following:
That way the consumer efficiently processes batches of data, but is not permitted to get stale if the flow slows down.
Or simply batch item reading based on a time interval alone, without any regard for size?
(i.e. read batches of items from the channel every 1 second assuming there is at least 1 item to read)
Any advice would be appreciated! Thanks
Note, I found this article that does exactly what I need, however it relies on an exception to pulse the read which is not great for tight intervals like 1 second or less.
[https://stackoverflow.com/questions/63881607/how-to-read-remaining-items-in-channel-less-than-batch-size-if-there-is-no-new]
Is there a plan to target .Net 6 and remove the dependency on System.Threading.Channels 6.0.0, since frameworks .Net 6 and up ship with System.Threading.Channels?
var sourceFileChannel = ConsumeFilesToChannel(metaJson.Files);
var dlChannel = Channel.CreateBounded<FileMetaDataDto>(10);
var checkTask = sourceFileChannel.ReadAllConcurrently(maxConcurrency: 30, async file =>
{
var filePath = Path.Combine(rootDirectory, file.Path);
if (File.Exists(filePath))
{
var existingMd5 = await CalculateMd5Async(filePath);
if (existingMd5 == file.Md5)
{
//Console.WriteLine($"Skipping {file.Path} because it already exists and has the same MD5");
return;
}
else
{
// Write to DL channel
await dlChannel.Writer.WriteAsync(file);
}
}
else
{
// Write to to DL channel
await dlChannel.Writer.WriteAsync(file);
}
}).ContinueWith(async x =>
{
// This is required since the ReadAllConcurrently completes before the last task completed
await dlChannel.Writer.WaitToWriteAsync();
await Task.Delay(TimeSpan.FromMilliseconds(100));
// End
dlChannel.Writer.Complete();
});
awaiting ReadAllConcurrently is appearantly not enougth to ensure all tasks it executed, completed.
Not doing this hack will result in the dlChannel complaining about already being closed when trying to write to it.
The below example works "as expected" on netcoreapp2.1
, net5.0
etc but hangs on .NET Framework (tested net472
, net48
).
"as expected" means items are streamed through the pipeline as soon as items are available i.e. StartProcessingTask2(...)
returns immediately and the int
values from queue
go through ReadAll(IncrementCount2)
as soon as the first int
is available and before all int
s have been added to queue
.
When your nuget package is referenced by .NET Framework projects, StartProcessingTask2(...)
does not return because queue.CompleteAdding()
has not been called, your library is waiting for the IEnumerable
given to .Source(source, true)
to complete. If it's body is wrapped in a Task
so that StartProcessingTask2(...)
may return and subsequently queue
can be populated, IncrementCount2(int c)
is not called until all items from queue
have been read completely into some intermediate buffer, only then will ReadAll(IncrementCount2)
execute IncrementCount2
.
When I cloned your code and referenced it as a project, it worked "as expected". Therefore I believe there is a bug/incompatibility with how msbuild builds this library for netstandard2.0
, netstandard2.1
, those dlls do not function correctly with .NET Framework.
I suggest/ask that you include .NET Framework specific dlls in the nuget package e.g. add dlls built specifically for 4.6.2
, 4.7
, 4.7.1
, 4.7.2
, 4.8
.
Great ideas executed in this library, thank you!
Open.ChannelExtensions.Tests.zip
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using NUnit.Framework;
namespace Open.ChannelExtensions.Tests
{
[Test]
public void Example()
{
var queue = new BlockingCollection<int>();
var processingTask = StartProcessingTask2(queue.GetConsumingEnumerable());
for (var i = 0; i < 100000000; i++)
queue.Add(i);
queue.CompleteAdding();
processingTask.Wait();
}
private int count_;
private Task StartProcessingTask2(IEnumerable<int> source)
{
return Channel.CreateUnbounded<int>()
.Source(source, true)
.ReadAll(IncrementCount2)
.AsTask();
}
private void IncrementCount2(int c)
{
Interlocked.Increment(ref count_);
}
}
}
Hello!
I found your extensions library quite useful and expressive!
I also think that BatchingChannelReader is lacking of feature to flush its buffer by timeout as well.
Could you please add it?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.