Apache Flink:如何为动态表启用“upsert 模式”?

Aus*_*ork 5 apache-flink flink-streaming flink-sql

我在 Flink 文档和官方 Flink 博客中看到多次提到基于唯一键的动态表的“更新插入模式”。但是,我没有看到有关如何在动态表上启用此模式的任何示例/文档。

例子:

  • 博客文章

    通过更新模式在流上定义动态表时,我们可以在表上指定唯一的键属性。在这种情况下,对键属性执行更新和删除操作。该更新模式是在如下图显示。

  • 文件

    转换为upsert 流的动态表需要一个(可能是复合的)唯一键

所以我的问题是:

  • 如何在 Flink 中的动态表上指定唯一键属性?
  • 如何将动态表置于更新/更新插入/“替换”模式,而不是追加模式?

Fab*_*ske 8

链接的资源描述了两种不同的场景。

  • 博客文章讨论了UPSERTDataStream -> Table转换。
  • 文档描述了反向 upsertTable -> DataStream转换。

以下讨论基于 Flink 1.4.0(2018 年 1 月)。

向上插入DataStream -> Table转换

通过对键的 upsert将 aDataStream转换为 aTable不是本机支持的,但在路线图上。同时,您可以使用附加Table和带有用户定义聚合函数的查询来模拟此行为。

如果您有跟踪用户登录Table Logins的架构的附加(user, loginTime, ip),您可以将其转换为使用以下查询Table键入的 upsert user

SELECT user, LAST_VAL(loginTime), LAST_VAL(ip) FROM Logins GROUP BY user
Run Code Online (Sandbox Code Playgroud)

LAST_VAL聚合函数是一个用户定义的聚合函数,它总是返回最新的附加值。

DataStream -> Table尽管提供了更简洁的 API,但对 upsert转换的本机支持的工作方式基本相同。

向上插入Table -> DataStream转换

不支持将 aTable转换为 upsert DataStream。这也正确反映在文档中:

请注意,在将动态表转换为 DataStream 时,仅支持追加和收回流。

我们故意选择不支持 upsertTable -> DataStream转换,因为DataStream只有在关键属性已知的情况下才能处理upsert 。这些取决于查询,并不总是直接识别。确保正确解释关键属性是开发人员的责任。否则会导致程序出错。为避免出现问题,我们决定不提供 upsertTable -> DataStream转换。

相反,用户可以将 aTable转换为缩回DataStream。此外,我们支持UpsertTableSink将更新插入写入DataStream外部系统,例如数据库或键值存储。


Sve*_*end 3

更新:从 Flink 1.9 开始,如果我们使用 Blink planner(这是 Flink 1.11 以来的默认设置),则它LAST_VALUE内置聚合函数的一部分。

假设上面 Fabian Hueske 的回复中提到的表存在Logins,我们现在可以将其转换为 upsert 表,如下所示:

SELECT 
  user, 
  LAST_VALUE(loginTime), 
  LAST_VALUE(ip) 
FROM Logins 
GROUP BY user
Run Code Online (Sandbox Code Playgroud)