Rza*_*sar 8 c# multithreading task-parallel-library .net-4.5 tpl-dataflow
我想实现优先顺序ActionBlock<T>.因此我可以TInput通过使用a 来有条件地优先考虑某些项目Predicate<T>.
我阅读了 Parallel Extensions Extras示例和实现自定义TPL数据流块的指南.
但仍然没有弄清楚如何实现这种情况.
---------------------------- EDIT --------------------- ------
有一些任务,其中5个可以同时运行.当用户按下按钮时,一些(取决于谓词函数)任务应该以最高优先级运行.
实际上我写了这段代码
TaskScheduler taskSchedulerHighPriority;
ActionBlock<CustomObject> actionBlockLow;
ActionBlock<CustomObject> actionBlockHigh;
...
queuedTaskScheduler = new QueuedTaskScheduler(TaskScheduler.Default, 5);
taskSchedulerHigh = queuedTaskScheduler.ActivateNewQueue(0);
taskSchedulerLow = queuedTaskScheduler.ActivateNewQueue(1);
...
actionBlockHigh = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, SingleProducerConstrained = false, TaskScheduler = taskSchedulerHigh });
actionBlockLow = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, MaxMessagesPerTask = 1, TaskScheduler = taskSchedulerLow });
...
if (predicate(customObject))
actionBlockHigh.Post(customObject);
else
actionBlockLow.Post(customObject);
Run Code Online (Sandbox Code Playgroud)
但似乎优先权根本没有实现.
----------------------------编辑------------------
我找到了事实上,当我使用这行代码时:
actionBlockHigh = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerHigh });
actionBlockLow = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerLow });
Run Code Online (Sandbox Code Playgroud)
原因是应用程序正确地观察了任务的优先级,但是一次只能执行一个任务,同时使用流程中显示的第一个代码块,导致应用程序同时运行5个任务,但处于不适当的优先级顺序.
actionBlockHigh = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, TaskScheduler = taskSchedulerHigh });
actionBlockLow = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, TaskScheduler = taskSchedulerLow });
Run Code Online (Sandbox Code Playgroud)
更新:
坦克svick,我应指定MaxMessagesPerTask的taskSchedulerLow.
您的问题不包含许多细节,因此以下只是您可能需要的猜测.
我认为最简单的方法是ActionBlock在QueuedTaskSchedulerParallelExtensionsExtras上运行两个不同的优先级.您将使用谓词链接到高优先级,然后使用低优先级链接.此外,为确保高优先级Tasks不等待,请设置MaxMessagesPerTask低优先级块.
在代码中,它看起来像:
static ITargetBlock<T> CreatePrioritizedActionBlock<T>(
Action<T> action, Predicate<T> isPrioritizedPredicate)
{
var buffer = new BufferBlock<T>();
var scheduler = new QueuedTaskScheduler(1);
var highPriorityScheduler = scheduler.ActivateNewQueue(0);
var lowPriorityScheduler = scheduler.ActivateNewQueue(1);
var highPriorityBlock = new ActionBlock<T>(
action, new ExecutionDataflowBlockOptions
{
TaskScheduler = highPriorityScheduler
});
var lowPriorityBlock = new ActionBlock<T>(
action, new ExecutionDataflowBlockOptions
{
TaskScheduler = lowPriorityScheduler,
MaxMessagesPerTask = 1
});
buffer.LinkTo(highPriorityBlock, isPrioritizedPredicate);
buffer.LinkTo(lowPriorityBlock);
return buffer;
}
Run Code Online (Sandbox Code Playgroud)
这只是您可以执行的操作的草图,例如,Completion返回的块的行为不正确.