Ale*_*Sin 7 .net c# producer-consumer task-parallel-library async-await
我正在学习C#中的异步/等待模式.目前我正在尝试解决这样的问题:
有一个生产者(硬件设备)每秒生成1000个数据包.我需要将此数据记录到文件中.
该设备仅具有一次ReadAsync()
报告单个数据包的方法.
我需要缓冲数据包并按照它们生成的顺序将它们写入文件,每秒只执行一次.
如果写入过程没有在下一批数据包准备好写入时及时完成,则写操作应该失败.
到目前为止,我写了类似下面的内容.它有效,但我不确定这是否是解决问题的最佳方法.有任何意见或建议吗?在消费者需要汇总从生产者处收到的数据时,采用这种生产者/消费者问题的最佳做法是什么?
static async Task TestLogger(Device device, int seconds)
{
const int bufLength = 1000;
bool firstIteration = true;
Task writerTask = null;
using (var writer = new StreamWriter("test.log")))
{
do
{
var buffer = new byte[bufLength][];
for (int i = 0; i < bufLength; i++)
{
buffer[i] = await device.ReadAsync();
}
if (!firstIteration)
{
if (!writerTask.IsCompleted)
throw new Exception("Write Time Out!");
}
writerTask = Task.Run(() =>
{
foreach (var b in buffer)
writer.WriteLine(ToHexString(b));
});
firstIteration = false;
} while (--seconds > 0);
}
}
Run Code Online (Sandbox Code Playgroud)
您可以使用以下想法,前提是刷新的标准是数据包的数量(最多 1000 个)。我没有测试它。它利用了 Stephen Cleary在这个问题AsyncProducerConsumerQueue<T>
中的特色。
AsyncProducerConsumerQueue<byte[]> _queue;
Stream _stream;
// producer
async Task ReceiveAsync(CancellationToken token)
{
while (true)
{
var list = new List<byte>();
while (true)
{
token.ThrowIfCancellationRequested(token);
var packet = await _device.ReadAsync(token);
list.Add(packet);
if (list.Count == 1000)
break;
}
// push next batch
await _queue.EnqueueAsync(list.ToArray(), token);
}
}
// consumer
async Task LogAsync(CancellationToken token)
{
Task previousFlush = Task.FromResult(0);
CancellationTokenSource cts = null;
while (true)
{
token.ThrowIfCancellationRequested(token);
// get next batch
var nextBatch = await _queue.DequeueAsync(token);
if (!previousFlush.IsCompleted)
{
cts.Cancel(); // cancel the previous flush if not ready
throw new Exception("failed to flush on time.");
}
await previousFlush; // it's completed, observe for any errors
// start flushing
cts = CancellationTokenSource.CreateLinkedTokenSource(token);
previousFlush = _stream.WriteAsync(nextBatch, 0, nextBatch.Count, cts.Token);
}
}
Run Code Online (Sandbox Code Playgroud)
如果您不想让记录器失败,而是希望取消刷新并继续进行下一批,则可以通过对此代码进行最小的更改来实现。
回复 @l3arnon 评论:
- 数据包不是字节,而是字节[]。2. 你没有使用OP的ToHexString。3. AsyncProducerConsumerQueue 的健壮性和测试能力远不如 .Net 的 TPL Dataflow。4. 在抛出异常后,您等待 previousFlush 发现错误,这使得该行变得多余。简而言之:我认为可能的附加值并不能证明这个非常复杂的解决方案是合理的。
buffer[i] = await device.ReadAsync()
。那么,一批数据包就是byte[]
。“你还没有使用OP的ToHexString。” - 目标是展示如何使用Stream.WriteAsync
本机接受取消令牌的方法,而不是WriteLineAsync
不允许取消的方法。ToHexString
与取消支持一起使用Stream.WriteAsync
并仍然利用取消支持很简单:
var hexBytes = Encoding.ASCII.GetBytes(ToHexString(nextBatch) +
Environment.NewLine);
_stream.WriteAsync(hexBytes, 0, hexBytes.Length, token);
Run Code Online (Sandbox Code Playgroud)“AsyncProducerConsumerQueue 远不如 .Net 的 TPL 数据流健壮且经过测试” - 我不认为这是一个确定的事实。但是,如果OP关心它,他可以使用常规BlockingCollection
,这不会阻塞生产者线程。在等待下一批时阻塞消费者线程是可以的,因为写入是并行完成的。与此相反,您的 TPL 数据流版本承载一个冗余CPU 和锁密集型操作:将数据从生产者管道移动到写入器管道logAction.Post(packet)
,逐字节移动。我的代码没有这样做。
“在抛出异常后,您会等待 previousFlush 错误,这使得该行变得多余。” - 这条线不是多余的。也许,你忽略了这一点:previousFlush.IsCompleted
可以是true
当previousFlush.IsFaulted
或是previousFlush.IsCancelled
也是true
。因此,await previousFlush
观察已完成任务的任何错误(例如写入失败)是相关的,否则这些错误将会丢失。
归档时间: |
|
查看次数: |
1170 次 |
最近记录: |