发布到 kafka 主题的 Faust 示例

mel*_*r55 11 python apache-kafka faust

我很好奇您应该如何表达您希望将消息快速传递到 Kafka 主题。他们自述文件中的示例似乎没有写入主题:

import faust

class Greeting(faust.Record):
    from_name: str
    to_name: str

app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)

@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')

@app.timer(interval=1.0)
async def example_sender(app):
    await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )

if __name__ == '__main__':
    app.main()
Run Code Online (Sandbox Code Playgroud)

我希望hello.send在上面的代码中向该主题发布一条消息,但似乎没有。

有很多从主题中读取的示例,以及许多使用 cli 推送 ad-hoc 消息的示例。梳理文档后,我没有看到任何明确的代码发布主题示例。我只是疯了,上面的代码应该可以工作吗?

BWS*_*rns 5

您可以使用sink来告诉 Faust 在哪里交付代理功能的结果。如果需要,您还可以一次使用多个主题作为接收器。

@app.agent(topic_to_read_from, sink=[destination_topic])
async def fetch(records):
    async for record in records:
        result = do_something(record)
        yield result
Run Code Online (Sandbox Code Playgroud)


dee*_*392 5

send()函数是为写入主题而调用的正确函数。您甚至可以指定一个特定的分区,就像等效的 Java API 调用一样。

以下是该send()方法的参考:

https://faust.readthedocs.io/en/latest/reference/faust.topics.html#faust.topics.Topic.send