我正在尝试使用 Apache Flink 1.11 创建一个源表,我可以在其中访问 JSON 消息中的嵌套属性。我可以从根属性中提取值,但我不确定如何访问嵌套对象。
文档建议它应该是一种MAP类型,但是当我设置它时,出现以下错误
: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP
Run Code Online (Sandbox Code Playgroud)
这是我的 SQL
: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP
Run Code Online (Sandbox Code Playgroud)
我的 JSON 看起来像这样:
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties MAP
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
)
Run Code Online (Sandbox Code Playgroud) AWS Schema Registry 是否支持在 Flink SQLK 应用程序中用作 SQL 目录?例如,文档显示了使用 Hive 目录的示例:
CREATE CATALOG hive WITH (
'type'='hive',
'property-version'='1',
'hive-version'='2.3.6',
'hive-conf-dir'='/opt/hive-conf'
);
Run Code Online (Sandbox Code Playgroud)
然后,可以在应用程序中使用该目录中定义的表,而无需声明CREATE TABLE,例如我可以这样做:
SELECT * FROM hive.`default`.my_table
Run Code Online (Sandbox Code Playgroud)
我想做的是这样的:
CREATE CATALOG awsglue WITH (
... some config here
);
Run Code Online (Sandbox Code Playgroud)
这是否受支持?是否有任何示例或文档显示其用法?
我正在 Apache Flink 中编写一个流服务。我基本上是使用 org.apache.flink.table.sources.CsvTableSource 从 CSV 文件中选取数据。下面是相同的代码:
StreamTableEnvironment streamTableEnvironment = TableEnvironment
.getTableEnvironment(streamExecutionEnvironment);
CsvTableSource csvTableSource = CsvTableSource.builder().path(pathToCsvFile)
.field("XXX0", Types.SQL_TIMESTAMP).field("XXX1", Types.INT)
.field("XXX2", Types.DECIMAL).field("XXX3", Types.INT).field("XXX4", Types.INT)
.field("XXX9", Types.DECIMAL).field("XXX5", Types.STRING)
.field("XXX6", Types.STRING).field("XXX7", Types.STRING).fieldDelimiter(",").lineDelimiter("\n")
.ignoreFirstLine().ignoreParseErrors().build();
streamTableEnvironment.registerTableSource("metrics_table", csvTableSource);
Table selectedMetricTable = streamTableEnvironment.sqlQuery(getSQLQuery(metricsType, metricsGroupingLevel));
DataStream<Tuple2<Boolean, MetricsTimeSeriesData>> metricStream = streamTableEnvironment
.toRetractStream(selectedMetricTable, MetricsTimeSeriesData.class);
Run Code Online (Sandbox Code Playgroud)
但它给出以下错误:
Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.sources.TableSource
Run Code Online (Sandbox Code Playgroud)
以下是 Maven 依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.4.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.4.0</version>
</dependency>
<dependency> …Run Code Online (Sandbox Code Playgroud) 有没有一种方法可以确定从执行计划或其他方式运行作业所需的任务槽总数,而不必先实际启动作业。
根据此文档:https ://ci.apache.org/projects/flink/flink-docs-stable/concepts/runtime.html
“Flink 集群需要的任务槽数量与作业中使用的最高并行度完全相同。无需计算程序总共包含多少个任务(具有不同的并行度)。”
如果我从 StreamExecutionEnvironment 获取执行计划(设置后但没有实际执行作业)并从执行计划 json 中的节点列表中获取任何节点的最大并行度,这是否足以确定执行任务所需的任务槽数运行作业。
是否存在这种情况不再出现的情况?或者有什么注意事项需要牢记吗?
我正在寻找建议/最佳实践,以确定在 Flink 集群上部署流作业所需的最佳资源。
资源有
我正在尝试从 Kafka 收到的 JSON 数据中提取 PyFlink 中的一些嵌套字段。JSON 记录架构如下。基本上,每个记录都有一个Result对象,其中有一个名为 的对象数组data。我正在尝试value从第一个数组元素中提取字段,即data[0]。
{
'ID': 'some-id',
'Result': {
'data': [
{
'value': 65537,
...
...
}
]
}
}
Run Code Online (Sandbox Code Playgroud)
我正在使用 Table API 从 Kafka 主题读取数据并将提取的字段写入另一个主题。
源DDL如下:
source_ddl = """
CREATE TABLE InTable (
`ID` STRING,
`Timestamp` TIMESTAMP(3),
`Result` ROW(
`data` ROW(`value` BIGINT) ARRAY),
WATERMARK FOR `Timestamp` AS `Timestamp`
) WITH (
'connector' = 'kafka',
'topic' = 'in-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'my-group-id',
'scan.startup.mode' = 'earliest-offset', …Run Code Online (Sandbox Code Playgroud) 我们在使用 Flink SQL 时遇到以下问题:我们已经配置了 Kafka Twitter 连接器以将推文添加到 Kafka,并且我们希望使用 Flink SQL 从表中的 Kafka 读取推文。
我们如何使用 Flink SQL API 定义嵌套的 json 属性(包括数组)?
我们尝试了以下方法,但不起作用(返回的值为空):
CREATE TABLE kafka_tweets(
payload ROW(`HashtagEntities` ARRAY[VARCHAR])
) WITH (
'connector' = 'kafka',
'topic' = 'twitter_status',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
Run Code Online (Sandbox Code Playgroud)
在 Twitter 响应中,HashtagEntities 是一个对象数组。
我正在尝试使用 Apache Iceberg 目录和 Hive Metastore 设置 Flink SQL,但没有成功。以下是我在干净的 Flink 1.18.1 安装中采取的步骤,以及我得到的错误。
\n运行 Hive MetaStore:
\ndocker run --rm --detach --name hms-standalone \\\n --publish 9083:9083 \\\n ghcr.io/recap-build/hive-metastore-standalone:latest \nRun Code Online (Sandbox Code Playgroud)\n使用 Docker 运行 MinIO:
\ndocker run --rm --detach --name minio \\\n -p 9001:9001 -p 9000:9000 \\\n -e "MINIO_ROOT_USER=admin" \\\n -e "MINIO_ROOT_PASSWORD=password" \\\n minio/minio server /data --console-address ":9001"\nRun Code Online (Sandbox Code Playgroud)\n配置一个存储桶:
\ndocker exec minio \\\n mc config host add minio http://localhost:9000 admin password\ndocker exec minio \\\n mc mb …Run Code Online (Sandbox Code Playgroud) 我的数据来自两个不同的 Kafka 主题,由不同的代理提供服务,每个主题具有不同数量的分区。一个流包含有关正在投放的广告的事件,另一个流包含点击事件:
\nad_serves: ad_id, ip, sTime\nad_clicks: ad_id, ip, cTime\nRun Code Online (Sandbox Code Playgroud)\n流程函数的文档包括有关使用 or 实现低级连接的部分,但CoProcessFunction我KeyedCoProcessFunction不确定如何设置它。
我还想知道这里是否可以使用Flink 的SQL Join之一。我对简单的连接感兴趣,比如
\nad_serves: ad_id, ip, sTime\nad_clicks: ad_id, ip, cTime\nRun Code Online (Sandbox Code Playgroud)\n以及基于投放后 5 秒内点击的广告的分析查询:
\nSELECT s.ad_id, s.sTime, c.cTime\nFROM ad_serves s, ad_clicks c\nWHERE s.ad_id = c.ad_id\nRun Code Online (Sandbox Code Playgroud)\n