我如何使用 ZeroMQ 为协议缓冲区编写自己的 RPC 实现

Vpa*_*ino 6 c++ rpc protocol-buffers zeromq

根据“定义服务”下的 Google Protocol Buffers 文档,他们说,

也可以在您自己的 RPC 实现中使用协议缓冲区。

据我了解,Protocol Buffers 本身并没有实现 RPC。相反,它们提供了一系列必须由用户实现的抽象接口(就是我!)。所以我想利用 ZeroMQ 实现这些抽象接口进行网络通信。

我正在尝试使用 ZeroMQ 创建一个 RPC 实现,因为我正在处理的项目已经实现了用于基本消息传递的 ZeroMQ(因此我使用 gRPC,正如文档所建议的那样)。

在通读 proto 文档后,我发现我必须为自己的实现实现抽象接口RpcChannelRpcController

我已经构建了一个最小化的例子,说明我目前的 RPC 实现

.proto 文件为简洁起见省略了 SearchRequest 和 SearchResponse 架构

service SearchService {
    rpc Search (SearchRequest) returns (SearchResponse);
}
Run Code Online (Sandbox Code Playgroud)

SearchServiceImpl.h :

class SearchServiceImpl : public SearchService {
 public:
  void Search(google::protobuf::RpcController *controller,
                    const SearchRequest *request,
                    SearchResponse *response,
                    google::protobuf::Closure *done) override {
    // Static function that processes the request and gets the result
    SearchResponse res = GetSearchResult(request);

    // Call the callback function
    if (done != NULL) {
    done->Run();
    }
    }
  }
};
Run Code Online (Sandbox Code Playgroud)

MyRPCController.h :

class MyRPCController : public google::protobuf::RpcController {
 public:
    MyRPCController();

    void Reset() override;

    bool Failed() const override;

    std::string ErrorText() const override;

    void StartCancel() override;

    void SetFailed(const std::string &reason) override;

    bool IsCanceled() const override;

    void NotifyOnCancel(google::protobuf::Closure *callback) override;
 private:
  bool failed_;
  std::string message_;
};
Run Code Online (Sandbox Code Playgroud)

MyRPCController.cpp - 基于

void MyRPCController::Reset() {  failed_ = false; }

bool MyRPCController::Failed() const { return failed_; }

std::string MyRPCController::ErrorText() const { return message_; }

void MyRPCController::StartCancel() { }

void MyRPCController::SetFailed(const std::string &reason) {
  failed_ = true;
  message_ = reason;
}

bool MyRPCController::IsCanceled() const { return false; }

void MyRPCController::NotifyOnCancel(google::protobuf::Closure *callback) { }

MyRPCController::ChiRpcController() : RpcController() { Reset(); }
Run Code Online (Sandbox Code Playgroud)

MyRpcChannel.h :

class MyRPCChannel: public google::protobuf::RpcChannel {
 public:
    void CallMethod(const google::protobuf::MethodDescriptor *method, google::protobuf::RpcController *controller,
                    const google::protobuf::Message *request, google::protobuf::Message *response,
                    google::protobuf::Closure *done) override;
};
Run Code Online (Sandbox Code Playgroud)

到目前为止我对我的例子的问题:

  • 我在哪里适合 ZeroMQ?
    • 似乎它应该进入 RPCChannel,因为在我看到的示例中(请参阅此处的第 3 个代码块),它们传递了一个具有要绑定到的端口的字符串(即MyRpcChannel channel("rpc:hostname:1234/myservice");
  • 我关心的是我的 RPCController 实现,它看起来太简单了。应该更多的去这里吗?
  • 我如何实现 RPCChannel,它似乎与 SearchServiceImpl 非常相似。这些类中的 1 虚函数具有非常相似的方法签名,只是它是通用的。

这是我遇到的其他一些 Stack Overflow 问题,其中包含有关该主题的一些有用信息:

  1. Protobuf-Net:实现服务器、rpc 控制器和 rpc 通道- 这是我找到 RPCController 实现示例的地方。
  2. 使用协议缓冲区在 ZeroMQ 中实现 RPC - 这个答案很有趣,因为在最佳答案中,似乎他们建议不要使用 .proto 文件的 RPC 格式内置的 Protobufs。
    • 我也在这个文件中注意到了同样的概念,在一个名为libpbrpc的存储库中,它似乎是示例代码的好来源
  3. 我可以/应该使用现有的实现,例如RPCZ吗?

感谢您的帮助。我希望我提供了足够的信息并且清楚我在寻找什么。如果有什么不清楚或缺乏信息,请告诉我。我很乐意相应地编辑问题。

Fla*_*viu 4

  • ZeroMQ 提供了一个低级 API,用于基于可包含任何数据的消息的网络通信。
  • ProtoBuffers 是一个将结构化数据编码为压缩二进制数据并对此类数据进行解码的库。
  • gRPC 是一个 RPC 框架,它为基于网络通信的 RPC 服务生成代码,并具有将数据交换为 ProtoBuffers 数据的功能。

ZeroMQ 和 gRPC 都提供对网络通信的支持,但方式不同。您必须选择 ZeroMQ 或 gRPC 进行网络通信。如果您选择 ZeroMQ,则可以使用交换二进制结构化数据的 ProtoBuffers 对消息进行编码。

要点是 ProtoBuffers 库允许对变体记录(类似于 C/C++ 联合)进行编码和解码,可以完全模拟具有交换 ProtoBuffers 消息功能的 RPC 服务提供的功能。

所以选项是:

  1. 将 ZeroMQ 与发送和接收原语以及 ProtoBuffers 编码的变体消息一起使用,这些消息可以包含各种子消息,例如
union Request
{
  byte msgType;
  MessageType1 msg1;
  MessageType2 msg2;
  MessageType3 msg3;
}

union Response
{
  byte msgType;
  MessageType3 msg1;
  MessageType4 msg2;
  MessageType5 msg3;
}

send(Request request);
receive(Response response);
Run Code Online (Sandbox Code Playgroud)
  1. 使用 gRPC 生成具有功能的服务,例如
service MyService 
{
  rpc function1(MessageType1) returns (Response);
  rpc function2(MessageType2) returns (Response);
  rpc function3(MessageType3) returns (Response);

  rpc functionN(MessageType3) returns (MessageType5);
}
Run Code Online (Sandbox Code Playgroud)

(这里可以使用很多组合)

  1. 仅使用单功能 gRPC 服务,例如
service MyService 
{
    rpc function(Request) returns (Response);
}
Run Code Online (Sandbox Code Playgroud)

该选项可能取决于

  • 客户端的首选目标:基于 ZeroMQ 或 gRPC 的客户端
  • 比较 ZeroMQ 与基于 gRPC 的服务的性能原因
  • 特定功能,例如在 ZeroMQ 与基于 gRPC 的服务和客户端中如何使用/处理订阅(请参阅如何在 grpc 中正确设计发布-订阅模式?

对于第一个选项,与第二个选项相比,您必须做很多事情。您必须将发送的消息类型与预期接收的消息类型相匹配。

如果其他人将开发客户端,第二个选项将允许更容易/更快地理解所提供的服务的功能。

为了在 ZeroMQ 上开发 RPC 服务,我将定义这样的 .proto 文件,指定函数、参数(所有可能的输入和输出参数)和错误,如下所示:

enum Function 
{
    F1 = 0;
    F2 = 1;
    F3 = 2;
}

enum Error 
{
    E1 = 0;
    E2 = 1;
    E3 = 2;
}

message Request
{ 
    required Function function = 1;
    repeated Input data = 2;
}

message Response
{ 
    required Function function = 1;
    required Error error = 2;
    repeated Output data = 3;
}

message Input
{ 
    optional Input1 data1 = 1;
    optional Input2 data2 = 2;
    ...
    optional InputN dataN = n;
}

message Output
{ 
    optional Output1 data1 = 1;
    optional Output2 data2 = 2;
    ...
    optional OutputN dataN = n;
}

message Message
{
   repeated Request requests;
   repeated Response responses;
}
Run Code Online (Sandbox Code Playgroud)

并且根据函数 ID,在运行时必须检查参数的数量和类型。