Sla*_*ish 6 c++ multithreading boost-asio
考虑使用Boost.asio实现的echo服务器.从连接的客户端读取事件会导致数据块被放置到到达事件队列中.线程池通过这些事件工作 - 对于每个事件,线程获取事件中的数据并将其回送到连接的客户端.
如上图所示,事件队列中可能存在多个来自单个客户端的事件.为了确保按顺序执行和交付给定客户端的这些事件,使用了股数.在这种情况下,来自给定连接客户端的所有事件都在客户端的链中执行.
我的问题是:股如何保证事件处理的正确顺序?我认为必须存在某种锁定链,但即使这样也是不够的,所以必须有更多,我希望有人可以解释一下我们指向一些代码来做到这一点?
我找到了这个文档: 如何工作以及为什么要使用它们
它揭示了这种机制的一些亮点,但是在一条链中说"处理程序执行顺序无法保证".这是否意味着我们最终会收到"永远的草莓."字段?
此外 - 每当新客户端连接时,我们是否必须创建一个新的链,以便每个客户端有一个链?
最后 - 当读取事件到来时,我们如何知道将其添加到哪个链?使用连接作为关键,必须从所有股线中查找钢绞线?
strand是一个执行上下文,它在正确的线程上执行临界区内的处理程序.
使用互斥锁实现(或多或少)该关键部分.
它有点聪明,因为如果一个调度程序检测到一个线程已经在该链中,它会将处理程序附加到一个处理程序队列,以便在关键部分离开之前执行,但是在当前处理程序完成之后.
因此,在这种情况下,新的处理程序是"排序"发布到当前正在执行的线程.
订购时有一些保证.
strand::post/dispatch(x);
strand::post/dispatch(y);
Run Code Online (Sandbox Code Playgroud)
总会导致x在y之前发生.
但如果x在执行期间调度处理程序z,那么执行顺序将是:
x,z,y
请注意,使用strands处理io完成处理程序的惯用方法不是将工作发布到完成处理程序中的strand,而是将完成处理程序包装在strand中,并在那里完成工作.
asio包含检测此问题的代码,并将做正确的事情,确保正确的排序和消除不必要的中间帖子.
例如:
async_read(sock, mystrand.wrap([](const auto& ec, auto transferred)
{
// this code happens in the correct strand, in the correct order.
});
Run Code Online (Sandbox Code Playgroud)
strand
为非并发和处理程序的调用顺序提供保证;strand
不控制执行和多路分解操作的顺序。strand
如果您有以下两种情况,请使用a :
该io_service
会提供的缓冲区的期望和预期订货填充或者在操作启动的顺序使用。例如,如果socket
具有“永远的草莓字段”。可供阅读,然后给出:
buffer1.resize(11); // buffer is a std::vector managed elsewhere
buffer2.resize(7); // buffer is a std::vector managed elsewhere
buffer3.resize(8); // buffer is a std::vector managed elsewhere
socket.async_read_some(boost::asio::buffer(buffer1), handler1);
socket.async_read_some(boost::asio::buffer(buffer2), handler2);
socket.async_read_some(boost::asio::buffer(buffer3), handler3);
Run Code Online (Sandbox Code Playgroud)
操作完成后:
handler1
被调用,buffer1
将包含“草莓”handler2
被调用,buffer2
将包含“字段”handler3
被调用时,buffer3
将包含“永远”。但是,未指定完成处理程序的调用顺序。即使使用,这种未指定的顺序仍然适用strand
。
Asio使用Proactor设计模式[1]对操作进行解复用。在大多数平台上,这是根据Reactor实现的。在官方文件中提到的组件和他们的责任。考虑以下示例:
socket.async_read_some(buffer, handler);
Run Code Online (Sandbox Code Playgroud)
调用方是发起方,它启动async_read_some
异步操作并创建handler
完成处理程序。异步操作由StreamSocketService操作处理器执行:
handler
完成处理程序排队到io_service
io_service
是RAN和数据是可用的插座上,然后将反应器将通知短耳。接下来,Asio将从套接字中取出未完成的读取操作,执行该操作,并将handler
完成处理程序排队到io_service
该io_service
摄将出列完成处理,解复用处理程序线程正在运行的io_service
,从中handler
完成处理程序将被执行。未指定完成处理程序的调用顺序。
如果在套接字上启动了相同类型的多个操作,则当前未指定使用或填充缓冲区的顺序。但是,在当前实现中,每个套接字针对每种类型的挂起操作使用FIFO队列(例如,用于读取操作的队列;用于写入操作的队列;等等)。该网络-TS草案,这是部分基于短耳,指定:
将
buffers
按照发出这些操作的顺序进行填写。这些操作的完成处理程序的调用顺序未指定。
鉴于:
socket.async_read_some(buffer1, handler1); // op1
socket.async_read_some(buffer2, handler2); // op2
Run Code Online (Sandbox Code Playgroud)
正如op1
之前所发起的那样op2
,那么buffer1
可以保证其中包含的数据流早于流中包含的数据buffer2
,但是handler2
可以在之前调用handler1
。
组合运算由零个或多个中间运算组成。例如,async_read()
组成的异步操作由零个或多个中间stream.async_read_some()
操作组成。
当前实现使用操作链来创建一个连续性,在该连续性中async_read_some()
启动单个操作,并在其内部完成句柄内确定是启动另一个async_read_some()
操作还是调用用户的完成处理程序。由于存在延续性,因此async_read
文档要求在完成组合操作之前,不得进行其他读取:
程序必须确保在此操作完成之前,该流不执行其他任何读取操作(例如
async_read
,该流的async_read_some
功能或执行读取的任何其他组合操作)。
如果程序违反了这一要求,由于前面提到的缓冲区填充顺序,可能会观察到交织的数据。
举一个具体的例子,考虑async_read()
启动操作以从套接字读取26字节数据的情况:
buffer.resize(26); // buffer is a std::vector managed elsewhere
boost::asio::async_read(socket, boost::asio::buffer(buffer), handler);
Run Code Online (Sandbox Code Playgroud)
如果套接字接收到“ Strawberry”,“ fields”,然后是“ forever。”,则该async_read()
操作可能由一个或多个socket.async_read_some()
操作组成。例如,它可以由3个中间操作组成:
async_read_some()
操作从偏移量0开始将11个包含“ Strawberry”的字节读入缓冲区。不满足读取26个字节的完成条件,因此async_read_some()
启动了另一个操作以继续操作async_read_some()
操作从偏移量11开始将7个包含“字段”的字节读入缓冲区。不满足读取26个字节的完成条件,因此async_read_some()
启动了另一个操作以继续操作async_read_some()
操作读取8个包含“永远”的字节。以18的偏移量开始进入缓冲区。由于已满足读取26个字节的完成条件,因此handler
将其排队。io_service
当handler
调用完成处理程序,buffer
包括“永远的草莓田”。
strand
用于以保证的顺序提供处理程序的序列化执行。鉴于:
s
f1
被添加到钢绞线s
通过s.post()
,或s.dispatch()
当s.running_in_this_thread() == false
f2
被添加到钢绞线s
通过s.post()
,或s.dispatch()
当s.running_in_this_thread() == false
然后,该链提供了有序和非并发的保证,使得f1
和f2
不会被同时调用。此外,如果添加的f1
发生在添加之前f2
,f1
则将在之前调用f2
。
带有:
auto wrapped_handler1 = strand.wrap(handler1);
auto wrapped_handler2 = strand.wrap(handler2);
socket.async_read_some(buffer1, wrapped_handler1); // op1
socket.async_read_some(buffer2, wrapped_handler2); // op2
Run Code Online (Sandbox Code Playgroud)
正如op1
之前所发起的那样op2
,那么buffer1
可以保证包含的流中早于包含的数据的数据要包含在中buffer2
,但未指定wrapped_handler1
和的 wrapped_handler2
调用顺序。在strand
保证:
handler1
并且handler2
不会被同时调用wrapped_handler1
在之前调用wrapped_handler2
,handler1
则将在之前调用handler2
wrapped_handler2
在之前调用wrapped_handler1
,handler2
则将在之前调用handler1
与组合的操作实现类似,该strand
实现使用操作链来创建延续。该strand
管理发布到它在FIFO队列中的所有处理程序。当队列为空且将处理程序发布到链上时,链将在上发布内部句柄io_service
。在内部处理程序中,处理程序将从strand
的FIFO队列中出队,执行,然后,如果队列不为空,则内部处理程序将自身回发到io_service
。
考虑阅读此答案,以查找组合操作如何用于在完成处理程序asio_handler_invoke()
的相同上下文(即strand
)内包装中间处理程序。可以在对此问题的评论中找到实现细节。
1. [POSA2] D. Schmidt等人,面向模式的软件体系结构,第2卷,Wiley,2000年。