Wil*_*sch 7 c# asynchronous c#-8.0 iasyncenumerable
我想知道是否有一种方法可以创建 Source对象IAsyncEnumerable<T>或IAsyncEnumerator<T>通过 Source 对象创建,就像TaskCompletionSource允许人们执行任务一样。特别是,TaskCompletionSource可以像任何其他参数一样传递。
也许是这样的:
public class AsyncEnumerables {
public Task HandlerTask { get; set; }
public async Task<string> ParentMethod() {
var source = new AsyncEnumerableSource<int>();
IAsyncEnumerable asyncEnumerable = source.GetAsyncEnumerable();
HandlerTask = Task.Run(() => handleAsyncResultsAsTheyHappen(asyncEnumerable));
int n = await someOtherTask();
source.YieldReturn(n);
var r = await ChildMethod(source);
source.Complete(); // this call would cause the HandlerTask to complete.
return r;
}
private async Task<string> ChildMethod(AsyncEnumerableSource<int> source) {
source.YieldReturn(5);
await SomeOtherCall();
source.YieldReturn(10);
return "hello";
}
}
Run Code Online (Sandbox Code Playgroud)
使用上面的代码,handleAsyncResultsAsTheyHappen任务将看到传递到 YieldReturn 的任何值。所以它会看到n上面代码中的 ,以及来自5的 和。10ChildMethod
这是该类的另一个实现AsyncEnumerableSource,它不依赖于 Rx 库。这一类依赖于Channel<T>, 类,该类本身就可以在 .NET 标准库中使用。它与基于 Rx 的实现具有相同的行为。
该类AsyncEnumerableSource可以将通知传播给多个订阅者。每个订阅者都可以按照自己的节奏枚举这些通知。这是可能的,因为每个订阅都有自己专用的Channel<T>底层存储。订阅的生命周期实际上与单个订阅的生命周期相关await foreach循环的生命周期相关。由于任何原因(包括抛出异常)提前退出循环,订阅将立即结束。
用技术术语来说,第一次调用MoveNextAsyncan 方法时就会创建新的订阅。IAsyncEnumerator<T>单独调用该方法GetAsyncEnumerable不会创建订阅,调用该GetAsyncEnumerator方法也不会创建订阅。IAsyncEnumerator<T>当关联被处置时,订阅结束。
public class AsyncEnumerableSource<T>
{
private readonly List<Channel<T>> _channels = new();
private bool _completed;
private Exception _exception;
public async IAsyncEnumerable<T> GetAsyncEnumerable(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
Channel<T> channel;
lock (_channels)
{
if (_exception != null) throw _exception;
if (_completed) yield break;
channel = Channel.CreateUnbounded<T>(
new() { SingleWriter = true, SingleReader = true });
_channels.Add(channel);
}
try
{
await foreach (var item in channel.Reader.ReadAllAsync()
.WithCancellation(cancellationToken).ConfigureAwait(false))
{
yield return item;
cancellationToken.ThrowIfCancellationRequested();
}
}
finally { lock (_channels) _channels.Remove(channel); }
}
public void YieldReturn(T value)
{
lock (_channels)
{
if (_completed) return;
foreach (var channel in _channels) channel.Writer.TryWrite(value);
}
}
public void Complete()
{
lock (_channels)
{
if (_completed) return;
foreach (var channel in _channels) channel.Writer.TryComplete();
_completed = true;
}
}
public void Fault(Exception error)
{
lock (_channels)
{
if (_completed) return;
foreach (var channel in _channels) channel.Writer.TryComplete(error);
_completed = true;
_exception = error;
}
}
}
Run Code Online (Sandbox Code Playgroud)
原因cancellationToken.ThrowIfCancellationRequested();是因为这个问题:ChannelReader.ReadAllAsync(CancellationToken) 实际上没有取消 mid-iteration。
注意:如果您在YieldReturn任何消费者订阅之前开始传播值AsyncEnumerableSource,这些值将会丢失。没有订阅者会观察它们。为了防止这种情况,您应该确保在启动生产者之前所有消费者都已订阅。最简单的方法是让消费者成为async方法,其中是方法中的await foreach第一个:awaitasync
// Correct, synchronous subscription
async Task Consume()
{
await foreach (var item in source.GetAsyncEnumerable())
{
//...
}
}
Task consumer = Consume();
Run Code Online (Sandbox Code Playgroud)
避免使用该方法的诱惑Task.Run,因为在这种情况下,订阅将在线程上异步发生ThreadPool,而不是与消费者的创建同步:
// Wrong, delayed subscription (possibility for unobserved values)
Task consumer = Task.Run(async () =>
{
await foreach (var item in source.GetAsyncEnumerable())
{
//...
}
});
Run Code Online (Sandbox Code Playgroud)
如果您不想同步进行订阅,可以将它们卸载到ThreadPool,并await在启动生产者之前建立它们:
// Correct, awaited subscription
Task consumer = await Task.Factory.StartNew(async () =>
{
HeavySynchronousComputation();
await foreach (var item in source.GetAsyncEnumerable())
{
//...
}
}, default, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
Run Code Online (Sandbox Code Playgroud)
该Task.Factory.StartNew(async方法创建一个嵌套的Task<Task>. 外部任务代表订阅,内部任务代表消费循环。
如果您可以构建代码以利用 和 ,那么您的情况会yield return好得多await foreach。例如,这段代码几乎做了同样的事情:
public async Task Consume()
{
var source = ParentMethod();
HandlerTask = Task.Run(async () => { await foreach (var item in source) { Console.WriteLine(item); } });
}
public async IAsyncEnumerable<int> ParentMethod()
{
await Task.Yield();
yield return 13;
await foreach (var item in ChildMethod())
yield return item;
}
private async IAsyncEnumerable<int> ChildMethod()
{
yield return 5;
await Task.Yield();
yield return 10;
}
Run Code Online (Sandbox Code Playgroud)
但是,如果您确实需要“异步可枚举源”,则需要首先认识到一件事。TaskCompletionSource<T>保存结果,即T(或异常)。它充当容器。可以在等待任务之前设置结果。这与“异步可枚举源”是一样的——您需要它能够在从中获取任何项目之前保存结果。“异步可枚举源”需要保存多个结果 - 在本例中是一个集合。
所以你实际上要求的是“一个可以作为异步枚举使用的集合”。这里有几种可能性,但我推荐的是Channel:
public async Task<string> ParentMethod()
{
var source = Channel.CreateUnbounded<int>();
var sourceWriter = source.Writer;
IAsyncEnumerable<int> asyncEnumerable = source.Reader.ReadAllAsync();
HandlerTask = Task.Run(async () => { await foreach (var item in asyncEnumerable) Console.WriteLine(item); });
await Task.Yield();
await sourceWriter.WriteAsync(13);
var r = await ChildMethod(sourceWriter);
sourceWriter.Complete();
return r;
}
private async Task<string> ChildMethod(ChannelWriter<int> sourceWriter)
{
await sourceWriter.WriteAsync(5);
await Task.Yield();
await sourceWriter.WriteAsync(10);
return "hello";
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3732 次 |
| 最近记录: |