sum*_*tkm 5 apache-flink flink-sql pyflink
我正在尝试从 Kafka 收到的 JSON 数据中提取 PyFlink 中的一些嵌套字段。JSON 记录架构如下。基本上,每个记录都有一个Result对象,其中有一个名为 的对象数组data。我正在尝试value从第一个数组元素中提取字段,即data[0]。
{
'ID': 'some-id',
'Result': {
'data': [
{
'value': 65537,
...
...
}
]
}
}
Run Code Online (Sandbox Code Playgroud)
我正在使用 Table API 从 Kafka 主题读取数据并将提取的字段写入另一个主题。
源DDL如下:
source_ddl = """
CREATE TABLE InTable (
`ID` STRING,
`Timestamp` TIMESTAMP(3),
`Result` ROW(
`data` ROW(`value` BIGINT) ARRAY),
WATERMARK FOR `Timestamp` AS `Timestamp`
) WITH (
'connector' = 'kafka',
'topic' = 'in-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'my-group-id',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
"""
Run Code Online (Sandbox Code Playgroud)
对应的接收器DDL是:
sink_ddl = """
CREATE TABLE OutTable (
`ID` STRING,
`value` BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'out-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'my-group-id',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
"""
Run Code Online (Sandbox Code Playgroud)
value以下是从数组的第一个元素中提取字段的代码片段:
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
table = t_env.from_path('InTable')
table \
.select(
table.ID,
table.Result.data.at(1).value) \
.execute_insert('OutTable') \
.wait()
Run Code Online (Sandbox Code Playgroud)
execute_insert当我执行此步骤时,我在该步骤中看到以下错误。
py4j.protocol.Py4JJavaError: An error occurred while calling o57.executeInsert.
: scala.MatchError: ITEM($9.data, 1) (of class org.apache.calcite.rex.RexCall)
Run Code Online (Sandbox Code Playgroud)
但是,如果我不提取嵌入的内容value而是提取数组的整行(即)table.Result.data.at(1)并进行适当修改sink_ddl,我就能够正确获取整行。
知道吗,我错过了什么?感谢您的指点!
编辑:这可能是 Flink 中的一个错误,它正在由https://issues.apache.org/jira/browse/FLINK-22082跟踪。
| 归档时间: |
|
| 查看次数: |
1102 次 |
| 最近记录: |