我正在使用 Apache Flink + RabbitMQ 堆栈。我知道有机会手动触发保存点并从中恢复作业,但问题是 Flink 在成功检查点后确认消息,如果您想创建保存点和恢复状态,您将丢失上次成功保存点和上次成功检查点之间的所有数据. 有没有办法从检查点恢复工作?这将解决在不可重播数据源(如rabbitmq)的情况下丢失数据的问题。顺便说一句,如果我们有检查点及其所有开销,为什么不让用户使用它们?
是否可以使用唯一名称标记作业,以便我可以在以后停止它们?。我真的不想 grep 和持久化作业 ID。
简而言之,作为部署的一部分,我想停止一项工作并部署新的工作。
我创建了一个流媒体环境的配置,并试图在访问此配置open()的方法RichMapFunction。
例子:
Configuration conf = new Configuration();
conf.setBoolean("a", true);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(8, conf);
DataStreamSource<Integer> source = env.fromElements(5,5,5,5,5);
source.map(new RichMapFunction<Integer, Integer>() {
@Override
public void open(Configuration parameters) throws Exception {
boolean a = parameters.getBoolean("a", false);
super.open(parameters);
}
@Override
public Integer map(Integer value) throws Exception {
return value;
}
}).print();
env.execute();
Run Code Online (Sandbox Code Playgroud)
但是,在调试该open()方法时,我发现配置为空。
我究竟做错了什么?如何RichFunction在流媒体环境中将配置正确传递给 a ?
对于不耐烦的读者:这是一项正在进行的工作,我在此过程中寻求帮助。请不要根据我的临时数据来判断这些工具,因为在我尝试获得更好的结果时它们可能会发生变化。
我们正处于架构决策过程的中间,该工具用于分析协同仿真的输出。
作为该过程的一部分,我被要求编写一个基准测试工具,并获取有关多个分布式处理框架速度的数据。
我测试的框架是:Apache Spark、Apache Flink、Hazelcast Jet。并作为比较基准纯 Java。
我使用的测试用例是一个简单的“这是一个 Pojo 列表,pojo 中的一个字段是双精度值。找到最小的(最小)值”。
简单,直接,希望具有高度可比性。
四分之三的测试使用一个简单的比较器,第四个(flink)使用与比较器基本相同的减速器。分析函数如下所示:
Java: double min = logs.stream().min(new LogPojo.Comp()).get().getValue();
Spark: JavaRDD<LogPojo> logData = sc.parallelize(logs, num_partitions);
double min = logData.min(new LogPojo.Comp()).getValue();
Hazel: IStreamList<LogPojo> iLogs = jet.getList("logs");
iLogs.addAll(logs);
double min = iLogs.stream().min(new LogPojo.Comp()).get().getValue();
Flink: DataSet<LogPojo> logSet = env.fromCollection(logs);
double min = logSet.reduce(new LogReducer()).collect().get(0).getValue();
Run Code Online (Sandbox Code Playgroud)
我对此进行了广泛的测试,改变了测试列表的大小以及分配的资源。结果让我大吃一惊。最佳结果如下(所有数字以毫秒为单位,1 个 mio pojo,每个 10 个测试):
结果:
java:
Instances:
List:
Process: 37, 24, 16, 17, 16, 16, 16, 16, 16, 16,
Overall: 111, 24, …Run Code Online (Sandbox Code Playgroud) 我有几个关于flink的并行性的问题。这是我的设置:
我有 1 个主节点和 2 个从节点。在 flink 中,我创建了 3 个 kafka 消费者,每个消费者都来自不同的主题。
由于元素的顺序对我来说很重要,每个主题只有一个分区,我有 flink 设置来使用事件时间。
然后我在每个数据流上运行以下管道(以伪代码):
source
.map(deserialize)
.window
.apply
.map(serialize)
.writeTo(sink)
Run Code Online (Sandbox Code Playgroud)
到目前为止,我以-p 2假设这将允许我使用我的两个节点的参数启动我的 flink 程序。结果不是我所希望的,因为我的输出顺序有时会混乱。
在阅读了 flink 文档并试图更好地理解它之后,有人可以确认我的以下“学习”吗?
1.) Passing-p 2仅配置任务并行度,即一个任务(例如map(deserialize))将被拆分成的最大并行实例数。如果我想在整个管道中保持订单,我必须使用-p 1.
2.) 这对我来说似乎矛盾/令人困惑:即使并行度设置为 1,不同的任务仍然可以并行(同时)运行。因此,如果我通过 ,我的 3 个管道也将并行运行-p 1。
作为一个后续问题:有没有办法找出哪些任务映射到哪个任务槽,以便我可以自己确认并行执行?
我将不胜感激任何输入!
更新
下面是 flink 的执行计划-p 2。
我想让 Windows 在滚动处理时间达到 100 或每 5 秒后完成?也就是说当元素达到100时,触发Windows计算,但如果元素没有达到100,但时间过去了5秒,也会触发Windows计算,就像下面两个触发器的组合:
.countWindow(100)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
我正在使用Flink v1.4.0. 我正在使用DataSet API(虽然这个,我认为无关紧要)。
我正在 12 核 VM 上运行一些重型转换。我正在使用 2 个内核Flink job,其中我将一些数据存储到一个内核中,并使用剩余的 10 个内核Flink Queryable State运行另一个Flink作业。
当我用 10 个内核运行第二个作业时,我似乎收到以下错误:
java.io.IOException: Insufficient number of network buffers: required 10, but only 9 available. The total number of network buffers is currently set to 4096 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:257)
at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:199)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:618)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
如果我确实用 8 …
我想测试卡夫卡/弗林克集成FlinkKafkaConsumer011和FlinkKafkaProducer011例如。
该过程将是:
使用字符串示例,从输入主题中读取字符串,转换为大写,写入新主题。
问题是如何测试流量?
当我说测试时,这是单元/集成测试。
谢谢!
我对 Flink 在事件时间加水印时如何处理后期元素感到有些困惑。
我的理解是,当 Flink 读取数据流时,水印时间会在看到任何事件时间比当前水印事件时间大的数据时进行。然后,任何覆盖时间严格小于水印的窗口都会被触发驱逐(假设没有延迟允许。
但是,以这个最小的例子为例:
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import org.apache.log4j.{Level, Logger}
object EventTimeExample {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
case class ExampleType(time: Long, value: Long)
def main(args: Array[String]) {
// Set up environment
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// Example S3 path
val simple = env.fromCollection(Seq(
ExampleType(1525132800000L, 1),
ExampleType(1525132800000L, 2) ,
ExampleType(1525132920000L, 3),
ExampleType(1525132800000L, 4)
))
.assignAscendingTimestamps(_.time)
val windows = simple
.windowAll(TumblingEventTimeWindows.of(Time.seconds(60)))
.apply{
(window, iter, collector: Collector[(Long, Long, String)]) => {
collector.collect(window.getStart, …Run Code Online (Sandbox Code Playgroud) 参考:https : //ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.html
定义:“一个槽共享单元定义了可以在一个槽内一起部署哪些不同的任务(来自不同的工作顶点)。”
有人可以详细说明一下吗?
apache-flink ×10
apache-kafka ×2
java ×2
apache-spark ×1
performance ×1
rabbitmq ×1
scala ×1
watermark ×1