如何在某些任务非常昂贵的任务中加载平衡并行性?

Mud*_*uds 3 c# parallel-processing multithreading task-parallel-library

我有一个需要处理的对象列表.所以说列表是所有客户的列表,我需要对所有客户执行CPU密集型计算.虽然在此计算之前和之后我需要将数据提取并提交回数据库,因此它不仅仅是一个CPU任务.

所以我做的是

Parallel.ForEach(list, action);

行动在字面上

1 Fetch customer data
2 Process calculate (time and memory intensive task)
3 Commit back customer data
Run Code Online (Sandbox Code Playgroud)

代码中使用的工作不错,但最近时有时多个客户与非常高的多项纪录被处理,我们得到系统内存不足.

那么有没有办法对此进行负载均衡?大多数客户都得到了快速处理,但很少有人将所有资源都拿走了.我可以避免其中一些一起跑吗?

我可以实现这一点的一种方法是根据大小对列表进行排序,然后尝试选择第一个和最后一个项目并自己控制并行性,但是想看看我在这里有哪些选项.

Ser*_*rvy 6

因为你说在实际完成之前你已经计算了大小,所以它简化了操作.此时,您只需要一个同步原语,该原语不限制要执行的操作,而是具有一些总权重值,并确保所有当前运行的操作的总和小于指定的权重值.然后,您可以使用给定的权重值请求给定的操作运行,并且在它有足够的未使用权重值之前它实际上不会运行.

没有现有的原语可以做到这一点(信号量非常接近,但并不完全存在).但是,您可以很容易地从现有的同步原语中创建一个.

public class WeightedSemaphore
{
    public WeightedSemaphore(int totalWeight)
    {
        currentWeight = TotalWeight = totalWeight;
    }

    private ManualResetEvent signal = new ManualResetEvent(false);
    private int currentWeight;
    public int TotalWeight { get; }
    public int CurrentWeight { get { lock (signal) return currentWeight; } }

    public void Wait(int weight)
    {
        while (true)
        {
            lock (signal)
            {
                if (currentWeight >= weight)
                {
                    currentWeight -= weight;
                    return;
                }
            }

            signal.Reset();
            signal.WaitOne();
        }
    }
    public void Release(int weight)
    {
        lock (signal)
        {
            currentWeight += weight;
            signal.Set();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

现在,您可以完成每项操作,确保在完成工作之前等待它并提供"大小"值.从那里开始,我们只需要进行一些实验就可以确定当前系统可以支持的总重量.

请注意,这样做的副作用是您会发现更快的操作往往会更快地获得优先级.当一些空间被释放时,较短的操作更有可能与那里的内容一起运行,这意味着它将在更昂贵的操作之前保留该空间,甚至可以获得运行的机会.在许多情况下,这实际上是一个理想的属性,因为当您将更快的操作优先于更昂贵的操作时,平均响应时间实际上会下降.