我最近接触了 C# 语言,并致力于从 cassandra 中获取数据,因此我正在使用下面的代码从 Cassandra 中获取数据并且工作正常。
我唯一的问题是我的ProcessCassQuery方法 - 我传递CancellationToken.None给我的requestExecuter函数,这可能不是正确的做法。处理这种情况的正确方法应该是什么?我应该怎么做才能正确处理?
/**
*
* Below method does multiple async calls on each table for their corresponding id's by limiting it down using Semaphore.
*
*/
private async Task<List<T>> ProcessCassQueries<T>(IList<int> ids, Func<CancellationToken, int, Task<T>> mapperFunc, string msg) where T : class
{
var tasks = ids.Select(async id =>
{
await semaphore.WaitAsync();
try
{
ProcessCassQuery(ct => mapperFunc(ct, id), msg);
}
finally
{
semaphore.Release();
}
});
return (await Task.WhenAll(tasks)).Where(e => e != null).ToList();
}
// this might not be good idea to do it. how can I improve below method?
private Task<T> ProcessCassQuery<T>(Func<CancellationToken, Task<T>> requestExecuter, string msg) where T : class
{
return requestExecuter(CancellationToken.None);
}
Run Code Online (Sandbox Code Playgroud)
rph*_*rph 11
正如官方文档中所述,取消令牌允许传播取消信号。例如,这对于取消由于某种原因不再有意义或花费太长时间的长时间运行的操作很有用。
这CancelationTokenSource将允许您获取可以传递给 的自定义令牌requestExecutor。它还将提供取消正在运行的方法Task。
private CancellationTokenSource cts = new CancellationTokenSource();
// ...
private Task<T> ProcessCassQuery<T>(Func<CancellationToken, Task<T>> requestExecuter, string msg) where T : class
{
return requestExecuter(cts.Token);
}
Run Code Online (Sandbox Code Playgroud)
例子
让我们看一个不同的最小/虚拟示例,以便我们可以了解它的内部。
考虑以下方法,GetSomethingAsync它将每秒返回一个递增的整数。
如果此过程被外部操作取消,则调用token.ThrowIfCancellationRequested将确保抛出 a 。TaskCanceledException可以采取其他方法,例如,检查是否token.IsCancellationRequested为真并对此采取措施。
private static async IAsyncEnumerable<int> GetSomethingAsync(CancellationToken token)
{
Console.WriteLine("starting to get something");
token.ThrowIfCancellationRequested();
for (var i = 0; i < 100; i++)
{
await Task.Delay(1000, token);
yield return i;
}
Console.WriteLine("finished getting something");
}
Run Code Online (Sandbox Code Playgroud)
现在让我们构建 main 方法来调用上述方法。
public static async Task Main()
{
var cts = new CancellationTokenSource();
// cancel it after 3 seconds, just for demo purposes
cts.CancelAfter(3000);
// or: Task.Delay(3000).ContinueWith(_ => { cts.Cancel(); });
await foreach (var i in GetSomethingAsync(cts.Token))
{
Console.WriteLine(i);
}
}
Run Code Online (Sandbox Code Playgroud)
如果我们运行它,我们将得到如下所示的输出:
starting to get something
0
1
Unhandled exception. System.Threading.Tasks.TaskCanceledException: A task was canceled.
Run Code Online (Sandbox Code Playgroud)
当然,这只是一个虚拟示例,取消可以由用户操作或发生的某些事件触发,不一定是计时器。