小编Luk*_*asM的帖子

如何从 faust 应用程序向 Websocket 发送数据

我目前正在使用 Kafka 和 robinhood 的 faust 处理一个用例来处理来自 Kafka 的数据。我已经成功地进行了计算,并且我需要的结果正在打印到我的 faust 工人正在运行的控制台上。

现在我想找到一种方法不仅可以在控制台中而且可以在 HTML 页面中看到我的结果。我查看了 websockets 库,但我无法将其与 faust 结合使用。我得到的错误是Crashed reason=RuntimeError('This event loop is already running')我认为这是因为代码是为正在处理的每条消息执行的。

任何帮助都受到高度赞赏

这是我正在使用的代码:

    import faust, datetime, websockets, asyncio

app = faust.App(
    'UseCase',
    broker='kafka://localhost:29092',
)

usecase_topic = app.topic('usecase',partitions=8)

usecase_table = app.Table('usecase', default=int)

checkfailure = {}

@app.agent(usecase_topic)
async def process_record(records):
    async for record in records:
        #count records for each Sensor
        print(record)
        sensor = record['ext_id']
        usecase_table[sensor] += 1
        #print(f'Records for Sensor {sensor}: {usecase_table[sensor]}')

        #write current timestamp of record …
Run Code Online (Sandbox Code Playgroud)

websocket python-3.x apache-kafka faust

2
推荐指数
1
解决办法
1216
查看次数

标签 统计

apache-kafka ×1

faust ×1

python-3.x ×1

websocket ×1