如何聚合来自异步生产者的数据并将其写入文件?

Ale*_*Sin 7 .net c# producer-consumer task-parallel-library async-await

我正在学习C#中的异步/等待模式.目前我正在尝试解决这样的问题:

  • 有一个生产者(硬件设备)每秒生成1000个数据包.我需要将此数据记录到文件中.

  • 该设备仅具有一次ReadAsync()报告单个数据包的方法.

  • 我需要缓冲数据包并按照它们生成的顺序将它们写入文件,每秒只执行一次.

  • 如果写入过程没有在下一批数据包准备好写入时及时完成,则写操作应该失败.

到目前为止,我写了类似下面的内容.它有效,但我不确定这是否是解决问题的最佳方法.有任何意见或建议吗?在消费者需要汇总从生产者处收到的数据时,采用这种生产者/消费者问题的最佳做法是什么?

static async Task TestLogger(Device device, int seconds)
{
    const int bufLength = 1000;
    bool firstIteration = true;
    Task writerTask = null;

    using (var writer = new StreamWriter("test.log")))
    {
        do
        {
            var buffer = new byte[bufLength][];

            for (int i = 0; i < bufLength; i++)
            {
                buffer[i] = await device.ReadAsync();
            }

            if (!firstIteration)
            {
                if (!writerTask.IsCompleted)
                    throw new Exception("Write Time Out!");
            }

            writerTask = Task.Run(() =>
                {
                    foreach (var b in buffer)
                        writer.WriteLine(ToHexString(b));
                });

            firstIteration = false;
        } while (--seconds > 0);
    }
}
Run Code Online (Sandbox Code Playgroud)

avo*_*avo 1

您可以使用以下想法,前提是刷新的标准是数据包的数量(最多 1000 个)。我没有测试它。它利用了 Stephen Cleary在这个问题AsyncProducerConsumerQueue<T>中的特色。

AsyncProducerConsumerQueue<byte[]> _queue;
Stream _stream;

// producer
async Task ReceiveAsync(CancellationToken token)
{
    while (true)
    {
       var list = new List<byte>();
       while (true)
       {
           token.ThrowIfCancellationRequested(token);
           var packet = await _device.ReadAsync(token);
           list.Add(packet);
           if (list.Count == 1000)
               break;
       }
       // push next batch
       await _queue.EnqueueAsync(list.ToArray(), token);
    }
}

// consumer
async Task LogAsync(CancellationToken token)
{
    Task previousFlush = Task.FromResult(0); 
    CancellationTokenSource cts = null;
    while (true)
    {
       token.ThrowIfCancellationRequested(token);
       // get next batch
       var nextBatch = await _queue.DequeueAsync(token);
       if (!previousFlush.IsCompleted)
       {
           cts.Cancel(); // cancel the previous flush if not ready
           throw new Exception("failed to flush on time.");
       }
       await previousFlush; // it's completed, observe for any errors
       // start flushing
       cts = CancellationTokenSource.CreateLinkedTokenSource(token);
       previousFlush = _stream.WriteAsync(nextBatch, 0, nextBatch.Count, cts.Token);
    }
}
Run Code Online (Sandbox Code Playgroud)

如果您不想让记录器失败,而是希望取消刷新并继续进行下一批,则可以通过对此代码进行最小的更改来实现。

回复 @l3arnon 评论:

  1. 数据包不是字节,而是字节[]。2. 你没有使用OP的ToHexString。3. AsyncProducerConsumerQueue 的健壮性和测试能力远不如 .Net 的 TPL Dataflow。4. 在抛出异常后,您等待 previousFlush 发现错误,这使得该行变得多余。简而言之:我认为可能的附加值并不能证明这个非常复杂的解决方案是合理的。
  1. “数据包不是字节,它是字节[]” - 数据包一个字节,这从OP的代码中显而易见:buffer[i] = await device.ReadAsync()。那么,一批数据包就是byte[]
  2. “你还没有使用OP的ToHexString。” - 目标是展示如何使用Stream.WriteAsync接受取消令牌的方法,而不是WriteLineAsync不允许取消的方法。ToHexString与取消支持一起使用Stream.WriteAsync并仍然利用取消支持很简单:

    var hexBytes = Encoding.ASCII.GetBytes(ToHexString(nextBatch) + 
        Environment.NewLine);
    _stream.WriteAsync(hexBytes, 0, hexBytes.Length, token);
    
    Run Code Online (Sandbox Code Playgroud)
  3. “AsyncProducerConsumerQueue 远不如 .Net 的 TPL 数据流健壮且经过测试” - 我不认为这是一个确定的事实。但是,如果OP关心它,他可以使用常规BlockingCollection,这不会阻塞生产者线程。在等待下一批时阻塞消费者线程是可以的,因为写入是并行完成的。与此相反,您的 TPL 数据流版本承载一个冗余CPU 和锁密集型操作:将数据从生产者管道移动到写入器管道logAction.Post(packet),逐字节移动。我的代码没有这样做。

  4. “在抛出异常后,您会等待 previousFlush 错误,这使得该行变得多余。” - 这条线不是多余的。也许,你忽略了这一点:previousFlush.IsCompleted可以是truepreviousFlush.IsFaulted或是previousFlush.IsCancelled也是true。因此,await previousFlush观察已完成任务的任何错误(例如写入失败)是相关的,否则这些错误将会丢失。