Mar*_*mis 5 c# asynchronous system.reactive rx.net
我有代码从SQL流下来的数据并将其写入不同的商店.代码大致是这样的:
using (var cmd = new SqlCommand("select * from MyTable", connection))
{
using (var reader = await cmd.ExecuteReaderAsync())
{
var list = new List<MyData>();
while (await reader.ReadAsync())
{
var row = GetRow(reader);
list.Add(row);
if (list.Count == BatchSize)
{
await WriteDataAsync(list);
list.Clear();
}
}
if (list.Count > 0)
{
await WriteDataAsync(list);
}
}
}
Run Code Online (Sandbox Code Playgroud)
我想为此目的使用Reactive扩展.理想情况下,代码看起来像这样:
await StreamDataFromSql()
.Buffer(BatchSize)
.ForEachAsync(async batch => await WriteDataAsync(batch));
Run Code Online (Sandbox Code Playgroud)
但是,似乎扩展方法ForEachAsync仅接受同步操作.是否可以编写一个可以接受异步操作的扩展?
是否可以编写一个接受异步操作的扩展?
不直接。
Rx 订阅必须是同步的,因为 Rx 是一个基于推送的系统。当数据项到达时,它会遍历您的查询,直到到达最终订阅 - 在这种情况下是执行Action.
awaitRx 提供的-able 方法正在await处理序列本身——即,ForEachAsync就序列而言是异步的(您正在异步等待序列完成),但其中的订阅ForEachAsync(为每个元素采取的操作)仍然必须是同步的.
为了在您的数据管道中进行同步到异步转换,您需要有一个缓冲区。Rx 订阅可以(同步)作为生产者添加到缓冲区,而异步消费者正在检索项目并处理它们。因此,您需要一个支持同步和异步操作的生产者/消费者队列。
TPL Dataflow 中的各种块类型可以满足这种需求。这样的事情应该足够了:
var obs = StreamDataFromSql().Buffer(BatchSize);
var buffer = new ActionBlock<IList<T>>(batch => WriteDataAsync(batch));
using (var subscription = obs.Subscribe(buffer.AsObserver()))
await buffer.Completion;
Run Code Online (Sandbox Code Playgroud)
注意没有背压;尽可能快地StreamDataFromSql推送数据,它将被缓冲并存储在ActionBlock. 根据数据的大小和类型,这会很快使用大量内存。
| 归档时间: |
|
| 查看次数: |
973 次 |
| 最近记录: |