And*_*ndy 5 .net c# .net-4.0 plinq
我有一组字符串,我需要执行两个操作.
第一个可以安全地以任何顺序(yay)独立处理,但是输出必须按原始顺序顺序处理(boo).
下面的Plinq让我大部分都在那里:
myStrings.AsParallel().AsOrdered()
.Select( str => Operation1(str) )
.AsSequential()
.Select( str => Operation2(str) );
//immagine Operation2() maintains some sort of state and must take the outputs from Operation1 in the original order
Run Code Online (Sandbox Code Playgroud)
这让我大部分都在那里,但问题是因为AsOrdered(),Operation1首先在每个字符串上执行,然后结果元素被排序回原始顺序,最后Operation2开始执行.
理想情况下,只要第一个字符串(即myStrings [0],而不是返回的第一个字符串)由Operation1调用返回,我就希望Operation2开始工作.
所以这是我试图解决这个问题的一般方法:
public static class ParallelHelper
{
public static IEnumerable<U> SelectAsOrdered<T, U>(this ParallelQuery<T> query, Func<T, U> func)
{
var completedTasks = new Dictionary<int, U>();
var queryWithIndexes = query.Select((x, y) => new { Input = x, Index = y })
.AsParallel()
.Select(t => new { Value = func(t.Input), Index = t.Index })
.WithMergeOptions(ParallelMergeOptions.NotBuffered);
int i = 0;
foreach (var task in queryWithIndexes)
{
if (i==task.Index)
{
Console.WriteLine("immediately yielding task: {0}", i);
i++;
yield return task.Value;
U previouslyCompletedTask;
while (completedTasks.TryGetValue(i, out previouslyCompletedTask))
{
completedTasks.Remove(i);
Console.WriteLine("delayed yielding task: {0}", i);
yield return previouslyCompletedTask;
i++;
}
}
else
{
completedTasks.Add(task.Index, task.Value);
}
}
yield break;
}
}
Run Code Online (Sandbox Code Playgroud)
然后我可以重新编写我的原始代码块:
myStrings.AsParallel()
.SelectAsOrdered( str => Operation1(str) )
.Select(str => Operation2(str));
Run Code Online (Sandbox Code Playgroud)
一旦myStrings [0]从Operation1中出来,Operation2就会启动.
我想知道的是:
谢谢!
安迪
以防你感兴趣:
如果没有调用.WithMergeOptions(ParallelMergeOptions.NotBuffered),则在所有Operation1调用都已启动之前,Operation2不会开始工作(这比原来的代码要好,直到它们全部完成).
现实生活中的问题:
Operation1正在寻找大型文本中的法律引用和引用(例如:"儿童行为1989").
这些引用通常是独立的,但有时候成绩单将包含类似"前面提到的行为的第6部分"的内容.Operation2依赖于Operation1的捕获来获取这些部分引用.
如果您需要速度,您可以并行化所有过程(加载数据、准备数据、处理数据和聚合数据),我认为最好使用生产者/消费者模式。
但是,如果您使用“Linq”,则无法生成(以一种很好的方式执行完整并行工作流程)并行数据(但可以:准备、处理和恢复)。
另一方面,我认为尝试使用“Linq”作为“并行(A)+顺序(B)”是错误的(你可以,是的),你的过程(我认为)是
B = f(A)
Run Code Online (Sandbox Code Playgroud)
那么,B必须等待A。
为什么不简单地“并行(A/B)”呢?
您可以做一个助手(扩展),但我认为它通常没有用。
在您的实际情况中,只需使用 aSemaphore来防止过早访问“文章 ID”。
并行准备、处理和恢复(无生成)的完整代码是:
class Text {
public static Regex rx = new Regex(@" (PREVID|ACTID\=([0-9]+)) ");
private Text prv; // previous article
private string ot; // original text
private int id; // act id on text
private Semaphore isComputed = new Semaphore(0, 1);
public int ActID {
get {
isComputed.WaitOne();
int _id = id;
isComputed.Release();
return _id;
}
}
public bool ProcessText() {
var mx = rx.Match(ot);
var prev = mx.Groups [1].Value == "PREVID";
if(prev)
id = prv == null ? 0 : prv.ActID;
else
if(!int.TryParse(mx.Groups [2].Value, out id))
throw new Exception(string.Format(@"Incorrect article id ""{0}""", mx.Groups [0].Value));
isComputed.Release();
return !prev;
}
public Text(string original_text, Text previous) {
prv = previous;
ot = original_text;
}
}
public static void Main(String [] args) {
// same random stream (for debugging)
var rnd = new Random(1);
var noise = @"These references are usually independent, but occasionally";
// some noise text
var bit = new Func<string>(() =>
noise.Substring(0, rnd.Next(noise.Length)));
// random article
var text = new Func<string>(() =>
string.Format(@"{0}{1}{2}", bit(),
rnd.Next() % 2 == 0 ? " PREVID "
: string.Format(@" ACTID={0} ", rnd.Next()), bit()));
// random data input
var data = new List<Text>();
Text prv = null;
for(var n = 0; n < 1000000; n++)
// producer / consumer is better to parallelize load data step
data.Add(prv = new Text(text(), prv));
Console.Write("Press key to start...");
Console.ReadKey();
// parallel processing
Console.WriteLine("{0} unique ID's", data.AsParallel().Where(n => n.ProcessText()).Count());
Console.WriteLine("Process completed.");
}
Run Code Online (Sandbox Code Playgroud)
如您所见,ProcessText并行处理所有文章。只有 PREVID 文章会等到其上一篇文章计算自己的 id 为止。
抽象这种行为的真正问题(我认为)是项目关系(一个项目依赖于另一个项目),在 Linq 中,自然的方式是“无项目关系”(您必须使用“group by”来执行它)。
我建议您使用生产者/消费者模式。
| 归档时间: |
|
| 查看次数: |
775 次 |
| 最近记录: |