Abh*_*hek 9 scala apache-spark apache-spark-sql pyspark apache-spark-ml
考虑一下这里给出的代码,
https://spark.apache.org/docs/1.2.0/ml-guide.html
import org.apache.spark.ml.classification.LogisticRegression
val training = sparkContext.parallelize(Seq(
LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))))
val lr = new LogisticRegression()
lr.setMaxIter(10).setRegParam(0.01)
val model1 = lr.fit(training)
Run Code Online (Sandbox Code Playgroud)
假设我们使用sqlContext.read()将"training"作为数据框读取,我们是否应该做类似的事情
val model1 = lr.fit(sparkContext.parallelize(training)) // or some variation of this
Run Code Online (Sandbox Code Playgroud)
或者,在传递dataFrame时,fit函数将自动处理并行计算/数据
问候,
zer*_*323 13
DataFrame是一种分布式数据结构.它既不是必需也不可能parallelize.SparkConext.parallelizemethod仅用于驻留在驱动程序内存中的分布式本地数据结构.您不应该习惯于分发大型数据集,更不用说重新分发RDDs或更高级别的数据结构(就像您在上一个问题中所做的那样)
sc.parallelize(trainingData.collect())
Run Code Online (Sandbox Code Playgroud)
如果要在RDD/ Dataframe(Dataset)之间进行转换,请使用旨在执行此操作的方法:
从DataFrame到RDD:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
val df: DataFrame = Seq(("foo", 1), ("bar", 2)).toDF("k", "v")
val rdd: RDD[Row] = df.rdd
Run Code Online (Sandbox Code Playgroud)形成RDD于DataFrame:
val rdd: RDD[(String, Int)] = sc.parallelize(Seq(("foo", 1), ("bar", 2)))
val df1: DataFrame = rdd.toDF
// or
val df2: DataFrame = spark.createDataFrame(rdd) // From 1.x use sqlContext
Run Code Online (Sandbox Code Playgroud)您可能应该查看 RDD 和 DataFrame 之间的区别以及如何在两者之间进行转换:Difference between DataFrame and RDD in Spark
直接回答您的问题:DataFrame 已经针对并行执行进行了优化。你不需要做任何事情,你可以直接将它传递给任何 spark estimators fit() 方法。并行执行在后台处理。
| 归档时间: |
|
| 查看次数: |
18638 次 |
| 最近记录: |