我正在从Kafka提取数据,然后Array[Byte]使用默认的解码器反序列化,然后我的RDD元素如下所示(null,[B@406fa9b2):(null,[B@21a9fe0)但是我想要具有模式的原始数据,那么如何实现呢?
我以Avro格式序列化邮件。
我是 scala 语言的新手,正在尝试实现应该适用于 Int 和 Long 的中值函数,这就是我尝试做的:
def getMedian[T: Numeric](seq: Seq[T]): T = {
val sortedSeq = seq.sortWith(_ < _)
if (seq.size % 2 == 1) sortedSeq(sortedSeq.size / 2) else {
val (up, down) = sortedSeq.splitAt(seq.size / 2)
(up.last + down.head) / 2
}
}
Run Code Online (Sandbox Code Playgroud)
但比较运算符对于数字类无效。我能做些什么来实现这个目标。
经过一些处理后,我有一个 DStream[String , ArrayList[String]] ,所以当我使用 saveAsTextFile 将它写入 hdfs 并在每批后覆盖数据时,如何通过附加到以前的结果来写入新结果
output.foreachRDD(r => {
r.saveAsTextFile(path)
})
Run Code Online (Sandbox Code Playgroud)
编辑 ::如果有人可以帮助我将输出转换为 avro 格式,然后附加到 HDFS