Jos*_*osh 23 .net c# concurrency asynchronous task-parallel-library
我知道异步编程多年来已经发生了很多变化.我有点尴尬,我让自己在34岁时就生锈了,但我依靠StackOverflow让我加快速度.
我想要做的是在一个单独的线程上管理一个"工作"队列,但是这样一次只能处理一个项目.我想在这个线程上发布工作,它不需要将任何内容传递给调用者.当然,我可以简单地旋转一个新Thread对象并让它在一个共享Queue对象上循环,使用睡眠,中断,等待句柄等.但是我知道事情从那以后变得更好.我们有BlockingCollection,Task,async/ await,更不用提的NuGet包,可能抽象了很多的.
我知道"什么是最好的..."这些问题通常是不受欢迎的,所以我会通过说"目前推荐的是什么......"的方式来改进它,最好使用内置的.NET机制来完成这样的事情.但是如果第三方NuGet包简化了一堆东西,它也是如此.
我认为一个TaskScheduler固定最大并发度为1 的实例,但似乎现在可能没那么笨重的方法了.
背景
具体来说,我在这种情况下尝试做的是在Web请求期间排队IP地理定位任务.相同的IP可能会多次排队等待地理定位,但是任务将知道如何检测并尽快跳过,如果它已经解决了.但请求处理程序只是将这些() => LocateAddress(context.Request.UserHostAddress)调用抛入队列,让该LocateAddress方法处理重复的工作检测.我正在使用的地理位置API不喜欢被请求轰炸,这就是我想一次将它限制为单个并发任务的原因.但是,如果允许通过简单的参数更改轻松扩展到更多并发任务,那将是很好的.
Ser*_*rvy 43
要创建异步单度并行工作队列,您可以简单地创建一个SemaphoreSlim,初始化为一个,然后await在开始所请求的工作之前获取该信号量的enqueing方法.
public class TaskQueue
{
private SemaphoreSlim semaphore;
public TaskQueue()
{
semaphore = new SemaphoreSlim(1);
}
public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
{
await semaphore.WaitAsync();
try
{
return await taskGenerator();
}
finally
{
semaphore.Release();
}
}
public async Task Enqueue(Func<Task> taskGenerator)
{
await semaphore.WaitAsync();
try
{
await taskGenerator();
}
finally
{
semaphore.Release();
}
}
}
Run Code Online (Sandbox Code Playgroud)
当然,要有一个固定的并行度而不是简单地将信号量初始化为其他数字.
i3a*_*non 16
你最好的选择,因为我看到它使用TPL Dataflow的ActionBlock:
var actionBlock = new ActionBlock<string>(address =>
{
if (!IsDuplicate(address))
{
LocateAddress(address);
}
});
actionBlock.Post(context.Request.UserHostAddress);
Run Code Online (Sandbox Code Playgroud)
TPL Dataflow是一个健壮的,线程安全的,已经async完全可配置的基于actor的框架(可用作nuget)
这是一个更复杂案例的简单示例.我们假设你想:
LocateAddress,队列插入async.var actionBlock = new ActionBlock<string>(async address =>
{
if (!IsDuplicate(address))
{
await LocateAddressAsync(address);
}
}, new ExecutionDataflowBlockOptions
{
BoundedCapacity = 10000,
MaxDegreeOfParallelism = Environment.ProcessorCount,
CancellationToken = new CancellationTokenSource(TimeSpan.FromHours(1)).Token
});
await actionBlock.SendAsync(context.Request.UserHostAddress);
Run Code Online (Sandbox Code Playgroud)
Ale*_*lov 15
实际上,您不需要在一个线程中运行任务,您需要它们以串行方式(一个接一个地)和FIFO运行.TPL没有类,但这是我非常轻量级,非阻塞的测试实现.https://github.com/Gentlee/SerialQueue
在那里也有@Servy实现,测试显示它比我的慢两倍并且它不保证FIFO.
例:
private readonly SerialQueue queue = new SerialQueue();
async Task SomeAsyncMethod()
{
var result = await queue.Enqueue(DoSomething);
}
Run Code Online (Sandbox Code Playgroud)
使用BlockingCollection<Action>来创建一个消费者(同时运行像你想的只有一件事)和一个或多个生产者生产者/消费者模式.
首先在某处定义共享队列:
BlockingCollection<Action> queue = new BlockingCollection<Action>();
Run Code Online (Sandbox Code Playgroud)
在您的消费者中Thread或Task您从中获取:
//This will block until there's an item available
Action itemToRun = queue.Take()
Run Code Online (Sandbox Code Playgroud)
然后从其他线程上的任意数量的生成器,只需添加到队列:
queue.Add(() => LocateAddress(context.Request.UserHostAddress));
Run Code Online (Sandbox Code Playgroud)
小智 5
我在这里发布了另一种解决方案。老实说,我不确定这是否是一个好的解决方案。
我习惯于使用BlockingCollection来实现生产者/消费者模式,并使用专用线程来消耗这些项目。如果总是有数据传入并且使用者线程不会坐在那里什么也不做就很好。
我遇到一种情况,其中一个应用程序希望在另一个线程上发送电子邮件,但是电子邮件总数并不那么大。我最初的解决方案是拥有一个专用的使用者线程(由Task.Run()创建),但是很多时候它只是坐在那里,什么也不做。
旧解决方案:
private readonly BlockingCollection<EmailData> _Emails =
new BlockingCollection<EmailData>(new ConcurrentQueue<EmailData>());
// producer can add data here
public void Add(EmailData emailData)
{
_Emails.Add(emailData);
}
public void Run()
{
// create a consumer thread
Task.Run(() =>
{
foreach (var emailData in _Emails.GetConsumingEnumerable())
{
SendEmail(emailData);
}
});
}
// sending email implementation
private void SendEmail(EmailData emailData)
{
throw new NotImplementedException();
}
Run Code Online (Sandbox Code Playgroud)
如您所见,如果没有足够的电子邮件要发送(这是我的情况),那么消费者线程将把大部分电子邮件都花在那儿,而什么也不做。
我将实现更改为:
// create an empty task
private Task _SendEmailTask = Task.Run(() => {});
// caller will dispatch the email to here
// continuewith will use a thread pool thread (different to
// _SendEmailTask thread) to send this email
private void Add(EmailData emailData)
{
_SendEmailTask = _SendEmailTask.ContinueWith((t) =>
{
SendEmail(emailData);
});
}
// actual implementation
private void SendEmail(EmailData emailData)
{
throw new NotImplementedException();
}
Run Code Online (Sandbox Code Playgroud)
它不再是生产者/消费者模式,但是那里没有线程,什么也不做,相反,每次发送电子邮件时,它将使用线程池线程来完成。
| 归档时间: |
|
| 查看次数: |
38323 次 |
| 最近记录: |