小编Fel*_*ger的帖子

无法连接到 id 为 1 的节点:[Worker]:错误:ConnectionError('没有连接到 id 为节点')

我正在尝试使用 robinhood / faust 但没有成功!

我已经创建了一个生产者,它成功地插入到我的 confluence-kafka localhost 实例中的原始主题中!

但 faust 无法连接到本地主机。

我的应用程序.py:

import faust
import base64
import random
from datetime import datetime


SOURCE_TOPIC="input_msgs"
TARGET_TOPIC="output_msgs"

app = faust.App("messages-stream", 
    broker="kafka://"+'localhost:9092',
    topic_partitions=1,
    store="memory://")

class OriginalMessage(faust.Record):
    msg: str


class TransformedMessage(faust.Record):
    msg_id: int
    msg_data: str
    msg_base64: str
    created_at: float 
    source_topic: str
    target_topic: str
    deleted: bool

topic = app.topic(SOURCE_TOPIC, value_type=OriginalMessage)
out_topic = app.topic(TARGET_TOPIC, partitions=1)

table = app.Table(
    "output_msgs",
    default=TransformedMessage,
    partitions=1,
    changelog_topic=out_topic,
)

print('Initializing Thread Processor...')


@app.agent(topic)
async def transformedmessage(messageevents):
    async for transfmessage in messageevents:
        try: …
Run Code Online (Sandbox Code Playgroud)

python stream-processing faust

5
推荐指数
1
解决办法
5388
查看次数

标签 统计

faust ×1

python ×1

stream-processing ×1