假设我想创建一个类似聊天的应用程序。客户端可以向服务器发送文本,反之亦然。文本交换的顺序可以是任意的。
服务器依赖于另一个控制服务器响应流的流。GRPC 流作为 python 生成器公开。服务器现在如何同时等待客户端输入和另一个流上的输入?通常人们会使用 select() 之类的东西,但这里我们有生成器。
我有一些示例代码,它实现了想要的行为,但在客户端和服务器端需要一个额外的线程。如何在没有线程的情况下获得相同的结果?
原型:
syntax = 'proto3';
service Scenario {
rpc Chat(stream DPong) returns (stream DPong) {}
}
message DPong {
string name = 1;
}
Run Code Online (Sandbox Code Playgroud)
服务器:
import random
import string
import threading
import grpc
import scenario_pb2_grpc
import scenario_pb2
import time
from concurrent import futures
class Scenario(scenario_pb2_grpc.ScenarioServicer):
def Chat(self, request_iterator, context):
def stream():
while 1:
time.sleep(1)
yield random.choice(string.ascii_letters)
output_stream = stream()
def read_incoming():
while 1:
received = next(request_iterator)
print('received: {}'.format(received))
thread = threading.Thread(target=read_incoming)
thread.daemon = True
thread.start()
while 1:
yield scenario_pb2.DPong(name=next(output_stream))
if __name__ == '__main__':
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
scenario_pb2.add_ScenarioServicer_to_server(
Scenario(), server)
server.add_insecure_port('[::]:50052')
server.start()
print('listening ...')
while 1:
time.sleep(1)
Run Code Online (Sandbox Code Playgroud)
客户
import threading
import grpc
import time
import scenario_pb2_grpc, scenario_pb2
def run():
channel = grpc.insecure_channel('localhost:50052')
stub = scenario_pb2_grpc.ScenarioStub(channel)
print('client connected')
def stream():
while 1:
yield scenario_pb2.DPong(name=input('$ '))
input_stream = stub.Chat(stream())
def read_incoming():
while 1:
print('received: {}'.format(next(input_stream).name))
thread = threading.Thread(target=read_incoming)
thread.daemon = True
thread.start()
while 1:
time.sleep(1)
if __name__ == '__main__':
print('client starting ...')
run()
Run Code Online (Sandbox Code Playgroud)