如何使用Flink实现不同数据源之间的流式连接?

Dav*_*son 4 apache-flink flink-streaming flink-sql

我的数据来自两个不同的 Kafka 主题,由不同的代理提供服务,每个主题具有不同数量的分区。一个流包含有关正在投放的广告的事件,另一个流包含点击事件:

\n
ad_serves: ad_id, ip, sTime\nad_clicks: ad_id, ip, cTime\n
Run Code Online (Sandbox Code Playgroud)\n

流程函数的文档包括有关使用 or 实现低级连接的部分,CoProcessFunctionKeyedCoProcessFunction不确定如何设置它。

\n

我还想知道这里是否可以使用Flink 的SQL Join之一。我对简单的连接感兴趣,比如

\n
ad_serves: ad_id, ip, sTime\nad_clicks: ad_id, ip, cTime\n
Run Code Online (Sandbox Code Playgroud)\n

以及基于投放后 5 秒内点击的广告的分析查询:

\n
SELECT s.ad_id, s.sTime, c.cTime\nFROM ad_serves s, ad_clicks c\nWHERE s.ad_id = c.ad_id\n
Run Code Online (Sandbox Code Playgroud)\n

Dav*_*son 7

一般来说,我建议使用 Flink SQL 来实现联接,因为它易于使用且优化良好。但无论您是使用 SQL/Table API,还是使用 DataStream API 自己实现联接,总体情况都大致相同。

  • 您将从不同的来源开始FlinkKafkaConsumer,每个主题都有一个来源。如果这些主题中的分区数量(及其数据量)非常不同,那么您可能会决定相应地扩展 Flink 源的实例数量。在下图中,我通过显示 2 个 ad_serve 实例和 1 个 ad_click 实例来建议这一点。

  • 实现联接时,无论是使用KeyedCoProcessFunctionSQL/Table API 还是使用 SQL/Table API,都必须对来自两个流的键具有相等约束。在这种情况下,我们可以通过 来设置两个流的键ad_id。这将具有将给定键的两个流中的所有事件汇集在一起​​的效果 - 例如,下图显示了广告 17 的 ad_serve 和 ad_click 事件,以及这些事件如何找到到达KeyedCoProcessFunction.

在此输入图像描述

  • 作为示例给出的两个查询在必须保留多少状态方面有非常不同的要求。对于无约束的常规连接,例如

    SELECT s.ad_id, s.sTime, c.cTime
    FROM ad_serves s, ad_clicks c
    WHERE s.ad_id = c.ad_id
    
    Run Code Online (Sandbox Code Playgroud)

    执行此查询的作业必须永远存储(在 Flink 的托管、键控状态下)来自两个流的所有事件。

    另一方面,第二个查询中提供的时间约束使得较旧的服务和单击事件的状态可能过期,无法再参与生成新的联接结果。(这里我假设所涉及的流是仅附加流,其中事件大致按时间顺序排列。)

这两个查询对于键控也有不同的需求。第一个查询连接于c.ad_id = s.ad_id; 第二个在s.ad_id = c.ad_id AND s.ip = c.ip. 如果您想对此进行设置,KeyedCoProcessFunction代码将如下所示:

DataStream<Serve> serves = ...
DataStream<Click> clicks = ...

serves
  .connect(clicks)
  .keyBy(s -> new Tuple2<>(s.ad_id, s.ip),
         c -> new Tuple2<>(c.ad_id, c.ip))
  .process(new MyJoinFunction())

Run Code Online (Sandbox Code Playgroud)

请注意,连接流上的 keyBy 需要两个键选择器函数,每个流一个,并且这些函数必须将两个流映射到同一键空间。在第二个连接的情况下,我们使用 的元组(ad_id, ip)作为键。