标签: task-parallel-library

为什么Task.WhenAll的continuations是同步执行的?

Task.WhenAll在 .NET Core 3.0 上运行时,我只是对该方法进行了一个奇怪的观察。我将一个简单的Task.Delay任务作为单个参数传递给Task.WhenAll,并且我预计包装的任务将与原始任务的行为相同。但这种情况并非如此。原始任务的延续是异步执行的(这是可取的),多个Task.WhenAll(task)包装器的延续是一个接一个同步执行的(这是不可取的)。

这是此行为的演示。四个工作任务正在等待同一个Task.Delay任务完成,然后继续进行繁重的计算(由 a 模拟Thread.Sleep)。

var task = Task.Delay(500);
var workers = Enumerable.Range(1, 4).Select(async x =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}" +
        $" [{Thread.CurrentThread.ManagedThreadId}] Worker{x} before await");

    await task;
    //await Task.WhenAll(task);

    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}" +
        $" [{Thread.CurrentThread.ManagedThreadId}] Worker{x} after await");

    Thread.Sleep(1000); // Simulate some heavy CPU-bound computation
}).ToArray();
Task.WaitAll(workers);
Run Code Online (Sandbox Code Playgroud)

这是输出。四个延续在不同的线程(并行)中按预期运行。

var task = Task.Delay(500);
var workers = Enumerable.Range(1, 4).Select(async x =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}" +
        $" [{Thread.CurrentThread.ManagedThreadId}] Worker{x} before await"); …
Run Code Online (Sandbox Code Playgroud)

c# continuations task-parallel-library async-await .net-core

18
推荐指数
1
解决办法
957
查看次数

在TPL中中止长时间运行的任务

我们的应用程序使用TPL来序列化(可能)长时间运行的工作单元.工作(任务)的创建是用户驱动的,可以随时取消.为了拥有响应式用户界面,如果不再需要当前的工作,我们想放弃我们正在做的事情,并立即开始另一项任务.

任务排队等同于:

private Task workQueue;
private void DoWorkAsync
    (Action<WorkCompletedEventArgs> callback, CancellationToken token) 
{
   if (workQueue == null)
   {
      workQueue = Task.Factory.StartWork
          (() => DoWork(callback, token), token);
   }
   else 
   {
      workQueue.ContinueWork(t => DoWork(callback, token), token);
   }
}
Run Code Online (Sandbox Code Playgroud)

DoWork方法包含长时间运行的呼叫,因此,token.IsCancellationRequested如果/当检测到取消时,它不会像不断检查状态和挽救一样简单.长时间运行的工作将阻止任务继续,直到它完成,即使任务被取消.

我已经提出了两个样本方法来解决这个问题,但我不相信这两种方法都是正确的.我创建了简单的控制台应用程序来演示它们如何工

需要注意的重要一点是,在原始任务完成之前会继续触发.

尝试#1:内部任务

static void Main(string[] args)
{
   CancellationTokenSource cts = new CancellationTokenSource();
   var token = cts.Token;
   token.Register(() => Console.WriteLine("Token cancelled"));
   // Initial work
   var t = Task.Factory.StartNew(() =>
     {
        Console.WriteLine("Doing work");

      // Wrap the long running …
Run Code Online (Sandbox Code Playgroud)

c# task-parallel-library cancellation

17
推荐指数
1
解决办法
8388
查看次数

为什么这个Parallel.ForEach代码会冻结程序?

更多新手问题:

这段代码从主窗口的列表中抓取了许多代理(我无法弄清楚如何在不同的函数之间使变量可用)并检查每个代理(简单的httpwebrequest),然后将它们添加到名为的列表中finishedProxies.

出于某种原因,当我按下开始按钮时,整个程序挂起.我的印象是Parallel为每个动作创建单独的线程,只留下UI线程,以便它响应?

private void start_Click(object sender, RoutedEventArgs e)
        {
            // Populate a list of proxies
            List<string> proxies = new List<string>();
            List<string> finishedProxies = new List<string>();

            foreach (string proxy in proxiesList.Items)
            {
                proxies.Add(proxy);
            }

            Parallel.ForEach<string>(proxies, (i) =>
            {
                string checkResult;
                checkResult = checkProxy(i);

                finishedProxies.Add(checkResult);
                // update ui
                /*
                 status.Dispatcher.Invoke(
                  System.Windows.Threading.DispatcherPriority.Normal,
                  new Action(
                    delegate()
                    {
                        status.Content = "hello" + checkResult;
                    }
                )); */
                // update ui finished


                //Console.WriteLine("[{0}] F({1}) = {2}", Thread.CurrentThread.Name, i, CalculateFibonacciNumber(i));
            });


        }
Run Code Online (Sandbox Code Playgroud)

我已经尝试使用已注释掉的代码来更改Parallel.Foreach中的UI,它会在按下启动按钮后冻结程序.它之前对我有用,但我使用的是Thread类.

如何从Parallel.Foreach内部更新UI,如何使Parallel.Foreach工作,以便它在工作时不会使UI冻结?

这是整个代码.

c# user-interface multithreading visual-studio-2010 task-parallel-library

17
推荐指数
2
解决办法
2万
查看次数

代码契约和异步

将后置条件添加到返回的异步方法的推荐方法是什么Task<T>

我已经阅读了以下建议:

http://social.msdn.microsoft.com/Forums/hu-HU/async/thread/52fc521c-473e-4bb2-a666-6c97a4dd3a39

帖子建议将每个方法实现为同步,签约,然后将异步对应实现为简单的包装器.不幸的是,我不认为这是一个可行的解决方案(也许是通过我自己的误解):

  1. 异步方法虽然被假定为同步方法的包装器,但没有任何真正的代码契约,因此可以按照自己的意愿进行.
  2. 致力于异步的代码库不太可能为所有内容实现同步对应.因此,实现包含await其他异步方法的新方法因此被强制为异步.这些方法本质上是异步的,不能轻易转换为同步.它们不仅仅是包装纸.

即使我们通过说我们可以使用.Result.Wait()代替await(这实际上会导致某些SyncContexts死锁,并且无论如何都必须在异步方法中重写)来使后一点无效,我仍然相信第一点.

有没有其他想法,或者有什么我错过的代码合同和TPL?

.net c# code-contracts task-parallel-library async-ctp

17
推荐指数
1
解决办法
1252
查看次数

表观BufferBlock.Post/Receive/ReceiveAsync种族/错误

交叉发布到http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9

我知道......我并没有真正使用TplDataflow来发挥它的最大潜力.ATM我只是BufferBlock用作消息传递的安全队列,其中生产者和消费者以不同的速率运行.我看到一些奇怪的行为,让我难以理解如何继续.

private BufferBlock<object> messageQueue = new BufferBlock<object>();

public void Send(object message)
{
    var accepted=messageQueue.Post(message);
    logger.Info("Send message was called qlen = {0} accepted={1}",
    messageQueue.Count,accepted);
}

public async Task<object> GetMessageAsync()
{
    try
    {
        var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
        //despite messageQueue.Count>0 next line 
        //occasionally does not execute
        logger.Info("message received");
        //.......
    }
    catch(TimeoutException)
    {
        //do something
    }
}
Run Code Online (Sandbox Code Playgroud)

在上面的代码中(它是2000行分布式解决方案的一部分),Send每100ms左右定期调用一次.这意味着一个项目被Post编到messageQueue在约10次.这已经过验证.但是,偶尔看起来ReceiveAsync在超时内没有完成(即Post没有导致ReceiveAsync完成)并且TimeoutException在30秒后被提升.在这一点上,messageQueue.Count是数百.这是出乎意料的.这个问题已经在较慢的发布率(1个帖子/秒)中观察到,并且通常在1000个项目通过之前发生BufferBlock.

因此,要解决此问题,我使用以下代码,它可以工作,但偶尔会在接收时导致1秒延迟(由于上面发生的错误)

    public async …
Run Code Online (Sandbox Code Playgroud)

c# dataflow task-parallel-library async-await tpl-dataflow

17
推荐指数
1
解决办法
1727
查看次数

为什么迭代GetConsumingEnumerable()不会完全清空底层的阻塞集合

我使用任务并行库,可量化和可重复的问题BlockingCollection<T>,ConcurrentQueue<T>GetConsumingEnumerable试图创建一个简单的管道.

简而言之,从一个线程向默认值BlockingCollection<T>(引擎盖下依赖于a ConcurrentQueue<T>)添加条目并不能保证它们会BlockingCollection<T>从调用GetConsumingEnumerable()Method的另一个线程中弹出.

我创建了一个非常简单的Winforms应用程序来重现/模拟它,它只是将整数打印到屏幕上.

  • Timer1负责排队工作项...它使用一个被调用的并发字典,_tracker以便它知道它已经添加到阻塞集合中的内容.
  • Timer2只记录两个BlockingCollection&的计数状态_tracker
  • START按钮启动一个Paralell.ForEach简单地遍历阻塞集合GetConsumingEnumerable()并开始将它们打印到第二个列表框的按钮.
  • STOP按钮会停止Timer1阻止将更多条目添加到阻止集合中.
public partial class Form1 : Form
{
    private int Counter = 0;
    private BlockingCollection<int> _entries;
    private ConcurrentDictionary<int, int> _tracker;
    private CancellationTokenSource _tokenSource;
    private TaskFactory _factory;

    public Form1()
    {
        _entries = new BlockingCollection<int>();
        _tracker = new ConcurrentDictionary<int, int>();
        _tokenSource = new CancellationTokenSource();
        _factory = new TaskFactory(); 
        InitializeComponent(); …
Run Code Online (Sandbox Code Playgroud)

.net c# wpf task-parallel-library blockingcollection

17
推荐指数
2
解决办法
1万
查看次数

跳过Dataflow TransformBlock中的项目

TPL Dataflow提供TransformBlock转换输入,例如:

var tb = new TransformBlock<int, int>(i => i * 2);
Run Code Online (Sandbox Code Playgroud)

是否有可能不输出一些输入,例如,如果输入未通过某些验证测试?

var tb = new TransformBlock<InputType, OutputType>(i =>
{
    if (!ValidateInput(i))
    {
        // Do something to not output anything for this input
    }
    // Normal output
}
Run Code Online (Sandbox Code Playgroud)

如果那是不可能的,那么实现这一目标的最佳模式是什么?
像下面这样的东西?

BufferBlock<OutputType> output = new BufferBlock<OutputType>();

var ab = new ActionBlock<InputType>(i =>
{
    if (ValidateInput(i)) 
    {
        output.Post(MyTransform(i));
    }
}
Run Code Online (Sandbox Code Playgroud)

.net c# task-parallel-library tpl-dataflow

17
推荐指数
3
解决办法
4353
查看次数

CancellationToken.ThrowIfCancellationRequested后出现故障与已取消的任务状态

通常我不会回答问题,但这次我想引起一些我认为可能是一个模糊而又常见的问题的注意.它是由这个问题引发的,从那以后我查看了我自己的旧代码,发现其中一些也受此影响.

下面的代码开始,等待着两个任务,task1并且task2,这几乎是相同的.task1只是task2因为它运行一个永无止境的循环.对于执行CPU限制工作的一些现实场景,这两种情况都非常典型.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    public class Program
    {
        static async Task TestAsync()
        {
            var ct = new CancellationTokenSource(millisecondsDelay: 1000);
            var token = ct.Token;

            // start task1
            var task1 = Task.Run(() =>
            {
                for (var i = 0; ; i++)
                {
                    Thread.Sleep(i); // simulate work item #i
                    token.ThrowIfCancellationRequested();
                }
            });

            // start task2
            var task2 = Task.Run(() =>
            {
                for (var i = 0; …
Run Code Online (Sandbox Code Playgroud)

.net c# multithreading task-parallel-library async-await

17
推荐指数
1
解决办法
4632
查看次数

异步MVVM命令

我一直在关注Stephen Cleary在MSDN杂志(异步MVVM应用程序模式)中的一系列相当优秀的文章,并一直IAsyncCommand在"hello world"风格的应用程序中使用他的模式.

但是,他没有解决的一个领域是当需要传入命令参数时(使用此模式).对于一个简单的示例,请考虑身份验证,其中出于安全原因,密码控件可能不受数据限制.

我想知道是否有人设法让他的AsyncCommand参与工作,如果是这样,他们会分享他们的发现吗?

wpf asynchronous mvvm task-parallel-library async-await

17
推荐指数
1
解决办法
8601
查看次数

并行运行异步方法

我有一个异步方法,GetExpensiveThing()它执行一些昂贵的I/O工作.这就是我使用它的方式:

// Serial execution
public async Task<List<Thing>> GetThings()
{
    var first = await GetExpensiveThing();
    var second = await GetExpensiveThing();
    return new List<Thing>() { first, second };
}
Run Code Online (Sandbox Code Playgroud)

但由于这是一种昂贵的方法,我想并行执行这些调用.我本以为移动等待会解决这个问题:

// Serial execution
public async Task<List<Thing>> GetThings()
{
    var first = GetExpensiveThing();
    var second = GetExpensiveThing();
    return new List<Thing>() { await first, await second };
}
Run Code Online (Sandbox Code Playgroud)

这不起作用,所以我将它们包装在一些任务中,这有效:

// Parallel execution
public async Task<List<Thing>> GetThings()
{
    var first = Task.Run(() =>
    {
        return GetExpensiveThing();
    });

    var second = Task.Run(() =>
    { …
Run Code Online (Sandbox Code Playgroud)

c# task-parallel-library async-await

17
推荐指数
2
解决办法
2万
查看次数