标签: apache-flink

AggregateFunction 中 merge 方法的含义

我试图理解 Flink 中的 AggregateFunction ,这里描述了这一点。它总共有四种方法,即

  1. 创建累加器
  2. 添加
  3. 获取结果
  4. 合并

据我了解,

createAccumulator当第一个元素进入新窗口时调用该方法,并且新创建的实例将被进一步使用

addcreateAccumulator调用方法以根据定义减少结果,这使用在方法中创建的实例

getResult当窗口关闭时调用方法并返回可用结果

我对上述方法的理解是否正确?最后,方法的用例是什么merge以及何时使用/调用它?这里的定义对我来说并不清楚。

apache-flink flink-streaming

2
推荐指数
1
解决办法
772
查看次数

在处理函数之后,带键的流是否变为不带键的?

假设我在键控过程后得到一个流。

DataStream<T> stream= sourceStream.keyBy(key).window(window).apply(function);
Run Code Online (Sandbox Code Playgroud)

生成的流是否仍被键入?我可以在该流中使用某些 Keyed 状态吗?

apache-flink flink-streaming

2
推荐指数
1
解决办法
1138
查看次数

是否可以通过编程方式设置 flink `state.checkpoint.dir` ?

我们的项目中有flink-conf.yaml本地运行。我们希望能够在本地运行 flink 进行测试。我们团队的一部分使用 Mac,另一部分使用 PC。我们想设置state.checkpoint.dir一些普遍接受的路径,最好在用户的主目录下。问题是,是否可以根据我们运行的操作系统以编程方式设置它,如果没有,在此设置中当前用户的主目录是否有一个普遍有效的快捷方式,例如 *NIX 上的“~”?

apache-flink flink-statefun

2
推荐指数
1
解决办法
1123
查看次数

Apache Flink 水印策略

我们正在构建一个流处理管道来处理/摄取 Kafka 消息。我们使用的是 Flink v1.12.2。在定义源水印策略时,在官方文档中,我遇到了两种开箱即用的水印策略;对于有界无序和对于单调时间戳。我确实浏览了 javadoc,但没有完全理解何时以及为什么应该使用一种策略而不是另一种策略。时间戳基于事件时间。谢谢。

apache-flink flink-streaming

2
推荐指数
1
解决办法
2246
查看次数

Apache flink 对水印空闲状态的理解以及与有限持续时间和窗口持续时间的关系

我有一个配置有 Kafka 连接器的 Flink 管道。

我使用以下命令将水印生成频率设置为 2 秒:

env.getConfig().setAutoWatermarkInterval(2000);
Run Code Online (Sandbox Code Playgroud)

现在,对于流窗口,我的滚动窗口为 60 秒,我们在其中进行一些聚合,并且根据数据字段之一的时间戳进行基于事件时间的处理。

我尚未为我的水印策略或我的流配置 allowedLateness。

final ConnectorConfig topicConfig = config.forTopic("mytopic");
final FlinkKafkaConsumer<MyPojo> myEvents = new FlinkKafkaConsumer<>(
        topicConfig.name(),
        AvroDeserializationSchema.forSpecific(MyPojo.class),
        topicConfig.forConsumer()
);
myEvents.setStartFromLatest();



myEvents.assignTimestampsAndWatermarks(
    WatermarkStrategy
        .<MyPojo>forBoundedOutOfOrderness(
                Duration.ofSeconds(30))
        .withIdleness(Duration.ofSeconds(120))
        .withTimestampAssigner((evt, timestamp) -> evt.event_timestamp_field));
Run Code Online (Sandbox Code Playgroud)

Q.1 根据我所读到的内容,我的时间 0-60 的窗口将在 90 秒后计算,30-90 在 120 秒后计算,依此类推。然而,由于我们正在做翻滚窗口,即没有重叠,我的猜测是没有 30-90 窗口,0-60 之后的下一个窗口是 60-120,在 150 秒标记处触发,我是对的吗?

Q.2 如果没有 allowedLateness,所有迟到的数据都将被丢弃,例如。90 秒后到达的时间戳为 45 的事件被认为是无序的,将超出第一个窗口,即 0-60。对于窗口 60-120,事件时间戳不匹配,因此它将被丢弃并且不包含在窗口在 150 秒处触发,对吗?

Q.3. 对于源空闲持续时间,我选择 120,表示如果该主题的任何 Kakfa 分区不活动且有数据,则在 2 分钟后将其标记为空闲,然后发送其他活动分区的水印。我的问题是选择这个数字,即 2 分钟,以及它是否与窗口持续时间(60 秒)或无序(30 秒)有关。如果是这样,我应该在这里记住什么来进行适当的选择,这样我就不会因为空闲分区导致的非前进水印而导致数据滞留?

或者 120 的等待时间太长,我可能会丢失数据,因此我应该将其设置为远小于 OutOfOrderness 持续时间以确保 …

java apache-kafka apache-flink flink-streaming

2
推荐指数
1
解决办法
2236
查看次数

如果我给 Flink 的 RocksDB 状态后端足够的内存,这与基于堆的状态后端有什么不同?

与 Flink一起使用的一个明显优势EmbeddedRocksDBStateBackend是,当内存不足时,它可以溢出到磁盘。但是,如果我准备给它足够的内存,以便它永远不需要使用磁盘,那么这与使用 有何不同HashMapStateBackend

apache-flink flink-streaming

2
推荐指数
1
解决办法
983
查看次数

在容器内运行时如何访问 flink Web UI (wsl2)

在 flink 的第一步说明中,它说您可以通过本地主机链接连接到 Web UI ,我一直在寻找一种方法,使其在 Windows 10 上运行时在 wsl2 中运行。我按照链接的“第一步”页面中的所有步骤进行操作,但每次连接都被拒绝。

windows apache-flink wsl-2

2
推荐指数
1
解决办法
1530
查看次数

TumblingWindow 中的 Flink AggregateFunction 自动分割为两个窗口以实现大窗口大小

我正在使用不同的窗口大小计算一些记录的简单平均值。使用1 小时1 周的窗口没有问题,并且结果计算正确。

var keyed = src
        .filter(event -> event.getSensor_id() < 10000)
        .keyBy(Event::getSensor_id);

var hourResult = keyed
        .window(TumblingEventTimeWindows.of(Time.hours(1)))
        .aggregate(new AvgQ1(Config.HOUR))
        .setParallelism(5);

var weekResult = keyed
        .window(TumblingEventTimeWindows.of(Time.days(7)))
        .aggregate(new AvgQ1(Config.WEEK))
        .setParallelism(5);
Run Code Online (Sandbox Code Playgroud)

相反,使用1 个月(31 天)的窗口,窗口被分成两半,flink 给出两个结果作为输出,一个用于 05-1 到 05-14 的记录,另一个用于从 05-15 到 05-14 的记录。 31.

SingleOutputOperator<OutputQuery> monthResult = keyed
    .window(TumblingEventTimeWindows.of(Time.days(31)))
    .aggregate(new AvgQ1(Config.MONTH))
    .setParallelism(5);
Run Code Online (Sandbox Code Playgroud)

使用大小为 30 天的窗口,结果会分为 (05-1;05-27) 和 (05-28;05-31)。

SingleOutputOperator<OutputQuery> monthResult = keyed
    .window(TumblingEventTimeWindows.of(Time.days(30)))
    .aggregate(new AvgQ1(Config.MONTH))
    .setParallelism(5);
Run Code Online (Sandbox Code Playgroud)

这是AggregateFunction.

public class AvgQ1 implements AggregateFunction<Event, AccumulatorQ1, OutputQuery> { …
Run Code Online (Sandbox Code Playgroud)

java stream-processing apache-kafka docker apache-flink

2
推荐指数
1
解决办法
618
查看次数

flink:丢失记录了吗?

我的拓扑结构如下:( kafka(p:6)->reduce(p:6)->db writer(p:12)其中p:是并行性).

  • 我让它在单个节点"集群"上运行 taskmanager.numberOfTaskSlots: 30
  • 我知道我的kafka源正在产生~65M记录/分钟
  • kafka'读者'具有与kafka分区相同的并行度

当我观察这个工作(通过flink UI)约1分钟时,这些是我看到的值:

  • kafka - > reduce:~1.5M记录发送(关闭> 4x)
  • 减少(窗口聚合5秒) - > db写入~114K记录发送(关闭> 2x)1
  • db write - >收到的记录:~23K(关闭> 5x)2

(其他部分的发送/接收值之间存在较小的差异,但我可以将这些差异归因于测量误差)

问题:
1.其余记录在哪里?
2.运行时,此机器上的负载永远不会超过1.5.还有其他一些限制因素吗?
3.我误读了UI中的值吗?

Java 8
Flink 1.0(最新github)
机器:32核/ 96 Gb RAM

1这可以通过汇总过程来解释.
2此值与写入数据库的内容对齐.

java apache-kafka apache-flink

1
推荐指数
1
解决办法
233
查看次数

无法运行flink示例程序,连接被拒绝

./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
Run Code Online (Sandbox Code Playgroud)

根据官方的QuickStart直接运行示例程序,请按照以下步骤登录。原因似乎是java.net.ConnectException。我确定未使用端口并且防火墙已关闭。

root@maple-PC:/home/maple/Downloads/flink-1.4.2# ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8081
Starting execution of program
Submitting job with JobID: b371f7847302f8930115f093c7e32d3d. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-198847679] with leader session id 00000000-0000-0000-0000-000000000000.
12/15/2018 01:06:20 Job execution switched to status RUNNING.
12/15/2018 01:06:20 Source: Socket Stream -> Flat Map(1/1) switched to SCHEDULED 
12/15/2018 01:06:20 TriggerWindow(TumblingProcessingTimeWindows(5000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@9cfdeb73, reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@71687585}, ProcessingTimeTrigger(), …
Run Code Online (Sandbox Code Playgroud)

java apache-flink

1
推荐指数
1
解决办法
1005
查看次数