标签: flink-sql

使用 Apache Flink SQL 从 Kafka 消息中获取嵌套字段

我正在尝试使用 Apache Flink 1.11 创建一个源表,我可以在其中访问 JSON 消息中的嵌套属性。我可以从根属性中提取值,但我不确定如何访问嵌套对象。

文档建议它应该是一种MAP类型,但是当我设置它时,出现以下错误

: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP
Run Code Online (Sandbox Code Playgroud)

这是我的 SQL

: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP
Run Code Online (Sandbox Code Playgroud)

我的 JSON 看起来像这样:

        CREATE TABLE input(
            id VARCHAR,
            title VARCHAR,
            properties MAP
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        )
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-sql pyflink

8
推荐指数
2
解决办法
4642
查看次数

AWS Glue计划注册表是否支持用作Flink SQL目录?

AWS Schema Registry 是否支持在 Flink SQLK 应用程序中用作 SQL 目录?例如,文档显示了使用 Hive 目录的示例:

CREATE CATALOG hive WITH (
'type'='hive',
'property-version'='1',
'hive-version'='2.3.6',
'hive-conf-dir'='/opt/hive-conf'
);
Run Code Online (Sandbox Code Playgroud)

然后,可以在应用程序中使用该目录中定义的表,而无需声明CREATE TABLE,例如我可以这样做:

SELECT * FROM hive.`default`.my_table
Run Code Online (Sandbox Code Playgroud)

我想做的是这样的:

CREATE CATALOG awsglue WITH (
...  some config here
);
Run Code Online (Sandbox Code Playgroud)

这是否受支持?是否有任何示例或文档显示其用法?

amazon-web-services apache-flink aws-glue flink-sql

6
推荐指数
1
解决办法
652
查看次数

Apache Flink:如何为动态表启用“upsert 模式”?

我在 Flink 文档和官方 Flink 博客中看到多次提到基于唯一键的动态表的“更新插入模式”。但是,我没有看到有关如何在动态表上启用此模式的任何示例/文档。

例子:

  • 博客文章

    通过更新模式在流上定义动态表时,我们可以在表上指定唯一的键属性。在这种情况下,对键属性执行更新和删除操作。该更新模式是在如下图显示。

  • 文件

    转换为upsert 流的动态表需要一个(可能是复合的)唯一键

所以我的问题是:

  • 如何在 Flink 中的动态表上指定唯一键属性?
  • 如何将动态表置于更新/更新插入/“替换”模式,而不是追加模式?

apache-flink flink-streaming flink-sql

5
推荐指数
2
解决办法
2913
查看次数

Apache Flink 错误 java.lang.ClassNotFoundException:org.apache.flink.table.sources.TableSource?

我正在 Apache Flink 中编写一个流服务。我基本上是使用 org.apache.flink.table.sources.CsvTableSource 从 CSV 文件中选取数据。下面是相同的代码:

 StreamTableEnvironment streamTableEnvironment = TableEnvironment
                .getTableEnvironment(streamExecutionEnvironment);

    CsvTableSource csvTableSource = CsvTableSource.builder().path(pathToCsvFile)
            .field("XXX0", Types.SQL_TIMESTAMP).field("XXX1", Types.INT)
            .field("XXX2", Types.DECIMAL).field("XXX3", Types.INT).field("XXX4", Types.INT)
            .field("XXX9", Types.DECIMAL).field("XXX5", Types.STRING)
            .field("XXX6", Types.STRING).field("XXX7", Types.STRING).fieldDelimiter(",").lineDelimiter("\n")
            .ignoreFirstLine().ignoreParseErrors().build();

    streamTableEnvironment.registerTableSource("metrics_table", csvTableSource);

    Table selectedMetricTable = streamTableEnvironment.sqlQuery(getSQLQuery(metricsType, metricsGroupingLevel));

    DataStream<Tuple2<Boolean, MetricsTimeSeriesData>> metricStream = streamTableEnvironment
            .toRetractStream(selectedMetricTable, MetricsTimeSeriesData.class);
Run Code Online (Sandbox Code Playgroud)

但它给出以下错误:

Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.sources.TableSource
Run Code Online (Sandbox Code Playgroud)

以下是 Maven 依赖项:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_2.11</artifactId>
            <version>1.4.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.4.0</version>
        </dependency>
        <dependency> …
Run Code Online (Sandbox Code Playgroud)

java scala apache-flink flink-streaming flink-sql

5
推荐指数
1
解决办法
6721
查看次数

有没有办法确定总作业并行度或运行 Flink 作业所需的插槽数量(在运行之前)

有没有一种方法可以确定从执行计划或其他方式运行作业所需的任务槽总数,而不必先实际启动作业。

根据此文档:https ://ci.apache.org/projects/flink/flink-docs-stable/concepts/runtime.html

“Flink 集群需要的任务槽数量与作业中使用的最高并行度完全相同。无需计算程序总共包含多少个任务(具有不同的并行度)。”

如果我从 StreamExecutionEnvironment 获取执行计划(设置后但没有实际执行作业)并从执行计划 json 中的节点列表中获取任何节点的最大并行度,这是否足以确定执行任务所需的任务槽数运行作业。

是否存在这种情况不再出现的情况?或者有什么注意事项需要牢记吗?

apache-flink flink-streaming flink-cep flink-sql

5
推荐指数
1
解决办法
833
查看次数

Apache Flink 资源规划最佳实践

我正在寻找建议/最佳实践,以确定在 Flink 集群上部署流作业所需的最佳资源。

资源有

  1. 每个 TaskManager 的任务槽数
  2. TaskManager 的最佳内存分配
  3. 最大并行度

apache-flink flink-streaming flink-cep flink-sql

5
推荐指数
1
解决办法
745
查看次数

PyFlink 从 JSON 数组中提取嵌套字段

我正在尝试从 Kafka 收到的 JSON 数据中提取 PyFlink 中的一些嵌套字段。JSON 记录架构如下。基本上,每个记录都有一个Result对象,其中有一个名为 的对象数组data。我正在尝试value从第一个数组元素中提取字段,即data[0]

{
  'ID': 'some-id',
  'Result': {
    'data': [
      {
        'value': 65537,
        ...
        ...
      }
    ]
  }
}
Run Code Online (Sandbox Code Playgroud)

我正在使用 Table API 从 Kafka 主题读取数据并将提取的字段写入另一个主题。

源DDL如下:

source_ddl = """
    CREATE TABLE InTable (
        `ID` STRING,
        `Timestamp` TIMESTAMP(3),
        `Result` ROW(
            `data` ROW(`value` BIGINT) ARRAY),
        WATERMARK FOR `Timestamp` AS `Timestamp`
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'in-topic',
        'properties.bootstrap.servers' = 'localhost:9092',
        'properties.group.id' = 'my-group-id',
        'scan.startup.mode' = 'earliest-offset', …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-sql pyflink

5
推荐指数
0
解决办法
1102
查看次数

我们如何使用 Flink SQL API 定义嵌套的 json 属性(包括数组)?

我们在使用 Flink SQL 时遇到以下问题:我们已经配置了 Kafka Twitter 连接器以将推文添加到 Kafka,并且我们希望使用 Flink SQL 从表中的 Kafka 读取推文。

我们如何使用 Flink SQL API 定义嵌套的 json 属性(包括数组)?

我们尝试了以下方法,但不起作用(返回的值为空):

CREATE TABLE kafka_tweets(
  payload ROW(`HashtagEntities` ARRAY[VARCHAR])
) WITH (
  'connector' = 'kafka',
  'topic' = 'twitter_status',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
)
Run Code Online (Sandbox Code Playgroud)

在 Twitter 响应中,HashtagEntities 是一个对象数组。

apache-flink flink-streaming flink-sql

5
推荐指数
1
解决办法
895
查看次数

Flink 与 Iceberg Catalog 和 Hive Metastore:找不到 org.apache.hadoop.fs.s3a.S3AFileSystem

我正在尝试使用 Apache Iceberg 目录和 Hive Metastore 设置 Flink SQL,但没有成功。以下是我在干净的 Flink 1.18.1 安装中采取的步骤,以及我得到的错误。

\n

设置组件

\n

运行 Hive MetaStore:

\n
docker run --rm --detach --name hms-standalone \\\n           --publish 9083:9083 \\\n           ghcr.io/recap-build/hive-metastore-standalone:latest \n
Run Code Online (Sandbox Code Playgroud)\n

使用 Docker 运行 MinIO:

\n
docker run --rm --detach --name minio \\\n            -p 9001:9001 -p 9000:9000 \\\n            -e "MINIO_ROOT_USER=admin" \\\n            -e "MINIO_ROOT_PASSWORD=password" \\\n            minio/minio server /data --console-address ":9001"\n
Run Code Online (Sandbox Code Playgroud)\n

配置一个存储桶:

\n
docker exec minio \\\n    mc config host add minio http://localhost:9000 admin password\ndocker exec minio \\\n    mc mb …
Run Code Online (Sandbox Code Playgroud)

apache-flink hive-metastore flink-sql apache-iceberg

5
推荐指数
1
解决办法
314
查看次数

如何使用Flink实现不同数据源之间的流式连接?

我的数据来自两个不同的 Kafka 主题,由不同的代理提供服务,每个主题具有不同数量的分区。一个流包含有关正在投放的广告的事件,另一个流包含点击事件:

\n
ad_serves: ad_id, ip, sTime\nad_clicks: ad_id, ip, cTime\n
Run Code Online (Sandbox Code Playgroud)\n

流程函数的文档包括有关使用 or 实现低级连接的部分,CoProcessFunctionKeyedCoProcessFunction不确定如何设置它。

\n

我还想知道这里是否可以使用Flink 的SQL Join之一。我对简单的连接感兴趣,比如

\n
ad_serves: ad_id, ip, sTime\nad_clicks: ad_id, ip, cTime\n
Run Code Online (Sandbox Code Playgroud)\n

以及基于投放后 5 秒内点击的广告的分析查询:

\n
SELECT s.ad_id, s.sTime, c.cTime\nFROM ad_serves s, ad_clicks c\nWHERE s.ad_id = c.ad_id\n
Run Code Online (Sandbox Code Playgroud)\n

apache-flink flink-streaming flink-sql

4
推荐指数
1
解决办法
1627
查看次数