TPL队列处理

Cli*_*int 2 c# queue task-parallel-library

我目前正在开发一个项目,我需要排队一些工作进行处理,这是要求:

  1. 必须一次处理一个作业
  2. 必须能够等待排队的项目

所以我想要类似于:

Task<result> QueueJob(params here)
{
   /// Queue the job and somehow return a waitable task that will wait until the queued job has been executed and return the result.
}
Run Code Online (Sandbox Code Playgroud)

我已经尝试过后台运行任务,只是将项目从队列中拉出来并处理作业,但难度是从后台任务到方法.

如果需要的话,我可以在QueueJob方法中使用刚刚请求完成回调的路径,但是如果我能得到一个透明的任务可以让你等待处理工作(即使有工作)也会很棒在它排队之前).

svi*_*ick 6

您可能会发现TaskCompletionSource<T>它很有用,它可以用来创建一个Task完全按照您的需要完成的东西.如果你把它结合起来BlockingCollection<T>,你就会得到你的队列:

class JobProcessor<TInput, TOutput> : IDisposable
{
    private readonly Func<TInput, TOutput> m_transform;

    // or a custom type instead of Tuple
    private readonly
        BlockingCollection<Tuple<TInput, TaskCompletionSource<TOutput>>>
        m_queue =
        new BlockingCollection<Tuple<TInput, TaskCompletionSource<TOutput>>>();

    public JobProcessor(Func<TInput, TOutput> transform)
    {
        m_transform = transform;
        Task.Factory.StartNew(ProcessQueue, TaskCreationOptions.LongRunning);
    }

    private void ProcessQueue()
    {
        Tuple<TInput, TaskCompletionSource<TOutput>> tuple;
        while (m_queue.TryTake(out tuple, Timeout.Infinite))
        {
            var input = tuple.Item1;
            var tcs = tuple.Item2;

            try
            {
                tcs.SetResult(m_transform(input));
            }
            catch (Exception ex)
            {
                tcs.SetException(ex);
            }
        }
    }

    public Task<TOutput> QueueJob(TInput input)
    {
        var tcs = new TaskCompletionSource<TOutput>();
        m_queue.Add(Tuple.Create(input, tcs));
        return tcs.Task;
    }

    public void Dispose()
    {
        m_queue.CompleteAdding();
    }
}
Run Code Online (Sandbox Code Playgroud)