我一直在尝试将 RDD 转换为数据帧。为此,需要定义类型而不是 Any。我正在使用 spark MLLib PrefixSpan,这就是 freqSequence.sequence 的来源。我从一个包含 Session_ID、视图和购买作为字符串数组的数据框开始:
viewsPurchasesGrouped: org.apache.spark.sql.DataFrame =
[session_id: decimal(29,0), view_product_ids: array[string], purchase_product_ids: array[string]]
Run Code Online (Sandbox Code Playgroud)
然后我计算频繁模式并在数据框中需要它们,以便我可以将它们写入 Hive 表。
val viewsPurchasesRddString = viewsPurchasesGrouped.map( row => Array(Array(row(1)), Array(row(2)) ))
val prefixSpan = new PrefixSpan()
.setMinSupport(0.001)
.setMaxPatternLength(2)
val model = prefixSpan.run(viewsPurchasesRddString)
val freqSequencesRdd = sc.parallelize(model.freqSequences.collect())
case class FreqSequences(views: Array[String], purchases: Array[String], support: Long)
val viewsPurchasesDf = freqSequencesRdd.map( fs =>
{
val views = fs.sequence(0)(0)
val purchases = fs.sequence(1)(0)
val freq = fs.freq
FreqSequences(views, purchases, freq)
}
)
viewsPurchasesDf.toDF() // optional
Run Code Online (Sandbox Code Playgroud)
当我尝试运行它时,视图和购买是“Any”而不是“Array[String]”。我拼命尝试转换它们,但我得到的最好的是 Array[Any]。我想我需要将内容映射到一个字符串,我已经尝试过例如:How to get an element in WrappedArray: result of Dataset.select("x").collect()?和这个:如何在 spark (scala)和数千个其他 Stackoverflow 问题中将 WrappedArray[WrappedArray[Float]] 转换为 Array[Array[Float]] ...
我真的不知道如何解决这个问题。我想我已经将初始数据帧/RDD 转换为很多,但无法理解在哪里。
我解决了这个问题。作为参考,这有效:
val viewsPurchasesRddString = viewsPurchasesGrouped.map( row =>
Array(
row.getSeq[Long](1).toArray,
row.getSeq[Long](2).toArray
)
)
val prefixSpan = new PrefixSpan()
.setMinSupport(0.001)
.setMaxPatternLength(2)
val model = prefixSpan.run(viewsPurchasesRddString)
case class FreqSequences(views: Long, purchases: Long, frequence: Long)
val ps_frequences = model.freqSequences.filter(fs => fs.sequence.length > 1).map( fs =>
{
val views = fs.sequence(0)(0)
val purchases = fs.sequence(1)(0)
val freq = fs.freq
FreqSequences(views, purchases, freq)
}
)
ps_frequences.toDF()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
12025 次 |
| 最近记录: |