我目前正在与Akka Stream Kafka合作与kafka互动,我很惊讶与Kafka Streams有什么不同.
我知道基于Akka的方法实现了反应性规范并处理了kafka流似乎缺乏的背压和功能.
使用kafka流比akka溪流kafka有什么好处?
scala stream-processing typesafe akka-stream apache-kafka-streams
我最近在工作中遇到过这个问题.虽然我不确定这是一个好主意,但我不明白编译器如何处理静态块.
这是一个例子:
考虑你有类A和B:
public class A {
public final static List<Integer> list;
static {
list = new ArrayList<>();
}
}
public class B {
public final static int dependsOnA;
static {
dependsOnA = A.list.size();
}
}
Run Code Online (Sandbox Code Playgroud)
还有一个只读的主要课程B.dependsOnA.
静态块B依赖于in A,因为它使用list静态变量.
现在,代码正确执行,并且NullPointerException在运行时不会引发.但是什么是确保list在其他地方潜在使用之前进行初始化的机制是什么?
我目前正在编写一个 Spark 流应用程序,它从 Kafka 读取数据并尝试在应用一些转换之前对其进行解码。
当前的代码结构如下所示:
val stream = KafkaUtils.createDirectStream[String, String](...)
.map(record => decode(record.value())
.filter(...)
.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
...
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
Run Code Online (Sandbox Code Playgroud)
失败的解码和过滤发生在DStream上,偏移量管理在内部完成foreachRDD,这意味着我只会提交成功的记录。
要提交失败的记录,我可以将所有内容移动到foreachRDD循环中:
val stream = KafkaUtils.createDirectStream[String, String](...)
.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
...
// Decoding and filtering here
...
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
Run Code Online (Sandbox Code Playgroud)
但是,我想知道是否有另一种方法可以提交失败的记录。也许不提交失败的记录是可以接受的?