我正在尝试使用 robinhood / faust 但没有成功!
我已经创建了一个生产者,它成功地插入到我的 confluence-kafka localhost 实例中的原始主题中!
但 faust 无法连接到本地主机。
我的应用程序.py:
import faust
import base64
import random
from datetime import datetime
SOURCE_TOPIC="input_msgs"
TARGET_TOPIC="output_msgs"
app = faust.App("messages-stream", 
    broker="kafka://"+'localhost:9092',
    topic_partitions=1,
    store="memory://")
class OriginalMessage(faust.Record):
    msg: str
class TransformedMessage(faust.Record):
    msg_id: int
    msg_data: str
    msg_base64: str
    created_at: float 
    source_topic: str
    target_topic: str
    deleted: bool
topic = app.topic(SOURCE_TOPIC, value_type=OriginalMessage)
out_topic = app.topic(TARGET_TOPIC, partitions=1)
table = app.Table(
    "output_msgs",
    default=TransformedMessage,
    partitions=1,
    changelog_topic=out_topic,
)
print('Initializing Thread Processor...')
@app.agent(topic)
async def transformedmessage(messageevents):
    async for transfmessage in messageevents:
        try: …