GRPC 流选择(python)

use*_*323 6 python grpc

假设我想创建一个类似聊天的应用程序。客户端可以向服务器发送文本,反之亦然。文本交换的顺序可以是任意的。

服务器依赖于另一个控制服务器响应流的流。GRPC 流作为 python 生成器公开。服务器现在如何同时等待客户端输入和另一个流上的输入?通常人们会使用 select() 之类的东西,但这里我们有生成器。

我有一些示例代码,它实现了想要的行为,但在客户端和服务器端需要一个额外的线程。如何在没有线程的情况下获得相同的结果?

原型:

syntax = 'proto3';

service Scenario {
    rpc Chat(stream DPong) returns (stream DPong) {}
}

message DPong {
    string name = 1;
}
Run Code Online (Sandbox Code Playgroud)

服务器:

import random
import string
import threading

import grpc

import scenario_pb2_grpc
import scenario_pb2
import time
from concurrent import futures

class Scenario(scenario_pb2_grpc.ScenarioServicer):

    def Chat(self, request_iterator, context):
        def stream():
            while 1:
                time.sleep(1)
                yield random.choice(string.ascii_letters)

        output_stream = stream()

        def read_incoming():
            while 1:
                received = next(request_iterator)
                print('received: {}'.format(received))

        thread = threading.Thread(target=read_incoming)
        thread.daemon = True
        thread.start()

        while 1:
            yield scenario_pb2.DPong(name=next(output_stream))


if __name__ == '__main__':
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    scenario_pb2.add_ScenarioServicer_to_server(
        Scenario(), server)

    server.add_insecure_port('[::]:50052')
    server.start()
    print('listening ...')
    while 1:
        time.sleep(1)
Run Code Online (Sandbox Code Playgroud)

客户

import threading

import grpc
import time

import scenario_pb2_grpc, scenario_pb2


def run():
    channel = grpc.insecure_channel('localhost:50052')
    stub = scenario_pb2_grpc.ScenarioStub(channel)
    print('client connected')

    def stream():
        while 1:
            yield scenario_pb2.DPong(name=input('$ '))

    input_stream = stub.Chat(stream())

    def read_incoming():
        while 1:
            print('received: {}'.format(next(input_stream).name))

    thread = threading.Thread(target=read_incoming)
    thread.daemon = True
    thread.start()

    while 1:
        time.sleep(1)

if __name__ == '__main__':
    print('client starting ...')
    run()
Run Code Online (Sandbox Code Playgroud)

小智 1

目前无法在不花费您所花费的线程的情况下执行此操作。我们正在考虑实施增强功能,以允许实现避免占用另一个线程,但这最早需要几个月的时间。