Bre*_*ann 6 c# task-parallel-library tpl-dataflow
我创建了类似于网络爬虫的东西来创建我需要管理的 1000 多个 Web 服务的报告。因此,我创建了一个 TPL 数据流管道来管理获取和处理数据。我想象的管道看起来有点像这样(对不起我的绘画技巧:D):

我已经创建了一个实现并且一切正常,直到我开始作为一个整体开始我的管道。我将 500 个对象作为管道的输入提供给管道,并希望程序运行一段时间,但程序在移动到执行块后停止执行。在检查程序的流程后,在我看来,完成快速传播到处置块。我使用相同的管道创建了一个小示例项目,以检查它是我对输入类的实现还是管道本身。示例代码是这样的:
public class Job
{
public int Ticker { get; set; }
public Type Type { get; }
public Job(Type type)
{
Type = type;
}
public Task Prepare()
{
Console.WriteLine("Preparing");
Ticker = 0;
return Task.CompletedTask;
}
public Task Tick()
{
Console.WriteLine("Ticking");
Ticker++;
return Task.CompletedTask;
}
public bool IsCommitable()
{
Console.WriteLine("Trying to commit");
return IsFinished() || ( Ticker != 0 && Ticker % 100000 == 0);
}
public bool IsFinished()
{
Console.WriteLine("Trying to finish");
return Ticker == 1000000;
}
public void IntermediateCleanUp()
{
Console.WriteLine("intermediate Cleanup");
Ticker = Ticker - 120;
}
public void finalCleanUp()
{
Console.WriteLine("Final Cleanup");
Ticker = -1;
}
}
Run Code Online (Sandbox Code Playgroud)
这是我输入到准备块中的输入类。
public class Dataflow
{
private TransformBlock<Job, Job> _preparationsBlock;
private BufferBlock<Job> _balancerBlock;
private readonly ExecutionDataflowBlockOptions _options = new ExecutionDataflowBlockOptions
{
BoundedCapacity = 4
};
private readonly DataflowLinkOptions _linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
private TransformBlock<Job, Job> _typeATickBlock;
private TransformBlock<Job, Job> _typeBTickBlock;
private TransformBlock<Job, Job> _writeBlock;
private TransformBlock<Job, Job> _intermediateCleanupBlock;
private ActionBlock<Job> _finalCleanupBlock;
public async Task Process()
{
CreateBlocks();
ConfigureBlocks();
for (int i = 0; i < 500; i++)
{
await _preparationsBlock.SendAsync(new Job(i % 2 == 0 ? Type.A : Type.B));
}
_preparationsBlock.Complete();
await Task.WhenAll(_preparationsBlock.Completion, _finalCleanupBlock.Completion);
}
private void CreateBlocks()
{
_preparationsBlock = new TransformBlock<Job, Job>(async job =>
{
await job.Prepare();
return job;
}, _options);
_balancerBlock = new BufferBlock<Job>(_options);
_typeATickBlock = new TransformBlock<Job, Job>(async job =>
{
await job.Tick();
return job;
}, _options);
_typeBTickBlock = new TransformBlock<Job, Job>(async job =>
{
await job.Tick();
await job.Tick();
return job;
}, _options);
_writeBlock = new TransformBlock<Job, Job>(job =>
{
Console.WriteLine(job.Ticker);
return job;
}, _options);
_finalCleanupBlock = new ActionBlock<Job>(job => job.finalCleanUp(), _options);
_intermediateCleanupBlock = new TransformBlock<Job, Job>(job =>
{
job.IntermediateCleanUp();
return job;
}, _options);
}
private void ConfigureBlocks()
{
_preparationsBlock.LinkTo(_balancerBlock, _linkOptions);
_balancerBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
_balancerBlock.LinkTo(_typeBTickBlock, _linkOptions, job => job.Type == Type.B);
_typeATickBlock.LinkTo(_typeATickBlock, _linkOptions, job => !job.IsCommitable());
_typeATickBlock.LinkTo(_writeBlock, _linkOptions, job => job.IsCommitable());
_typeBTickBlock.LinkTo(_typeBTickBlock, _linkOptions, job => !job.IsCommitable());
_writeBlock.LinkTo(_intermediateCleanupBlock, _linkOptions, job => !job.IsFinished());
_writeBlock.LinkTo(_finalCleanupBlock, _linkOptions, job => job.IsFinished());
_intermediateCleanupBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
}
}
Run Code Online (Sandbox Code Playgroud)
这是我的数据流管道,代表我上面的“艺术品”:D。所有这些都在 Programm.cs 中启动的我的调度程序中执行:
public class Scheduler
{
private readonly Timer _timer;
private readonly Dataflow _flow;
public Scheduler(int intervall)
{
_timer = new Timer(intervall);
_flow = new Dataflow();
}
public void Start()
{
_timer.AutoReset = false;
_timer.Elapsed += _timer_Elapsed;
_timer.Start();
}
private async void _timer_Elapsed(object sender, ElapsedEventArgs e)
{
try
{
_timer.Stop();
Console.WriteLine("Timer stopped");
await _flow.Process().ConfigureAwait(false);
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
finally
{
Console.WriteLine("Timer started again.");
_timer.Start();
}
}
}
class Program
{
static void Main(string[] args)
{
var scheduler = new Scheduler(1000);
scheduler.Start();
Console.ReadKey();
}
}
Run Code Online (Sandbox Code Playgroud)
我得到的控制台输出是:计时器停止准备滴答试图提交尝试完成滴答尝试提交尝试完成滴答尝试提交尝试完成滴答尝试提交尝试完成滴答尝试提交尝试完成滴答尝试提交尝试完成滴答声尝试提交尝试完成滴答声尝试提交尝试完成滴答声尝试提交尝试完成滴答声尝试提交尝试完成尝试提交尝试完成
似乎程序在那时已经停止工作,因为我没有达到任何断点或进一步。我认为我所有的块都已经收到了完成信号,因此停止接受任何新项目。因此,我的问题是:如何管理 Completion 信号,以便管道仅在没有更多工作要做时才完成?
流程的主要问题是滴答块的反馈循环。这会导致两个问题。
第一:背压
当_typeATickBlock自身链接回来时,一旦达到其容量,它将停止接受所有消息。在您的情况 4 中,这意味着一旦输出缓冲区中有 3 条消息并且其中一条正在处理,它将停止接受和传递消息。您可以通过将以下行添加到块中来查看这一点:
Console.WriteLine($"Tick Block {_typeATickBlock.InputCount}/{_typeATickBlock.OutputCount}");
Run Code Online (Sandbox Code Playgroud)
并会输出:
Tick Block 0/3
Run Code Online (Sandbox Code Playgroud)
要解决此问题,您可以添加任何缓冲块、Buffer 或 Transform。关键是缓冲区的有限容量。在您的情况下,每条消息都需要重新路由回刻度块。这样您就知道您的容量需要与任何给定时间的消息量相匹配。在本例中为 500。
_printingBuffer = new TransformBlock<Job, Job>(job =>
{
Console.WriteLine($"{_printingBuffer.InputCount}/{_printingBuffer.OutputCount}");
return job;
}, new ExecutionDataflowBlockOptions() { BoundedCapacity = 500 });
Run Code Online (Sandbox Code Playgroud)
在您的真实代码中,您可能不知道该值,并且Unbounded可能是避免锁定管道的最佳选择,但您可以根据传入量调整该值。
二:完成流程
通过管道中的反馈循环,完成传播变得比简单地设置链接选项更加困难。一旦完成达到刻度块,它就会停止接受所有消息,甚至是那些仍需要处理的消息。为了避免这种情况,您需要保持传播,直到所有消息都通过循环。首先,在刻度块之前停止传播,然后检查参与循环的每个块上的缓冲区。然后,一旦所有缓冲区都为空,就会将完成和故障传播到块。
_balancerBlock.Completion.ContinueWith(tsk =>
{
while (!_typeATickBlock.Completion.IsCompleted)
{
if (_printingBuffer.InputCount == 0 && _printingBuffer.OutputCount == 0
&& _typeATickBlock.InputCount == 0 && _typeATickBlock.OutputCount == 0)
{
_typeATickBlock.Complete();
}
}
});
Run Code Online (Sandbox Code Playgroud)
最后的
您完成的ConfigureBlocks完成设置和插入的缓冲区应该如下所示。请注意,我在这里只传递完整信息,而不是错误信息,并且删除了 B 型分支。
private void ConfigureBlocks()
{
_preparationsBlock.LinkTo(_balancerBlock, _linkOptions);
_balancerBlock.LinkTo(_typeATickBlock, job => job.Type == Type.A);
_balancerBlock.Completion.ContinueWith(tsk =>
{
while (!_typeATickBlock.Completion.IsCompleted)
{
if (_printingBuffer.InputCount == 0 && _printingBuffer.OutputCount == 0
&& _typeATickBlock.InputCount == 0 && _typeATickBlock.OutputCount == 0)
{
_typeATickBlock.Complete();
}
}
});
_typeATickBlock.LinkTo(_printingBuffer, job => !job.IsCommitable());
_printingBuffer.LinkTo(_typeATickBlock);
_typeATickBlock.LinkTo(_writeBlock, _linkOptions, job => job.IsCommitable());
_writeBlock.LinkTo(_intermediateCleanupBlock, _linkOptions, job => !job.IsFinished());
_writeBlock.LinkTo(_finalCleanupBlock, _linkOptions, job => job.IsFinished());
_intermediateCleanupBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
}
Run Code Online (Sandbox Code Playgroud)
我不久前写了一篇关于通过反馈循环处理完成的博客文章,博客不再活跃。它可能会提供更多帮助。从 WayBackMachine 检索。