小编JSR*_*R29的帖子

如何将字节从Kafka转换为其原始对象?

我正在从Kafka提取数据,然后Array[Byte]使用默认的解码器反序列化,然后我的RDD元素如下所示(null,[B@406fa9b2)(null,[B@21a9fe0)但是我想要具有模式的原始数据,那么如何实现呢?

我以Avro格式序列化邮件。

apache-kafka apache-spark spark-streaming spark-avro

5
推荐指数
1
解决办法
2833
查看次数

Int 和 Long 的 Scala 中值函数

我是 scala 语言的新手,正在尝试实现应该适用于 Int 和 Long 的中值函数,这就是我尝试做的:

 def getMedian[T: Numeric](seq: Seq[T]): T = {
      val sortedSeq = seq.sortWith(_ < _)
      if (seq.size % 2 == 1) sortedSeq(sortedSeq.size / 2)  else {
            val (up, down) = sortedSeq.splitAt(seq.size / 2)
            (up.last + down.head) / 2
       }
  }
Run Code Online (Sandbox Code Playgroud)

但比较运算符对于数字类无效。我能做些什么来实现这个目标。

generics scala numbers

3
推荐指数
1
解决办法
1279
查看次数

如何在不覆盖的情况下将 Spark Streaming 输出写入 HDFS

经过一些处理后,我有一个 DStream[String , ArrayList[String]] ,所以当我使用 saveAsTextFile 将它写入 hdfs 并在每批后覆盖数据时,如何通过附加到以前的结果来写入新结果

output.foreachRDD(r => {
  r.saveAsTextFile(path)
})
Run Code Online (Sandbox Code Playgroud)

编辑 ::如果有人可以帮助我将输出转换为 avro 格式,然后附加到 HDFS

apache-kafka spark-streaming

2
推荐指数
1
解决办法
9124
查看次数