Whi*_*ord 7 c++ asynchronous c++11 microservices grpc
我的团队正在设计一个具有微服务架构的可扩展解决方案,并计划将gRPC用作层之间的传输通信.我们决定使用async grpc模型.如果我扩展RPC方法的数量,示例(greeter_async_server.cc)提供的设计似乎不可行,因为那时我将不得不为每个RPC方法创建一个新类,并HandleRpcs()
像这样创建它们的对象.
Pastebin(简短示例代码).
void HandleRpcs() {
new CallDataForRPC1(&service_, cq_.get());
new CallDataForRPC2(&service_, cq_.get());
new CallDataForRPC3(&service, cq_.get());
// so on...
}
Run Code Online (Sandbox Code Playgroud)
它将被硬编码,所有的灵活性都将丢失.
我有大约300-400个RPC方法实现,并且当我必须处理超过100K RPC请求/秒时,具有300-400个类将是麻烦且低效的,并且该解决方案是非常糟糕的设计.我不能承受在每个请求上以这种方式创建对象的开销.有人可以为我提供一个解决方法.async grpc c++
可以像它的同步伴侣一样简单吗?
编辑:赞成使情况更加清晰,对于那些可能正在努力掌握这个异步示例流程的人,我写的是我到目前为止所理解的内容,如果在某处出错,请让我纠正.
在async grpc中,每次我们必须将unique-tag与completion-queue绑定,这样当我们轮询时,服务器可以在客户端命中特定RPC时将其返回给我们,并且我们从返回的内容中推断有关呼叫类型的唯一标记.
service_->RequestRPC2(&ctx_, &request_, &responder_, cq_, cq_,this);
这里我们使用当前对象的地址作为唯一标记.这就像在完成队列上注册我们的RPC调用一样.然后我们向下调查HandleRPCs()
以查看客户端是否命中RPC,如果是,则cq_->Next(&tag, &OK)
填充标记.轮询代码段:
while (true) {
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
Run Code Online (Sandbox Code Playgroud)
因为,我们在队列中注册的unique-tag是CallData对象的地址,所以我们可以调用Proceed()
.这对于一个内部逻辑的RPC来说很好Proceed()
.但是每次我们将所有这些都放在CallData中时,有了更多的RPC,那么在轮询时,我们将调用唯一一个Proceed()
包含逻辑的(例如)RPC1(postgres调用),RPC2(mongodb调用),.等等.这就像在一个函数中编写我的所有程序.因此,为了避免这种情况,我使用了一个GenericCallData
带有virtual void Proceed()
和派生类的类,每个RPC有一个类,它们自己的逻辑Proceed()
.这是一个有效的解决方案,但我想避免编写许多类.
我尝试的另一个解决方案是将所有RPC函数逻辑proceed()
保留在自己的函数中并保持全局std::map<long, std::function</*some params*/>>
.因此,每当我将具有unique-tag的RPC注册到队列中时,我将其相应的逻辑函数(我肯定会硬编码到语句中并绑定所需的所有参数),然后将unique-tag作为键存储.在轮询时,当我得到它时,我&tag
在地图中查找此键并调用相应的已保存函数.现在,还有一个障碍,我必须在函数逻辑中执行此操作:
// pseudo code
void function(reply, responder, context, service)
{
// register this RPC with another unique tag so to serve new incoming request of the same type on the completion queue
service_->RequestRPC1(/*params*/, new_unique_id);
// now again save this new_unique_id and current function into the map, so when tag will be returned we can do lookup
map.emplace(new_unique_id, function);
// now you're free to do your logic
// do your logic
}
Run Code Online (Sandbox Code Playgroud)
你看,代码已经扩展到另一个模块,它基于RPC.希望它能清除这种情况.我想如果有人能以更简单的方式实现这种类型的服务器.
这篇文章现在已经很老了,但我还没有看到任何关于这个的答案或例子,所以我将向任何其他读者展示我是如何解决它的。我有大约 30 个 RPC 调用,并且正在寻找一种在添加和删除 RPC 调用时减少占用空间的方法。我花了一些迭代才找到解决它的好方法。
因此,我从 (g)RPC 库获取 RPC 请求的接口是接收方需要实现的回调接口。界面如下所示:
class IRpcRequestHandler
{
public:
virtual ~IRpcRequestHandler() = default;
virtual void onZigbeeOpenNetworkRequest(const smarthome::ZigbeeOpenNetworkRequest& req,
smarthome::Response& res) = 0;
virtual void onZigbeeTouchlinkDeviceRequest(const smarthome::ZigbeeTouchlinkDeviceRequest& req,
smarthome::Response& res) = 0;
...
};
Run Code Online (Sandbox Code Playgroud)
以及一些用于在 gRPC 服务器启动后设置/注册每个 RPC 方法的代码:
void ready()
{
SETUP_SMARTHOME_CALL("ZigbeeOpenNetwork", // Alias that is used for debug messages
smarthome::Command::AsyncService::RequestZigbeeOpenNetwork, // Generated gRPC service method for async.
smarthome::ZigbeeOpenNetworkRequest, // Generated gRPC service request message
smarthome::Response, // Generated gRPC service response message
IRpcRequestHandler::onZigbeeOpenNetworkRequest); // The callback method to call when request has arrived.
SETUP_SMARTHOME_CALL("ZigbeeTouchlinkDevice",
smarthome::Command::AsyncService::RequestZigbeeTouchlinkDevice,
smarthome::ZigbeeTouchlinkDeviceRequest,
smarthome::Response,
IRpcRequestHandler::onZigbeeTouchlinkDeviceRequest);
...
}
Run Code Online (Sandbox Code Playgroud)
这就是您在添加和删除 RPC 方法时需要关心的全部内容。
SETUP_SMARTHOME_CALL 是一个自制的宏,如下所示:
#define SETUP_SMARTHOME_CALL(ALIAS, SERVICE, REQ, RES, CALLBACK_FUNC) \
new ServerCallData<REQ, RES>( \
ALIAS, \
std::bind(&SERVICE, \
&mCommandService, \
std::placeholders::_1, \
std::placeholders::_2, \
std::placeholders::_3, \
std::placeholders::_4, \
std::placeholders::_5, \
std::placeholders::_6), \
mCompletionQueue.get(), \
std::bind(&CALLBACK_FUNC, requestHandler, std::placeholders::_1, std::placeholders::_2))
Run Code Online (Sandbox Code Playgroud)
我认为 ServerCallData 类看起来像 gRPCs 示例中的类,但做了一些修改。ServerCallData 派生自非模板类,具有void proceed(bool ok)
用于 CompletionQueue::Next() 处理的抽象函数。创建 ServerCallData 时,它将调用该SERVICE
方法在 CompletionQueue 上注册自己,并且在每次第一次proceed(ok)
调用时,它都会克隆自己,这将注册另一个实例。如果有人感兴趣,我也可以为此发布一些示例代码。
编辑:在下面添加了更多示例代码。
Grpc服务器
class GrpcServer
{
public:
explicit GrpcServer(std::vector<grpc::Service*> services);
virtual ~GrpcServer();
void run(const std::string& sslKey,
const std::string& sslCert,
const std::string& password,
const std::string& listenAddr,
uint32_t port,
uint32_t threads = 1);
private:
virtual void ready(); // Called after gRPC server is created and before polling CQ.
void handleRpcs(); // Function that polls from CQ, can be run by multiple threads. Casts object to CallData and calls CallData::proceed().
std::unique_ptr<ServerCompletionQueue> mCompletionQueue;
std::unique_ptr<Server> mServer;
std::vector<grpc::Service*> mServices;
std::list<std::shared_ptr<std::thread>> mThreads;
...
}
Run Code Online (Sandbox Code Playgroud)
和CallData
对象的主要部分:
template <typename TREQUEST, typename TREPLY>
class ServerCallData : public ServerCallMethod
{
public:
explicit ServerCallData(const std::string& methodName,
std::function<void(ServerContext*,
TREQUEST*,
::grpc::ServerAsyncResponseWriter<TREPLY>*,
::grpc::CompletionQueue*,
::grpc::ServerCompletionQueue*,
void*)> serviceFunc,
grpc::ServerCompletionQueue* completionQueue,
std::function<void(const TREQUEST&, TREPLY&)> callback,
bool first = false)
: ServerCallMethod(methodName),
mResponder(&mContext),
serviceFunc(serviceFunc),
completionQueue(completionQueue),
callback(callback)
{
requestNewCall();
}
void proceed(bool ok) override
{
if (!ok)
{
delete this;
return;
}
if (callStatus() == ServerCallMethod::PROCESS)
{
callStatus() = ServerCallMethod::FINISH;
new ServerCallData<TREQUEST, TREPLY>(callMethodName(), serviceFunc, completionQueue, callback);
try
{
callback(mRequest, mReply);
}
catch (const std::exception& e)
{
mResponder.Finish(mReply, Status::CANCELLED, this);
return;
}
mResponder.Finish(mReply, Status::OK, this);
}
else
{
delete this;
}
}
private:
void requestNewCall()
{
serviceFunc(
&mContext, &mRequest, &mResponder, completionQueue, completionQueue, this);
}
ServerContext mContext;
TREQUEST mRequest;
TREPLY mReply;
ServerAsyncResponseWriter<TREPLY> mResponder;
std::function<void(ServerContext*,
TREQUEST*,
::grpc::ServerAsyncResponseWriter<TREPLY>*,
::grpc::CompletionQueue*,
::grpc::ServerCompletionQueue*,
void*)>
serviceFunc;
std::function<void(const TREQUEST&, TREPLY&)> callback;
grpc::ServerCompletionQueue* completionQueue;
};
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
2553 次 |
最近记录: |