假设我在键控过程后得到一个流。
DataStream<T> stream= sourceStream.keyBy(key).window(window).apply(function);
Run Code Online (Sandbox Code Playgroud)
生成的流是否仍被键入?我可以在该流中使用某些 Keyed 状态吗?
我们的项目中有flink-conf.yaml本地运行。我们希望能够在本地运行 flink 进行测试。我们团队的一部分使用 Mac,另一部分使用 PC。我们想设置state.checkpoint.dir一些普遍接受的路径,最好在用户的主目录下。问题是,是否可以根据我们运行的操作系统以编程方式设置它,如果没有,在此设置中当前用户的主目录是否有一个普遍有效的快捷方式,例如 *NIX 上的“~”?
我们正在构建一个流处理管道来处理/摄取 Kafka 消息。我们使用的是 Flink v1.12.2。在定义源水印策略时,在官方文档中,我遇到了两种开箱即用的水印策略;对于有界无序和对于单调时间戳。我确实浏览了 javadoc,但没有完全理解何时以及为什么应该使用一种策略而不是另一种策略。时间戳基于事件时间。谢谢。
我有一个配置有 Kafka 连接器的 Flink 管道。
我使用以下命令将水印生成频率设置为 2 秒:
env.getConfig().setAutoWatermarkInterval(2000);
Run Code Online (Sandbox Code Playgroud)
现在,对于流窗口,我的滚动窗口为 60 秒,我们在其中进行一些聚合,并且根据数据字段之一的时间戳进行基于事件时间的处理。
我尚未为我的水印策略或我的流配置 allowedLateness。
final ConnectorConfig topicConfig = config.forTopic("mytopic");
final FlinkKafkaConsumer<MyPojo> myEvents = new FlinkKafkaConsumer<>(
topicConfig.name(),
AvroDeserializationSchema.forSpecific(MyPojo.class),
topicConfig.forConsumer()
);
myEvents.setStartFromLatest();
myEvents.assignTimestampsAndWatermarks(
WatermarkStrategy
.<MyPojo>forBoundedOutOfOrderness(
Duration.ofSeconds(30))
.withIdleness(Duration.ofSeconds(120))
.withTimestampAssigner((evt, timestamp) -> evt.event_timestamp_field));
Run Code Online (Sandbox Code Playgroud)
Q.1 根据我所读到的内容,我的时间 0-60 的窗口将在 90 秒后计算,30-90 在 120 秒后计算,依此类推。然而,由于我们正在做翻滚窗口,即没有重叠,我的猜测是没有 30-90 窗口,0-60 之后的下一个窗口是 60-120,在 150 秒标记处触发,我是对的吗?
Q.2 如果没有 allowedLateness,所有迟到的数据都将被丢弃,例如。90 秒后到达的时间戳为 45 的事件被认为是无序的,将超出第一个窗口,即 0-60。对于窗口 60-120,事件时间戳不匹配,因此它将被丢弃并且不包含在窗口在 150 秒处触发,对吗?
Q.3. 对于源空闲持续时间,我选择 120,表示如果该主题的任何 Kakfa 分区不活动且有数据,则在 2 分钟后将其标记为空闲,然后发送其他活动分区的水印。我的问题是选择这个数字,即 2 分钟,以及它是否与窗口持续时间(60 秒)或无序(30 秒)有关。如果是这样,我应该在这里记住什么来进行适当的选择,这样我就不会因为空闲分区导致的非前进水印而导致数据滞留?
或者 120 的等待时间太长,我可能会丢失数据,因此我应该将其设置为远小于 OutOfOrderness 持续时间以确保 …
与 Flink一起使用的一个明显优势EmbeddedRocksDBStateBackend是,当内存不足时,它可以溢出到磁盘。但是,如果我准备给它足够的内存,以便它永远不需要使用磁盘,那么这与使用 有何不同HashMapStateBackend?
在 flink 的第一步说明中,它说您可以通过本地主机链接连接到 Web UI ,我一直在寻找一种方法,使其在 Windows 10 上运行时在 wsl2 中运行。我按照链接的“第一步”页面中的所有步骤进行操作,但每次连接都被拒绝。
我正在使用不同的窗口大小计算一些记录的简单平均值。使用1 小时和1 周的窗口没有问题,并且结果计算正确。
var keyed = src
.filter(event -> event.getSensor_id() < 10000)
.keyBy(Event::getSensor_id);
var hourResult = keyed
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.aggregate(new AvgQ1(Config.HOUR))
.setParallelism(5);
var weekResult = keyed
.window(TumblingEventTimeWindows.of(Time.days(7)))
.aggregate(new AvgQ1(Config.WEEK))
.setParallelism(5);
Run Code Online (Sandbox Code Playgroud)
相反,使用1 个月(31 天)的窗口,窗口被分成两半,flink 给出两个结果作为输出,一个用于 05-1 到 05-14 的记录,另一个用于从 05-15 到 05-14 的记录。 31.
SingleOutputOperator<OutputQuery> monthResult = keyed
.window(TumblingEventTimeWindows.of(Time.days(31)))
.aggregate(new AvgQ1(Config.MONTH))
.setParallelism(5);
Run Code Online (Sandbox Code Playgroud)
使用大小为 30 天的窗口,结果会分为 (05-1;05-27) 和 (05-28;05-31)。
SingleOutputOperator<OutputQuery> monthResult = keyed
.window(TumblingEventTimeWindows.of(Time.days(30)))
.aggregate(new AvgQ1(Config.MONTH))
.setParallelism(5);
Run Code Online (Sandbox Code Playgroud)
这是AggregateFunction.
public class AvgQ1 implements AggregateFunction<Event, AccumulatorQ1, OutputQuery> { …Run Code Online (Sandbox Code Playgroud) 我的拓扑结构如下:( kafka(p:6)->reduce(p:6)->db writer(p:12)其中p:是并行性).
taskmanager.numberOfTaskSlots: 30 当我观察这个工作(通过flink UI)约1分钟时,这些是我看到的值:
(其他部分的发送/接收值之间存在较小的差异,但我可以将这些差异归因于测量误差)
问题:
1.其余记录在哪里?
2.运行时,此机器上的负载永远不会超过1.5.还有其他一些限制因素吗?
3.我误读了UI中的值吗?
Java 8
Flink 1.0(最新github)
机器:32核/ 96 Gb RAM
1这可以通过汇总过程来解释.
2此值与写入数据库的内容对齐.
./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
Run Code Online (Sandbox Code Playgroud)
根据官方的QuickStart直接运行示例程序,请按照以下步骤登录。原因似乎是java.net.ConnectException。我确定未使用端口并且防火墙已关闭。
root@maple-PC:/home/maple/Downloads/flink-1.4.2# ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8081
Starting execution of program
Submitting job with JobID: b371f7847302f8930115f093c7e32d3d. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-198847679] with leader session id 00000000-0000-0000-0000-000000000000.
12/15/2018 01:06:20 Job execution switched to status RUNNING.
12/15/2018 01:06:20 Source: Socket Stream -> Flat Map(1/1) switched to SCHEDULED
12/15/2018 01:06:20 TriggerWindow(TumblingProcessingTimeWindows(5000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@9cfdeb73, reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@71687585}, ProcessingTimeTrigger(), …Run Code Online (Sandbox Code Playgroud)