小编vde*_*dep的帖子

在Spark 2.0中使用SparkSession时的parallelize()方法

我看到SparkSession没有.parallelize()方法,我们需要SparkContext再次使用来创建RDD吗?如果是这样,既创造SparkSessionSparkContext在一个程序中可取?

apache-spark

16
推荐指数
1
解决办法
1万
查看次数

替换spark Dataframe中所有列名称中的空格

我有火花数据帧,其中一些列名称中有空格,必须用下划线替换.

我知道可以使用withColumnRenamed()sparkSQL重命名单个列,但要重命名'n'个列,这个函数必须链接'n'次(据我所知).

要自动执行此操作,我尝试过:

val old_names = df.columns()        // contains array of old column names

val new_names = old_names.map { x => 
   if(x.contains(" ") == true) 
      x.replaceAll("\\s","_") 
   else x 
}                    // array of new column names with removed whitespace.
Run Code Online (Sandbox Code Playgroud)

现在,如何用.替换df的标头 new_names

scala apache-spark apache-spark-sql spark-dataframe

9
推荐指数
4
解决办法
1万
查看次数

如何在不转换DataFrame并访问数据集的情况下向数据集添加列?

我知道使用.withColumn()和a 向Spark DataSet添加新列的方法UDF,它返回一个DataFrame.我也知道,我们可以将生成的DataFrame转换为DataSet.

我的问题是:

  1. 如果我们仍然遵循传统的DF方法(即将列名称作为UDF输入的字符串传递),DataSet的类型安全性如何在这里发挥作用
  2. 是否存在访问列的"面向对象的方式"(不将列名作为字符串传递),就像我们以前用RDD一样,用于追加新列.
  3. 如何在正常操作中访问新列,如map,filter等?

例如:

    scala> case class Temp(a : Int, b : String)    //creating case class
    scala> val df = Seq((1,"1str"),(2,"2str),(3,"3str")).toDS    // creating DS
    scala> val appendUDF = udf( (b : String) => b + "ing")      // sample UDF

    scala> df.withColumn("c",df("b"))   // adding a new column
    res5: org.apache.spark.sql.DataFrame = [a: int, b: string ... 1 more field]

    scala> res5.as[Temp]   // converting to DS
    res6: org.apache.spark.sql.Dataset[Temp] = [a: int, b: string ... 1 …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark

8
推荐指数
1
解决办法
6810
查看次数

UDF的vs Spark sql vs列表达式性能优化

我知道UDFs是Spark的完整黑盒子,不会尝试优化它.但是Column在(https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Column)中列出的类型及其功能的使用是否会
成为函数"符合条件" Catalyst Optimizer

例如,UDF通过添加1到现有列来创建新列

val addOne = udf( (num: Int) => num + 1 )
df.withColumn("col2", addOne($"col1"))
Run Code Online (Sandbox Code Playgroud)

相同的功能,使用Column类型:

def addOne(col1: Column) = col1.plus(1)
df.withColumn("col2", addOne($"col1"))
Run Code Online (Sandbox Code Playgroud)

要么

spark.sql("select *, col1 + 1 from df")
Run Code Online (Sandbox Code Playgroud)

他们之间的表现会有什么不同吗?

scala apache-spark apache-spark-sql spark-dataframe

8
推荐指数
1
解决办法
1650
查看次数

使用mongo-scala-driver从Scala中的Mongodb打印查询结果

我试图在Scala中打印MongoDB查询的结果

val mongoClient: MongoClient = MongoClient()
val database: MongoDatabase = mongoClient.getDatabase("tableScala")
val collection: MongoCollection[Document] = database.getCollection("tableScala")

collection.find().printResults()
Run Code Online (Sandbox Code Playgroud)

抛出的错误是:Cannot resolve symbol printResults.建议使用mongo-scala-driver版本的一些其他问题的答案1.2,因为版本1.1及以下没有实现printResults()

SBT档案:

name := "scalaMongoDriver"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "org.mongodb.scala" %% "mongo-scala-driver" % "1.2.0-beta1"
Run Code Online (Sandbox Code Playgroud)

尝试使用以下方式手动打印:

collection.find().subscribe(
      (user: Document) => println(user.toJson()),                         // onNext
      (error: Throwable) => println(s"Query failed: ${error.getMessage}"), // onError
      () => println("Done")                                               // onComplete
    ) 
Run Code Online (Sandbox Code Playgroud)

得到以下信息:

信息:ReadPreferenceServerSelector {readPreference = primary}从集群描述ClusterDescription {type = UNKNOWN,connectionMode = SINGLE,serverDescriptions = [ServerDescription …

scala mongodb mongodb-scala mongodb.driver

5
推荐指数
1
解决办法
1540
查看次数

在Java Spark中将RDD转换为Dataset

我有一个RDD,我需要将其转换为数据集,我试过:

Dataset<Person> personDS =  sqlContext.createDataset(personRDD, Encoders.bean(Person.class));
Run Code Online (Sandbox Code Playgroud)

以上行抛出错误,

无法解析方法createDataset(org.apache.spark.api.java.JavaRDD Main.Person,org.apache.spark.sql.Encoder T)

但是,我可以转换为Dataset转换为Dataframe.以下代码有效:

Dataset<Row> personDF = sqlContext.createDataFrame(personRDD, Person.class);
Dataset<Person> personDS = personDF.as(Encoders.bean(Person.class));
Run Code Online (Sandbox Code Playgroud)

java apache-spark

5
推荐指数
1
解决办法
1万
查看次数

如何将List [Double]转换为列?

我有List[Double],如何将其转换为org.apache.spark.sql.Column.我试图将其作为列插入.withColumn()到现有的DataFrame.

scala dataframe apache-spark apache-spark-sql

4
推荐指数
1
解决办法
4164
查看次数

Spark 将新的拟合阶段添加到现有 PipelineModel 中,无需再次拟合

我有一个保存的 PipelineModel:

pipe_model = pipe.fit(df_train)
pipe_model.write().overwrite().save("/user/pipe_text_2")
Run Code Online (Sandbox Code Playgroud)

现在我想向此管道添加一个新的已安装的 PipelineModel:

pipe_model = PipelineModel.load("/user/pipe_text_2")
df2 = pipe_model.transform(df1)

kmeans = KMeans(k=20)
pipe2 = Pipeline(stages=[kmeans])
pipe_model2 = pipe2.fit(df2)
Run Code Online (Sandbox Code Playgroud)

不用重新装也可以吗?为了获得一个新的PipelineModel但不是一个新的Pipeline。理想的情况如下:

pipe_model_new = pipe_model + pipe_model2
TypeError: unsupported operand type(s) for +: 'PipelineModel' and 'PipelineModel'
Run Code Online (Sandbox Code Playgroud)

我发现将两个 Spark mllib 管道连接在一起,但是使用此解决方案,您需要再次安装整个管道。这就是我试图避免的。

pipeline apache-spark pyspark apache-spark-mllib

4
推荐指数
1
解决办法
2190
查看次数

在Spark Job Server中保留/共享RDD

我希望持久化来自spark作业的RDD,以便所有后续作业都可以使用Spark Job Server.这是我尝试过的:

工作1:

package spark.jobserver

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark._
import org.apache.spark.SparkContext._
import scala.util.Try

object FirstJob extends SparkJob with NamedRddSupport {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[4]").setAppName("FirstJob")
    val sc = new SparkContext(conf)
    val config = ConfigFactory.parseString("")
    val results = runJob(sc, config)
    println("Result is " + results)
  }

  override def validate(sc: SparkContext, config: Config): SparkJobValidation = SparkJobValid

  override def runJob(sc: SparkContext, config: Config): Any = {

    // the below variable is to be accessed by other jobs: …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-jobserver

1
推荐指数
1
解决办法
1006
查看次数

Scala Trait Mixin订单

object sandbox {

  class Numbers {
    def price() : List[Int] = List(1,3,5,7)
    def printIt(): Unit = {
      price.foreach(x => print(x+ " ") )
    }
  }

  trait DoubleIt extends Numbers {
    override def price() : List[Int] ={
      println("doubling")
      super.price.map(x => x*2)
    }
  }

  trait AddIt extends Numbers {
    override def price() : List[Int] = {
      println("adding")
      super.price.map( x => x+2)
    }
  }

  def main(args :Array[String]): Unit = {
    val obj = new Numbers with DoubleIt with AddIt
    obj.printIt() 
  }

}
//output …
Run Code Online (Sandbox Code Playgroud)

scala

0
推荐指数
1
解决办法
88
查看次数