dam*_*boy 3 .net c# parallel-processing protobuf-net task-parallel-library
我正在尝试使用.NET任务读取使用ProtoBuf.NET序列化的多个文件,如下所示:
public static ResultsDump Amalgamate(RuntimeTypeModel model, IEnumerable<string> files)
{
var readDumpTasks =
files.Select(fn =>
Task<ResultsDump>.Factory.StartNew(() => {
try {
using (var dumpFile = new FileStream(fn, FileMode.Open))
{
var miniDump = (ResultsDump)model.Deserialize(dumpFile, null, typeof(ResultsDump));
if (miniDump == null) {
throw new Exception(string.Format("Failed to deserialize dump file {0}", fn));
}
//readDumps.Add(miniDump);
return miniDump;
}
}
catch (Exception e) {
throw new Exception(string.Format("cannot read dump file {0}: {1}", fn, e.Message), e);
}
})).ToArray();
Task.WaitAll(readDumpTasks);
var allDumps = readDumpTasks.Select(t => t.Result).ToList();
// Goes on.. irrelevant to the question
}
Run Code Online (Sandbox Code Playgroud)
出于某种原因,CPU使用率实际上并不高于单个核心.Protobuf.NET中是否存在一些不同于同时对多个文件进行反序列化的固有锁?
我已尝试使用多个RuntimeTypeModel实例以及一个,并且它似乎总是在非常"低"的CPU使用级别达到峰值.
我甚至错误地指责ProtoBuf.NET?这是.NET内存分配器/ TPL吗?
protobuf网中有意锁定非常有限; 它只在检查类型(第一次运行)时才真正锁定,以查看所需的内容.一旦理解了模型,它就是无锁的,并且设计成平凡的并行.
如上所述(评论),IO很可能是您的瓶颈.事实上,parallelising访问同一个物理磁盘/主轴通常会大大降低吞吐量,缓冲区更辩称,它必须做更多的追求,而不是连续的读数.
这应该很容易测试/验证:对于测试运行,而不是从磁盘读取,首先将它们全部加载到内存中;
var ms = new MemoryStream(
File.ReadAllBytes(path));
Run Code Online (Sandbox Code Playgroud)
加载所有文件后,现在执行相同的代码,但将MemoryStreams输入作为输入.如果它仍然无法扩展,则可能是一个错误.然而,我强烈怀疑你会发现它在那时很好地并行.
这是一个有用的例子,对我来说,通过并发反序列化使我的所有内核饱和:
using System.Collections.Generic;
using ProtoBuf;
using System;
using System.IO;
using System.Threading.Tasks;
internal class Program
{
private static void Main()
{
var foo = new Foo { Bars = new List<Bar>() };
var rand = new Random(1234);
for (int i = 0; i < 1000; i++)
{
var bar = new Bar
{
A = rand.Next(),
B = rand.Next(),
C = rand.Next(),
D = rand.Next(),
E = rand.Next(),
F = rand.Next(),
G = rand.Next(),
H = rand.Next()
};
foo.Bars.Add(bar);
}
var ms = new MemoryStream();
Serializer.Serialize(ms, foo);
var bytes = ms.ToArray();
const int count = 100000;
Parallel.For(0, count, x =>
{
Serializer.Deserialize<Foo>(new MemoryStream(bytes));
});
}
}
[ProtoContract]
internal class Foo
{
[ProtoMember(1)]
public List<Bar> Bars { get; set; }
}
[ProtoContract]
internal class Bar
{
[ProtoMember(1)]
public int A { get; set; }
[ProtoMember(2)]
public int B { get; set; }
[ProtoMember(3)]
public int C { get; set; }
[ProtoMember(4)]
public int D { get; set; }
[ProtoMember(5)]
public int E { get; set; }
[ProtoMember(6)]
public int F { get; set; }
[ProtoMember(7)]
public int G { get; set; }
[ProtoMember(8)]
public int H { get; set; }
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1044 次 |
| 最近记录: |