如何使用Rx.Nex扩展ForEachAsync与异步操作

Mar*_*mis 5 c# asynchronous system.reactive rx.net

我有代码从SQL流下来的数据并将其写入不同的商店.代码大致是这样的:

using (var cmd = new SqlCommand("select * from MyTable", connection))
{
     using (var reader = await cmd.ExecuteReaderAsync())
     {
         var list = new List<MyData>();
         while (await reader.ReadAsync())
         {
             var row = GetRow(reader);
             list.Add(row);
             if (list.Count == BatchSize)
             {
                 await WriteDataAsync(list);
                 list.Clear();
             }
         }
         if (list.Count > 0)
         {
             await WriteDataAsync(list);
         }
     }
 }
Run Code Online (Sandbox Code Playgroud)

我想为此目的使用Reactive扩展.理想情况下,代码看起来像这样:

await StreamDataFromSql()
    .Buffer(BatchSize)
    .ForEachAsync(async batch => await WriteDataAsync(batch));
Run Code Online (Sandbox Code Playgroud)

但是,似乎扩展方法ForEachAsync仅接受同步操作.是否可以编写一个可以接受异步操作的扩展?

Ste*_*ary 5

是否可以编写一个接受异步操作的扩展?

不直接。

Rx 订阅必须是同步的,因为 Rx 是一个基于推送的系统。当数据项到达时,它会遍历您的查询,直到到达最终订阅 - 在这种情况下是执行Action.

awaitRx 提供的-able 方法正在await处理序列本身——即,ForEachAsync就序列而言是异步的(您正在异步等待序列完成),但其中的订阅ForEachAsync(为每个元素采取的操作)仍然必须是同步的.

为了在您的数据管道中进行同步到异步转换,您需要有一个缓冲区。Rx 订阅可以(同步)作为生产者添加到缓冲区,而异步消费者正在检索项目并处理它们。因此,您需要一个支持同步和异步操作的生产者/消费者队列。

TPL Dataflow 中的各种块类型可以满足这种需求。这样的事情应该足够了:

var obs = StreamDataFromSql().Buffer(BatchSize);
var buffer = new ActionBlock<IList<T>>(batch => WriteDataAsync(batch));
using (var subscription = obs.Subscribe(buffer.AsObserver()))
  await buffer.Completion;
Run Code Online (Sandbox Code Playgroud)

注意没有背压;尽可能快地StreamDataFromSql推送数据,它将被缓冲并存储在ActionBlock. 根据数据的大小和类型,这会很快使用大量内存。