实现线程在C#中快速,批量和连续读取的最佳实践?

Sem*_*ore 2 c# io usb multithreading hid

如何在.NET 4.0中处理从C#中的设备读取批量数据?具体来说,我需要从USB HID设备快速读取,该设备发出超过26个数据包的报告,其中必须保留订单.

我试过在BackgroundWorker线程中这样做.它一次从设备读取一个数据包,然后在阅读更多信息之前对其进行处理.这提供了相当好的响应时间,但它可能会丢失数据包,并且单个数据包读取的开销成本会增加.

while (!( sender as BackgroundWorker ).CancellationPending) {
       //read a single packet
       //check for header or footer
       //process packet data
    }
}
Run Code Online (Sandbox Code Playgroud)

C#中读取这样的设备的最佳做法是什么?


背景:

我的USB HID设备不断报告大量数据.数据分为26个数据包,我必须保留订单.不幸的是,设备只标记每个报告中的最后一个数据包,因此我需要能够捕获其间的所有其他数据包.

Mat*_*son 5

对于.Net 4,您可以使用a BlockingCollection来提供可供生产者和使用者使用的线程安全队列.该BlockingCollection.GetConsumingEnumerable()方法提供了一个枚举器,当队列被标记为已完成CompleteAdding()并且为空时,该枚举器自动终止.

这是一些示例代码.在此示例中,有效负载是一个int数组,但您当然可以使用所需的任何数据类型.

请注意,对于您的特定示例,您可以使用其重载GetConsumingEnumerable()接受类型的参数CancellationToken.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    public static class Program
    {
        private static void Main()
        {
            var queue = new BlockingCollection<int[]>();

            Task.Factory.StartNew(() => produce(queue));

            consume(queue);

            Console.WriteLine("Finished.");
        }

        private static void consume(BlockingCollection<int[]> queue)
        {
            foreach (var item in queue.GetConsumingEnumerable())
            {
                Console.WriteLine("Consuming " + item[0]);
                Thread.Sleep(25);
            }
        }

        private static void produce(BlockingCollection<int[]> queue)
        {
            for (int i = 0; i < 1000; ++i)
            {
                Console.WriteLine("Producing " + i);
                var payload = new int[100];
                payload[0] = i;
                queue.Add(payload);
                Thread.Sleep(20);
            }

            queue.CompleteAdding();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

对于.Net 4.5及更高版本,您可以使用Microsoft的任务并行库中的更高级别的类,它具有丰富的功能(初看起来有点令人生畏).

以下是使用TPL DataFlow的相同示例:

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace Demo
{
    public static class Program
    {
        private static void Main()
        {
            var queue = new BufferBlock<int[]>();

            Task.Factory.StartNew(() => produce(queue));
            consume(queue).Wait();

            Console.WriteLine("Finished.");
        }

        private static async Task consume(BufferBlock<int[]> queue)
        {
            while (await queue.OutputAvailableAsync())
            {
                var payload = await queue.ReceiveAsync();
                Console.WriteLine("Consuming " + payload[0]);
                await Task.Delay(25);
            }
        }

        private static void produce(BufferBlock<int[]> queue)
        {
            for (int i = 0; i < 1000; ++i)
            {
                Console.WriteLine("Producing " + i);
                var payload = new int[100];
                payload[0] = i;
                queue.Post(payload);
                Thread.Sleep(20);
            }

            queue.Complete();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)