具有指定结果的任务并行库WaitAny

Ada*_*ger 13 c# task-parallel-library

我正在尝试编写一些代码,这些代码将使Web服务并行调用多个不同的服务器,因此TPL似乎是一个明显的选择.

只有我的一个Web服务调用将返回我想要的结果,而其他所有调用都不会.我正试图找出一种有效的方法,Task.WaitAnyTask匹配条件的第一个返回时,只有解锁.

我尝试过,WaitAny但无法确定过滤器的位置.我到目前为止:

public void SearchServers()
{
    var servers = new[] {"server1", "server2", "server3", "server4"};
    var tasks = servers
                 .Select(s => Task<bool>.Factory.StartNew(server => CallServer((string)server), s))
                 .ToArray();

    Task.WaitAny(tasks); //how do I say "WaitAny where the result is true"?

    //Omitted: cancel any outstanding tasks since the correct server has been found
}

private bool CallServer(string server)
{
    //... make the call to the server and return the result ...
}
Run Code Online (Sandbox Code Playgroud)

编辑:快速澄清,以防上面有任何混淆.我正在尝试执行以下操作:

  1. 对于每个服务器,启动a Task进行检查
  2. 或者,等到服务器返回true(最多只有1个服务器将返回true)
  3. 或者,等到所有服务器都返回false,即没有匹配.

Joh*_*ger 10

我能想到的最好的是ContinueWith为每个指定一个Task,检查结果,以及true取消其他任务.要取消任务,您可能需要使用CancellationToken.

var tasks = servers
    .Select(s => Task.Run(...)
        .ContinueWith(t =>
            if (t.Result) {
                // cancel other threads
            }
        )
    ).ToArray();
Run Code Online (Sandbox Code Playgroud)

更新:另一种解决方案是WaitAny直到正确的任务完成(但它有一些缺点,例如从列表中删除已完成的任务,并从剩余的任务中创建一个新阵列是一个非常繁重的操作):

List<Task<bool>> tasks = servers.Select(s => Task<bool>.Factory.StartNew(server => CallServer((string)server), s)).ToList();

bool result;
do {
    int idx = Task.WaitAny(tasks.ToArray());
    result = tasks[idx].Result;
    tasks.RemoveAt(idx);
} while (!result && tasks.Count > 0);

// cancel other tasks
Run Code Online (Sandbox Code Playgroud)

更新2:现在我会用Rx做到这一点:

[Fact]
public async Task AwaitFirst()
{
    var servers = new[] { "server1", "server2", "server3", "server4" };
    var server = await servers
        .Select(s => Observable
            .FromAsync(ct => CallServer(s, ct))
            .Where(p => p)
            .Select(_ => s)
        )
        .Merge()
        .FirstAsync();
    output.WriteLine($"Got result from {server}");
}

private async Task<bool> CallServer(string server, CancellationToken ct)
{
    try
    {
        if (server == "server1")
        {
            await Task.Delay(TimeSpan.FromSeconds(1), ct);
            output.WriteLine($"{server} finished");
            return false;
        }
        if (server == "server2")
        {
            await Task.Delay(TimeSpan.FromSeconds(2), ct);
            output.WriteLine($"{server} finished");
            return false;
        }
        if (server == "server3")
        {
            await Task.Delay(TimeSpan.FromSeconds(3), ct);
            output.WriteLine($"{server} finished");
            return true;
        }
        if (server == "server4")
        {
            await Task.Delay(TimeSpan.FromSeconds(4), ct);
            output.WriteLine($"{server} finished");
            return true;
        }
    }
    catch(OperationCanceledException)
    {
        output.WriteLine($"{server} Cancelled");
        throw;
    }

    throw new ArgumentOutOfRangeException(nameof(server));
}
Run Code Online (Sandbox Code Playgroud)

我的机器上的测试需要3.32秒(这意味着它没有等待第4台服务器)并得到以下输出:

server1 finished
server2 finished
server3 finished
server4 Cancelled
Got result from server3
Run Code Online (Sandbox Code Playgroud)