CRM*_*CRM 3 csv scala apache-spark
我是Spark和scala编程语言的新手.我的输入是CSV文件.我需要在csv文件中的值上构建一个倒排索引,如下面的示例所示.
Input: file.csv
attr1, attr2, attr3
1, AAA, 23
2, BBB, 23
3, AAA, 27
output format: value -> (rowid, collumnid) pairs
for example: AAA -> ((1,2),(3,2))
27 -> (3,3)
Run Code Online (Sandbox Code Playgroud)
我已经开始使用以下代码了.那之后我被困住了.请帮助.
object Main {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Invert Me!").setMaster("local[2]")
val sc = new SparkContext(conf)
val txtFilePath = "/home/person/Desktop/sample.csv"
val txtFile = sc.textFile(txtFilePath)
val nRows = txtFile.count()
val data = txtFile.map(line => line.split(",").map(elem => elem.trim()))
val nCols = data.collect()(0).length
}
}
Run Code Online (Sandbox Code Playgroud)
保留你的风格的代码看起来像
val header = sc.broadcast(data.first())
val cells = data.zipWithIndex().filter(_._2 > 0).flatMap { case (row, index) =>
row.zip(header.value).map { case (value, column) => value ->(column, index) }
}
val index: RDD[(String, Vector[(String, Long)])] =
cells.aggregateByKey(Vector.empty[(String, Long)])(_ :+ _, _ ++ _)
Run Code Online (Sandbox Code Playgroud)
这里的index值应包含CellValue对的所需映射(ColumnName, RowIndex)
上面方法中的下划线只是快捷方式的lambdas,它可以用另一种方式编写
val cellsVerbose = data.zipWithIndex().flatMap {
case (row, 1) => IndexedSeq.empty // skipping header row
case (row, index) => row.zip(header.value).map {
case (value, column) => value ->(column, index)
}
}
val indexVerbose: RDD[(String, Vector[(String, Long)])] =
cellsVerbose.aggregateByKey(zeroValue = Vector.empty[(String, Long)])(
seqOp = (keys, key) => keys :+ key,
combOp = (keysA, keysB) => keysA ++ keysB)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2335 次 |
| 最近记录: |