我正在开发一个基于gRPC的服务,该服务需要高吞吐量。但目前我的程序在使用 C++ 同步 gRPC 时吞吐量较低。
我已通读 gRPC 文档,但没有找到有关同步/异步 API 之间差异的明确解释。除了异步可以控制完成队列之外,它对同步 API 是透明的。
我想知道同步gRPC是否将消息发送到TCP层,并等待其“ack”,从而导致下一条消息被阻塞?同时,异步 API 会异步发送它们,而不会等待后面的消息吗?
Mii*_*gon 11
TLDR:是的,异步 API 会异步发送消息,而不会等待后面的消息,而同步 API 会在发送/接收一条消息时阻塞整个线程。
gRPC 使用CompletionQueue进行异步操作。您可以在这里找到官方教程:https://grpc.io/docs/languages/cpp/async/
CompletionQueue 是一个事件队列。这里的“事件”可以是请求数据接收的完成或警报(定时器)到期等(基本上是任何异步操作的完成)。
以官方 gRPC 异步 API 示例为例,重点关注CallData类 和HandleRpcs():
void HandleRpcs() {
// Spawn a new CallData instance to serve new clients.
new CallData(&service_, cq_.get());
void* tag; // uniquely identifies a request.
bool ok;
while (true) {
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or cq_ is shutting down.
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
Run Code Online (Sandbox Code Playgroud)
HandleRpcs() 是服务器的主循环。这是一个无限循环,通过使用不断从完成队列中获取下一个事件cq->Next(),并调用它的Proceed()方法(我们自定义的方法来处理不同状态的客户端请求)。
类CallData(其实例代表客户端请求的完整处理周期):
class CallData {
public:
// Take in the "service" instance (in this case representing an asynchronous
// server) and the completion queue "cq" used for asynchronous communication
// with the gRPC runtime.
CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
: service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
// Invoke the serving logic right away.
Proceed();
}
void Proceed() {
if (status_ == CREATE) {
// Make this instance progress to the PROCESS state.
status_ = PROCESS;
// As part of the initial CREATE state, we *request* that the system
// start processing SayHello requests. In this request, "this" acts are
// the tag uniquely identifying the request (so that different CallData
// instances can serve different requests concurrently), in this case
// the memory address of this CallData instance.
service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
this);
} else if (status_ == PROCESS) {
// Spawn a new CallData instance to serve new clients while we process
// the one for this CallData. The instance will deallocate itself as
// part of its FINISH state.
new CallData(service_, cq_);
// The actual processing.
std::string prefix("Hello ");
reply_.set_message(prefix + request_.name());
// And we are done! Let the gRPC runtime know we've finished, using the
// memory address of this instance as the uniquely identifying tag for
// the event.
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);
} else {
GPR_ASSERT(status_ == FINISH);
// Once in the FINISH state, deallocate ourselves (CallData).
delete this;
}
}
private:
// The means of communication with the gRPC runtime for an asynchronous
// server.
Greeter::AsyncService* service_;
// The producer-consumer queue where for asynchronous server notifications.
ServerCompletionQueue* cq_;
// Context for the rpc, allowing to tweak aspects of it such as the use
// of compression, authentication, as well as to send metadata back to the
// client.
ServerContext ctx_;
// What we get from the client.
HelloRequest request_;
// What we send back to the client.
HelloReply reply_;
// The means to get back to the client.
ServerAsyncResponseWriter<HelloReply> responder_;
// Let's implement a tiny state machine with the following states.
enum CallStatus { CREATE, PROCESS, FINISH };
CallStatus status_; // The current serving state.
};
Run Code Online (Sandbox Code Playgroud)
我们可以看到,aCallData有 3 个状态:CREATE、PROCESS 和 FINISH。
service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_, this)会调用 ,这会告诉 gRPC 准备接收一个 SayHello请求。RequestSayHello告诉 gRPC 在收到请求后将上下文、请求正文和请求响应者放在哪里,以及通知使用哪个完成队列以及应将哪些标签附加到通知事件(在本例中,this用作标签)。HandleRpcs()阻止cq->Next(). 等待事件发生。一段时间之后....
SayHello客户端向服务器发出请求, gRPC 开始接收并解码该请求。(IO操作)一段时间之后....
request_CallData 对象的字段中(通过前面提供的指针),然后创建一个事件(使用the pointer to the CallData objectas 标记,如前面最后一个参数所询问的那样RequestSayHello)。然后 gRPC将该事件放入完成队列中cq_。HandleRpcs()接收到事件(之前阻塞的调用cq->Next()现在返回),调用CallData::Proceed()处理请求。status_CallData 的值为PROCESS,因此它执行以下操作:HandleRpcs()进入下一次迭代并再次阻塞cq->Next(),等待新事件发生。一段时间之后....
cq->Next() 接收事件并返回,CallData::Proceed()释放 CallData 对象(通过使用delete this;)。HandleRpcs()再次循环并阻塞cq->Next(),等待新事件。看起来该过程与同步 API 基本相同,只是对完成队列进行了额外的访问。然而,通过这样做,每次some time later....(通常是等待 IO 操作完成或等待请求发生),cq->Next()实际上不仅可以接收该请求的操作完成事件,还可以接收其他请求的操作完成事件。
因此,如果在第一个请求正在等待回复数据传输完成时收到新请求,则将cq->Next()获取新请求发出的事件,并立即并发地开始处理新请求,而不是等待第一个请求完成其传输。
另一方面,同步 API 将始终等待一个请求完全完成(从开始接收到完成回复),然后再开始接收另一个请求。这意味着接收请求正文数据和发回回复数据(IO 操作)时 CPU 利用率接近 0%。本来可以用来处理其他请求的宝贵 CPU 时间却浪费在等待上。
这确实很糟糕,因为如果互联网连接较差(往返 100 毫秒)的客户端向服务器发送请求,我们将不得不为来自该客户端的每个请求花费至少 200 毫秒,只是主动等待 TCP 传输完成。这将使我们的服务器性能下降到每秒仅约 5 个请求。
然而,如果我们使用异步 API,我们只是不会主动等待任何事情。我们告诉 gRPC:“请将此数据发送到客户端,但我们不会在这里等你完成。相反,只需在完成后向完成队列中放入一个小字母,我们稍后会检查它。” 并继续处理其他请求。
您可以看到如何为同步 API和异步 API编写一个简单的服务器
gRPC C++ 性能节点建议的最佳性能实践是生成与 CPU 核心数相等的线程数量,并为每个线程使用一个 CompletionQueue。
| 归档时间: |
|
| 查看次数: |
6269 次 |
| 最近记录: |