sum*_*tkm 11 python apache-kafka apache-beam apache-beam-io
我正在尝试一个简单的示例,将 Kafka 主题的数据读取到 Apache Beam 中。这是相关的片段:
with beam.Pipeline(options=pipeline_options) as pipeline:
_ = (
pipeline
| 'Read from Kafka' >> ReadFromKafka(
consumer_config={'bootstrap.servers': 'localhost:29092'},
topics=['test'])
| 'Print' >> beam.Map(print))
Run Code Online (Sandbox Code Playgroud)
使用上面的 Beam 管道片段,我没有看到任何消息传入。Kafka 在 Docker 容器中本地运行,我可以kafkacat从主机(容器外部)使用它来发布和订阅消息。所以,我想这方面没有问题。
看来 Beam 能够连接到 Kafka 并收到新消息的通知,因为我在发布数据时看到 Beam 日志中的偏移量发生了变化kafkacat:
INFO:root:severity: INFO
timestamp {
seconds: 1612886861
nanos: 534000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST offset of partition test-0"
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "22"
INFO:root:severity: INFO
timestamp {
seconds: 1612886861
nanos: 537000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset for partition test-0 to offset 29."
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "22"
Run Code Online (Sandbox Code Playgroud)
这就是我使用以下方式发布数据的方式kafkacat:
$ kafkacat -P -b localhost:29092 -t test -K:
1:foo
1:bar
Run Code Online (Sandbox Code Playgroud)
我可以再次使用以下命令确认已收到它kafkacat:
$ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: %s\n'
Key: 1 Value: foo
Key: 1 Value: bar
Run Code Online (Sandbox Code Playgroud)
但尽管如此,我并没有看到 Beam 按我的预期打印出实际的消息。任何指出这里缺少的内容的指示都会受到赞赏。我怀疑这可能是 Beam 管道端的解码问题,但可能是不正确的。
编辑(2021 年 3 月 17 日):
ReadFromKafka在与 Beam Python Kafka 连接器开发人员讨论此问题后,Python未按预期运行的根本原因是可移植 Flink 运行程序无法执行无界可分割 DoFns (SDF),因为它只支持自检查点。便携式流媒体 Flink 不会定期向 SDK 发出检查点请求。这就是为什么所有 Kafka 记录都会在第一ReadFromKafka阶段进行缓冲。跟踪此问题的 Jira 是https://issues.apache.org/jira/browse/BEAM-11991。此外,还有另一个 Jira 正在跟踪功能请求以支持此功能: https: //issues.apache.org/jira/browse/BEAM-11998。希望这可以帮助!
| 归档时间: |
|
| 查看次数: |
2992 次 |
| 最近记录: |