Task.WhenAll在 .NET Core 3.0 上运行时,我只是对该方法进行了一个奇怪的观察。我将一个简单的Task.Delay任务作为单个参数传递给Task.WhenAll,并且我预计包装的任务将与原始任务的行为相同。但这种情况并非如此。原始任务的延续是异步执行的(这是可取的),多个Task.WhenAll(task)包装器的延续是一个接一个同步执行的(这是不可取的)。
这是此行为的演示。四个工作任务正在等待同一个Task.Delay任务完成,然后继续进行繁重的计算(由 a 模拟Thread.Sleep)。
var task = Task.Delay(500);
var workers = Enumerable.Range(1, 4).Select(async x =>
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}" +
$" [{Thread.CurrentThread.ManagedThreadId}] Worker{x} before await");
await task;
//await Task.WhenAll(task);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}" +
$" [{Thread.CurrentThread.ManagedThreadId}] Worker{x} after await");
Thread.Sleep(1000); // Simulate some heavy CPU-bound computation
}).ToArray();
Task.WaitAll(workers);
Run Code Online (Sandbox Code Playgroud)
这是输出。四个延续在不同的线程(并行)中按预期运行。
var task = Task.Delay(500);
var workers = Enumerable.Range(1, 4).Select(async x =>
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}" +
$" [{Thread.CurrentThread.ManagedThreadId}] Worker{x} before await"); …Run Code Online (Sandbox Code Playgroud) c# continuations task-parallel-library async-await .net-core
我们的应用程序使用TPL来序列化(可能)长时间运行的工作单元.工作(任务)的创建是用户驱动的,可以随时取消.为了拥有响应式用户界面,如果不再需要当前的工作,我们想放弃我们正在做的事情,并立即开始另一项任务.
任务排队等同于:
private Task workQueue;
private void DoWorkAsync
(Action<WorkCompletedEventArgs> callback, CancellationToken token)
{
if (workQueue == null)
{
workQueue = Task.Factory.StartWork
(() => DoWork(callback, token), token);
}
else
{
workQueue.ContinueWork(t => DoWork(callback, token), token);
}
}
Run Code Online (Sandbox Code Playgroud)
该DoWork方法包含长时间运行的呼叫,因此,token.IsCancellationRequested如果/当检测到取消时,它不会像不断检查状态和挽救一样简单.长时间运行的工作将阻止任务继续,直到它完成,即使任务被取消.
我已经提出了两个样本方法来解决这个问题,但我不相信这两种方法都是正确的.我创建了简单的控制台应用程序来演示它们如何工
需要注意的重要一点是,在原始任务完成之前会继续触发.
尝试#1:内部任务
static void Main(string[] args)
{
CancellationTokenSource cts = new CancellationTokenSource();
var token = cts.Token;
token.Register(() => Console.WriteLine("Token cancelled"));
// Initial work
var t = Task.Factory.StartNew(() =>
{
Console.WriteLine("Doing work");
// Wrap the long running …Run Code Online (Sandbox Code Playgroud) 更多新手问题:
这段代码从主窗口的列表中抓取了许多代理(我无法弄清楚如何在不同的函数之间使变量可用)并检查每个代理(简单的httpwebrequest),然后将它们添加到名为的列表中finishedProxies.
出于某种原因,当我按下开始按钮时,整个程序挂起.我的印象是Parallel为每个动作创建单独的线程,只留下UI线程,以便它响应?
private void start_Click(object sender, RoutedEventArgs e)
{
// Populate a list of proxies
List<string> proxies = new List<string>();
List<string> finishedProxies = new List<string>();
foreach (string proxy in proxiesList.Items)
{
proxies.Add(proxy);
}
Parallel.ForEach<string>(proxies, (i) =>
{
string checkResult;
checkResult = checkProxy(i);
finishedProxies.Add(checkResult);
// update ui
/*
status.Dispatcher.Invoke(
System.Windows.Threading.DispatcherPriority.Normal,
new Action(
delegate()
{
status.Content = "hello" + checkResult;
}
)); */
// update ui finished
//Console.WriteLine("[{0}] F({1}) = {2}", Thread.CurrentThread.Name, i, CalculateFibonacciNumber(i));
});
}
Run Code Online (Sandbox Code Playgroud)
我已经尝试使用已注释掉的代码来更改Parallel.Foreach中的UI,它会在按下启动按钮后冻结程序.它之前对我有用,但我使用的是Thread类.
如何从Parallel.Foreach内部更新UI,如何使Parallel.Foreach工作,以便它在工作时不会使UI冻结?
c# user-interface multithreading visual-studio-2010 task-parallel-library
将后置条件添加到返回的异步方法的推荐方法是什么Task<T>?
我已经阅读了以下建议:
http://social.msdn.microsoft.com/Forums/hu-HU/async/thread/52fc521c-473e-4bb2-a666-6c97a4dd3a39
帖子建议将每个方法实现为同步,签约,然后将异步对应实现为简单的包装器.不幸的是,我不认为这是一个可行的解决方案(也许是通过我自己的误解):
await其他异步方法的新方法因此被强制为异步.这些方法本质上是异步的,不能轻易转换为同步.它们不仅仅是包装纸.即使我们通过说我们可以使用.Result或.Wait()代替await(这实际上会导致某些SyncContexts死锁,并且无论如何都必须在异步方法中重写)来使后一点无效,我仍然相信第一点.
有没有其他想法,或者有什么我错过的代码合同和TPL?
我知道......我并没有真正使用TplDataflow来发挥它的最大潜力.ATM我只是BufferBlock用作消息传递的安全队列,其中生产者和消费者以不同的速率运行.我看到一些奇怪的行为,让我难以理解如何继续.
private BufferBlock<object> messageQueue = new BufferBlock<object>();
public void Send(object message)
{
var accepted=messageQueue.Post(message);
logger.Info("Send message was called qlen = {0} accepted={1}",
messageQueue.Count,accepted);
}
public async Task<object> GetMessageAsync()
{
try
{
var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
//despite messageQueue.Count>0 next line
//occasionally does not execute
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
Run Code Online (Sandbox Code Playgroud)
在上面的代码中(它是2000行分布式解决方案的一部分),Send每100ms左右定期调用一次.这意味着一个项目被Post编到messageQueue在约10次.这已经过验证.但是,偶尔看起来ReceiveAsync在超时内没有完成(即Post没有导致ReceiveAsync完成)并且TimeoutException在30秒后被提升.在这一点上,messageQueue.Count是数百.这是出乎意料的.这个问题已经在较慢的发布率(1个帖子/秒)中观察到,并且通常在1000个项目通过之前发生BufferBlock.
因此,要解决此问题,我使用以下代码,它可以工作,但偶尔会在接收时导致1秒延迟(由于上面发生的错误)
public async …Run Code Online (Sandbox Code Playgroud) 我使用任务并行库,可量化和可重复的问题BlockingCollection<T>,ConcurrentQueue<T>与GetConsumingEnumerable试图创建一个简单的管道.
简而言之,从一个线程向默认值BlockingCollection<T>(引擎盖下依赖于a ConcurrentQueue<T>)添加条目并不能保证它们会BlockingCollection<T>从调用GetConsumingEnumerable()Method的另一个线程中弹出.
我创建了一个非常简单的Winforms应用程序来重现/模拟它,它只是将整数打印到屏幕上.
Timer1负责排队工作项...它使用一个被调用的并发字典,_tracker以便它知道它已经添加到阻塞集合中的内容.Timer2只记录两个BlockingCollection&的计数状态_trackerParalell.ForEach简单地遍历阻塞集合GetConsumingEnumerable()并开始将它们打印到第二个列表框的按钮.Timer1阻止将更多条目添加到阻止集合中.public partial class Form1 : Form
{
private int Counter = 0;
private BlockingCollection<int> _entries;
private ConcurrentDictionary<int, int> _tracker;
private CancellationTokenSource _tokenSource;
private TaskFactory _factory;
public Form1()
{
_entries = new BlockingCollection<int>();
_tracker = new ConcurrentDictionary<int, int>();
_tokenSource = new CancellationTokenSource();
_factory = new TaskFactory();
InitializeComponent(); …Run Code Online (Sandbox Code Playgroud) TPL Dataflow提供TransformBlock转换输入,例如:
var tb = new TransformBlock<int, int>(i => i * 2);
Run Code Online (Sandbox Code Playgroud)
是否有可能不输出一些输入,例如,如果输入未通过某些验证测试?
var tb = new TransformBlock<InputType, OutputType>(i =>
{
if (!ValidateInput(i))
{
// Do something to not output anything for this input
}
// Normal output
}
Run Code Online (Sandbox Code Playgroud)
如果那是不可能的,那么实现这一目标的最佳模式是什么?
像下面这样的东西?
BufferBlock<OutputType> output = new BufferBlock<OutputType>();
var ab = new ActionBlock<InputType>(i =>
{
if (ValidateInput(i))
{
output.Post(MyTransform(i));
}
}
Run Code Online (Sandbox Code Playgroud) 通常我不会回答问题,但这次我想引起一些我认为可能是一个模糊而又常见的问题的注意.它是由这个问题引发的,从那以后我查看了我自己的旧代码,发现其中一些也受此影响.
下面的代码开始,等待着两个任务,task1并且task2,这几乎是相同的.task1只是task2因为它运行一个永无止境的循环.对于执行CPU限制工作的一些现实场景,这两种情况都非常典型.
using System;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication
{
public class Program
{
static async Task TestAsync()
{
var ct = new CancellationTokenSource(millisecondsDelay: 1000);
var token = ct.Token;
// start task1
var task1 = Task.Run(() =>
{
for (var i = 0; ; i++)
{
Thread.Sleep(i); // simulate work item #i
token.ThrowIfCancellationRequested();
}
});
// start task2
var task2 = Task.Run(() =>
{
for (var i = 0; …Run Code Online (Sandbox Code Playgroud) 我一直在关注Stephen Cleary在MSDN杂志(异步MVVM应用程序模式)中的一系列相当优秀的文章,并一直IAsyncCommand在"hello world"风格的应用程序中使用他的模式.
但是,他没有解决的一个领域是当需要传入命令参数时(使用此模式).对于一个简单的示例,请考虑身份验证,其中出于安全原因,密码控件可能不受数据限制.
我想知道是否有人设法让他的AsyncCommand参与工作,如果是这样,他们会分享他们的发现吗?
我有一个异步方法,GetExpensiveThing()它执行一些昂贵的I/O工作.这就是我使用它的方式:
// Serial execution
public async Task<List<Thing>> GetThings()
{
var first = await GetExpensiveThing();
var second = await GetExpensiveThing();
return new List<Thing>() { first, second };
}
Run Code Online (Sandbox Code Playgroud)
但由于这是一种昂贵的方法,我想并行执行这些调用.我本以为移动等待会解决这个问题:
// Serial execution
public async Task<List<Thing>> GetThings()
{
var first = GetExpensiveThing();
var second = GetExpensiveThing();
return new List<Thing>() { await first, await second };
}
Run Code Online (Sandbox Code Playgroud)
这不起作用,所以我将它们包装在一些任务中,这有效:
// Parallel execution
public async Task<List<Thing>> GetThings()
{
var first = Task.Run(() =>
{
return GetExpensiveThing();
});
var second = Task.Run(() =>
{ …Run Code Online (Sandbox Code Playgroud) c# ×9
async-await ×5
.net ×4
tpl-dataflow ×2
wpf ×2
.net-core ×1
async-ctp ×1
asynchronous ×1
cancellation ×1
dataflow ×1
mvvm ×1