同步和异步 gRPC 之间的区别

Tin*_*den 3 c++ tcp grpc

我正在开发一个基于gRPC的服务,该服务需要高吞吐量。但目前我的程序在使用 C++ 同步 gRPC 时吞吐量较低。

我已通读 gRPC 文档,但没有找到有关同步/异步 API 之间差异的明确解释。除了异步可以控制完成队列之外,它对同步 API 是透明的。

我想知道同步gRPC是否将消息发送到TCP层,并等待其“ack”,从而导致下一条消息被阻塞?同时,异步 API 会异步发送它们,而不会等待后面的消息吗?

Mii*_*gon 11

TLDR:是的,异步 API 会异步发送消息,而不会等待后面的消息,而同步 API 会在发送/接收一条消息时阻塞整个线程。

gRPC 使用CompletionQueue进行异步操作。您可以在这里找到官方教程:https://grpc.io/docs/languages/cpp/async/

CompletionQueue 是一个事件队列。这里的“事件”可以是请求数据接收的完成或警报(定时器)到期等(基本上是任何异步操作的完成)。

官方 gRPC 异步 API 示例为例,重点关注CallData类 和HandleRpcs()

  void HandleRpcs() {
    // Spawn a new CallData instance to serve new clients.
    new CallData(&service_, cq_.get());
    void* tag;  // uniquely identifies a request.
    bool ok;
    while (true) {
      // Block waiting to read the next event from the completion queue. The
      // event is uniquely identified by its tag, which in this case is the
      // memory address of a CallData instance.
      // The return value of Next should always be checked. This return value
      // tells us whether there is any kind of event or cq_ is shutting down.
      GPR_ASSERT(cq_->Next(&tag, &ok));
      GPR_ASSERT(ok);
      static_cast<CallData*>(tag)->Proceed();
    }
  }
Run Code Online (Sandbox Code Playgroud)

HandleRpcs() 是服务器的主循环。这是一个无限循环,通过使用不断从完成队列中获取下一个事件cq->Next(),并调用它的Proceed()方法(我们自定义的方法来处理不同状态的客户端请求)。

CallData(其实例代表客户端请求的完整处理周期):

  class CallData {
   public:
    // Take in the "service" instance (in this case representing an asynchronous
    // server) and the completion queue "cq" used for asynchronous communication
    // with the gRPC runtime.
    CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
        : service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
      // Invoke the serving logic right away.
      Proceed();
    }

    void Proceed() {
      if (status_ == CREATE) {
        // Make this instance progress to the PROCESS state.
        status_ = PROCESS;

        // As part of the initial CREATE state, we *request* that the system
        // start processing SayHello requests. In this request, "this" acts are
        // the tag uniquely identifying the request (so that different CallData
        // instances can serve different requests concurrently), in this case
        // the memory address of this CallData instance.
        service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
                                  this);
      } else if (status_ == PROCESS) {
        // Spawn a new CallData instance to serve new clients while we process
        // the one for this CallData. The instance will deallocate itself as
        // part of its FINISH state.
        new CallData(service_, cq_);

        // The actual processing.
        std::string prefix("Hello ");
        reply_.set_message(prefix + request_.name());

        // And we are done! Let the gRPC runtime know we've finished, using the
        // memory address of this instance as the uniquely identifying tag for
        // the event.
        status_ = FINISH;
        responder_.Finish(reply_, Status::OK, this);
      } else {
        GPR_ASSERT(status_ == FINISH);
        // Once in the FINISH state, deallocate ourselves (CallData).
        delete this;
      }
    }

   private:
    // The means of communication with the gRPC runtime for an asynchronous
    // server.
    Greeter::AsyncService* service_;
    // The producer-consumer queue where for asynchronous server notifications.
    ServerCompletionQueue* cq_;
    // Context for the rpc, allowing to tweak aspects of it such as the use
    // of compression, authentication, as well as to send metadata back to the
    // client.
    ServerContext ctx_;

    // What we get from the client.
    HelloRequest request_;
    // What we send back to the client.
    HelloReply reply_;

    // The means to get back to the client.
    ServerAsyncResponseWriter<HelloReply> responder_;

    // Let's implement a tiny state machine with the following states.
    enum CallStatus { CREATE, PROCESS, FINISH };
    CallStatus status_;  // The current serving state.
  };
Run Code Online (Sandbox Code Playgroud)

我们可以看到,aCallData有 3 个状态:CREATE、PROCESS 和 FINISH。

请求例程如下所示:

  1. 启动时,为未来传入的客户端预分配一个CallData。
  2. 在构造该 CallData 对象期间,service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_, this)会调用 ,这会告诉 gRPC 准备接收一个 SayHello请求。
    此时我们不知道请求将来自哪里或何时到来,我们只是告诉 gRPC 我们已经准备好在请求实际到达时进行处理,并让 gRPC 在请求发生时通知我们。
    参数 toRequestSayHello告诉 gRPC 在收到请求后将上下文、请求正文和请求响应者放在哪里,以及通知使用哪个完成队列以及应将哪些标签附加到通知事件(在本例中,this用作标签)。
  3. HandleRpcs()阻止cq->Next(). 等待事件发生。

一段时间之后....

  1. SayHello客户端向服务器发出请求, gRPC 开始接收并解码该请求。(IO操作)

一段时间之后....

  1. gRPC 已完成接收请求。它将请求正文放入request_CallData 对象的字段中(通过前面提供的指针),然后创建一个事件(使用the pointer to the CallData objectas 标记,如前面最后一个参数所询问的那样RequestSayHello)。然后 gRPC将该事件放入完成队列中cq_
  2. 循环HandleRpcs()接收到事件(之前阻塞的调用cq->Next()现在返回),调用CallData::Proceed()处理请求。
  3. status_CallData 的值为PROCESS,因此它执行以下操作:
    6.1。创建一个新的 CallData 对象,以便可以处理此之后的新客户端请求。
    6.2. 生成请求的回复,告诉 gRPC 我们已完成处理,请将回复发送回客户端。
    6.3 gRPC 开始传输回复。(IO操作)
    6.4 循环HandleRpcs()进入下一次迭代并再次阻塞cq->Next(),等待新事件发生。

一段时间之后....

  1. gRPC 已经完成了回复的传输,并告诉我们,再次将一个事件放入完成队列中,并以 CallData 的指针作为标记。
  2. cq->Next() 接收事件并返回CallData::Proceed()释放 CallData 对象(通过使用delete this;)。HandleRpcs()再次循环并阻塞cq->Next(),等待新事件。

看起来该过程与同步 API 基本相同,只是对完成队列进行了额外的访问。然而,通过这样做,每次some time later....(通常是等待 IO 操作完成或等待请求发生),cq->Next()实际上不仅可以接收该请求的操作完成事件,还可以接收其他请求的操作完成事件。

因此,如果在第一个请求正在等待回复数据传输完成时收到新请求,则将cq->Next()获取新请求发出的事件,并立即并发地开始处理新请求,而不是等待第一个请求完成其传输。

另一方面,同步 API 将始终等待一个请求完全完成(从开始接收到完成回复),然后再开始接收另一个请求。这意味着接收请求正文数据和发回回复数据(IO 操作)时 CPU 利用率接近 0%。本来可以用来处理其他请求的宝贵 CPU 时间却浪费在等待上。

这确实很糟糕,因为如果互联网连接较差(往返 100 毫秒)的客户端向服务器发送请求,我们将不得不为来自该客户端的每个请求花费至少 200 毫秒,只是主动等待 TCP 传输完成。这将使我们的服务器性能下降到每秒仅约 5 个请求。

然而,如果我们使用异步 API,我们只是不会主动等待任何事情。我们告诉 gRPC:“请将此数据发送到客户端,但我们不会在这里等你完成。相反,只需在完成后向完成队列中放入一个小字母,我们稍后会检查它。” 并继续处理其他请求。

相关信息

您可以看到如何为同步 API异步 API编写一个简单的服务器

最佳绩效实践

gRPC C++ 性能节点建议的最佳性能实践是生成与 CPU 核心数相等的线程数量,并为每个线程使用一个 CompletionQueue。