Ale*_*ffe 9 .net c# parallel-processing parallel-extensions plinq
假设我有两个序列返回整数1到5.
第一个返回1,2和3非常快,但4和5每个需要200ms.
public static IEnumerable<int> FastFirst()
{
for (int i = 1; i < 6; i++)
{
if (i > 3) Thread.Sleep(200);
yield return i;
}
}
Run Code Online (Sandbox Code Playgroud)
第二个返回1,2和3,延迟时间为200ms,但快速返回4和5.
public static IEnumerable<int> SlowFirst()
{
for (int i = 1; i < 6; i++)
{
if (i < 4) Thread.Sleep(200);
yield return i;
}
}
Run Code Online (Sandbox Code Playgroud)
联合这两个序列只给出数字1到5.
FastFirst().Union(SlowFirst());
Run Code Online (Sandbox Code Playgroud)
我不能保证两种方法中的哪一种在什么时候有延迟,所以执行的顺序不能保证为我提供解决方案.因此,我想将联盟并行化,以便最小化我的例子中的(人为的)延迟.
一个真实场景:我有一个返回一些实体的缓存,以及一个返回所有实体的数据源.我希望能够从一个方法返回一个迭代器,该方法将请求内部并行化到缓存和数据源,以便缓存的结果尽可能快地生成.
注1:我意识到这仍然在浪费CPU周期; 我不是问我怎么能阻止序列迭代它们的慢元素,我怎么能尽可能快地结合它们.
更新1:我已经定制了achitaka-san对接受多个生成器的响应,并使用ContinueWhenAll将BlockingCollection的CompleteAdding设置为一次.我只是把它放在这里,因为它会因缺少注释格式而丢失.任何进一步的反馈都会很棒!
public static IEnumerable<TResult> SelectAsync<TResult>(
params IEnumerable<TResult>[] producer)
{
var resultsQueue = new BlockingCollection<TResult>();
var taskList = new HashSet<Task>();
foreach (var result in producer)
{
taskList.Add(
Task.Factory.StartNew(
() =>
{
foreach (var product in result)
{
resultsQueue.Add(product);
}
}));
}
Task.Factory.ContinueWhenAll(taskList.ToArray(), x => resultsQueue.CompleteAdding());
return resultsQueue.GetConsumingEnumerable();
}
Run Code Online (Sandbox Code Playgroud)
看看这个。第一个方法只是按照结果的顺序返回所有内容。第二个检查唯一性。如果你把它们链接起来,我想你会得到你想要的结果。
public static class Class1
{
public static IEnumerable<TResult> SelectAsync<TResult>(
IEnumerable<TResult> producer1,
IEnumerable<TResult> producer2,
int capacity)
{
var resultsQueue = new BlockingCollection<TResult>(capacity);
var producer1Done = false;
var producer2Done = false;
Task.Factory.StartNew(() =>
{
foreach (var product in producer1)
{
resultsQueue.Add(product);
}
producer1Done = true;
if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); }
});
Task.Factory.StartNew(() =>
{
foreach (var product in producer2)
{
resultsQueue.Add(product);
}
producer2Done = true;
if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); }
});
return resultsQueue.GetConsumingEnumerable();
}
public static IEnumerable<TResult> SelectAsyncUnique<TResult>(this IEnumerable<TResult> source)
{
HashSet<TResult> knownResults = new HashSet<TResult>();
foreach (TResult result in source)
{
if (knownResults.Contains(result)) {continue;}
knownResults.Add(result);
yield return result;
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1012 次 |
| 最近记录: |