小编Vin*_*ini的帖子

如何在调用更新后端状态的函数时从 python (fastapi) 发送服务器端事件

我有以下问题:给定一个运行 fastapi 的后端,它有一个流端点,用于更新前端,我想在每次调用更新后端状态的函数时发送这些更新(可以是计划的作业或已命中并导致状态更新的不同端点)。

我想要实现的一个简单版本是:

from fastapi import FastAPI
from starlette.responses import StreamingResponse

class State:
    def __init__(self):
        self.messages = []

    def update(self, new_messages):
        self.messages = new_messages
        # HERE: notify waiting stream endpoint

app = FastAPI()

state = State()

@app.get('/stream')
def stream():
    def event_stream():
        while True:
            # HERE lies the question: wait for state to be update
            for message in state.messages:
                yield 'data: {}\n\n'.format(json.dumps(message))
    return StreamingResponse(event_stream(), media_type="text/event-stream")
Run Code Online (Sandbox Code Playgroud)

我希望它永远运行下去。每次状态更新时,都会event_stream解除阻塞并发送消息。

我看过线程和异步,但我有一种感觉,我缺少一些关于如何在 python 中执行此操作的简单概念。

python python-multithreading server-sent-events python-asyncio fastapi

6
推荐指数
2
解决办法
3691
查看次数

如何在烧瓶响应中返回分块的二进制数据?

我有一个处理 get 请求的烧瓶端点,我有兴趣在响应中返回一个对象列表(序列化),但以“分块”的方式。我的意思是,当我向该端点发出 get 请求时,我希望能够迭代响应(就好像我正在获取二进制数据列表)并将每个“块”反序列化为一个对象。

我能够实现与我的需求类似的结果,但是使用字符串。例如:

服务器端:

from flask import stream_with_context, request, Response

from flask import Flask

app = Flask(__name__)


@app.route('/stream')
def streamed_get():
    @stream_with_context
    def generate():
        yield 'Hello \n'
        yield "there\n"
        yield '!\n'

    return Response(generate())
Run Code Online (Sandbox Code Playgroud)

客户端:

import requests

response = requests.get("http://127.0.0.1:5000/stream", stream=True)
for i in response.iter_lines():
    print(i)
Run Code Online (Sandbox Code Playgroud)

这将打印:

Hello 
there
!
Run Code Online (Sandbox Code Playgroud)

但这似乎很明显,因为我正在使用response.iter_lines().

因此,为了进一步实验,我尝试向服务器发送一个发布请求,如下所示:

客户端:

Hello 
there
!
Run Code Online (Sandbox Code Playgroud)

服务器端:

def gen():
    yield 'hi\n'
    yield 'there'


requests.post("http://127.0.0.1:5000/stream_post", data=gen())
Run Code Online (Sandbox Code Playgroud)

在服务器控制台上打印:

('hi\n', 0)
('there', 1)
Run Code Online (Sandbox Code Playgroud)

例如,我不知道如何针对序列化对象执行类似的操作。我觉得我的知识存在根本差距,因为我似乎正在寻找一种返回块编码响应的方法?或者至少是某种带有其大小的响应,这样我就可以在客户端迭代它,如果这有意义的话。

python chunked-encoding flask

5
推荐指数
0
解决办法
2464
查看次数