如何正确使用 ConcurrentQueue 中的块

mad*_*nul 4 c# multithreading producer-consumer task-parallel-library tpl-dataflow

我需要实现一个可以从多个线程填充的请求队列。当此队列大于 1000 个已完成的请求时,应将此请求存储到数据库中。这是我的实现:

public class RequestQueue
{
    private static BlockingCollection<VerificationRequest> _queue = new BlockingCollection<VerificationRequest>();
    private static ConcurrentQueue<VerificationRequest> _storageQueue = new ConcurrentQueue<VerificationRequest>();

    private static volatile bool isLoading = false;
    private static object _lock = new object();

    public static void Launch()
    {
        Task.Factory.StartNew(execute);
    }

    public static void Add(VerificationRequest request)
    {
        _queue.Add(request);
    }

    public static void AddRange(List<VerificationRequest> requests)
    {
        Parallel.ForEach(requests, new ParallelOptions() {MaxDegreeOfParallelism = 3},
            (request) => { _queue.Add(request); });
    }


    private static void execute()
    {
        Parallel.ForEach(_queue.GetConsumingEnumerable(), new ParallelOptions {MaxDegreeOfParallelism = 5}, EnqueueSaveRequest );
    }

    private static void EnqueueSaveRequest(VerificationRequest request)
    {
        _storageQueue.Enqueue( new RequestExecuter().ExecuteVerificationRequest( request ) );
        if (_storageQueue.Count > 1000 && !isLoading)
        {
            lock ( _lock )
            {
                if ( _storageQueue.Count > 1000 && !isLoading )
                {
                    isLoading = true;

                    var requestChunck = new List<VerificationRequest>();
                    VerificationRequest req;
                    for (var i = 0; i < 1000; i++)
                    {
                        if( _storageQueue.TryDequeue(out req))
                            requestChunck.Add(req);
                    }
                    new VerificationRequestRepository().InsertRange(requestChunck);

                    isLoading = false;
                }
            }
        }            
    }
}
Run Code Online (Sandbox Code Playgroud)

有没有办法在没有锁定和 isLoading 的情况下实现这个?

Pan*_*vos 5

满足您的要求的最简单方法是使用TPL Dataflow库中的块。例如

var batchBlock = new BatchBlock<VerificationRequest>(1000);
var exportBlock = new ActionBlock<VerificationRequest[]>(records=>{
               new VerificationRequestRepository().InsertRange(records);
};

batchBlock.LinkTo(exportBlock , new DataflowLinkOptions { PropagateCompletion = true });
Run Code Online (Sandbox Code Playgroud)

就是这样。

您可以使用以下命令将消息发送到起始块

batchBlock.Post(new VerificationRequest(...));
Run Code Online (Sandbox Code Playgroud)

完成工作后,您可以取消整个管道并通过调用batchBlock.Complete();并等待最后一个块完成来刷新任何剩余的消息:

batchBlock.Complete();
await exportBlock.Completion;
Run Code Online (Sandbox Code Playgroud)

BatchBlock批处理多达1000条记录到的1000个项目的阵列,并将它们传递到下一个块。一个ActionBlock默认只使用 1 个任务,所以它是线程安全的。您可以使用存储库的现有实例而无需担心跨线程访问:

var repository=new VerificationRequestRepository();
var exportBlock = new ActionBlock<VerificationRequest[]>(records=>{
               repository.InsertRange(records);
};
Run Code Online (Sandbox Code Playgroud)

几乎所有块都有一个并发输入缓冲区。每个块在自己的 TPL 任务上运行,因此每个步骤彼此并发运行。这意味着您可以“免费”获得异步执行,如果您有多个链接步骤,这可能很重要,例如您使用TransformBlock修改流经管道的消息。

我使用这样的管道来创建调用外部服务、解析响应、生成最终记录、批处理它们并使用使用 SqlBulkCopy 的块将它们发送到数据库的管道。