背景:
我问这个是因为我目前有一个包含许多(数百到数千)个线程的应用程序.这些线程中的大多数在很长一段时间内处于空闲状态,等待将工作项放入队列中.当工作项可用时,然后通过调用一些任意复杂的现有代码来处理它.在某些操作系统配置中,应用程序会遇到控制最大用户进程数的内核参数,因此我想尝试减少工作线程数的方法.
我建议的解决方案
它似乎是一个基于协程的方法,我用协程替换每个工作线程,将有助于实现这一目标.然后,我可以拥有一个由实际(内核)工作线程池支持的工作队列.当项目被放置在特定协程的队列中进行处理时,会将一个条目放入线程池的队列中.然后它将恢复相应的协同程序,处理其排队的数据,然后再次暂停它,释放工作线程以执行其他工作.
实施细节:
在考虑如何做到这一点时,我无法理解无堆栈和堆栈协程之间的功能差异.我有一些使用Boost.Coroutine库的堆栈协同程序的经验.我发现从概念层面理解它相对容易:对于每个协同程序,它维护CPU上下文和堆栈的副本,当你切换到协程时,它切换到保存的上下文(就像内核模式调度程序一样) ).
对我来说不太清楚的是无堆栈协程与此有何不同.在我的应用程序中,与上述工作项排队相关的开销量非常重要.我见过的大多数实现,比如新的CO2库,都表明无堆栈协程提供了更低开销的上下文切换.
因此,我想更清楚地理解无堆栈和堆栈协程之间的功能差异.具体来说,我想到了这些问题:
像这样的参考文献表明,区别在于你可以在一个堆栈与无堆栈协程中产生/恢复.是这样的吗?有一个简单的例子,我可以在一个堆栈的协程中做但不能在无堆栈的协同程序中吗?
使用自动存储变量(即"堆栈中的变量")是否有任何限制?
我可以从无堆协程中调用哪些函数有任何限制吗?
如果无堆栈协程没有保存堆栈上下文,那么当协同程序运行时,自动存储变量会去哪里?
查看Boost.Coroutine的性能页面,对称协程似乎比不对称协同程序要便宜得多,大约50倍.这似乎令人惊讶,因为对称协程似乎提供了更一般的抽象.在不对称协程中是否有一些功能可以证明这个成本是合理的?
我还要补充一点,在不对称协程的情况下,构建成本约为上下文切换成本的500倍,因此很容易成为应用程序的瓶颈.
我正在做一些C++绿色线程的研究,大多数boost::coroutine2
和类似的POSIX函数一样makecontext()/swapcontext()
,并计划在其上实现一个C++绿色线程库boost::coroutine2
.两者都需要用户代码为每个新函数/协同程序分配一个堆栈.
我的目标平台是x64/Linux.我希望我的绿色线程库适合一般用途,因此堆栈应该根据需要进行扩展(合理的上限很好,例如10MB),如果堆栈在未使用太多内存时可能会收缩,那就太好了(不需要) ).我还没有想出一个合适的算法来分配堆栈.
经过一些谷歌搜索,我自己想出了几个选项:
mmap()
希望内核足够聪明,可以保留未分配的物理内存,只在访问堆栈时分配.在这种情况下,我们受内核的支配.mmap(PROT_NONE)
并设置一个SIGSEGV
信号处理程序.在信号处理程序中,当SIGSEGV
由堆栈访问引起时(被访问的内存在保留的大内存空间内),分配所需的内存mmap(PROT_READ | PROT_WRITE)
.这是这种方法的问题:mmap()
不是异步安全,不能在信号处理程序内部调用.它仍然可以实现,但非常棘手:在程序启动期间为内存分配创建另一个线程,并用于pipe() + read()/write()
从信号处理程序向线程发送内存分配信息.关于选项3的更多问题:
mmap()
调用而极度分散时,内核/ CPU的性能/坏程度如何?read()
被召唤?绿色线程的堆栈分配还有其他(更好的)选项吗?如何在其他实现中分配绿色线程堆栈,例如Go/Java?
分段堆栈如何工作?这个问题也适用于Boost.Coroutine
我所以我也在使用C++标签.主要的疑问来自这篇文章看起来他们所做的是在堆栈的底部保留一些空间并通过在那里分配的内存(可能通过mmap
和mprotect
?)注册某种信号处理程序来检查它是否已经损坏了当他们发现他们已经耗尽了空间时,他们继续分配更多的内存,然后从那里继续.3个问题
这不是构建用户空间的东西吗?它们如何控制新堆栈的分配位置以及如何编译程序的指令以了解它?
push指令基本上只是向堆栈指针添加一个值,然后将值存储在堆栈中的寄存器中,那么push指令如何知道新堆栈的启动位置以及相应的pop如何知道它何时必须将堆栈指针移回旧堆栈?
他们也说
在我们得到一个新的堆栈段之后,我们
goroutine
通过重试导致我们用完堆栈的函数来重新启动它
这是什么意思?他们重启整个goroutine吗?这不可能导致非确定性行为吗?
他们如何检测到程序已超出堆栈?如果他们在底部保留一个canary-ish内存区域,那么当用户程序创建一个足够大的数组溢出时会发生什么?这不会导致堆栈溢出并且是一个潜在的安全漏洞吗?
如果Go和Boost的实现不同,我很高兴知道它们中的任何一个如何处理这种情况
我最近发布了一个关于堆栈分段和提升协同程序的问题, 但似乎-fsplit-stack方法只适用于使用该标志编译的源文件,当你分支到另一个尚未编译的函数时,运行时会崩溃 -的fsplit堆栈.对于例如
这意味着运行时使用函数本地技术来检测何时超过当前堆栈.而不是"保护页面信号"技巧,其中堆栈的末尾总是有一个保护页面,它会在写入或读取时产生一个信号,告诉运行时分配一个新的堆栈帧并分支到那个.
那么这个国旗的用途是什么?如果我链接到任何其他未使用它构建的库,代码将会中断(甚至libstdc ++和libc),那么人们如何使用大型项目?
从阅读关于拆分堆栈的gcc wiki看来,从拆分堆栈函数调用非拆分堆栈函数会导致分配64KB堆栈帧.好.
但似乎尚未实现从函数指针调用非拆分堆栈函数以遵循上述方案.
这个标志有什么用呢?如果我继续调用任何虚函数我的程序会中断吗?
从下面的答案进一步看来,clang似乎没有实现拆分堆栈?
我无法形成关于控制流如何与spawn发生的心理图像.
当我调用时spawn(io_service, my_coroutine)
,它是否为io_service
队列添加了一个新的处理程序来包装对my_coroutine
?的调用?
在协同程序中我调用异步函数传递它yield_context
,它是否暂停协程直到异步操作完成?
void my_coroutine(yield_context yield) { ... async_foo(params ..., yield); ... // control comes here only once the async_foo operation completes }
我不明白的是我们如何避免等待.假设如果my_coroutine
服务于TCP连接,my_coroutine
在特定实例上挂起的其他实例如何被挂起,等待async_foo
完成?
在阅读coroutine2
我的文档时,我发现了一段很好的代码片段,展示了如何使用它asio
这里的参考是文档中的代码:
void session(boost::asio::io_service& io_service){
// construct TCP-socket from io_service
boost::asio::ip::tcp::socket socket(io_service);
try{
for(;;){
// local data-buffer
char data[max_length];
boost::system::error_code ec;
// read asynchronous data from socket
// execution context will be suspended until
// some bytes are read from socket
std::size_t length=socket.async_read_some(
boost::asio::buffer(data),
boost::asio::yield[ec]);
if (ec==boost::asio::error::eof)
break; //connection closed cleanly by peer
else if(ec)
throw boost::system::system_error(ec); //some other error
// write some bytes asynchronously
boost::asio::async_write(
socket,
boost::asio::buffer(data,length),
boost::asio::yield[ec]);
if (ec==boost::asio::error::eof)
break; //connection closed cleanly …
Run Code Online (Sandbox Code Playgroud) 为了在boost中尝试新的协同程序功能,我创建了以下程序:
#include <boost/coroutine/all.hpp>
#include <string>
#include <vector>
typedef boost::coroutines::coroutine<int(char)> coroutine_t;
void f(coroutine_t::caller_type & ca)
{
std::vector<int> vec = {1, 2, 3};
for (int i : vec)
{
char c = ca.get();
std::cout << "c: " << c << std::endl;
ca(i);
}
}
int main()
{
coroutine_t cr(f);
std::string str("abc");
for (char c : str)
{
std::cout << c << std::flush;
cr(c);
int n = cr.get();
std::cout << n << std::endl;
}
}
Run Code Online (Sandbox Code Playgroud)
我的构建命令如下:
$ g++ …
Run Code Online (Sandbox Code Playgroud) 在这个问题中,我描述了boost :: asio和boost :: coroutine使用模式,它导致我的应用程序随机崩溃,我发布了我的代码和valgrind以及GDB输出的提取.
为了进一步研究这个问题,我创建了一个较小的概念证明应用程序,它应用了相同的模式.我看到在我发布的源代码较小的程序中出现了同样的问题.
代码启动几个线程并创建一个带有几个虚拟连接的连接池(用户提供的数字).其他参数是无符号整数,它们扮演虚假请求的角色.sendRequest
函数的虚拟实现只是启动异步计时器,等待输入数字的等待秒数和函数的yileds.
有人可以看到这个代码的问题,他可以提出一些解决方案吗?
#include "asiocoroutineutils.h"
#include "concurrentqueue.h"
#include <iostream>
#include <thread>
#include <boost/lexical_cast.hpp>
using namespace std;
using namespace boost;
using namespace utils;
#define id this_thread::get_id() << ": "
// ---------------------------------------------------------------------------
/*!
* \brief This is a fake Connection class
*/
class Connection
{
public:
Connection(unsigned connectionId)
: _id(connectionId)
{
}
unsigned getId() const
{
return _id;
}
void sendRequest(asio::io_service& …
Run Code Online (Sandbox Code Playgroud) 我正在使用Simple-Web-Server库来创建简单的 Web 服务,以便将XML转换为JSON,反之亦然。反过来,它使用了几个boost库以及其中的boost::coroutine。对于XML<->JSON转换,我使用boost::property_tree库进行中间表示。这是代码:
#include <iostream>
#include <sstream>
#include <server_http.hpp>
#define BOOST_SPIRIT_THREADSAFE
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/xml_parser.hpp>
using namespace std;
using namespace boost::property_tree;
using HttpServer = SimpleWeb::Server<SimpleWeb::HTTP>;
int main()
{
HttpServer server(8080, 1);
server.resource["^/json_to_xml$"]["POST"] = [](auto& response, auto request) {
try
{
ptree pt;
read_json(request->content, pt);
ostringstream json, xml;
write_json(json, pt);
clog << "JSON request content:" << endl << json.str() << endl;
write_xml(xml, pt, xml_writer_make_settings<ptree::key_type>(' ', …
Run Code Online (Sandbox Code Playgroud) 我的服务器基于 boost spawn echo server 示例,并在此线程中进行了改进。真正的服务器很复杂,我做了一个更简单的服务器来显示问题:
服务器侦听端口 12345,从新连接接收 0x4000 字节数据。
客户端运行 1000 个线程,连接到服务器并发送 0x4000 字节数据。
问题:当客户端运行时,在 1 秒后通过控制台中的Ctrl-C杀死客户端进程,然后服务器io_context
将停止,服务器运行到无限循环并消耗 100% 的 CPU。如果这没有发生,重复启动客户端并杀死它几次,它会发生。可能几次后它的TCP端口用完了,等几分钟再试一次,它是在我的机器上杀死客户端3~15次后发生的。
该升压文件说,io_context.stopped()
是用来判断它是否停止
通过显式调用 stop() 或由于工作用完
我从不调用io_context.stop()
,并使用 amake_work_guard(io_context)
来保持io_context
不停止,但为什么它仍然停止?
我的环境:Win10-64bit,boost 1.71.0
服务器代码:
#include <iostream>
using namespace std;
#include <boost/thread/thread.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
using namespace boost;
using namespace boost::asio;
using namespace boost::asio::ip;
namespace ba=boost::asio;
#define SERVER_PORT 12345
#define DATA_LEN 0x4000
struct session : public std::enable_shared_from_this<session> …
Run Code Online (Sandbox Code Playgroud) 我继承了一个正在boost 1.75上工作的代码库。此代码在 boost 1.75 上运行没有问题:
using CompletionTokenType = boost::asio::yield_context;
using FunctionType = void(boost::system::error_code);
using AsyncResultType = boost::asio::async_result<CompletionTokenType, FunctionType>;
using HandlerType = typename AsyncResultType::completion_handler_type;
[[maybe_unused]] ResultOrErrorType
read(CompletionTokenType token, StatementType const& statement)
{
auto handler = HandlerType{token};
auto result = AsyncResultType{handler};
auto const future = handle_.get().asyncExecute(statement, [handler](auto const&) mutable {
boost::asio::post(boost::asio::get_associated_executor(handler), [handler]() mutable {
handler(boost::system::error_code{});
});
});
result.get(); // suspends coroutine until handler called
if (auto res = future.get(); res) {
return res;
} else {
// handle error
} …
Run Code Online (Sandbox Code Playgroud) boost-coroutine ×13
c++ ×12
boost ×7
boost-asio ×6
c++11 ×1
concurrency ×1
coroutine ×1
fiber ×1
go ×1
goroutine ×1
split-stacks ×1
valgrind ×1