.NET中管理单独(单个)线程上任务队列的最佳方式

Jos*_*osh 23 .net c# concurrency asynchronous task-parallel-library

我知道异步编程多年来已经发生了很多变化.我有点尴尬,我让自己在34岁时就生锈了,但我依靠StackOverflow让我加快速度.

我想要做的是在一个单独的线程上管理一个"工作"队列,但是这样一次只能处理一个项目.我想在这个线程上发布工作,它不需要将任何内容传递给调用者.当然,我可以简单地旋转一个新Thread对象并让它在一个共享Queue对象上循环,使用睡眠,中断,等待句柄等.但是我知道事情从那以后变得更好.我们有BlockingCollection,Task,async/ await,更不用提的NuGet包,可能抽象了很多的.

我知道"什么是最好的..."这些问题通常是不受欢迎的,所以我会通过说"目前推荐的是什么......"的方式来改进它,最好使用内置的.NET机制来完成这样的事情.但是如果第三方NuGet包简化了一堆东西,它也是如此.

我认为一个TaskScheduler固定最大并发度为1 的实例,但似乎现在可能没那么笨重的方法了.

背景

具体来说,我在这种情况下尝试做的是在Web请求期间排队IP地理定位任务.相同的IP可能会多次排队等待地理定位,但是任务将知道如何检测并尽快跳过,如果它已经解决了.但请求处理程序只是将这些() => LocateAddress(context.Request.UserHostAddress)调用抛入队列,让该LocateAddress方法处理重复的工作检测.我正在使用的地理位置API不喜欢被请求轰炸,这就是我想一次将它限制为单个并发任务的原因.但是,如果允许通过简单的参数更改轻松扩展到更多并发任务,那将是很好的.

Ser*_*rvy 43

要创建异步单度并行工作队列,您可以简单地创建一个SemaphoreSlim,初始化为一个,然后await在开始所请求的工作之前获取该信号量的enqueing方法.

public class TaskQueue
{
    private SemaphoreSlim semaphore;
    public TaskQueue()
    {
        semaphore = new SemaphoreSlim(1);
    }

    public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            return await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
    public async Task Enqueue(Func<Task> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

当然,要有一个固定的并行度而不是简单地将信号量初始化为其他数字.

  • 请注意,此代码*不会*在单独的专用线程上执行任务(如标题中所述),而是保证任务一个接一个地运行,因为OP实际需要. (12认同)
  • 这段代码甚至可以保证FIFO吗? (4认同)
  • @ThibaultD。不,不是的。 (4认同)

i3a*_*non 16

你最好的选择,因为我看到它使用TPL DataflowActionBlock:

var actionBlock = new ActionBlock<string>(address =>
{
    if (!IsDuplicate(address))
    {
        LocateAddress(address);
    }
});

actionBlock.Post(context.Request.UserHostAddress);
Run Code Online (Sandbox Code Playgroud)

TPL Dataflow是一个健壮的,线程安全的,已经async完全可配置的基于actor的框架(可用作nuget)

这是一个更复杂案例的简单示例.我们假设你想:

  • 启用并发(仅限于可用内核).
  • 限制队列大小(这样你就不会耗尽内存).
  • 两者都有LocateAddress,队列插入async.
  • 一小时后取消所有内容.

var actionBlock = new ActionBlock<string>(async address =>
{
    if (!IsDuplicate(address))
    {
        await LocateAddressAsync(address);
    }
}, new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 10000,
    MaxDegreeOfParallelism = Environment.ProcessorCount,
    CancellationToken = new CancellationTokenSource(TimeSpan.FromHours(1)).Token
});

await actionBlock.SendAsync(context.Request.UserHostAddress);
Run Code Online (Sandbox Code Playgroud)

  • 非常好!我一定会检查一下.我从未仔细研究过TPL Dataflow,但听起来它比专用线程有很多优点. (3认同)

Ale*_*lov 15

实际上,您不需要在一个线程中运行任务,您需要它们以串行方式(一个接一个地)和FIFO运行.TPL没有类,但这是我非常轻量级,非阻塞的测试实现.https://github.com/Gentlee/SerialQueue

在那里也有@Servy实现,测试显示它比我的慢两倍并且它不保证FIFO.

例:

private readonly SerialQueue queue = new SerialQueue();

async Task SomeAsyncMethod()
{
    var result = await queue.Enqueue(DoSomething);
}
Run Code Online (Sandbox Code Playgroud)


Zer*_*er0 5

使用BlockingCollection<Action>来创建一个消费者(同时运行像你想的只有一件事)和一个或多个生产者生产者/消费者模式.

首先在某处定义共享队列:

BlockingCollection<Action> queue = new BlockingCollection<Action>();
Run Code Online (Sandbox Code Playgroud)

在您的消费者中ThreadTask您从中获取:

//This will block until there's an item available
Action itemToRun = queue.Take()
Run Code Online (Sandbox Code Playgroud)

然后从其他线程上的任意数量的生成器,只需添加到队列:

queue.Add(() => LocateAddress(context.Request.UserHostAddress));
Run Code Online (Sandbox Code Playgroud)

  • @ Zer0这是一个异步*生产者*,但是一个完全同步的*消费者*.线程仍然很昂贵,为了让它们无所事事而旋转很多只是昂贵的应该避免. (3认同)
  • 这要求消费者同步处理任务.当没有工作时,必须有一个线程在那里无所事事,而不是异步地消耗操作,这样当没有工作要做时没有线程. (2认同)
  • @Servy如果没有工作要做,线程将处于等待状态.它不会浪费任何CPU时间,所以我没有看到任何问题.虽然我仍然不理解你对"同步"或"异步"的评论.这是一种异步设计. (2认同)
  • @ Zer0只是它*CPU根本不需要关心*并行性发生.并行性可以在没有任何线程的情况下发生.例如,您可以有多个挂起的IO请求,每个请求都在工作,根本没有使用任何线程.在不使用CPU的情况下,有很多方法可以"工作". (2认同)

小智 5

我在这里发布了另一种解决方案。老实说,我不确定这是否是一个好的解决方案。

我习惯于使用BlockingCollection来实现生产者/消费者模式,并使用专用线程来消耗这些项目。如果总是有数据传入并且使用者线程不会坐在那里什么也不做就很好。

我遇到一种情况,其中一个应用程序希望在另一个线程上发送电子邮件,但是电子邮件总数并不那么大。我最初的解决方案是拥有一个专用的使用者线程(由Task.Run()创建),但是很多时候它只是坐在那里,什么也不做。

旧解决方案:

private readonly BlockingCollection<EmailData> _Emails =
    new BlockingCollection<EmailData>(new ConcurrentQueue<EmailData>());

// producer can add data here
public void Add(EmailData emailData)
{
    _Emails.Add(emailData);
}

public void Run()
{
    // create a consumer thread
    Task.Run(() => 
    {
        foreach (var emailData in _Emails.GetConsumingEnumerable())
        {
            SendEmail(emailData);
        }
    });
}

// sending email implementation
private void SendEmail(EmailData emailData)
{
    throw new NotImplementedException();
}
Run Code Online (Sandbox Code Playgroud)

如您所见,如果没有足够的电子邮件要发送(这是我的情况),那么消费者线程将把大部分电子邮件都花在那儿,而什么也不做。

我将实现更改为:

// create an empty task
private Task _SendEmailTask = Task.Run(() => {});

// caller will dispatch the email to here
// continuewith will use a thread pool thread (different to
// _SendEmailTask thread) to send this email
private void Add(EmailData emailData)
{
    _SendEmailTask = _SendEmailTask.ContinueWith((t) =>
    {
        SendEmail(emailData);
    });
}

// actual implementation
private void SendEmail(EmailData emailData)
{
    throw new NotImplementedException();
}
Run Code Online (Sandbox Code Playgroud)

它不再是生产者/消费者模式,但是那里没有线程,什么也不做,相反,每次发送电子邮件时,它将使用线程池线程来完成。