有没有人有关于测试Flask Content Streaming资源的经验/指针?我的应用程序使用Redis Pub/Sub,当在通道中接收消息时,它会传输文本'data:{"value":42}'实现在Flask docs:docs之后,我的单元测试也在Flask docs之后完成.发往Redis pub/sub的消息由第二个资源(POST)发送.
我正在创建一个线程来监听流,同时我在主应用程序上发布到发布到Redis的资源.
虽然连接断言通过(我收到OK 200和mimetype'text/event-stream'),但数据对象是空的.
我的单元测试是这样的:
def test_04_receiving_events(self):
headers = [('Content-Type', 'application/json')]
data = json.dumps({"value": 42})
headers.append(('Content-Length', len(data)))
def get_stream():
rv_stream = self.app.get('stream/data')
rv_stream_object = json.loads(rv_stream.data) #error (empty)
self.assertEqual(rv_stream.status_code, 200)
self.assertEqual(rv_stream.mimetype, 'text/event-stream')
self.assertEqual(rv_stream_object, "data: {'value': 42}")
t.stop()
threads = []
t = Thread(target=get_stream)
threads.append(t)
t.start()
time.sleep(1)
rv_post = self.app.post('/sensor', headers=headers, data=data)
threads_done = False
while not threads_done:
threads_done = True
for t in threads:
if t.is_alive():
threads_done = False
time.sleep(1)
Run Code Online (Sandbox Code Playgroud)
应用资源是:
@app.route('/stream/data')
def …Run Code Online (Sandbox Code Playgroud)