标签: apache-flink

从检查点恢复 Apache Flink 作业

我正在使用 Apache Flink + RabbitMQ 堆栈。我知道有机会手动触发保存点并从中恢复作业,但问题是 Flink 在成功检查点后确认消息,如果您想创建保存点和恢复状态,您将丢失上次成功保存点和上次成功检查点之间的所有数据. 有没有办法从检查点恢复工作?这将解决在不可重播数据源(如rabbitmq)的情况下丢失数据的问题。顺便说一句,如果我们有检查点及其所有开销,为什么不让用户使用它们?

rabbitmq apache-flink flink-streaming

3
推荐指数
1
解决办法
4011
查看次数

Apache Flink(如何唯一标记作业)

是否可以使用唯一名称标记作业,以便我可以在以后停止它们?。我真的不想 grep 和持久化作业 ID。

简而言之,作为部署的一部分,我想停止一项工作并部署新的工作。

apache-flink flink-streaming

3
推荐指数
1
解决办法
2599
查看次数

如何在DataStream程序中配置用户功能?

我创建了一个流媒体环境的配置,并试图在访问此配置open()的方法RichMapFunction

例子:

    Configuration conf = new Configuration();
    conf.setBoolean("a", true);
    StreamExecutionEnvironment env = 
        StreamExecutionEnvironment.createLocalEnvironment(8, conf);

    DataStreamSource<Integer> source = env.fromElements(5,5,5,5,5);
    source.map(new RichMapFunction<Integer, Integer>() {

        @Override
        public void open(Configuration parameters) throws Exception {
            boolean a = parameters.getBoolean("a", false);
            super.open(parameters);
        }

        @Override
        public Integer map(Integer value) throws Exception {
            return value;
        }
    }).print();

    env.execute();
Run Code Online (Sandbox Code Playgroud)

但是,在调试该open()方法时,我发现配置为空。

我究竟做错了什么?如何RichFunction在流媒体环境中将配置正确传递给 a ?

apache-flink

3
推荐指数
1
解决办法
1292
查看次数

分配处理时通常有多少开销?

对于不耐烦的读者:这是一项正在进行的工作,我在此过程中寻求帮助。请不要根据我的临时数据来判断这些工具,因为在我尝试获得更好的结果时它们可能会发生变化。

我们正处于架构决策过程的中间,该工具用于分析协同仿真的输出。

作为该过程的一部分,我被要求编写一个基准测试工具,并获取有关多个分布式处理框架速度的数据。

我测试的框架是:Apache Spark、Apache Flink、Hazelcast Jet。并作为比较基准纯 Java。

我使用的测试用例是一个简单的“这是一个 Pojo 列表,pojo 中的一个字段是双精度值。找到最小的(最小)值”。

简单,直接,希望具有高度可比性。

四分之三的测试使用一个简单的比较器,第四个(flink)使用与比较器基本相同的减速器。分析函数如下所示:

Java: double min = logs.stream().min(new LogPojo.Comp()).get().getValue();

Spark: JavaRDD<LogPojo> logData = sc.parallelize(logs, num_partitions);
double min = logData.min(new LogPojo.Comp()).getValue();

Hazel: IStreamList<LogPojo> iLogs = jet.getList("logs");
iLogs.addAll(logs);
double min = iLogs.stream().min(new LogPojo.Comp()).get().getValue();

Flink: DataSet<LogPojo> logSet = env.fromCollection(logs);
double min = logSet.reduce(new LogReducer()).collect().get(0).getValue();
Run Code Online (Sandbox Code Playgroud)

我对此进行了广泛的测试,改变了测试列表的大小以及分配的资源。结果让我大吃一惊。最佳结果如下(所有数字以毫秒为单位,1 个 mio pojo,每个 10 个测试):

  • 实例:声明和启动框架实例需要多长时间
  • 列表:解析/传输列表到框架“列表”需要多长时间
  • process:处理数据需要多长时间才能检索到最小值
  • 总体:从每个测试的开始到结束

结果:

java:
Instances: 
List: 
Process: 37, 24, 16, 17, 16, 16, 16, 16, 16, 16, 
Overall: 111, 24, …
Run Code Online (Sandbox Code Playgroud)

java performance apache-spark apache-flink hazelcast-jet

3
推荐指数
1
解决办法
424
查看次数

Apache Flink:如何并行执行但保持消息顺序?

我有几个关于flink的并行性的问题。这是我的设置:

我有 1 个主节点和 2 个从节点。在 flink 中,我创建了 3 个 kafka 消费者,每个消费者都来自不同的主题。
由于元素的顺序对我来说很重要,每个主题只有一个分区,我有 flink 设置来使用事件时间。

然后我在每个数据流上运行以下管道(以伪代码):

source
.map(deserialize)
.window
.apply
.map(serialize)
.writeTo(sink)
Run Code Online (Sandbox Code Playgroud)

到目前为止,我以-p 2假设这将允许我使用我的两个节点的参数启动我的 flink 程序。结果不是我所希望的,因为我的输出顺序有时会混乱。

在阅读了 flink 文档并试图更好地理解它之后,有人可以确认我的以下“学习”吗?

1.) Passing-p 2仅配置任务并行度,即一个任务(例如map(deserialize))将被拆分成的最大并行实例数。如果我想在整个管道中保持订单,我必须使用-p 1.

2.) 这对我来说似乎矛盾/令人困惑:即使并行度设置为 1,不同的任务仍然可以并行(同时)运行。因此,如果我通过 ,我的 3 个管道也将并行运行-p 1

作为一个后续问题:有没有办法找出哪些任务映射到哪个任务槽,以便我可以自己确认并行执行?

我将不胜感激任何输入!

更新

下面是 flink 的执行计划-p 2

parallel-processing apache-kafka apache-flink

3
推荐指数
1
解决办法
3735
查看次数

我们可以在 Flink 中结合计数和处理时间触发器吗?

我想让 Windows 在滚动处理时间达到 100 或每 5 秒后完成?也就是说当元素达到100时,触发Windows计算,​​但如果元素没有达到100,但时间过去了5秒,也会触发Windows计算,​​就像下面两个触发器的组合:

.countWindow(100)

.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

apache-flink flink-streaming

3
推荐指数
1
解决办法
1889
查看次数

Flink IOException:网络缓冲区数量不足

我正在使用Flink v1.4.0. 我正在使用DataSet API(虽然这个,我认为无关紧要)。

我正在 12 核 VM 上运行一些重型转换。我正在使用 2 个内核Flink job,其中我将一些数据存储到一个内核中,并使用剩余的 10 个内核Flink Queryable State运行另一个Flink作业。

当我用 10 个内核运行第二个作业时,我似乎收到以下错误:

java.io.IOException: Insufficient number of network buffers: required 10, but only 9 available. The total number of network buffers is currently set to 4096 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
            at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:257)
            at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:199)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:618)
            at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

如果我确实用 8 …

java apache-flink

3
推荐指数
1
解决办法
4295
查看次数

测试kafka和flink集成流程

我想测试卡夫卡/弗林克集成FlinkKafkaConsumer011FlinkKafkaProducer011例如。

该过程将是:

  1. 使用 Flink 从 kafka 主题中读取
  2. 使用 Flink 进行一些操作
  3. 用 Flink 写入另一个 kafka topic

使用字符串示例,从输入主题中读取字符串,转换为大写,写入新主题。

问题是如何测试流量?

当我说测试时,这是单元/集成测试。

谢谢!

integration-testing scala apache-kafka apache-flink

3
推荐指数
1
解决办法
2530
查看次数

Flink WaterMark 和触发器 - 未在事件时间丢弃后期元素?

我对 Flink 在事件时间加水印时如何处理后期元素感到有些困惑。

我的理解是,当 Flink 读取数据流时,水印时间会在看到任何事件时间比当前水印事件时间大的数据时进行。然后,任何覆盖时间严格小于水印的窗口都会被触发驱逐(假设没有延迟允许。

但是,以这个最小的例子为例:

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import org.apache.log4j.{Level, Logger}

object EventTimeExample {

  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("akka").setLevel(Level.OFF)

  case class ExampleType(time: Long, value: Long)

  def main(args: Array[String]) {

    // Set up environment
    val env = StreamExecutionEnvironment.createLocalEnvironment(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // Example S3 path
    val simple = env.fromCollection(Seq(
      ExampleType(1525132800000L, 1),
      ExampleType(1525132800000L, 2) ,
      ExampleType(1525132920000L, 3),
      ExampleType(1525132800000L, 4)
    ))
      .assignAscendingTimestamps(_.time)

    val windows = simple
      .windowAll(TumblingEventTimeWindows.of(Time.seconds(60)))
      .apply{
       (window, iter, collector: Collector[(Long, Long, String)]) => {
        collector.collect(window.getStart, …
Run Code Online (Sandbox Code Playgroud)

watermark apache-flink flink-streaming

3
推荐指数
1
解决办法
2212
查看次数

Apache Flink 中的 SlotSharingGroup 是什么?

参考:https : //ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.html

定义:“一个槽共享单元定义了可以在一个槽内一起部署哪些不同的任务(来自不同的工作顶点)。”

有人可以详细说明一下吗?

apache-flink

3
推荐指数
1
解决办法
1677
查看次数