jos*_*ele 1 hbase scala apache-spark
我正在使用Cloudera的SparkOnHBase模块来从HBase获取数据.
我以这种方式得到一个RDD:
var getRdd = hbaseContext.hbaseRDD("kbdp:detalle_feedback", scan)
Run Code Online (Sandbox Code Playgroud)
基于此,我得到的是类型的对象
RDD[(Array[Byte], List[(Array[Byte], Array[Byte], Array[Byte])])]
Run Code Online (Sandbox Code Playgroud)
它对应于行键和值列表.所有这些都由字节数组表示.
如果我将getRDD保存到文件中,我看到的是:
([B@f7e2590,[([B@22d418e2,[B@12adaf4b,[B@48cf6e81), ([B@2a5ffc7f,[B@3ba0b95,[B@2b4e651c), ([B@27d0277a,[B@52cfcf01,[B@491f7520), ([B@3042ad61,[B@6984d407,[B@f7c4db0), ([B@29d065c1,[B@30c87759,[B@39138d14), ([B@32933952,[B@5f98506e,[B@8c896ca), ([B@2923ac47,[B@65037e6a,[B@486094f5), ([B@3cd385f2,[B@62fef210,[B@4fc62b36), ([B@5b3f0f24,[B@8fb3349,[B@23e4023a), ([B@4e4e403e,[B@735bce9b,[B@10595d48), ([B@5afb2a5a,[B@1f99a960,[B@213eedd5), ([B@2a704c00,[B@328da9c4,[B@72849cc9), ([B@60518adb,[B@9736144,[B@75f6bc34)])
Run Code Online (Sandbox Code Playgroud)
对于每个记录(rowKey和列)
但我需要的是获取所有和每个键和值的String表示.或者至少是价值观.为了将它保存到文件中,看到类似的东西
key1,(value1,value2...)
Run Code Online (Sandbox Code Playgroud)
或类似的东西
key1,value1,value2...
Run Code Online (Sandbox Code Playgroud)
我对火花和斯卡拉来说是全新的,而且很难得到一些东西.
你能帮帮我吗?
首先让我们创建一些示例数据:
scala> val d = List( ("ab" -> List(("qw", "er", "ty")) ), ("cd" -> List(("ac", "bn", "afad")) ) )
d: List[(String, List[(String, String, String)])] = List((ab,List((qw,er,ty))), (cd,List((ac,bn,afad))))
Run Code Online (Sandbox Code Playgroud)
这是数据的方式:
scala> d foreach println
(ab,List((qw,er,ty)))
(cd,List((ac,bn,afad)))
Run Code Online (Sandbox Code Playgroud)
将其转换为Array[Byte]格式
scala> val arrData = d.map { case (k,v) => k.getBytes() -> v.map { case (a,b,c) => (a.getBytes(), b.getBytes(), c.getBytes()) } }
arrData: List[(Array[Byte], List[(Array[Byte], Array[Byte], Array[Byte])])] = List((Array(97, 98),List((Array(113, 119),Array(101, 114),Array(116, 121)))), (Array(99, 100),List((Array(97, 99),Array(98, 110),Array(97, 102, 97, 100)))))
Run Code Online (Sandbox Code Playgroud)
从这些数据中创建一个RDD
scala> val rdd1 = sc.parallelize(arrData)
rdd1: org.apache.spark.rdd.RDD[(Array[Byte], List[(Array[Byte], Array[Byte], Array[Byte])])] = ParallelCollectionRDD[0] at parallelize at <console>:25
Run Code Online (Sandbox Code Playgroud)
创建一个转换功能Array[Byte]来String:
scala> def b2s(a: Array[Byte]): String = new String(a)
b2s: (a: Array[Byte])String
Run Code Online (Sandbox Code Playgroud)
执行我们的最终转换:
scala> val rdd2 = rdd1.map { case (k,v) => b2s(k) -> v.map{ case (a,b,c) => (b2s(a), b2s(b), b2s(c)) } }
rdd2: org.apache.spark.rdd.RDD[(String, List[(String, String, String)])] = MapPartitionsRDD[1] at map at <console>:29
scala> rdd2.collect()
res2: Array[(String, List[(String, String, String)])] = Array((ab,List((qw,er,ty))), (cd,List((ac,bn,afad))))
Run Code Online (Sandbox Code Playgroud)