在处理大量数据时降低CPU使用率

Ram*_*rai 0 c# multithreading winforms task-parallel-library tpl-dataflow

我正在编写一个实时应用程序,它每秒接收大约2000条消息,这些消息被推入队列中.我编写了一个后台线程来处理队列中的消息.

private void ProcessSocketMessage()
{
    while (!this.shouldStopProcessing)
    {
        while (this.messageQueue.Count > 0)
        {
            string message;
            bool result = this.messageQueue.TryDequeue(out message);
            if (result)
            {
               // Process the string and do some other stuff
               // Like updating the received message in a datagrid  
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

上述代码的问题在于它使用了大约12%CPU(2.40 GHz双核处理器)的疯狂处理能力.

我有4个类似于上面的块,它实际上占用了50%的CPU计算能力.在上面的代码中有什么可以优化的吗?

在第二次循环结束之前添加100 ms的线程休眠似乎会使性能提高50%.但我做错了吗?

Pan*_*vos 5

Dataflow库的 ActionBlock类中已经提供了此功能.ActionBlock有一个输入缓冲区,它接收消息并通过为每个消息调用一个动作来处理它们.默认情况下,一次只处理一条消息.它不使用繁忙的等待.

void MyActualProcessingMethod(string it)
{
  // Process the string and do some other stuff
}

var myBlock = new ActionBlock<string>( someString =>MyActualProcessingMethod(someString));

//Simulate a lot of messages
for(int i=0;i<100000;i++)
{
    myBlock.Post(someMessage);
}
Run Code Online (Sandbox Code Playgroud)

当消息完成和/或我们不再需要消息时,我们通过拒绝任何新消息并处理输入缓冲区中剩余的任何内容来命令它完成:

myBlock.Complete();
Run Code Online (Sandbox Code Playgroud)

在我们完成之前,我们需要实际等待块完成剩余的处理:

await myBlock.Completion;
Run Code Online (Sandbox Code Playgroud)

所有Dataflow块都可以接受来自多个客户端的消息.

块也可以组合在一起.一个块的输出可以提供另一个块.TransformBlock接受将输入转换为输出的函数.

通常,每个块都使用线程池中的任务.默认情况下,一个块一次只处理一条消息.不同的块在不同的任务或甚至不同的TaskScheduler上运行.这样,您可以让一个块执行一些繁重的处理,并将结果推送到另一个更新UI的块.

string MyActualProcessingMethod(string it)
{
  // Process the string and do some other stuff
  // and send a progress message downstream
  return SomeProgressMessage;
}

void UpdateTheUI(string msg)
{
  statusBar1.Text = msg;
}

var myProcessingBlock = new TransformBlock<string,string>(msg =>MyActualProcessingMethod(msg));
Run Code Online (Sandbox Code Playgroud)

UI将由在UI线程上运行的另一个块更新.这通过ExecutionDataflowBlockOptions表示:

var runOnUI=new ExecutionDataflowBlockOptions { 
                  TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() 
            };
var myUpdater = new ActionBlock<string>(msg => UpdateTheUI(msg),runOnUI);

//Pass progress messages from the processor to the updater
myProcessingBlock.LinkTo(myUpdater,new DataflowLinkOptions { PropagateCompletion = true });
Run Code Online (Sandbox Code Playgroud)

将消息发布到管道的第一个块的代码不会更改:

//Simulate a lot of messages
for(int i=0;i<100000;i++)
{
    myProcessingBlock.Post(someMessage);
}


//We are finished, tell the block to process any leftover messages
myProcessingBlock.Complete();
Run Code Online (Sandbox Code Playgroud)

在这种情况下,一旦处理器完成,它将通知管道中的下一个块完成.我们还需要等待最后一个块完成

//Wait for the block to finish
await myUpdater.Completion;
Run Code Online (Sandbox Code Playgroud)

如何使第一个块并行工作?我们可以指定最多10个任务将用于通过其执行选项处理输入消息:

var dopOptions = new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 10};
var myProcessingBlock = new TransformBlock<string,string>(msg =>MyActualProcessingMethod(msg),dopOptions);
Run Code Online (Sandbox Code Playgroud)

处理器将并行处理多达10条消息,但更新程序仍将在UI线程中逐个处理它们.