小编nsa*_*lar的帖子

Akka Stream Kafka vs Kafka Streams

我目前正在与Akka Stream Kafka合作与kafka互动,我很惊讶与Kafka Streams有什么不同.

我知道基于Akka的方法实现了反应性规范并处理了kafka流似乎缺乏的背压和功能.

使用kafka流比akka溪流kafka有什么好处?

scala stream-processing typesafe akka-stream apache-kafka-streams

36
推荐指数
2
解决办法
1万
查看次数

如何解析静态块之间的对象依赖性?

我最近在工作中遇到过这个问题.虽然我不确定这是一个好主意,但我不明白编译器如何处理静态块.

这是一个例子:

考虑你有类AB:

public class A {

    public final static List<Integer> list;
    static {
        list = new ArrayList<>();
    }
}

public class B {

    public final static int dependsOnA;
    static {
        dependsOnA = A.list.size();
    }
}
Run Code Online (Sandbox Code Playgroud)

还有一个只读的主要课程B.dependsOnA.

静态块B依赖于in A,因为它使用list静态变量.

现在,代码正确执行,并且NullPointerException在运行时不会引发.但是什么是确保list在其他地方潜在使用之前进行初始化的机制是什么?

java jvm classloader

33
推荐指数
7
解决办法
1481
查看次数

Spark Streaming Kafka 中的 DStream 过滤和偏移管理

我目前正在编写一个 Spark 流应用程序,它从 Kafka 读取数据并尝试在应用一些转换之前对其进行解码。

当前的代码结构如下所示:

val stream = KafkaUtils.createDirectStream[String, String](...)
 .map(record => decode(record.value())
 .filter(...)
 .foreachRDD { rdd =>
   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   ...
   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
 }
Run Code Online (Sandbox Code Playgroud)

失败的解码和过滤发生在DStream上,偏移量管理在内部完成foreachRDD,这意味着我只会提交成功的记录。

要提交失败的记录,我可以将所有内容移动到foreachRDD循环中:

val stream = KafkaUtils.createDirectStream[String, String](...)
 .foreachRDD { rdd =>
   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   ...
   // Decoding and filtering here
   ...
   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
 }
Run Code Online (Sandbox Code Playgroud)

但是,我想知道是否有另一种方法可以提交失败的记录。也许不提交失败的记录是可以接受的?

scala apache-kafka spark-streaming

5
推荐指数
1
解决办法
563
查看次数