jam*_*lia 1 c# queue asynchronous semaphore async-await
我有几种方法可以向数据库报告一些数据。我们希望异步调用对数据服务的所有调用。这些对数据服务的调用都结束了,因此我们要确保这些 DS 调用在任何给定时间按顺序依次执行。最初,我在这些方法中的每一个上使用 async await 并且每个调用都是异步执行的,但我们发现如果它们乱序,那么就有出错的空间。
所以,我认为我们应该将所有这些异步任务排队并在一个单独的线程中发送它们,但我想知道我们有哪些选择?我遇到了 'SemaphoreSlim' 。这在我的用例中是否合适?或者还有哪些其他选项适合我的用例?请指导我。
所以,我目前在我的代码中有什么
public static SemaphoreSlim mutex = new SemaphoreSlim(1);
//first DS call
public async Task SendModuleDataToDSAsync(Module parameters)
{
var tasks1 = new List<Task>();
var tasks2 = new List<Task>();
//await mutex.WaitAsync(); **//is this correct way to use SemaphoreSlim ?**
foreach (var setting in Module.param)
{
Task job1 = SaveModule(setting);
tasks1.Add(job1);
Task job2= SaveModule(GetAdvancedData(setting));
tasks2.Add(job2);
}
await Task.WhenAll(tasks1);
await Task.WhenAll(tasks2);
//mutex.Release(); // **is this correct?**
}
private async Task SaveModule(Module setting)
{
await Task.Run(() =>
{
// Invokes Calls to DS
...
});
}
Run Code Online (Sandbox Code Playgroud)
//在主线程的某个地方,调用对DS的第二次调用
//Second DS Call
private async Task SendInstrumentSettingsToDS(<param1>, <param2>)
{
//await mutex.WaitAsync();// **is this correct?**
await Task.Run(() =>
{
//TrackInstrumentInfoToDS
//mutex.Release();// **is this correct?**
});
if(param2)
{
await Task.Run(() =>
{
//TrackParam2InstrumentInfoToDS
});
}
}
Run Code Online (Sandbox Code Playgroud)
最初,我在这些方法中的每一个上使用 async await 并且每个调用都是异步执行的,但我们发现如果它们乱序,那么就有出错的空间。
所以,我认为我们应该将所有这些异步任务排队并在一个单独的线程中发送它们,但我想知道我们有哪些选择?我遇到了 'SemaphoreSlim' 。
SemaphoreSlim确实将异步代码限制为一次运行一个,并且是一种有效的互斥形式。但是,由于“乱序”调用会导致错误,因此SemaphoreSlim它不是一个合适的解决方案,因为它不能保证 FIFO。
从更一般的意义上讲,没有同步原语能保证 FIFO,因为这可能会由于锁护航等副作用而导致问题。另一方面,数据结构很自然地是严格的 FIFO。
因此,您需要使用自己的 FIFO 队列,而不是使用隐式执行队列。Channels 是一个不错的、高性能的、异步兼容的队列,但由于您使用的是旧版本的 C#/.NET,因此BlockingCollection<T>可以使用:
public sealed class ExecutionQueue
{
private readonly BlockingCollection<Func<Task>> _queue = new BlockingCollection<Func<Task>>();
public ExecutionQueue() => Completion = Task.Run(() => ProcessQueueAsync());
public Task Completion { get; }
public void Complete() => _queue.CompleteAdding();
private async Task ProcessQueueAsync()
{
foreach (var value in _queue.GetConsumingEnumerable())
await value();
}
}
Run Code Online (Sandbox Code Playgroud)
此设置唯一棘手的部分是如何排队工作。从排队工作的代码的角度来看,他们想知道 lambda 何时执行,而不是 lambda 何时排队。从队列方法(我正在调用它Run)的角度来看,该方法需要在执行 lambda 之后才能完成其返回的任务。所以,你可以像这样编写队列方法:
public Task Run(Func<Task> lambda)
{
var tcs = new TaskCompletionSource<object>();
_queue.Add(async () =>
{
// Execute the lambda and propagate the results to the Task returned from Run
try
{
await lambda();
tcs.TrySetResult(null);
}
catch (OperationCanceledException ex)
{
tcs.TrySetCanceled(ex.CancellationToken);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
});
return tcs.Task;
}
Run Code Online (Sandbox Code Playgroud)
这种排队方法并不完美。如果任务完成时出现多个异常(这对于并行代码是正常的),则仅保留第一个异常(这对于异步代码是正常的)。在OperationCanceledException处理方面还有一个边缘情况。但是这段代码对于大多数情况来说已经足够了。
现在你可以像这样使用它:
public static ExecutionQueue _queue = new ExecutionQueue();
public async Task SendModuleDataToDSAsync(Module parameters)
{
var tasks1 = new List<Task>();
var tasks2 = new List<Task>();
foreach (var setting in Module.param)
{
Task job1 = _queue.Run(() => SaveModule(setting));
tasks1.Add(job1);
Task job2 = _queue.Run(() => SaveModule(GetAdvancedData(setting)));
tasks2.Add(job2);
}
await Task.WhenAll(tasks1);
await Task.WhenAll(tasks2);
}
Run Code Online (Sandbox Code Playgroud)