PyFlink 从 JSON 数组中提取嵌套字段

sum*_*tkm 5 apache-flink flink-sql pyflink

我正在尝试从 Kafka 收到的 JSON 数据中提取 PyFlink 中的一些嵌套字段。JSON 记录架构如下。基本上,每个记录都有一个Result对象,其中有一个名为 的对象数组data。我正在尝试value从第一个数组元素中提取字段,即data[0]

{
  'ID': 'some-id',
  'Result': {
    'data': [
      {
        'value': 65537,
        ...
        ...
      }
    ]
  }
}
Run Code Online (Sandbox Code Playgroud)

我正在使用 Table API 从 Kafka 主题读取数据并将提取的字段写入另一个主题。

源DDL如下:

source_ddl = """
    CREATE TABLE InTable (
        `ID` STRING,
        `Timestamp` TIMESTAMP(3),
        `Result` ROW(
            `data` ROW(`value` BIGINT) ARRAY),
        WATERMARK FOR `Timestamp` AS `Timestamp`
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'in-topic',
        'properties.bootstrap.servers' = 'localhost:9092',
        'properties.group.id' = 'my-group-id',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'json'
    )
"""
Run Code Online (Sandbox Code Playgroud)

对应的接收器DDL是:

sink_ddl = """
    CREATE TABLE OutTable (
        `ID` STRING,
        `value` BIGINT
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'out-topic',
        'properties.bootstrap.servers' = 'localhost:9092',
        'properties.group.id' = 'my-group-id',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'json'
    )
"""
Run Code Online (Sandbox Code Playgroud)

value以下是从数组的第一个元素中提取字段的代码片段:

t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)

table = t_env.from_path('InTable')
table \
    .select(
        table.ID,
        table.Result.data.at(1).value) \
    .execute_insert('OutTable') \
    .wait()
Run Code Online (Sandbox Code Playgroud)

execute_insert当我执行此步骤时,我在该步骤中看到以下错误。

py4j.protocol.Py4JJavaError: An error occurred while calling o57.executeInsert.
: scala.MatchError: ITEM($9.data, 1) (of class org.apache.calcite.rex.RexCall)
Run Code Online (Sandbox Code Playgroud)

但是,如果我不提取嵌入的内容value而是提取数组的整行(即)table.Result.data.at(1)并进行适当修改sink_ddl,我就能够正确获取整行。

知道吗,我错过了什么?感谢您的指点!

编辑:这可能是 Flink 中的一个错误,它正在由https://issues.apache.org/jira/browse/FLINK-22082跟踪。