Eri*_*ick 5 c# parallel-processing multithreading xmlwriter
我有一个用例,我需要:
输入看起来像这样:
<Root>
<Input>
<Case>ABC123</Case>
<State>MA</State>
<Investor>Goldman</Investor>
</Input>
<Input>
<Case>BCD234</Case>
<State>CA</State>
<Investor>Goldman</Investor>
</Input>
</Root>
Run Code Online (Sandbox Code Playgroud)
和输出:
<Results>
<Output>
<Case>ABC123</Case>
<State>MA</State>
<Investor>Goldman</Investor>
<Price>75.00</Price>
<Product>Blah</Product>
</Output>
<Output>
<Case>BCD234</Case>
<State>CA</State>
<Investor>Goldman</Investor>
<Price>55.00</Price>
<Product>Ack</Product>
</Output>
</Results>
Run Code Online (Sandbox Code Playgroud)
我想并行运行计算; 典型的输入文件可能有50,000个输入节点,没有线程的总处理时间可能是90分钟.大约90%的处理时间花在步骤#2(计算)上.
static IEnumerable<XElement> EnumerateAxis(XmlReader reader, string axis)
{
reader.MoveToContent();
while (reader.Read())
{
switch (reader.NodeType)
{
case XmlNodeType.Element:
if (reader.Name == axis)
{
XElement el = XElement.ReadFrom(reader) as XElement;
if (el != null)
yield return el;
}
break;
}
}
}
...
Parallel.ForEach(EnumerateAxis(reader, "Input"), node =>
{
// do calc
// lock the XmlWriter, write, unlock
});
Run Code Online (Sandbox Code Playgroud)
我当前倾向于在写入XmlWriter时使用锁来确保线程安全.
在这种情况下,是否有更优雅的方式来处理XmlWriter?具体来说,我应该让Parallel.ForEach代码将结果传递回原始线程并让该线程处理XmlWriter,避免需要锁定吗?如果是这样,我不确定对此的正确方法.
这是我最喜欢的问题:一个可以用管道解决的问题.
请注意,根据您的具体情况,这种方法实际上可能会对性能产生负面影响,但是当您明确询问如何在专用线程上使用编写器时,下面的代码正好说明了这一点.
免责声明:理想情况下,你应该考虑TPL数据流,但这不是我精通的东西所以我只是采取熟悉的Task+ BlockingCollection<T>路线.
起初我打算建议一个3阶段的管道(读取,处理,写入),但后来我意识到你已经将前两个阶段与你在读取节点时"流"节点的方式结合起来了.把它们喂给你Parallel.ForEach(是的,你已经实施了一系列的管道).更好 - 减少线程同步.
考虑到这一点,代码现在变为:
public class Result
{
public string Case { get; set; }
public string State { get; set; }
public string Investor { get; set; }
public decimal Price { get; set; }
public string Product { get; set; }
}
Run Code Online (Sandbox Code Playgroud)
...
using (var reader = CreateXmlReader())
{
// I highly doubt that this collection will
// ever reach its bounded capacity since
// the processing stage takes so long,
// but in case it does, Parallel.ForEach
// will be throttled.
using (var handover = new BlockingCollection<Result>(boundedCapacity: 100))
{
var processStage = Task.Run(() =>
{
try
{
Parallel.ForEach(EnumerateAxis(reader, "Input"), node =>
{
// Do calc.
Thread.Sleep(1000);
// Hand over to the writer.
// This handover is not blocking (unless our
// blocking collection has reached its bounded
// capacity, which would indicate that the
// writer is running slower than expected).
handover.Add(new Result());
});
}
finally
{
handover.CompleteAdding();
}
});
var writeStage = Task.Run(() =>
{
using (var writer = CreateXmlReader())
{
foreach (var result in handover.GetConsumingEnumerable())
{
// Write element.
}
}
});
// Note: the two stages are now running in parallel.
// You could technically use Parallel.Invoke to
// achieve the same result with a bit less code.
Task.WaitAll(processStage, writeStage);
}
}
Run Code Online (Sandbox Code Playgroud)