Fil*_*ale 7 apache-kafka ksqldb
我正在尝试按其属性之一对事件进行分组,并随着时间的推移使用 KSQL 窗口聚合,特别是会话窗口。
我有一个STREAM由卡夫卡主题制成的TIMESTAMP属性已明确指定。
当我尝试STREAM使用以下查询创建会话窗口时:
CREATE STREAM SESSION_STREAM AS
SELECT ...
FROM EVENT_STREAM
WINDOW SESSION (5 MINUTES)
GROUP BY ...;
Run Code Online (Sandbox Code Playgroud)
我总是收到错误:
您的 SELECT 查询会生成一个 TABLE。请改用 CREATE TABLE AS SELECT 语句。
STREAM是否可以使用窗口聚合创建一个?
当我尝试按照建议创建一个包含所有会话启动事件的TABLEa 时STREAM,查询如下:
CREATE STREAM SESSION_START_STREAM AS
SELECT *
FROM SESSION_TABLE
WHERE WINDOWSTART=WINDOWEND;
Run Code Online (Sandbox Code Playgroud)
KSQL 告诉我:
KSQL 不支持窗口表上的持久查询
如何STREAM在 KSQL 中创建启动会话窗口的事件?
如果切换到创建表语句,您的创建流语句将创建一个不断更新的表。接收器主题SESSION_STREAM将包含表的更改流,即其更改日志。
ksqlDB 将其建模为 TABLE,因为它具有 TABLE 语义,即表中只能存在具有任何特定键的单行。但是,更改日志将包含已应用于表的更改流。
如果您想要一个包含所有会话的主题,那么将创建这样的内容:
-- create a stream with a new 'data' topic:
CREATE STREAM DATA (USER_ID INT)
WITH (kafka_topic='data', value_format='json');
-- create a table that tracks user interactions per session:
CREATE TABLE SESSION AS
SELECT USER_ID, COUNT(USER_ID) AS COUNT
FROM DATA
WINDOW SESSION (5 SECONDS)
GROUP BY USER_ID;
Run Code Online (Sandbox Code Playgroud)
这将创建一个SESSIONS包含表更改的主题SESSIONS:即其更改日志。
如果您想将其转换为会话启动事件流,那么不幸的是 ksqlDB还不允许您直接从表中更改创建流,但您可以在表的更改日志上创建流:
-- Create a stream over the existing `SESSIONS` topic.
-- Note it states the window_type is 'Session'.
CREATE STREAM SESSION_STREAM (ROWKEY INT KEY, COUNT BIGINT)
WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');
-- Create a stream of window start events:
CREATE STREAM SESSION_STARTS AS
SELECT * FROM SESSION_STREAM
WHERE WINDOWSTART = WINDOWEND;
Run Code Online (Sandbox Code Playgroud)
请注意,在即将发布的 0.10 版本中,您将能够正确命名键列SESSION_STREAM:
CREATE STREAM SESSION_STREAM (USER_ID INT KEY, COUNT BIGINT)
WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2754 次 |
| 最近记录: |