小编Jim*_*cks的帖子

将Spark Dataframe转换为Scala Map集合

我正在尝试找到将整个Spark数据帧转换为scala Map集合的最佳解决方案.最好说明如下:

从这里开始(在Spark示例中):

val df = sqlContext.read.json("examples/src/main/resources/people.json")

df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
Run Code Online (Sandbox Code Playgroud)

Scala集合(Map of Maps)代表如下:

val people = Map(
Map("age" -> null, "name" -> "Michael"),
Map("age" -> 30, "name" -> "Andy"),
Map("age" -> 19, "name" -> "Justin")
)
Run Code Online (Sandbox Code Playgroud)

dataframe apache-spark apache-spark-sql

10
推荐指数
2
解决办法
1万
查看次数

将 VectorAssembler 添加到 Spark ML Pipeline 时出错

尝试将 VectorAssembler 添加到 GBT 管道示例并得到管道无法找到 features 字段的错误。我引入了一个示例文件而不是 libsvm,所以我需要转换功能集集。

错误:线程“main”中的异常 java.lang.IllegalArgumentException:字段“features”不存在。

 val df = sqlContext.read
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("data/training_example.csv")

val sampleDF = df.sample(false,0.05,987897L)

val assembler = new VectorAssembler()
  .setInputCols(Array("val1","val2","val3",...,"valN"))
  .setOutputCol("features")

val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(sampleDF)

val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(sampleDF)

val Array(trainingData, testData) = sampleDF.randomSplit(Array(0.7, 0.3))

val gbt = new GBTClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(3)
  .setMaxDepth(5)

val pipeline = new Pipeline()
  .setStages(Array(assembler,labelIndexer,featureIndexer,gbt))

val model = pipeline.fit(trainingData)

val predictions = model.transform(testData)

predictions.show(10)
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-mllib

4
推荐指数
1
解决办法
2243
查看次数