如何强制 IAsyncEnumerable 遵守 CancellationToken

The*_*ias 7 c# cancellation c#-8.0 iasyncenumerable

编辑:这个问题的要求已经改变。请参阅下面的更新部分。

我有一个异步迭代器方法,它产生一个IAsyncEnumerable<int>(数字流),每 200 毫秒一个数字。此方法的调用者使用流,但希望在 1000 毫秒后停止枚举。因此使用了 a CancellationTokenSource,并且将令牌作为参数传递给WithCancellation扩展方法。但令牌不受尊重。枚举一直持续到所有数字都被消耗完:

static async IAsyncEnumerable<int> GetSequence()
{
    for (int i = 1; i <= 10; i++)
    {
        await Task.Delay(200);
        yield return i;
    }
}

var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence().WithCancellation(cts.Token))
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}
Run Code Online (Sandbox Code Playgroud)

输出:

12:55:17.506 > 1
12:55:17.739 > 2
12:55:17.941 > 3
12:55:18.155 > 4
12:55:18.367 > 5
12:55:18.520
: > 572 18 >
18 :55:18.973 > 8
12:55:19.174 > 9
12:55:19.376 > 10

预期的输出是TaskCanceledException在数字 5 之后发生。似乎我误解了WithCancellation实际在做什么。该方法只是将提供的令牌传递给迭代器方法,如果该方法接受一个。否则,就像GetSequence()我的示例中的方法一样,令牌将被忽略。我想在我的情况下的解决方案是手动询问枚举主体内的令牌:

var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence())
{
    cts.Token.ThrowIfCancellationRequested();
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}
Run Code Online (Sandbox Code Playgroud)

这很简单,效果很好。但无论如何,我想知道是否有可能创建一个扩展方法来执行我期望的WithCancellation操作,在随后的枚举中烘焙令牌。这是所需方法的签名:

public static IAsyncEnumerable<T> WithEnforcedCancellation<T>(
    this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
    // Is it possible?
}
Run Code Online (Sandbox Code Playgroud)

更新:似乎当我问这个问题时,我对整个取消概念的目的有一个不正确的理解。我的印象是,取消是为打破循环的等待之后MoveNextAsync,而真正的目的是要取消的等待本身。在我的小例子中,等待只持续 200 毫秒,但在现实世界的例子中,等待可能会更长,甚至是无限的。意识到这一点后,我当前形式的问题几乎没有任何价值,我必须将其删除并打开一个具有相同标题的新问题,或者更改现有问题的要求。这两种选择在某一方面都不好。

我决定采用第二种选择。因此,我不接受当前接受的答案,并且我正在寻求一个新的解决方案,以解决更困难的问题,即以立即生效的方式强制取消。换句话说,取消令牌应该会在几毫秒内完成异步枚举。让我们举一个实际的例子来区分可取和不可取的行为:

var cts = new CancellationTokenSource(500);
var stopwatch = Stopwatch.StartNew();
try
{
    await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
    {
        Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > {i}");
    }
}
catch (OperationCanceledException)
{
    Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > Canceled");
}
Run Code Online (Sandbox Code Playgroud)

输出(理想):

0:00.242 > 1
0:00.467 > 2
0:00.500 > 取消

输出(不需要):

0:00.242 > 1
0:00.467 > 2
0:00.707 > 取消

GetSequence与初始示例中的方法相同,每 200 毫秒传输一个数字。此方法不支持取消,前提是我们无法更改。WithEnforcedCancellation是解决此问题所需的扩展方法。

Jer*_*ert 15

IAsyncEnumerable明确地为这种机制提供了以下EnumeratorCancellation属性:

static async IAsyncEnumerable<int> GetSequence([EnumeratorCancellation] CancellationToken ct = default) {
    for (int i = 1; i <= 10; i++) {
        ct.ThrowIfCancellationRequested();
        await Task.Delay(200);    // or `Task.Delay(200, ct)` if this wasn't an example
        yield return i;
    }
}
Run Code Online (Sandbox Code Playgroud)

事实上,如果你给方法一个CancellationToken参数,但不添加属性,编译器足以发出警告。

请注意,传递给的令牌.WithCancellation将覆盖传递给该方法的任何本地令牌。该规范对这个细节。

当然,这仍然只有在枚举实际接受 a 的情况下才有效CancellationToken——但事实上,取消只有在合作完成时才真正有效,这一事实适用于任何async工作。Yeldar 的答案有助于将某些取消措施“强制”到不支持它的枚举中,但首选的解决方案应该是修改枚举以支持取消本身——编译器会尽一切努力帮助您。