小编sum*_*tkm的帖子

Apache Beam Python SDK ReadFromKafka 未收到数据

我正在尝试一个简单的示例,将 Kafka 主题的数据读取到 Apache Beam 中。这是相关的片段:

  with beam.Pipeline(options=pipeline_options) as pipeline:
    _ = (
        pipeline
        | 'Read from Kafka' >> ReadFromKafka(
            consumer_config={'bootstrap.servers': 'localhost:29092'},
            topics=['test'])
        | 'Print' >> beam.Map(print))
Run Code Online (Sandbox Code Playgroud)

使用上面的 Beam 管道片段,我没有看到任何消息传入。Kafka 在 Docker 容器中本地运行,我可以kafkacat从主机(容器外部)使用它来发布和订阅消息。所以,我想这方面没有问题。

看来 Beam 能够连接到 Kafka 并收到新消息的通知,因为我在发布数据时看到 Beam 日志中的偏移量发生了变化kafkacat

INFO:root:severity: INFO
timestamp {
  seconds: 1612886861
  nanos: 534000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST offset of partition test-0"
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "22"

INFO:root:severity: INFO
timestamp {
  seconds: 1612886861
  nanos: 537000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, …
Run Code Online (Sandbox Code Playgroud)

python apache-kafka apache-beam apache-beam-io

11
推荐指数
0
解决办法
2992
查看次数

PyFlink 从 JSON 数组中提取嵌套字段

我正在尝试从 Kafka 收到的 JSON 数据中提取 PyFlink 中的一些嵌套字段。JSON 记录架构如下。基本上,每个记录都有一个Result对象,其中有一个名为 的对象数组data。我正在尝试value从第一个数组元素中提取字段,即data[0]

{
  'ID': 'some-id',
  'Result': {
    'data': [
      {
        'value': 65537,
        ...
        ...
      }
    ]
  }
}
Run Code Online (Sandbox Code Playgroud)

我正在使用 Table API 从 Kafka 主题读取数据并将提取的字段写入另一个主题。

源DDL如下:

source_ddl = """
    CREATE TABLE InTable (
        `ID` STRING,
        `Timestamp` TIMESTAMP(3),
        `Result` ROW(
            `data` ROW(`value` BIGINT) ARRAY),
        WATERMARK FOR `Timestamp` AS `Timestamp`
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'in-topic',
        'properties.bootstrap.servers' = 'localhost:9092',
        'properties.group.id' = 'my-group-id',
        'scan.startup.mode' = 'earliest-offset', …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-sql pyflink

5
推荐指数
0
解决办法
1102
查看次数