strands如何保证在boost.asio中正确执行挂起事件

Sla*_*ish 6 c++ multithreading boost-asio

考虑使用Boost.asio实现的echo服务器.从连接的客户端读取事件会导致数据块被放置到到达事件队列中.线程池通过这些事件工作 - 对于每个事件,线程获取事件中的数据并将其回送到连接的客户端.

在此输入图像描述

如上图所示,事件队列中可能存在多个来自单个客户端的事件.为了确保按顺序执行和交付给定客户端的这些事件,使用了股数.在这种情况下,来自给定连接客户端的所有事件都在客户端的链中执行.

我的问题是:股如何保证事件处理的正确顺序?我认为必须存在某种锁定链,但即使这样也是不够的,所以必须有更多,我希望有人可以解释一下我们指向一些代码来做到这一点?

我找到了这个文档: 如何工作以及为什么要使用它们

它揭示了这种机制的一些亮点,但是在一条链中说"处理程序执行顺序无法保证".这是否意味着我们最终会收到"永远的草莓."字段?

此外 - 每当新客户端连接时,我们是否必须创建一个新的链,以便每个客户端有一个链?

最后 - 当读取事件到来时,我们如何知道将其添加到哪个链?使用连接作为关键,必须从所有股线中查找钢绞线?

Ric*_*ges 6

strand是一个执行上下文,它在正确的线程上执行临界区内的处理程序.

使用互斥锁实现(或多或少)该关键部分.

它有点聪明,因为如果一个调度程序检测到一个线程已经在该链中,它会将处理程序附加到一个处理程序队列,以便在关键部分离开之前执行,但是在当前处理程序完成之后.

因此,在这种情况下,新的处理程序是"排序"发布到当前正在执行的线程.

订购时有一些保证.

strand::post/dispatch(x);
strand::post/dispatch(y);
Run Code Online (Sandbox Code Playgroud)

总会导致x在y之前发生.

但如果x在执行期间调度处理程序z,那么执行顺序将是:

x,z,y

请注意,使用strands处理io完成处理程序的惯用方法不是将工作发布到完成处理程序中的strand,而是将完成处理程序包装在strand中,并在那里完成工作.

asio包含检测此问题的代码,并将做正确的事情,确保正确的排序和消除不必要的中间帖子.

例如:

async_read(sock, mystrand.wrap([](const auto& ec, auto transferred)
{
  // this code happens in the correct strand, in the correct order.
});
Run Code Online (Sandbox Code Playgroud)


Tan*_*ury 5

strand为非并发和处理程序的调用顺序提供保证;strand不控制执行和多路分解操作的顺序。strand如果您有以下两种情况,请使用a :

  • 多个线程访问不是线程安全的共享对象
  • 需要保证处理程序的顺序排序

io_service会提供的缓冲区的期望和预期订货填充或者在操作启动的顺序使用。例如,如果socket具有“永远的草莓字段”。可供阅读,然后给出:

buffer1.resize(11); // buffer is a std::vector managed elsewhere
buffer2.resize(7);  // buffer is a std::vector managed elsewhere
buffer3.resize(8);  // buffer is a std::vector managed elsewhere
socket.async_read_some(boost::asio::buffer(buffer1), handler1);
socket.async_read_some(boost::asio::buffer(buffer2), handler2);
socket.async_read_some(boost::asio::buffer(buffer3), handler3);
Run Code Online (Sandbox Code Playgroud)

操作完成后:

  • handler1被调用,buffer1将包含“草莓”
  • handler2被调用,buffer2将包含“字段”
  • handler3被调用时,buffer3将包含“永远”。

但是,未指定完成处理程序的调用顺序。即使使用,这种未指定的顺序仍然适用strand


操作解复用

Asio使用Proactor设计模式[1]对操作进行解复用。在大多数平台上,这是根据Reactor实现的。在官方文件中提到的组件和他们的责任。考虑以下示例:

socket.async_read_some(buffer, handler);
Run Code Online (Sandbox Code Playgroud)

调用方是发起方,它启动async_read_some异步操作并创建handler完成处理程序。异步操作由StreamSocketService操作处理器执行:

  • 在启动函数中,如果套接字没有其他未完成的异步读取操作,并且有可用数据,则StreamSocketService将从套接字读取并将handler完成处理程序排队到io_service
  • 否则,读取操作将排队到套接字上,并且一旦套接字上的数据可用,就会通知反应堆以通知Asio。当io_service是RAN和数据是可用的插座上,然后将反应器将通知短耳。接下来,Asio将从套接字中取出未完成的读取操作,执行该操作,并将handler完成处理程序排队到io_service

io_service摄将出列完成处理,解复用处理程序线程正在运行的io_service,从中handler完成处理程序将被执行。未指定完成处理程序的调用顺序。

多种操作

如果在套接字上启动了相同类型的多个操作,则当前未指定使用或填充缓冲区的顺序。但是,在当前实现中,每个套接字针对每种类型的挂起操作使用FIFO队列(例如,用于读取操作的队列;用于写入操作的队列;等等)。该网络-TS草案,这是部分基于短耳,指定:

buffers按照发出这些操作的顺序进行填写。这些操作的完成处理程序的调用顺序未指定。

鉴于:

socket.async_read_some(buffer1, handler1); // op1
socket.async_read_some(buffer2, handler2); // op2
Run Code Online (Sandbox Code Playgroud)

正如op1之前所发起的那样op2,那么buffer1可以保证其中包含的数据流早于流中包含的数据buffer2,但是handler2可以在之前调用handler1

组合作战

组合运算由零个或多个中间运算组成。例如,async_read()组成的异步操作由零个或多个中间stream.async_read_some()操作组成。

当前实现使用操作链来创建一个连续性,在该连续性中async_read_some()启动单个操作,并在其内部完成句柄内确定是启动另一个async_read_some()操作还是调用用户的完成处理程序。由于存在延续性,因此async_read文档要求在完成组合操作之前,不得进行其他读取:

程序必须确保在此操作完成之前,该流不执行其他任何读取操作(例如async_read,该流的async_read_some功能或执行读取的任何其他组合操作)。

如果程序违反了这一要求,由于前面提到的缓冲区填充顺序,可能会观察到交织的数据。

举一个具体的例子,考虑async_read()启动操作以从套接字读取26字节数据的情况:

buffer.resize(26); // buffer is a std::vector managed elsewhere
boost::asio::async_read(socket, boost::asio::buffer(buffer), handler);
Run Code Online (Sandbox Code Playgroud)

如果套接字接收到“ Strawberry”,“ fields”,然后是“ forever。”,则该async_read()操作可能由一个或多个socket.async_read_some()操作组成。例如,它可以由3个中间操作组成:

  • 第一个async_read_some()操作从偏移量0开始将11个包含“ Strawberry”的字节读入缓冲区。不满足读取26个字节的完成条件,因此async_read_some()启动了另一个操作以继续操作
  • 第二个async_read_some()操作从偏移量11开始将7个包含“字段”的字节读入缓冲区。不满足读取26个字节的完成条件,因此async_read_some()启动了另一个操作以继续操作
  • 第三个async_read_some()操作读取8个包含“永远”的字节。以18的偏移量开始进入缓冲区。由于已满足读取26个字节的完成条件,因此handler将其排队。io_service

handler调用完成处理程序,buffer包括“永远的草莓田”。


股线

strand用于以保证的顺序提供处理程序的序列化执行。鉴于:

  • 股线对象 s
  • 一个功能对象f1被添加到钢绞线s通过s.post(),或s.dispatch()s.running_in_this_thread() == false
  • 一个功能对象f2被添加到钢绞线s通过s.post(),或s.dispatch()s.running_in_this_thread() == false

然后,该链提供了有序和非并发的保证,使得f1f2不会被同时调用。此外,如果添加的f1发生在添加之前f2f1则将在之前调用f2

带有:

auto wrapped_handler1 = strand.wrap(handler1);
auto wrapped_handler2 = strand.wrap(handler2);
socket.async_read_some(buffer1, wrapped_handler1); // op1
socket.async_read_some(buffer2, wrapped_handler2); // op2
Run Code Online (Sandbox Code Playgroud)

正如op1之前所发起的那样op2,那么buffer1可以保证包含的流中早于包含的数据的数据要包含在中buffer2,但未指定wrapped_handler1和的 wrapped_handler2调用顺序。在strand保证:

  • handler1并且handler2不会被同时调用
  • 如果wrapped_handler1在之前调用wrapped_handler2handler1则将在之前调用handler2
  • 如果wrapped_handler2在之前调用wrapped_handler1handler2则将在之前调用handler1

与组合的操作实现类似,该strand实现使用操作链来创建延续。该strand管理发布到它在FIFO队列中的所有处理程序。当队列为空且将处理程序发布到链上时,链将在上发布内部句柄io_service。在内部处理程序中,处理程序将从strand的FIFO队列中出队,执行,然后,如果队列不为空,则内部处理程序将自身回发到io_service

考虑阅读答案,以查找组合操作如何用于在完成处理程序asio_handler_invoke()的相同上下文(即strand)内包装中间处理程序。可以在对此问题的评论中找到实现细节。


1. [POSA2] D. Schmidt等人,面向模式的软件体系结构,第2卷,Wiley,2000年。