使用async/await并使用TPL Dataflow返回yield

Tej*_*ora 11 c# ienumerable yield-return async-await tpl-dataflow

我正在尝试使用实现数据处理管道TPL Dataflow.但是,我对数据流相对较新,并不完全确定如何正确使用它来解决我想要解决的问题.

问题:

我试图遍历文件列表并处理每个文件以读取一些数据,然后进一步处理该数据.每个文件大致700MB1GB大小.每个文件都包含JSON数据.为了并行处理这些文件,而不是的运行内存,我试图使用IEnumerable<>yield return再进一步处理数据.

获得文件列表后,我希望一次最多处理4-5个文件.我的困惑来自:

  • 如何使用IEnumerable<>yeild return使用async/await和数据流.碰上了这个答案svick,但仍然不知道如何转换IEnumerable<>ISourceBlock,然后所有块连接在一起,并跟踪完成.
  • 在我的情况下,producer将非常快(通过文件列表),但consumer将非常慢(处理每个文件 - 读取数据,反序列化JSON).在这种情况下,如何跟踪完成情况.
  • 我应该使用LinkTo数据块的功能来连接各种块吗?或者使用诸如OutputAvailableAsync()和之类的方法ReceiveAsync()将数据从一个块传播到另一个块.

代码:

private const int ProcessingSize= 4;
private BufferBlock<string> _fileBufferBlock;
private ActionBlock<string> _processingBlock;
private BufferBlock<DataType> _messageBufferBlock;

public Task ProduceAsync()
{
    PrepareDataflow(token);
    var bufferTask = ListFilesAsync(_fileBufferBlock, token);

    var tasks = new List<Task> { bufferTask, _processingBlock.Completion };
    return Task.WhenAll(tasks);
}

private async Task ListFilesAsync(ITargetBlock<string> targetBlock, CancellationToken token)
{
    ...
    // Get list of file Uris
    ...
    foreach(var fileNameUri in fileNameUris)
        await targetBlock.SendAsync(fileNameUri, token);

    targetBlock.Complete();
}

private async Task ProcessFileAsync(string fileNameUri, CancellationToken token)
{
    var httpClient = new HttpClient();
    try
    {
        using (var stream = await httpClient.GetStreamAsync(fileNameUri))
        using (var sr = new StreamReader(stream))
        using (var jsonTextReader = new JsonTextReader(sr))
        {
            while (jsonTextReader.Read())
            {
                if (jsonTextReader.TokenType == JsonToken.StartObject)
                {
                    try
                    {
                        var data = _jsonSerializer.Deserialize<DataType>(jsonTextReader)
                        await _messageBufferBlock.SendAsync(data, token);
                    }
                    catch (Exception ex)
                    {
                        _logger.Error(ex, $"JSON deserialization failed - {fileNameUri}");
                    }
                }
            }
        }
    }
    catch(Exception ex)
    {
        // Should throw?
        // Or if converted to block then report using Fault() method?
    }
    finally
    {
        httpClient.Dispose();
        buffer.Complete();
    }
}

private void PrepareDataflow(CancellationToken token)
{
    _fileBufferBlock = new BufferBlock<string>(new DataflowBlockOptions
    {
        CancellationToken = token
    });

    var actionExecuteOptions = new ExecutionDataflowBlockOptions
    {
        CancellationToken = token,
        BoundedCapacity = ProcessingSize,
        MaxMessagesPerTask = 1,
        MaxDegreeOfParallelism = ProcessingSize
    };
    _processingBlock = new ActionBlock<string>(async fileName =>
    {
        try
        {
            await ProcessFileAsync(fileName, token);
        }
        catch (Exception ex)
        {
            _logger.Fatal(ex, $"Failed to process fiel: {fileName}, Error: {ex.Message}");
            // Should fault the block?
        }
    }, actionExecuteOptions);

    _fileBufferBlock.LinkTo(_processingBlock, new DataflowLinkOptions { PropagateCompletion = true });

    _messageBufferBlock = new BufferBlock<DataType>(new ExecutionDataflowBlockOptions
    {
        CancellationToken = token,
        BoundedCapacity = 50000
    });
    _messageBufferBlock.LinkTo(DataflowBlock.NullTarget<DataType>());
}
Run Code Online (Sandbox Code Playgroud)

在上面的代码中,我没有使用IEnumerable<DataType>,yield return因为我无法使用它async/await.所以我将输入缓冲区链接ActionBlock<DataType>到另一个队列.然而,通过使用ActionBlock<>,我无法将其链接到下一个块,用于处理和手动必须Post/SendAsyncActionBlock<>BufferBlock<>.此外,在这种情况下,不确定,如何跟踪完成.

这段代码有效,但是,我确信可以有更好的解决方案然后我可以链接所有块(而不是ActionBlock<DataType>然后从它发送消息BufferBlock<DataType>)

另一种选择可能是转换IEnumerable<>IObservable<>使用Rx,但我又是没有太大的熟悉Rx,不知道究竟该如何搭配TPL DataflowRx

Kir*_*kiy 8

问题1

您可以IEnumerable<T>使用PostSendAsync直接在使用者块上将生产者插入TPL数据流链,如下所示:

foreach (string fileNameUri in fileNameUris)
{
    await _processingBlock.SendAsync(fileNameUri).ConfigureAwait(false);
}
Run Code Online (Sandbox Code Playgroud)

你也可以使用a BufferBlock<TInput>,但在你的情况下,它实际上似乎是不必要的(甚至是有害的 - 见下一部分).

问题2

你什么时候喜欢SendAsync而不是Post?如果您的生产者运行速度超过了可以处理的URI(并且您已经指明了这种情况),并且您选择提供_processingBlocka BoundedCapacity,那么当块的内部缓冲区达到指定容量时,您SendAsync将"挂起"直到缓冲区插槽释放,您的foreach循环将被限制.这种反馈机制可以产生背压并确保您不会耗尽内存.

问题3

LinkTo大多数情况下,您绝对应该使用该方法链接您的块.不幸的是,由于相互作用IDisposable和非常大(可能)的序列,你的是一个角落案例.因此,您的完成将在缓冲区和处理块之间自动流动(由于LinkTo),但在此之后 - 您需要手动传播它.这很棘手,但可行.

我将用"Hello World"示例来说明这一点,其中生产者遍历每个字符,而使用者(非常慢)将每个字符输出到Debug窗口.

注意:LinkTo不存在.

// REALLY slow consumer.
var consumer = new ActionBlock<char>(async c =>
{
    await Task.Delay(100);

    Debug.Print(c.ToString());
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

var producer = new ActionBlock<string>(async s =>
{
    foreach (char c in s)
    {
        await consumer.SendAsync(c);

        Debug.Print($"Yielded {c}");
    }
});

try
{
    producer.Post("Hello world");
    producer.Complete();

    await producer.Completion;
}
finally
{
    consumer.Complete();
}

// Observe combined producer and consumer completion/exceptions/cancellation.
await Task.WhenAll(producer.Completion, consumer.Completion);
Run Code Online (Sandbox Code Playgroud)

这输出:

Yielded H
H
Yielded e
e
Yielded l
l
Yielded l
l
Yielded o
o
Yielded  

Yielded w
w
Yielded o
o
Yielded r
r
Yielded l
l
Yielded d
d

从上面的输出可以看出,生产者受到限制,块之间的切换缓冲区永远不会变得太大.

编辑

您可能会发现通过传播完成更加清晰

producer.Completion.ContinueWith(
    _ => consumer.Complete(), TaskContinuationOptions.ExecuteSynchronously
);
Run Code Online (Sandbox Code Playgroud)

...在producer定义之后.这允许您略微减少生产者/消费者的耦合 - 但最后您仍然需要记住观察Task.WhenAll(producer.Completion, consumer.Completion).


Ste*_*ary 7

为了并行处理这些文件而不是内存运行,我试图使用带有yield return的IEnumerable <>然后进一步处理数据.

我不相信这一步是必要的.你实际上在这里避免的只是一个文件名列表.即使您有数百万个文件,文件名列表也不会占用大量内存.

我将输入缓冲区链接到ActionBlock,后者又发布到另一个队列.但是,通过使用ActionBlock <>,我无法将其链接到下一个块进行处理,并且必须从ActionBlock <>手动Post/SendAsync到BufferBlock <>.此外,在这种情况下,不确定,如何跟踪完成.

ActionBlock<TInput>是一个"行尾"块.它只接受输入并且不产生任何输出.在你的情况下,你不想要ActionBlock<TInput>; 你想要的TransformManyBlock<TInput, TOutput>,它接受输入,在它上面运行一个函数,并产生输出(每个输入项有任意数量的输出项).

要记住的另一点是所有缓冲区块都有一个输入缓冲区.所以额外BufferBlock是不必要的.

最后,如果您已经处于"数据流域"中,通常最好以实际执行某些操作的数据流块结束(例如,ActionBlock而不是BufferBlock).在这种情况下,您可以使用BufferBlock有限生成器/使用者队列,其中一些其他代码正在使用结果.就个人而言,我认为,这可能是更清洁重写消费代码的作用ActionBlock,但它也可能是更清洁,以保持独立的数据流的消费者.对于下面的代码,我离开了最后的限制BufferBlock,但是如果你使用这个解决方案,考虑将最终的块改为有界ActionBlock.

private const int ProcessingSize= 4;
private static readonly HttpClient HttpClient = new HttpClient();
private TransformBlock<string, DataType> _processingBlock;
private BufferBlock<DataType> _messageBufferBlock;

public Task ProduceAsync()
{
  PrepareDataflow(token);
  ListFiles(_fileBufferBlock, token);
  _processingBlock.Complete();
  return _processingBlock.Completion;
}

private void ListFiles(ITargetBlock<string> targetBlock, CancellationToken token)
{
  ... // Get list of file Uris, occasionally calling token.ThrowIfCancellationRequested()
  foreach(var fileNameUri in fileNameUris)
    _processingBlock.Post(fileNameUri);
}

private async Task<IEnumerable<DataType>> ProcessFileAsync(string fileNameUri, CancellationToken token)
{
  return Process(await HttpClient.GetStreamAsync(fileNameUri), token);
}

private IEnumerable<DataType> Process(Stream stream, CancellationToken token)
{
  using (stream)
  using (var sr = new StreamReader(stream))
  using (var jsonTextReader = new JsonTextReader(sr))
  {
    while (jsonTextReader.Read())
    {
      token.ThrowIfCancellationRequested();
      if (jsonTextReader.TokenType == JsonToken.StartObject)
      {
        try
        {
          yield _jsonSerializer.Deserialize<DataType>(jsonTextReader);
        }
        catch (Exception ex)
        {
          _logger.Error(ex, $"JSON deserialization failed - {fileNameUri}");
        }
      }
    }
  }
}

private void PrepareDataflow(CancellationToken token)
{
  var executeOptions = new ExecutionDataflowBlockOptions
  {
    CancellationToken = token,
    MaxDegreeOfParallelism = ProcessingSize
  };
  _processingBlock = new TransformManyBlock<string, DataType>(fileName =>
      ProcessFileAsync(fileName, token), executeOptions);

  _messageBufferBlock = new BufferBlock<DataType>(new DataflowBlockOptions
  {
    CancellationToken = token,
    BoundedCapacity = 50000
  });
}
Run Code Online (Sandbox Code Playgroud)

或者,您可以使用Rx.但是,学习Rx可能非常困难,特别是对于混合异步并行数据流情况,您可以在此处使用.

至于你的其他问题:

如何使用IEnumerable <>和yeild返回async/await和dataflow.

async并且yield完全不兼容.至少在今天的语言中.在您的情况下,JSON读者必须同步读取流(它们不支持异步读取),因此实际的流处理是同步的并且可以与之一起使用yield.进行初始来回以获取流本身仍然可以是异步的并且可以与之一起使用async.这是我们今天所能得到的,直到JSON读者支持异步读取和语言支持async yield.(Rx今天可以做一个"异步产生",但是JSON阅读器仍然不支持异步读取,因此在这种特殊情况下它无济于事).

在这种情况下,如何跟踪完成情况.

如果JSON读者确实支持异步读取,那么上面的解决方案将不是最好的解决方案.在这种情况下,您可能希望使用手动SendAsync调用,并且需要仅链接这些块的完成,这可以这样做:

_processingBlock.Completion.ContinueWith(
    task =>
    {
      if (task.IsFaulted)
        ((IDataflowBlock)_messageBufferBlock).Fault(task.Exception);
      else if (!task.IsCanceled)
        _messageBufferBlock.Complete();
    },
    CancellationToken.None,
    TaskContinuationOptions.DenyChildAttach | TaskContinuationOptions.ExecuteSynchronously,
    TaskScheduler.Default);
Run Code Online (Sandbox Code Playgroud)

我应该使用数据块的LinkTo功能来连接各种块吗?或者使用诸如OutputAvailableAsync()和ReceiveAsync()之类的方法将数据从一个块传播到另一个块.

LinkTo尽可能使用.它可以为您处理所有角落案例.

//应该扔?//块应该出错吗?

这完全取决于你.默认情况下,当任何项目的任何处理失败时,块都会出错,如果您正在传播完成,则整个块链都会出错.

断层块相当剧烈; 他们扔掉任何正在进行的工作,拒绝继续处理.如果要重试,则必须构建新的数据流网格.

如果您更喜欢"更软"的错误策略,则可以使用catch异常并执行类似记录(代码当前执行的操作)的操作,或者您可以更改数据流块的性质以将异常作为数据项传递.