Nil*_*rak 5 python-3.x apache-kafka faust
我试图在一段时间间隔后将 faust 表的数据(计数)发布到 kafka 主题。当我发布一些简单的字符串时,计时器正在工作,但它无法以某种方式访问表的数据。下面是定时器的代码:
@app.timer(interval=10.0)
async def publish_to_anomaly_topic():
await anomaly_topic.send(
value=str(page_views['total'].value())
)
@app.agent(page_view_topic)
async def count_page_views(views):
async for view in views.group_by(PageView.id):
total=0
page_views[view.id]+=1
for everykey in list(page_views.keys()):
if everykey != 'total':
total+=page_views[everykey].value()
page_views['total'] = total
Run Code Online (Sandbox Code Playgroud)
代理工作正常。我能够正确地看到这些值。
经过大量实验,结果发现您无法与应用程序计时器一起访问表的值(即使您在创建表时指定relative_field选项)。此问题的解决方法是创建另一个表来维护消息的时间戳并在业务逻辑中使用它们。
if view.timestamp-page_views_timer[view.id+'_first_timestamp'] > 60:
await anomaly_topic.send(value={//the data to be sent})
Run Code Online (Sandbox Code Playgroud)
其中 page_views_timer 是创建的新表。
| 归档时间: |
|
| 查看次数: |
1855 次 |
| 最近记录: |