有谁知道 AWS MSK(Kafka 托管流)是否支持 KSQL?我在 Confluence 平台上看过很多有关 KSQL 的视频和文档,但没有看过 AWS MSK 的视频和文档。
请告诉我它是否支持,并告诉我是否有关于 AWS MSK 中的 KSQL 设置的任何文档。
谢谢,巴拉
我正在尝试按其属性之一对事件进行分组,并随着时间的推移使用 KSQL 窗口聚合,特别是会话窗口。
我有一个STREAM
由卡夫卡主题制成的TIMESTAMP
属性已明确指定。
当我尝试STREAM
使用以下查询创建会话窗口时:
CREATE STREAM SESSION_STREAM AS
SELECT ...
FROM EVENT_STREAM
WINDOW SESSION (5 MINUTES)
GROUP BY ...;
Run Code Online (Sandbox Code Playgroud)
我总是收到错误:
您的 SELECT 查询会生成一个 TABLE。请改用 CREATE TABLE AS SELECT 语句。
STREAM
是否可以使用窗口聚合创建一个?
当我尝试按照建议创建一个包含所有会话启动事件的TABLE
a 时STREAM
,查询如下:
CREATE STREAM SESSION_START_STREAM AS
SELECT *
FROM SESSION_TABLE
WHERE WINDOWSTART=WINDOWEND;
Run Code Online (Sandbox Code Playgroud)
KSQL 告诉我:
KSQL 不支持窗口表上的持久查询
如何STREAM
在 KSQL 中创建启动会话窗口的事件?
我目前正在 .NET 中使用 Kafka 消费者处理大量 Kafka 消息。
我的处理过程的第一步是解析 JSON 并根据 JSON 中特定字段的值丢弃许多消息。
我不想首先处理(特别是不下载)那些不需要的消息。
看起来 kSql 查询(写为推送查询)可以有效地过滤出我需要处理的消息。
我如何通过 .NET 使用这些?我看到一些文档提到了 REST API,但我怀疑这是一个好主意,我需要在一天的高峰时间每分钟处理超过 100 000 条记录。(如果我可以选择性地下载和处理消息,我只会正在处理当前体积的大约三分之一。)
不幸的是,我无法控制发布者,因此我无法更改消息的发布内容/方式。
我们有一个“微服务”平台,我们正在使用 Debezium 从这些平台上的数据库捕获变更数据,效果很好。
现在,我们希望能够轻松地加入这些主题并将结果流式传输到一个可由多个服务使用的新主题。
免责声明:这假设 ksqldb 和 cli v0.11(似乎其中大部分内容在旧版本中可能不起作用)
来自两个数据库实例的两个表流入 Kafka 主题的示例:
-- source identity microservice (postgres)
CREATE TABLE public.user_entity (
id varchar(36) NOT NULL,
first_name varchar(255) NULL,
PRIMARY KEY (id)
);
-- ksql stream
CREATE STREAM stream_user_entity WITH (KAFKA_TOPIC='cdc.identity.public.user_entity', value_format='avro');
-- source organization microservice (postgres)
CREATE TABLE public.user_info (
id varchar(36) NOT NULL,
user_entity_id varchar(36) NOT NULL,
business_unit varchar(255) NOT NULL,
cost_center varchar(255) NOT NULL,
PRIMARY KEY (id)
);
-- ksql stream
CREATE STREAM stream_user_info WITH (KAFKA_TOPIC='cdc.application.public.user_info', value_format='avro');
Run Code Online (Sandbox Code Playgroud)
选项 …
我是 ksql 的新手。我只是将 kafka 主题读到流中,效果很好。
此外,尝试从 kafka 主题创建表并失败。意识到我需要在 kafka 主题中设置一个键,该键被视为 ksql 表中的主键。所以我尝试从流创建表,但也失败了。查询/脚本:
CREATE TABLE DETAILS_TABLE AS SELECT SEQ, Server1, ServerId, NumberUri, SERVERID2, SERVER2 FROM details_stream WINDOW TUMBLING (SIZE 1 MINUTES);
Invalid result type. Your SELECT query produces a STREAM. Please use CREATE STREAM AS SELECT statement instead.
Run Code Online (Sandbox Code Playgroud)
有人可以解释一下是否可能吗?如果是,我哪里出错了?谢谢。
假设我有一个关于温度预测数据的主题,如下:
2018-10-25,Melbourne,21
2018-10-26,Melbourne,17
2018-10-27,Melbourne,21
2018-10-25,Sydney,22
2018-10-26,Sydney,20
2018-10-27,Sydney,23
2018-10-26,Melbourne,18
2018-10-27,Melbourne,22
2018-10-26,Sydney,21
2018-10-27,Sydney,24
Run Code Online (Sandbox Code Playgroud)
每个条目都包含一个日期、一个城市和一个预报温度,并代表该日期对该城市预报的更新。我可以将其描述为这样的 KSQL 流:
CREATE STREAM forecasts_csv ( \
date VARCHAR, \
city VARCHAR, \
temperature INTEGER \
) WITH (kafka_topic='forecasts-csv', value_format='DELIMITED');
Run Code Online (Sandbox Code Playgroud)
现在,我想要一个表示每个城市当前(即最新)预测温度的表格,以及该预测随时间变化的最小值和最大值。所需输出的示例是:
{ date='2018-10-27', city='Melbourne', latest=22, min=21, max=22 }
Run Code Online (Sandbox Code Playgroud)
我怎样才能做到这一点?
我设法获得如下聚合(最小/最大):
CREATE STREAM forecasts_keyed \
WITH (partitions=4, value_format='JSON') \
AS SELECT date + '/' + city AS forecast_key, * \
FROM forecasts_csv \
PARTITION BY forecast_key;
CREATE TABLE forecasts_minmax \
WITH (partitions=4, value_format='JSON') \
AS …
Run Code Online (Sandbox Code Playgroud) 我知道,在 KSQL 中我们可以将偏移量设置为最早或最晚但是我们可以获取特定时间段的数据,即我需要从 2020 年 5 月 6 日起将数据插入到主题中?
我正在尝试创建一个Debezium MySQL连接器,并进行转换以提取密钥。
关键转换之前:
create source connector mysql with(
"connector.class" = 'io.debezium.connector.mysql.MySqlConnector',
"database.hostname" = 'mysql',
"tasks.max" = '1',
"database.port" = '3306',
"database.user" = 'debezium',
"database.password" = 'dbz',
"database.server.id" = '42',
"database.server.name" = 'before',
"table.whitelist" = 'deepprices.deepprices',
"database.history.kafka.bootstrap.servers" = 'kafka:29092',
"database.history.kafka.topic" = 'dbz.deepprices',
"include.schema.changes" = 'true',
"transforms" = 'unwrap',
"transforms.unwrap.type" = 'io.debezium.transforms.UnwrapFromEnvelope');
Run Code Online (Sandbox Code Playgroud)
主题结果是:
> rowtime: 2020/05/20 16:47:23.354 Z, key: [St@5778462697648631933/8247607644536792125], value: {"id": "P195910", "price": "1511.64"}
Run Code Online (Sandbox Code Playgroud)
当 key.converter 设置为JSON时,Key 变为{"id": "P195910"}
所以,我想从 key 中提取 id 并将其设为字符串键:
预期成绩 : …
有没有办法使用 KSQL 将一条消息拆分为多条消息并发布到新主题。需要明确的是,我并不是在寻找基于 Java 的侦听器并将其迭代/流式传输到新主题;而是在寻找基于 Java 的侦听器并将其迭代/流式传输到新主题。相反,我正在寻找一个可以为我做到这一点的 KSQL。
例如:
比方说,我需要将invoice
主题中的消息拆分为item_inventory_delta
消息
键:销售支票号码
消息示例:
{
"total": 12.33,
"salecounter": 1,
"items": [
{
"itemId": 123,
"quantity": 1
},
{
"itemId": 345,
"quantity": 5
}
]
}
Run Code Online (Sandbox Code Playgroud)
key : saleschecknumber_itemID
消息示例
{
"itemId": 123,
"quantity": 1
}
Run Code Online (Sandbox Code Playgroud)
2.
{
"itemId": 345,
"quantity": 5
}
Run Code Online (Sandbox Code Playgroud) 我在无头模式下运行 Kafka 和 KSQLDB 服务器。在 KSQLDB 服务器上,我只部署了几个查询来进行试验:
CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews-ksql', PARTITIONS=1, REPLICAS=3, value_format='DELIMITED');
CREATE TABLE users_original (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR) WITH (kafka_topic='users-ksql', PARTITIONS=1, REPLICAS=3, value_format='JSON', key = 'userid');
CREATE STREAM pageviews_enriched AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid;
Run Code Online (Sandbox Code Playgroud)
我的问题是,KSQLDB 服务器现在不断记录此 INFO 消息:
“发现分区 _confluence-ksql-ksql-01query_CSAS_PAGEVIEWS_ENRICHED_0-Join-repartition-0 没有提交的偏移量”。
它每秒向日志发送大约 10 次此消息。对应的主题为空。
这是什么意思以及如何解决它?