任务并行库 - 自定义任务计划程序

Fen*_*Fen 5 c# parallel-extensions task-parallel-library

我要求将Web服务请求发送到在线api,我认为Parallel Extensions非常适合我的需求.

有问题的Web服务被设计为重复调用,但是如果您每秒超过一定数量的呼叫,则有一种机制可以向您收费.我显然希望尽量减少我的收费,因此想知道是否有人看过可以应对以下要求的TaskScheduler:

  1. 限制每个时间跨度计划的任务数.我想如果请求的数量超过这个限制那么它需要丢弃任务或可能阻止?(停止任务的后退日志)
  2. 检测相同的请求是否已经在要执行的调度程序中但尚未执行,如果是,则不对第二个任务进行排队,而是返回第一个任务.

人们是否觉得这些是任务调度员应该处理的那种责任,还是我在咆哮错误的树?如果您有其他选择,我愿意接受建议.

svi*_*ick 7

我同意其他人认为TPL Dataflow听起来是一个很好的解决方案.

要限制处理,您可以创建一个TransformBlock实际上不以任何方式转换数据的数据,如果它在之前的数据之后很快到达,它就会延迟它:

static IPropagatorBlock<T, T> CreateDelayBlock<T>(TimeSpan delay)
{
    DateTime lastItem = DateTime.MinValue;
    return new TransformBlock<T, T>(
        async x =>
                {
                    var waitTime = lastItem + delay - DateTime.UtcNow;
                    if (waitTime > TimeSpan.Zero)
                        await Task.Delay(waitTime);

                    lastItem = DateTime.UtcNow;

                    return x;
                },
        new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}
Run Code Online (Sandbox Code Playgroud)

然后创建一个生成数据的方法(例如从0开始的整数):

static async Task Producer(ITargetBlock<int> target)
{
    int i = 0;
    while (await target.SendAsync(i))
        i++;
}
Run Code Online (Sandbox Code Playgroud)

它是异步编写的,因此如果目标块现在无法处理项目,它将等待.

然后编写一个消费者方法:

static void Consumer(int i)
{
    Console.WriteLine(i);
}
Run Code Online (Sandbox Code Playgroud)

最后,将它们连接起来并启动它:

var delayBlock = CreateDelayBlock<int>(TimeSpan.FromMilliseconds(500));

var consumerBlock = new ActionBlock<int>(
    (Action<int>)Consumer,
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

delayBlock.LinkTo(consumerBlock, new DataflowLinkOptions { PropagateCompletion = true });

Task.WaitAll(Producer(delayBlock), consumerBlock.Completion);
Run Code Online (Sandbox Code Playgroud)

这里,delayBlock每500毫秒最多可接受一个项目,并且该Consumer()方法可以并行运行多次.要完成处理,请致电delayBlock.Complete().

如果你想为你的#2添加一些缓存,你可以创建另一个TransformBlock在那里工作并将其链接到其他块.