Mar*_*ery 8 c# parallel-processing c#-4.0
我有多个枚举器枚举平面文件.我最初在并行调用中有每个枚举器,每个Action都添加到a,BlockingCollection<Entity>并且该集合返回一个ConsumingEnumerable();
public interface IFlatFileQuery
{
IEnumerable<Entity> Run();
}
public class FlatFile1 : IFlatFileQuery
{
public IEnumerable<Entity> Run()
{
// loop over a flat file and yield each result
yield return Entity;
}
}
public class Main
{
public IEnumerable<Entity> DoLongTask(ICollection<IFlatFileQuery> _flatFileQueries)
{
// do some other stuff that needs to be returned first:
yield return Entity;
// then enumerate and return the flat file data
foreach (var entity in GetData(_flatFileQueries))
{
yield return entity;
}
}
private IEnumerable<Entity> GetData(_flatFileQueries)
{
var buffer = new BlockingCollection<Entity>(100);
var actions = _flatFileQueries.Select(fundFileQuery => (Action)(() =>
{
foreach (var entity in fundFileQuery.Run())
{
buffer.TryAdd(entity, Timeout.Infinite);
}
})).ToArray();
Task.Factory.StartNew(() =>
{
Parallel.Invoke(actions);
buffer.CompleteAdding();
});
return buffer.GetConsumingEnumerable();
}
}
Run Code Online (Sandbox Code Playgroud)
但经过一些测试后发现,下面的代码更改速度提高了大约20-25%.
private IEnumerable<Entity> GetData(_flatFileQueries)
{
return _flatFileQueries.AsParallel().SelectMany(ffq => ffq.Run());
}
Run Code Online (Sandbox Code Playgroud)
代码更改的麻烦在于它等待所有平面文件查询被枚举之后才返回整个批次,然后可以枚举和生成.
是否有可能以某种方式产生上述代码以使其更快?
我应该补充一点,所有平面文件查询的组合结果最多只能是1000个实体.
编辑:将其更改为以下内容对运行时间没有影响.(R#甚至建议回到原来的样子)
private IEnumerable<Entity> GetData(_flatFileQueries)
{
foreach (var entity in _flatFileQueries.AsParallel().SelectMany(ffq => ffq.Run()))
{
yield return entity;
}
}
Run Code Online (Sandbox Code Playgroud)
代码更改的问题在于,它会等到枚举所有平面文件查询后才返回可以枚举和生成的全部数据。
我们通过一个简单的例子来证明它是错误的。首先,让我们创建一个TestQuery类,它将在给定时间后生成单个实体。其次,让我们并行执行多个测试查询并测量生成结果所需的时间。
public class TestQuery : IFlatFileQuery {
private readonly int _sleepTime;
public IEnumerable<Entity> Run() {
Thread.Sleep(_sleepTime);
return new[] { new Entity() };
}
public TestQuery(int sleepTime) {
_sleepTime = sleepTime;
}
}
internal static class Program {
private static void Main() {
Stopwatch stopwatch = Stopwatch.StartNew();
var queries = new IFlatFileQuery[] {
new TestQuery(2000),
new TestQuery(3000),
new TestQuery(1000)
};
foreach (var entity in queries.AsParallel().SelectMany(ffq => ffq.Run()))
Console.WriteLine("Yielded after {0:N0} seconds", stopwatch.Elapsed.TotalSeconds);
Console.ReadKey();
}
}
Run Code Online (Sandbox Code Playgroud)
此代码打印:
1秒后
屈服 2秒后
屈服 3秒后屈服
您可以看到此输出AsParallel()将在每个结果可用时立即生成它,因此一切正常。请注意,根据并行度,您可能会得到不同的计时(例如并行度为 1 的“2s、5s、6s”,实际上使整个操作根本不并行)。该输出来自 4 核机器。
如果线程之间没有共同的瓶颈(例如共享锁定资源),您的长时间处理可能会随着核心数量的增加而扩展。您可能想要分析您的算法,看看是否有可以使用dotTrace等工具改进的缓慢部分。