我想知道以下代码是否可以优化以更快地执行.在一个非常简单的数据流结构中,我目前似乎每秒大约有140万条简单消息.我知道这个示例进程同步传递/转换消息,但是,我目前正在测试TPL Dataflow作为我自己的基于任务和并发集合的自定义解决方案的替代.我知道术语"并发"已经建议我并行运行,但是出于当前的测试目的,我通过同步推送消息在我自己的解决方案上,每秒钟我得到大约510万条消息.我在这里缺少什么,我读过TPL Dataflow被推为高吞吐量,低延迟的解决方案,但到目前为止我必须忽略性能调整.有谁能指出我正确的方向吗?
class TPLDataFlowExperiments
{
public TPLDataFlowExperiments()
{
var buf1 = new BufferBlock<int>();
var transform = new TransformBlock<int, string>(t =>
{
return "";
});
var action = new ActionBlock<string>(s =>
{
//Thread.Sleep(100);
//Console.WriteLine(s);
});
buf1.LinkTo(transform);
transform.LinkTo(action);
//Propagate all Completions down the flow
buf1.Completion.ContinueWith(t =>
{
transform.Complete();
transform.Completion.ContinueWith(u =>
{
action.Complete();
});
});
Stopwatch watch = new Stopwatch();
watch.Start();
int cap = 10000000;
for (int i = 0; i < cap; i++)
{
buf1.Post(i);
}
//Mark Buffer as Complete
buf1.Complete(); …Run Code Online (Sandbox Code Playgroud) 我打算玩TPL数据流,但我似乎无法在VS 2012中找到它(包括扩展和更新/ nuget对话框内).他们剪了它,还是我只是在错误的地方?
我试图在 TPL 数据流中控制数据流。我有一个非常快的生产者和一个非常慢的消费者。(我的实际代码更复杂,但无论如何,这是一个非常好的模型,它重现了问题。)
当我运行它时,代码开始消耗内存,就像它已经过时一样——并且生产者上的输出队列尽可能快地填满。我真正希望看到的是生产者停止运行一段时间,直到消费者有机会要求它。从我对文档的阅读来看,这就是应该发生的事情:也就是说,我认为生产者会等到消费者有空间。
事实并非如此,很明显。如何修复它以便队列不会发疯?
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Threading;
namespace MemoryLeakTestCase
{
class Program
{
static void Main(string[] args)
{
var CreateData = new TransformManyBlock<int, string>(ignore =>
{
return Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i);
});
var ParseFile = new TransformManyBlock<string, string>(fileContent =>
{
Thread.Sleep(1000);
return Enumerable.Range(0, 100).Select((sst, iii) => "Hello, " + iii);
}, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1000 }
);
var EndOfTheLine = new ActionBlock<object>(f …Run Code Online (Sandbox Code Playgroud) 我是TPL数据流的新手,请原谅我,如果这是一个简单的问题.
我有一个输入缓冲区块,它采用基类.如何根据派生类型从那里分支到块?例如:
var inputBlock = new BufferBlock<EventBase>();
//if EventBase is Meeting then go to block X
//if EventBase is Appointment the go to block Y
Run Code Online (Sandbox Code Playgroud)
谢谢!
我正在尝试使用链接到操作块的有界批处理块.我知道批处理块中的项目进料结束时我想触发完成链.
问题是:如果我BatchBlock<T>是给定的,BoundedCapacity我将无法在动作块中触发所有项目.
这是我的问题的一个示例,它应该(在我对TPL数据流的理解......)中打印0到124,但最终打印0到99.
必须有一些我缺少的东西...也许BoundedCapacity意味着"当队列数超过xxx时丢弃项目......"如果是这样的话我怎样才能实现保证的最大内存消耗?
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks.Dataflow;
namespace ConsoleApplication
{
class Program
{
static void Main(string[] args)
{
int itemsCount = 125;
List<int> ints = new List<int>(itemsCount);
for (int i = 0; i < itemsCount; i++)
ints.Add(i);
BatchBlock<int> batchBlock = new BatchBlock<int>(50,new GroupingDataflowBlockOptions(){BoundedCapacity = 100});
ActionBlock<int[]> actionBlock = new ActionBlock<int[]>(intsBatch =>
{
Thread.Sleep(1000);
foreach (int i in intsBatch)
Console.WriteLine(i);
});
batchBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true });
// …Run Code Online (Sandbox Code Playgroud) 我是C#TPL和DataFlow的新手,我正在努力研究如何实现TPL DataFlow TransformManyBlock.我正在将其他一些代码翻译成DataFlow.我的(简化)原始代码是这样的:
private IEnumerable<byte[]> ExtractFromByteStream(Byte[] byteStream)
{
yield return byteStream; // Plus operations on the array
}
Run Code Online (Sandbox Code Playgroud)
在另一种方法中,我会这样称呼它:
foreach (byte[] myByteArray in ExtractFromByteStream(byteStream))
{
// Do stuff with myByteArray
}
Run Code Online (Sandbox Code Playgroud)
我正在尝试创建一个TransformManyBlock来生成来自较大输入数组(实际上是二进制流)的多个小数组(实际上是数据包),因此in和out都是类型byte[].
我尝试了下面的内容,但我知道我错了.我想使用与以前相同的函数构造这个块,并且只是将TransformManyBlock它包裹起来.我收到一个错误"这个电话很模糊......"
var streamTransformManyBlock = new TransformManyBlock<byte[], byte[]>(ExtractFromByteStream);
Run Code Online (Sandbox Code Playgroud) 出于某种原因,当 anOperationCanceledException被抛出到 内时IDataflowBlock,块不会将此异常传播到其IDataflowBlock.Completion任务。运行下面的代码示例会返回一个意外的IDataflowBlock.Completion.Status == TaskStatus.RanToCompletion.
但是,如果块中抛出的异常类型更改为 an ArgumentNullException,则IDataflowBlock.Completion.Status更改TaskStatus.Faulted和 异常将保存在其InnerException属性中。
任何想法为什么OperationCanceledException被吞下?
[TestFixture]
public class TplDataBlockExceptionTest
{
[Test]
public void ShouldThrowException()
{
// Arrange
var block = new TransformBlock<int, string>(i =>
{
throw new OperationCanceledException();
return i.ToString();
});
// Act
block.Post(1);
block.Complete();
try
{
block.Completion.Wait();
}
catch (Exception)
{
// ignored
}
// Assert
Assert.That(block.Completion.IsFaulted);
}
}
Run Code Online (Sandbox Code Playgroud) 我的问题的标题说明了一切。
我正在寻找不需要输入的 TPL 数据流块。
现在我正在使用转换块,但它的输入未使用。
c# multithreading asynchronous task-parallel-library tpl-dataflow
我需要实现一个可以从多个线程填充的请求队列。当此队列大于 1000 个已完成的请求时,应将此请求存储到数据库中。这是我的实现:
public class RequestQueue
{
private static BlockingCollection<VerificationRequest> _queue = new BlockingCollection<VerificationRequest>();
private static ConcurrentQueue<VerificationRequest> _storageQueue = new ConcurrentQueue<VerificationRequest>();
private static volatile bool isLoading = false;
private static object _lock = new object();
public static void Launch()
{
Task.Factory.StartNew(execute);
}
public static void Add(VerificationRequest request)
{
_queue.Add(request);
}
public static void AddRange(List<VerificationRequest> requests)
{
Parallel.ForEach(requests, new ParallelOptions() {MaxDegreeOfParallelism = 3},
(request) => { _queue.Add(request); });
}
private static void execute()
{
Parallel.ForEach(_queue.GetConsumingEnumerable(), new ParallelOptions {MaxDegreeOfParallelism = 5}, EnqueueSaveRequest …Run Code Online (Sandbox Code Playgroud) c# multithreading producer-consumer task-parallel-library tpl-dataflow
我正在创建一个使用TPL DataFlow的任务处理器。我将遵循生产者消费者模型,在生产者模型中,生产者偶尔会生产一些要处理的商品,而消费者则一直在等待新商品的到来。这是我的代码:
async Task Main()
{
var runner = new Runner();
CancellationTokenSource cts = new CancellationTokenSource();
Task runnerTask = runner.ExecuteAsync(cts.Token);
await Task.WhenAll(runnerTask);
}
public class Runner
{
public async Task ExecuteAsync(CancellationToken cancellationToken) {
var random = new Random();
ActionMeshProcessor processor = new ActionMeshProcessor();
await processor.Init(cancellationToken);
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(1)); // wait before enqueuing more
int[] items = GetItems(random.Next(3, 7));
await processor.ProcessBlockAsync(items);
}
}
private int[] GetItems(int count)
{
Random randNum = new Random();
int[] arr = new int[count];
for …Run Code Online (Sandbox Code Playgroud)