grpc c ++中的异步模型

Whi*_*ord 7 c++ asynchronous c++11 microservices grpc

我的团队正在设计一个具有微服务架构的可扩展解决方案,并计划将gRPC用作层之间的传输通信.我们决定使用async grpc模型.如果我扩展RPC方法的数量,示例(greeter_async_server.cc)提供的设计似乎不可行,因为那时我将不得不为每个RPC方法创建一个新类,并HandleRpcs()像这样创建它们的对象. Pastebin(简短示例代码).

   void HandleRpcs() {
            new CallDataForRPC1(&service_, cq_.get());
            new CallDataForRPC2(&service_, cq_.get());
            new CallDataForRPC3(&service, cq_.get());
            // so on...
    }
Run Code Online (Sandbox Code Playgroud)

它将被硬编码,所有的灵活性都将丢失.

我有大约300-400个RPC方法实现,并且当我必须处理超过100K RPC请求/秒时,具有300-400个类将是麻烦且低效的,并且该解决方案是非常糟糕的设计.我不能承受在每个请求上以这种方式创建对象的开销.有人可以为我提供一个解决方法.async grpc c++可以像它的同步伴侣一样简单吗?

编辑:赞成使情况更加清晰,对于那些可能正在努力掌握这个异步示例流程的人,我写的是我到目前为止所理解的内容,如果在某处出错,请让我纠正.

在async grpc中,每次我们必须将unique-tag与completion-queue绑定,这样当我们轮询时,服务器可以在客户端命中特定RPC时将其返回给我们,并且我们从返回的内容中推断有关呼叫类型的唯一标记.

service_->RequestRPC2(&ctx_, &request_, &responder_, cq_, cq_,this);这里我们使用当前对象的地址作为唯一标记.这就像在完成队列上注册我们的RPC调用一样.然后我们向下调查HandleRPCs()以查看客户端是否命中RPC,如果是,则cq_->Next(&tag, &OK)填充标记.轮询代码段:

while (true) {
          GPR_ASSERT(cq_->Next(&tag, &ok));
          GPR_ASSERT(ok);
          static_cast<CallData*>(tag)->Proceed();
        }
Run Code Online (Sandbox Code Playgroud)

因为,我们在队列中注册的unique-tag是CallData对象的地址,所以我们可以调用Proceed().这对于一个内部逻辑的RPC来说很好Proceed().但是每次我们将所有这些都放在CallData中时,有了更多的RPC,那么在轮询时,我们将调用唯一一个Proceed()包含逻辑的(例如)RPC1(postgres调用),RPC2(mongodb调用),.等等.这就像在一个函数中编写我的所有程序.因此,为了避免这种情况,我使用了一个GenericCallData带有virtual void Proceed()和派生类的类,每个RPC有一个类,它们自己的逻辑Proceed().这是一个有效的解决方案,但我想避免编写许多类.

我尝试的另一个解决方案是将所有RPC函数逻辑proceed()保留在自己的函数中并保持全局std::map<long, std::function</*some params*/>>.因此,每当我将具有unique-tag的RPC注册到队列中时,我将其相应的逻辑函数(我肯定会硬编码到语句中并绑定所需的所有参数),然后将unique-tag作为键存储.在轮询时,当我得到它时,我&tag在地图中查找此键并调用相应的已保存函数.现在,还有一个障碍,我必须在函数逻辑中执行此操作:

// pseudo code
void function(reply, responder, context, service)
{
    // register this RPC with another unique tag so to serve new incoming request of the same type on the completion queue
     service_->RequestRPC1(/*params*/, new_unique_id);
    // now again save this new_unique_id and current function into the map, so when tag will be returned we can do lookup
     map.emplace(new_unique_id, function);

    // now you're free to do your logic
    // do your logic
}
Run Code Online (Sandbox Code Playgroud)

你看,代码已经扩展到另一个模块,它基于RPC.希望它能清除这种情况.我想如果有人能以更简单的方式实现这种类型的服务器.

ale*_*eer 5

这篇文章现在已经很老了,但我还没有看到任何关于这个的答案或例子,所以我将向任何其他读者展示我是如何解决它的。我有大约 30 个 RPC 调用,并且正在寻找一种在添加和删除 RPC 调用时减少占用空间的方法。我花了一些迭代才找到解决它的好方法。

因此,我从 (g)RPC 库获取 RPC 请求的接口是接收方需要实现的回调接口。界面如下所示:

class IRpcRequestHandler
{
public:
    virtual ~IRpcRequestHandler() = default;
    virtual void onZigbeeOpenNetworkRequest(const smarthome::ZigbeeOpenNetworkRequest& req,
                                            smarthome::Response& res) = 0;
    virtual void onZigbeeTouchlinkDeviceRequest(const smarthome::ZigbeeTouchlinkDeviceRequest& req,
                                                smarthome::Response& res) = 0;
    ...
};
Run Code Online (Sandbox Code Playgroud)

以及一些用于在 gRPC 服务器启动后设置/注册每个 RPC 方法的代码:

void ready() 
{
    SETUP_SMARTHOME_CALL("ZigbeeOpenNetwork", // Alias that is used for debug messages
                         smarthome::Command::AsyncService::RequestZigbeeOpenNetwork,  // Generated gRPC service method for async.
                         smarthome::ZigbeeOpenNetworkRequest, // Generated gRPC service request message
                         smarthome::Response, // Generated gRPC service response message
                         IRpcRequestHandler::onZigbeeOpenNetworkRequest); // The callback method to call when request has arrived.

    SETUP_SMARTHOME_CALL("ZigbeeTouchlinkDevice",
                         smarthome::Command::AsyncService::RequestZigbeeTouchlinkDevice,
                         smarthome::ZigbeeTouchlinkDeviceRequest,
                         smarthome::Response,
                         IRpcRequestHandler::onZigbeeTouchlinkDeviceRequest);
    ...
}
Run Code Online (Sandbox Code Playgroud)

这就是您在添加和删除 RPC 方法时需要关心的全部内容。

SETUP_SMARTHOME_CALL 是一个自制的宏,如下所示:

#define SETUP_SMARTHOME_CALL(ALIAS, SERVICE, REQ, RES, CALLBACK_FUNC) \
  new ServerCallData<REQ, RES>(                                       \
      ALIAS,                                                          \
      std::bind(&SERVICE,                                             \
                &mCommandService,                                     \
                std::placeholders::_1,                                \
                std::placeholders::_2,                                \
                std::placeholders::_3,                                \
                std::placeholders::_4,                                \
                std::placeholders::_5,                                \
                std::placeholders::_6),                               \
      mCompletionQueue.get(),                                         \
      std::bind(&CALLBACK_FUNC, requestHandler, std::placeholders::_1, std::placeholders::_2))
Run Code Online (Sandbox Code Playgroud)

我认为 ServerCallData 类看起来像 gRPCs 示例中的类,但做了一些修改。ServerCallData 派生自非模板类,具有void proceed(bool ok)用于 CompletionQueue::Next() 处理的抽象函数。创建 ServerCallData 时,它将调用该SERVICE方法在 CompletionQueue 上注册自己,并且在每次第一次proceed(ok)调用时,它都会克隆自己,这将注册另一个实例。如果有人感兴趣,我也可以为此发布一些示例代码。

编辑:在下面添加了更多示例代码。

Grpc服务器

class GrpcServer
{
 public:
  explicit GrpcServer(std::vector<grpc::Service*> services);
  virtual ~GrpcServer();

  void run(const std::string& sslKey,
           const std::string& sslCert,
           const std::string& password,
           const std::string& listenAddr,
           uint32_t port,
           uint32_t threads = 1);

 private:
  virtual void ready();  // Called after gRPC server is created and before polling CQ.
  void handleRpcs();  // Function that polls from CQ, can be run by multiple threads. Casts object to CallData and calls CallData::proceed().

  std::unique_ptr<ServerCompletionQueue> mCompletionQueue;
  std::unique_ptr<Server> mServer;
  std::vector<grpc::Service*> mServices;
  std::list<std::shared_ptr<std::thread>> mThreads;
  ...
}
Run Code Online (Sandbox Code Playgroud)

CallData对象的主要部分:

template <typename TREQUEST, typename TREPLY>
class ServerCallData : public ServerCallMethod
{
 public:
  explicit ServerCallData(const std::string& methodName,
                          std::function<void(ServerContext*,
                                             TREQUEST*,
                                             ::grpc::ServerAsyncResponseWriter<TREPLY>*,
                                             ::grpc::CompletionQueue*,
                                             ::grpc::ServerCompletionQueue*,
                                             void*)> serviceFunc,
                          grpc::ServerCompletionQueue* completionQueue,
                          std::function<void(const TREQUEST&, TREPLY&)> callback,
                          bool first = false)
      : ServerCallMethod(methodName),
        mResponder(&mContext),
        serviceFunc(serviceFunc),
        completionQueue(completionQueue),
        callback(callback)
  {
    requestNewCall();
  }

  void proceed(bool ok) override
  {
    if (!ok)
    {
      delete this;
      return;
    }

    if (callStatus() == ServerCallMethod::PROCESS)
    {    
      callStatus() = ServerCallMethod::FINISH;
      new ServerCallData<TREQUEST, TREPLY>(callMethodName(), serviceFunc, completionQueue, callback);

      try
      {
        callback(mRequest, mReply);
      }
      catch (const std::exception& e)
      {
        mResponder.Finish(mReply, Status::CANCELLED, this);
        return;
      }

      mResponder.Finish(mReply, Status::OK, this);
    }
    else
    {    
      delete this;
    }
  }

 private:
  void requestNewCall()
  {
    serviceFunc(
        &mContext, &mRequest, &mResponder, completionQueue, completionQueue, this);
  }

  ServerContext mContext;
  TREQUEST mRequest;
  TREPLY mReply;
  ServerAsyncResponseWriter<TREPLY> mResponder;
  std::function<void(ServerContext*,
                     TREQUEST*,
                     ::grpc::ServerAsyncResponseWriter<TREPLY>*,
                     ::grpc::CompletionQueue*,
                     ::grpc::ServerCompletionQueue*,
                     void*)>
      serviceFunc;
  std::function<void(const TREQUEST&, TREPLY&)> callback;
  grpc::ServerCompletionQueue* completionQueue;
};
Run Code Online (Sandbox Code Playgroud)