使用相同的模型并行(高效)反序列化

dam*_*boy 3 .net c# parallel-processing protobuf-net task-parallel-library

我正在尝试使用.NET任务读取使用ProtoBuf.NET序列化的多个文件,如下所示:

public static ResultsDump Amalgamate(RuntimeTypeModel model, IEnumerable<string> files)
{      
  var readDumpTasks = 
    files.Select(fn =>
      Task<ResultsDump>.Factory.StartNew(() => {
        try {
          using (var dumpFile = new FileStream(fn, FileMode.Open))
          {
            var miniDump = (ResultsDump)model.Deserialize(dumpFile, null, typeof(ResultsDump));
            if (miniDump == null) {
              throw new Exception(string.Format("Failed to deserialize dump file {0}", fn));
            }
            //readDumps.Add(miniDump);
            return miniDump;
          }
        }
        catch (Exception e) {
          throw new Exception(string.Format("cannot read dump file {0}: {1}", fn, e.Message), e);
        }
      })).ToArray();

  Task.WaitAll(readDumpTasks);

  var allDumps = readDumpTasks.Select(t => t.Result).ToList();

  // Goes on.. irrelevant to the question
}
Run Code Online (Sandbox Code Playgroud)

出于某种原因,CPU使用率实际上并不高于单个核心.Protobuf.NET中是否存在一些不同于同时对多个文件进行反序列化的固有锁?

我已尝试使用多个RuntimeTypeModel实例以及一个,并且它似乎总是在非常"低"的CPU使用级别达到峰值.

我甚至错误地指责ProtoBuf.NET?这是.NET内存分配器/ TPL吗?

Mar*_*ell 6

protobuf网中有意锁定非常有限; 它只在检查类型(第一次运行)时才真正锁定,以查看所需的内容.一旦理解了模型,它就是无锁的,并且设计成平凡的并行.

如上所述(评论),IO很可能是您的瓶颈.事实上,parallelising访问同一个物理磁盘/主轴通常会大大降低吞吐量,缓冲区更辩称,它必须做更多的追求,而不是连续的读数.

这应该很容易测试/验证:对于测试运行,而不是从磁盘读取,首先将它们全部加载到内存中;

var ms = new MemoryStream(
    File.ReadAllBytes(path));
Run Code Online (Sandbox Code Playgroud)

加载所有文件后,现在执行相同的代码,但将MemoryStreams输入作为输入.如果它仍然无法扩展,则可能是一个错误.然而,我强烈怀疑你会发现它在那时很好地并行.


这是一个有用的例子,对我来说,通过并发反序列化使我的所有内核饱和:

using System.Collections.Generic;
using ProtoBuf;
using System;
using System.IO;
using System.Threading.Tasks;


internal class Program
{
    private static void Main()
    {
        var foo = new Foo { Bars = new List<Bar>() };
        var rand = new Random(1234);
        for (int i = 0; i < 1000; i++)
        {
            var bar = new Bar
            {
                A = rand.Next(),
                B = rand.Next(),
                C = rand.Next(),
                D = rand.Next(),
                E = rand.Next(),
                F = rand.Next(),
                G = rand.Next(),
                H = rand.Next()
            };
            foo.Bars.Add(bar);
        }
        var ms = new MemoryStream();
        Serializer.Serialize(ms, foo);
        var bytes = ms.ToArray();
        const int count = 100000;
        Parallel.For(0, count, x =>
        {
            Serializer.Deserialize<Foo>(new MemoryStream(bytes));
        });
    }
}
[ProtoContract]
internal class Foo
{
    [ProtoMember(1)]
    public List<Bar> Bars { get; set; }
}
[ProtoContract]
internal class Bar
{
    [ProtoMember(1)]
    public int A { get; set; }
    [ProtoMember(2)]
    public int B { get; set; }
    [ProtoMember(3)]
    public int C { get; set; }
    [ProtoMember(4)]
    public int D { get; set; }
    [ProtoMember(5)]
    public int E { get; set; }
    [ProtoMember(6)]
    public int F { get; set; }
    [ProtoMember(7)]
    public int G { get; set; }
    [ProtoMember(8)]
    public int H { get; set; }
}
Run Code Online (Sandbox Code Playgroud)

  • @ damageboy本地测试,不使用IO进行反序列化,我可以运行8个8核,总计95%的机器活动CPU(我还有其他各种运行).我的结论是:没有 - protobuf-net没有内部并行化限制.添加示例来回答. (2认同)