如何为PLINQ编写线程感知扩展函数?

Joh*_*hnB 8 c# plinq

有人知道如何编写一个在PLINQ中返回ParallelQuery的扩展函数吗?

更具体地说,我有以下问题:我想在需要引擎的PLINQ查询中执行转换,该引擎的创建成本高且无法同时访问.

我可以做以下事情:

var result = source.AsParallel ().Select ( (i) => { var e = new Engine (); return e.Process(i); } )
Run Code Online (Sandbox Code Playgroud)

这里,每个项目创建一次引擎,这太昂贵了.

我希望每个线程创建一次引擎.

使用Aggregate,我可以接近我想要的东西

// helper class: engine to use plus list of results obtained in thread so far
class EngineAndResults {
   public Engine engine = null;
   public IEnumerable<ResultType> results;
}

var result = source.AsParallel ().Aggregate (

   // done once per block of items (=thread),
   // returning an empty list, but a new engine
   () => new EngineAndList () {
       engine = new Engine (),
       results = Enumerable.Empty<ResultType> ()
   },

   // we process a new item and put it to the thread-local list,
   // preserving the engine for further use
   (engineAndResults, item) => new EngineAndResults () {
       engine = engineAndResults.engine,
       results = Enumerable.Concat (
           engineAndResults.results,
           new ResultType [] { engineAndResults.engine.Process (item) }
       )
   },

   // tell linq how to aggregate across threads
   (engineAndResults1, engineAndResults2) => new EngineAndResults () {
       engine = engineAndResults1.engine,
       results = Enumerable.Concat (engineAndResults1.results, engineAndResults2.results)
   },

   // after all aggregations, how do we come to the result?
   engineAndResults => engineAndResults.results
);
Run Code Online (Sandbox Code Playgroud)

如您所见,我误用累加器为每个线程携带一个引擎.这里的问题是PLINQ最终将结果聚合成单个IEnumerable,这会导致线程同步.如果我想在之后添加另一个PLINQ扩展,这不是很好.

我会很感激

   var result = source.AsParallel ()
                  .SelectWithThreadwiseInitWhichIAmLookingFor (
                       () => new Engine (),
                       (engine, item) => engine.Process (item)
              )
Run Code Online (Sandbox Code Playgroud)

有没有人知道如何实现这一目标?

svi*_*ick 5

你可以ThreadLocal<T>用来做这件事.就像是:

var engine = new ThreadLocal<Engine>(() => new Engine());
var result = source.AsParallel()
                   .Select(item => engine.Value.Process(item));
Run Code Online (Sandbox Code Playgroud)