我面临以下问题:
我有一个Foo对象数据流,并将这些对象流式传输到几个并发的进程内任务/线程,这些任务/线程依次处理对象和输出FooResult对象。每个都FooResult包含在其他成员中与Foo在创建FooResult. 但是,并非每个人都Foo必须创建一个FooResult.
我的问题是,我想从整个过程传递一个包装对象,该对象包含原始对象Foo和可能FooResult从Foo并发任务中创建的所有对象(如果有)。
注意:我目前使用 TPL 数据流,而每个并发进程都发生在ActionBlock<Foo>从BroadCastBlock<Foo>. 它使用SendAsync()目标数据流块来发送可能创建的FooResult. 显然,并发数据流块FooResult在不可预测的时间产生,这正是我目前所面临的问题。我似乎无法弄清楚FooResult总共创建了多少个,ActionBlock<Foo>以便我可以将它们与原始Foo对象捆绑在一起并将其作为包装对象传递。
在伪代码中,它目前如下所示:
BroadCastBlock<Foo> broadCastBlock;
ActionBlock<Foo> aBlock1;
ActionBlock<Foo> aBlock2;
ActionBlock<FooResult> targetBlock;
broadCastBlock.LinkTo(aBlock1); broadCastBlock.LinkTo(aBlock2);
aBlock1 = new ActionBlock<Foo>(foo =>
{
//do something here. Sometimes create a FooResult. If then
targetBlock.SendAsync(fooResult);
});
//similar for aBlock2
Run Code Online (Sandbox Code Playgroud)
但是,当前代码的问题在于,如果在任何操作块中都Foo没有生成单个,则 targetBlock …
我正在寻找一个轻量级,进程中,异步消息总线,并遇到了TPL Dataflow.
我目前的实现如下(https://gist.github.com/4416655上的完整示例).
public class Bus
{
private readonly BroadcastBlock<object> broadcast =
new BroadcastBlock<object>(message => message);
private readonly ConcurrentDictionary<Guid, IDisposable> subscriptions
= new ConcurrentDictionary<Guid, IDisposable>();
public Task SendAsync<TMessage>(TMessage message)
{
return SendAsync<TMessage>(message, CancellationToken.None);
}
public Task SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
{
return broadcast.SendAsync(message, cancellationToken);
}
public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
{
var handler = new ActionBlock<object>(message => handlerAction((TMessage)message));
var subscription = broadcast.LinkTo(handler,
new DataflowLinkOptions { PropagateCompletion = true },
message => message is TMessage);
return AddSubscription(subscription);
}
public void …Run Code Online (Sandbox Code Playgroud) 我目前正在尝试使用TransformBlocks来使我的代码运行得更快.相反,我发现我基本上没有实现并行化:

正如您所看到的,存在相当多的死空间,很少有I/O或其他问题阻止事物并行运行(注意:所有绿色块都是主线程).
调用代码的基本结构如下:
var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 };
var download = new TransformBlock<string, Tuple<string, string>>(s => sendAndReciveRequest(s), options);
var process = new TransformBlock<Tuple<string, string, TransformBlock<string, Tuple<string, string>>>, List<string>>(s => Helpers.ParseKBDL(s), options);
var toObjects = new TransformBlock<List<string>, List<Food>>(list => toFood(list), options);
for (char char1 = 'a'; char1 < 'z' + 1; char1++)
download.Post(char1.ToString());
while ((download.InputCount != 0 || download.OutputCount != 0 || process.InputCount != 0) || (Form1.downloadCount != Form1.processCount))
{
if (download.OutputCount == 0 && download.InputCount …Run Code Online (Sandbox Code Playgroud) concurrency performance task-parallel-library c#-5.0 tpl-dataflow
我想设置一个TransformBlock并行处理它的项目.因此,我设置ExecutionDataflowBlockOptions.MaxDegreeOfParallelism为> 1.我不关心消息的顺序,但文档说:
如果指定的最大并行度大于1,则会同时处理多个消息,因此,可能无法按接收顺序处理消息.但是,从块输出消息的顺序将被正确排序.
"正确排序"是否意味着如果队列中有一条消息需要很长的处理时间,则在处理这一条消息之前不会输出更多消息?
如果是这样,我如何指定一个TransformBlock不关心排序的执行块(例如a )?或者我必须在消费端指定我不关心订购?
我正在使用TPL数据流来处理Azure辅助角色中的队列中的项目.我应该有一个长时间运行的数据流,还是为我收到的每封邮件生成一个新流?
如果块中抛出错误,该块将停止接受新消息.这意味着如果块中存在异常,则整个数据流将停止处理.
我需要能够在不锁定数据流的情况下承受无效队列输入等异常.我看到两个选项之一:
我已经看到很多关于如何在异常后完成数据流的文档,但很少有关于如何从异常中恢复的文档.
这个问题是关于ContinueWith()用于处理TPL数据块完成时的最佳实践.
该ITargetBlock<TInput>.Completion()方法允许您使用异步处理数据块的完成ContinueWith().
请考虑以下Console应用程序代码,它演示了一个非常基本的用法
private static void Main()
{
test().Wait();
}
static async Task test()
{
var transform = new TransformBlock<int, double>(i => i/2.0);
var output = new ActionBlock<double>(d => Console.WriteLine(d));
// Warning CS4014 here:
transform.Completion.ContinueWith(continuation => output.Complete());
transform.LinkTo(output);
for (int i = 0; i < 10; ++i)
await transform.SendAsync(i);
transform.Complete();
await output.Completion;
}
Run Code Online (Sandbox Code Playgroud)
代码有一个非常简单的方法TransformBlock,它将整数除以2.0并将它们转换为双精度数.转换的数据由ActionBlock只处理控制台窗口的值处理.
输出是:
0
0.5
1
1.5
2
2.5
3
3.5
4
4.5
Run Code Online (Sandbox Code Playgroud)
当TransformBlock完成后,我也想完成ActionBlock.这样做很方便: …
我想知道并行执行多个异步方法的推荐方法是什么?
在System.Threading.Tasks.Dataflow中,我们可以指定最大并行度,但Unbounded可能是Task.WhenAll的默认值?
这个 :
var tasks = new List<Task>();
foreach(var item in items)
{
tasks.Add(myAsyncMethod(item));
}
await Task.WhenAll(tasks.ToArray());
Run Code Online (Sandbox Code Playgroud)
或者那个 :
var action = new ActionBlock<string>(myAsyncMethod, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
BoundedCapacity = DataflowBlockOptions.Unbounded,
MaxMessagesPerTask = DataflowBlockOptions.Unbounded
});
foreach (var item in items) { }
{
action.Post(item);
}
action.Complete();
await action.Completion;
Run Code Online (Sandbox Code Playgroud) c# asynchronous task-parallel-library async-await tpl-dataflow
有什么方法可以通过TPL节流来限制性能下降吗?
我有一个复杂的组件管道,并试图限制所需的内存需求。我从多个文件中并行读取,管道中的组件可能会从这些文件的随机部分中读取一些内容,其余组件则进行CPU绑定操作。
我使用通用测试方法将性能测试平台简化为这些测试。
private void TPLPerformaceTest(int generateNumbers, ExecutionDataflowBlockOptions transformBlockOptions)
{
var transformBlock = new TransformBlock<int, int>(i => i, transformBlockOptions);
var storedCount = 0;
var generatedCount = 0;
var store = new ActionBlock<int>(i => Interlocked.Increment(ref storedCount));
transformBlock.LinkTo(store);
transformBlock.Completion.ContinueWith(_ => store.Complete());
for (int i = 0; i < generateNumbers; i++)
{
transformBlock.SendAsync(i).Wait(); //To ensure delivery
Interlocked.Increment(ref generatedCount);
}
transformBlock.Complete();
store.Completion.Wait();
Assert.IsTrue(generatedCount == generateNumbers);
Assert.IsTrue(storedCount == generateNumbers);
}
Run Code Online (Sandbox Code Playgroud)
第一个没有节流。在我的CPU上,大约需要12秒钟才能完成,消耗约800MB的RAM,平均CPU利用率约为35%。
[Test]
public void TPLPerformaceUnlimitedTest()
{
var …Run Code Online (Sandbox Code Playgroud) c# parallel-processing performance performance-testing tpl-dataflow
我有一连串的TPL Dataflow块,并希望观察系统内部的进度。
我知道我可以将a TransformBlock塞入要观察的网格中,将其发布到各种进度更新器中,然后将消息原封不动地返回到下一个块。我不喜欢这种解决方案,因为该块纯粹是因为它的副作用而存在,而且我还必须在我想观察的任何地方更改块链接逻辑。
因此,我想知道是否可以ISourceBlock<T>.AsObservable用来观察网格内消息的传递而无需更改它,也无需消耗消息。如果可行的话,这似乎是一种更纯净,更实用的解决方案。
从我对Rx的(有限的)理解中,这意味着我需要可观察的对象是热的而不是冷的,以便我的progress更新程序可以看到该消息但不使用它。并且.Publish().RefCount()似乎是使可观察到的热点的方法。但是,它根本不起作用按预期-而不是要么block2或progress接收并消耗每条消息。
// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()), new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
var obs = block1.AsObservable().Publish().RefCount(); // Declare this here just in case it makes a difference to do it before the LinkTo call. …Run Code Online (Sandbox Code Playgroud) 我正在编写一个小的记录器,我想打开一次日志文件,在日志消息到达时继续写入,并在程序终止时处理所有内容.
我不确定如何保持FileStream打开并在消息到达时反应性地写入消息.
我想从我以前的解决方案更新设计,其中我有一个ConcurrentQueue充当缓冲区,并且在using消耗队列的语句中有一个循环.
具体来说,我想同时利用using语句构造,所以我不必显式关闭流和编写器,以及反应式,无循环编程风格.目前我只知道如何同时使用这些结构之一:using/ loop组合,或显式流关闭/反应组合.
这是我的代码:
BufferBlock<LogEntry> _buffer = new BufferBlock<LogEntry>();
// CONSTRUCTOR
public DefaultLogger(string folder)
{
var filePath = Path.Combine(folder, $"{DateTime.Now.ToString("yyyy.MM.dd")}.log");
_cancellation = new CancellationTokenSource();
var observable = _buffer.AsObservable();
using (var stream = File.Create(_filePath))
using (var writer = new StreamWriter(stream))
using (var subscription = observable.Subscribe(entry =>
writer.Write(GetFormattedString(entry))))
{
while (!_cancellation.IsCancellationRequested)
{
// what do I do here?
}
}
}
Run Code Online (Sandbox Code Playgroud) tpl-dataflow ×10
c# ×8
.net ×4
async-await ×3
concurrency ×2
performance ×2
asynchronous ×1
c#-5.0 ×1
filestream ×1
merge ×1
rx.net ×1