MySQL异步?

use*_*889 4 c++ mysql boost asynchronous boost-asio

我基本上面临阻塞问题.我的服务器编码基于C++ Boost.ASIO,使用8个线程,因为服务器有8个逻辑核心.

我的问题是一个线程可能面临一个MySQL查询阻塞0.2~1.5秒我真的不知道如何解决,因为MySQL C++ Connector不支持异步查询,我不知道如何设计服务器"正确"使用多个线程进行查询.

这就是我要求在这种情况下做什么的意见.为异步'查询sql创建100个线程?我可以就这个问题得到专家的意见吗?

seh*_*ehe 5

好的,正确的解决方案是扩展Asio并编写mysql_service实现来集成它.我几乎要知道如何立即完成,但我想开始使用"仿真".

这个想法是有的

  • io_service正在使用的业务流程(正如您已经在做的那样)
  • 数据库"facade"接口,它将异步查询分派到不同的队列(io_service)并将完成处理程序发布回business_processio_service

这里需要一个微妙的调整则需要从尽快关停保留io_service对象在业务流程方面,因为它的工作队列为空,因为它可能仍在等待来自数据库层的响应.

因此,将其建模为快速演示:

namespace database
{
    // data types
    struct sql_statement { std::string dml; };
    struct sql_response { std::string echo_dml; }; // TODO cover response codes, resultset data etc.
Run Code Online (Sandbox Code Playgroud)

我希望你能原谅我的粗略简化:/

struct service
{
    service(unsigned max_concurrent_requests = 10)
        : work(io_service::work(service_)),
        latency(mt19937(), uniform_int<int>(200, 1500)) // random 0.2 ~ 1.5s
    {
        for (unsigned i = 0; i < max_concurrent_requests; ++i)
            svc_threads.create_thread(boost::bind(&io_service::run, &service_));
    }

    friend struct connection;

private:
    void async_query(io_service& external, sql_statement query, boost::function<void(sql_response response)> completion_handler)
    {
        service_.post(bind(&service::do_async_query, this, ref(external), std::move(query), completion_handler));
    }

    void do_async_query(io_service& external, sql_statement q, boost::function<void(sql_response response)> completion_handler)
    {
        this_thread::sleep_for(chrono::milliseconds(latency())); // simulate the latency of a db-roundtrip

        external.post(bind(completion_handler, sql_response { q.dml }));
    }

    io_service service_;
    thread_group svc_threads; // note the order of declaration
    optional<io_service::work> work;

    // for random delay
    random::variate_generator<mt19937, uniform_int<int> > latency;
};
Run Code Online (Sandbox Code Playgroud)

该服务用于协调最大并发请求数(在"数据库io_service"端)并ping/ping完成另一个io_service(async_query/do_async_query组合).这个存根实现以明显的方式模拟0.2~1.5s的延迟:)

现在客户端"门面"

struct connection
{
    connection(int connection_id, io_service& external, service& svc)
        : connection_id(connection_id),
          external_(external), 
          db_service_(svc)
    { }

    void async_query(sql_statement query, boost::function<void(sql_response response)> completion_handler)
    {
        db_service_.async_query(external_, std::move(query), completion_handler);
    }
  private:
    int connection_id;
    io_service& external_;
    service& db_service_;
};
Run Code Online (Sandbox Code Playgroud)

connection实际上只是一个方便,所以我们不必明确处理调用站点上的各种队列.

现在,让我们以良好的旧Asio风格实现演示业务流程:

namespace domain
{
    struct business_process : id_generator
    {
        business_process(io_service& app_service, database::service& db_service_) 
            : id(generate_id()), phase(0), 
            in_progress(io_service::work(app_service)),
            db(id, app_service, db_service_)
        { 
            app_service.post([=] { start_select(); });
        }

    private:
        int id, phase;
        optional<io_service::work> in_progress;

        database::connection db;

        void start_select() {
            db.async_query({ "select * from tasks where completed = false" }, [=] (database::sql_response r) { handle_db_response(r); });
        }

        void handle_db_response(database::sql_response r) {
            if (phase++ < 4)
            {
                if ((id + phase) % 3 == 0) // vary the behaviour slightly
                {
                    db.async_query({ "insert into tasks (text, completed) values ('hello', false)" }, [=] (database::sql_response r) { handle_db_response(r); });
                } else
                {
                    db.async_query({ "update * tasks set text = 'update' where id = 123" }, [=] (database::sql_response r) { handle_db_response(r); });
                }
            } else
            {
                in_progress.reset();
                lock_guard<mutex> lk(console_mx);
                std::cout << "business_process " << id << " has completed its work\n";
            }
        }
    };

}
Run Code Online (Sandbox Code Playgroud)

此业务流程首先在应用服务上发布.然后,它会连续执行多个数据库查询,并最终退出(通过执行in_progress.reset()应用程序服务可以了解这一点).

演示主体,在单个线程上启动10个业务流程:

int main()
{
    io_service app;
    database::service db;

    ptr_vector<domain::business_process> bps;
    for (int i = 0; i < 10; ++i)
    {
        bps.push_back(new domain::business_process(app, db));
    }

    app.run();
}
Run Code Online (Sandbox Code Playgroud)

在我的示例中,business_processes不执行任何CPU密集型工作,因此在CPU上安排它们是没有用的,但如果您希望通过将app.run()行替换为以下内容,可以轻松实现此目的:

thread_group g;
for (unsigned i = 0; i < thread::hardware_concurrency(); ++i)
    g.create_thread(boost::bind(&io_service::run, &app));
g.join_all();
Run Code Online (Sandbox Code Playgroud)

查看运行Live On Coliru的演示


Mic*_*l J 0

我不是 MySQL 专家,但以下是通用的多线程建议。

  • NumberOfThreads == NumberOfCores当没有任何线程阻塞并且您只是将负载分散到所有 CPU 上时,拥有是合适的。

  • 一种常见的模式是每个 CPU 有多个线程,因此一个线程正在执行,而另一个线程正在等待某些事情。

  • 在你的情况下,我倾向于设置NumberOfThreads = n * NumberOfCores从配置文件、注册表项或其他用户可设置值读取“n”的位置。您可以使用不同的“n”值来测试系统,以找到最佳方案。我建议初步猜测为 3 左右。