我看到SparkSession没有.parallelize()方法,我们需要SparkContext再次使用来创建RDD吗?如果是这样,既创造SparkSession和SparkContext在一个程序中可取?
我有火花数据帧,其中一些列名称中有空格,必须用下划线替换.
我知道可以使用withColumnRenamed()sparkSQL重命名单个列,但要重命名'n'个列,这个函数必须链接'n'次(据我所知).
要自动执行此操作,我尝试过:
val old_names = df.columns() // contains array of old column names
val new_names = old_names.map { x =>
if(x.contains(" ") == true)
x.replaceAll("\\s","_")
else x
} // array of new column names with removed whitespace.
Run Code Online (Sandbox Code Playgroud)
现在,如何用.替换df的标头 new_names
我知道使用.withColumn()和a 向Spark DataSet添加新列的方法UDF,它返回一个DataFrame.我也知道,我们可以将生成的DataFrame转换为DataSet.
我的问题是:
例如:
scala> case class Temp(a : Int, b : String) //creating case class
scala> val df = Seq((1,"1str"),(2,"2str),(3,"3str")).toDS // creating DS
scala> val appendUDF = udf( (b : String) => b + "ing") // sample UDF
scala> df.withColumn("c",df("b")) // adding a new column
res5: org.apache.spark.sql.DataFrame = [a: int, b: string ... 1 more field]
scala> res5.as[Temp] // converting to DS
res6: org.apache.spark.sql.Dataset[Temp] = [a: int, b: string ... 1 …Run Code Online (Sandbox Code Playgroud) 我知道UDFs是Spark的完整黑盒子,不会尝试优化它.但是Column在(https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Column)中列出的类型及其功能的使用是否会
成为函数"符合条件" Catalyst Optimizer?
例如,UDF通过添加1到现有列来创建新列
val addOne = udf( (num: Int) => num + 1 )
df.withColumn("col2", addOne($"col1"))
Run Code Online (Sandbox Code Playgroud)
相同的功能,使用Column类型:
def addOne(col1: Column) = col1.plus(1)
df.withColumn("col2", addOne($"col1"))
Run Code Online (Sandbox Code Playgroud)
要么
spark.sql("select *, col1 + 1 from df")
Run Code Online (Sandbox Code Playgroud)
他们之间的表现会有什么不同吗?
我试图在Scala中打印MongoDB查询的结果
val mongoClient: MongoClient = MongoClient()
val database: MongoDatabase = mongoClient.getDatabase("tableScala")
val collection: MongoCollection[Document] = database.getCollection("tableScala")
collection.find().printResults()
Run Code Online (Sandbox Code Playgroud)
抛出的错误是:Cannot resolve symbol printResults.建议使用mongo-scala-driver版本的一些其他问题的答案1.2,因为版本1.1及以下没有实现printResults()
SBT档案:
name := "scalaMongoDriver"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.mongodb.scala" %% "mongo-scala-driver" % "1.2.0-beta1"
Run Code Online (Sandbox Code Playgroud)
尝试使用以下方式手动打印:
collection.find().subscribe(
(user: Document) => println(user.toJson()), // onNext
(error: Throwable) => println(s"Query failed: ${error.getMessage}"), // onError
() => println("Done") // onComplete
)
Run Code Online (Sandbox Code Playgroud)
得到以下信息:
信息:ReadPreferenceServerSelector {readPreference = primary}从集群描述ClusterDescription {type = UNKNOWN,connectionMode = SINGLE,serverDescriptions = [ServerDescription …
我有一个RDD,我需要将其转换为数据集,我试过:
Dataset<Person> personDS = sqlContext.createDataset(personRDD, Encoders.bean(Person.class));
Run Code Online (Sandbox Code Playgroud)
以上行抛出错误,
无法解析方法createDataset(org.apache.spark.api.java.JavaRDD Main.Person,org.apache.spark.sql.Encoder T)
但是,我可以转换为Dataset转换为Dataframe.以下代码有效:
Dataset<Row> personDF = sqlContext.createDataFrame(personRDD, Person.class);
Dataset<Person> personDS = personDF.as(Encoders.bean(Person.class));
Run Code Online (Sandbox Code Playgroud) 我有List[Double],如何将其转换为org.apache.spark.sql.Column.我试图将其作为列插入.withColumn()到现有的DataFrame.
我有一个保存的 PipelineModel:
pipe_model = pipe.fit(df_train)
pipe_model.write().overwrite().save("/user/pipe_text_2")
Run Code Online (Sandbox Code Playgroud)
现在我想向此管道添加一个新的已安装的 PipelineModel:
pipe_model = PipelineModel.load("/user/pipe_text_2")
df2 = pipe_model.transform(df1)
kmeans = KMeans(k=20)
pipe2 = Pipeline(stages=[kmeans])
pipe_model2 = pipe2.fit(df2)
Run Code Online (Sandbox Code Playgroud)
不用重新装也可以吗?为了获得一个新的PipelineModel但不是一个新的Pipeline。理想的情况如下:
pipe_model_new = pipe_model + pipe_model2
TypeError: unsupported operand type(s) for +: 'PipelineModel' and 'PipelineModel'
Run Code Online (Sandbox Code Playgroud)
我发现将两个 Spark mllib 管道连接在一起,但是使用此解决方案,您需要再次安装整个管道。这就是我试图避免的。
我希望持久化来自spark作业的RDD,以便所有后续作业都可以使用Spark Job Server.这是我尝试过的:
工作1:
package spark.jobserver
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark._
import org.apache.spark.SparkContext._
import scala.util.Try
object FirstJob extends SparkJob with NamedRddSupport {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[4]").setAppName("FirstJob")
val sc = new SparkContext(conf)
val config = ConfigFactory.parseString("")
val results = runJob(sc, config)
println("Result is " + results)
}
override def validate(sc: SparkContext, config: Config): SparkJobValidation = SparkJobValid
override def runJob(sc: SparkContext, config: Config): Any = {
// the below variable is to be accessed by other jobs: …Run Code Online (Sandbox Code Playgroud) object sandbox {
class Numbers {
def price() : List[Int] = List(1,3,5,7)
def printIt(): Unit = {
price.foreach(x => print(x+ " ") )
}
}
trait DoubleIt extends Numbers {
override def price() : List[Int] ={
println("doubling")
super.price.map(x => x*2)
}
}
trait AddIt extends Numbers {
override def price() : List[Int] = {
println("adding")
super.price.map( x => x+2)
}
}
def main(args :Array[String]): Unit = {
val obj = new Numbers with DoubleIt with AddIt
obj.printIt()
}
}
//output …Run Code Online (Sandbox Code Playgroud)