我正在尝试一个简单的示例,将 Kafka 主题的数据读取到 Apache Beam 中。这是相关的片段:
with beam.Pipeline(options=pipeline_options) as pipeline:
_ = (
pipeline
| 'Read from Kafka' >> ReadFromKafka(
consumer_config={'bootstrap.servers': 'localhost:29092'},
topics=['test'])
| 'Print' >> beam.Map(print))
Run Code Online (Sandbox Code Playgroud)
使用上面的 Beam 管道片段,我没有看到任何消息传入。Kafka 在 Docker 容器中本地运行,我可以kafkacat从主机(容器外部)使用它来发布和订阅消息。所以,我想这方面没有问题。
看来 Beam 能够连接到 Kafka 并收到新消息的通知,因为我在发布数据时看到 Beam 日志中的偏移量发生了变化kafkacat:
INFO:root:severity: INFO
timestamp {
seconds: 1612886861
nanos: 534000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST offset of partition test-0"
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "22"
INFO:root:severity: INFO
timestamp {
seconds: 1612886861
nanos: 537000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, …Run Code Online (Sandbox Code Playgroud) 我正在尝试从 Kafka 收到的 JSON 数据中提取 PyFlink 中的一些嵌套字段。JSON 记录架构如下。基本上,每个记录都有一个Result对象,其中有一个名为 的对象数组data。我正在尝试value从第一个数组元素中提取字段,即data[0]。
{
'ID': 'some-id',
'Result': {
'data': [
{
'value': 65537,
...
...
}
]
}
}
Run Code Online (Sandbox Code Playgroud)
我正在使用 Table API 从 Kafka 主题读取数据并将提取的字段写入另一个主题。
源DDL如下:
source_ddl = """
CREATE TABLE InTable (
`ID` STRING,
`Timestamp` TIMESTAMP(3),
`Result` ROW(
`data` ROW(`value` BIGINT) ARRAY),
WATERMARK FOR `Timestamp` AS `Timestamp`
) WITH (
'connector' = 'kafka',
'topic' = 'in-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'my-group-id',
'scan.startup.mode' = 'earliest-offset', …Run Code Online (Sandbox Code Playgroud)