小编Mar*_*nne的帖子

何时在Scala特征中使用val或def?

我正想通过有效的斯卡拉幻灯片,并提到在幻灯片10至从来不使用valtrait抽象成员和使用def来代替.幻灯片没有详细提及为什么val在a中使用抽象trait是一种反模式.如果有人可以解释在抽象方法的特性中使用val vs def的最佳实践,我将不胜感激

inheritance scala traits

84
推荐指数
2
解决办法
2万
查看次数

SparkSQL:如何处理用户定义函数中的空值?

给定表1,其中一列为"x",类型为String.我想创建表2,其中列为"y",它是"x"中给出的日期字符串的整数表示形式.

必不可少的是将null值保留在"y"列中.

表1(数据帧df1):

+----------+
|         x|
+----------+
|2015-09-12|
|2015-09-13|
|      null|
|      null|
+----------+
root
 |-- x: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

表2(数据帧df2):

+----------+--------+                                                                  
|         x|       y|
+----------+--------+
|      null|    null|
|      null|    null|
|2015-09-12|20150912|
|2015-09-13|20150913|
+----------+--------+
root
 |-- x: string (nullable = true)
 |-- y: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)

用于将列"x"中的值转换为列"y"的用户定义函数(udf)为:

val extractDateAsInt = udf[Int, String] (
  (d:String) => d.substring(0, 10)
      .filterNot( "-".toSet)
      .toInt )
Run Code Online (Sandbox Code Playgroud)

并且工作,处理空值是不可能的.

尽管如此,我可以做类似的事情

val extractDateAsIntWithNull = udf[Int, String] (
  (d:String) => 
    if (d != …
Run Code Online (Sandbox Code Playgroud)

scala nullable user-defined-functions apache-spark apache-spark-sql

27
推荐指数
3
解决办法
4万
查看次数

如何在Spark SQL中定义自定义类型的模式?

以下示例代码尝试将一些案例对象放入数据框中.代码包括案例对象层次结构的定义和使用此特征的案例类:

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext

sealed trait Some
case object AType extends Some
case object BType extends Some

case class Data( name : String, t: Some)

object Example {
  def main(args: Array[String]) : Unit = {
    val conf = new SparkConf()
      .setAppName( "Example" )
      .setMaster( "local[*]")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    import sqlContext.implicits._

    val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()
    df.show()
  }
}    
Run Code Online (Sandbox Code Playgroud)

执行代码时,我遗憾地遇到以下异常:

java.lang.UnsupportedOperationException: Schema for type …
Run Code Online (Sandbox Code Playgroud)

scala case-class apache-spark apache-spark-sql

26
推荐指数
1
解决办法
2万
查看次数

使用Spark`DataFrame`的`unionAll`出了什么问题?

使用Spark 1.5.0并给出以下代码,我希望unionAll DataFrame基于它们的列名进行联合.在代码中,我使用一些FunSuite传递SparkContext sc:

object Entities {

  case class A (a: Int, b: Int)
  case class B (b: Int, a: Int)

  val as = Seq(
    A(1,3),
    A(2,4)
  )

  val bs = Seq(
    B(5,3),
    B(6,4)
  )
}

class UnsortedTestSuite extends SparkFunSuite {

  configuredUnitTest("The truth test.") { sc =>
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val aDF = sc.parallelize(Entities.as, 4).toDF
    val bDF = sc.parallelize(Entities.bs, 4).toDF
    aDF.show()
    bDF.show()
    aDF.unionAll(bDF).show
  }
}
Run Code Online (Sandbox Code Playgroud)

输出:

+---+---+
|  a|  b|
+---+---+
|  1|  3| …
Run Code Online (Sandbox Code Playgroud)

scala dataframe apache-spark apache-spark-sql

20
推荐指数
1
解决办法
3万
查看次数

`map`和`reduce`方法如何在Spark RDD中工作?

以下代码来自Apache Spark的快速入门指南.有人可以解释一下"线"变量是什么以及它来自何处?

textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
Run Code Online (Sandbox Code Playgroud)

另外,如何将值传递给a,b?

链接到QSG http://spark.apache.org/docs/latest/quick-start.html

closures scala apache-spark

18
推荐指数
2
解决办法
4万
查看次数

有没有办法为Spark数据帧添加额外的元数据?

是否可以向DataFrames 添加额外的元数据?

原因

我有Spark DataFrame,我需要保留额外的信息.示例:A DataFrame,我想要"记住"Integer id列中使用率最高的索引.

当前解决方案

我使用单独的DataFrame来存储这些信息.当然,单独保存这些信息是单调乏味且容易出错的.

有没有更好的解决方案来存储这样的额外信息DataFrame

scala apache-spark apache-spark-sql

11
推荐指数
3
解决办法
9791
查看次数

Spark和SparkSQL:如何模仿窗口功能?

描述

给定一个数据帧 df

id |       date
---------------
 1 | 2015-09-01
 2 | 2015-09-01
 1 | 2015-09-03
 1 | 2015-09-04
 2 | 2015-09-04
Run Code Online (Sandbox Code Playgroud)

我想创建一个运行计数器或索引,

  • 按相同的ID分组
  • 按该组中的日期排序,

从而

id |       date |  counter
--------------------------
 1 | 2015-09-01 |        1
 1 | 2015-09-03 |        2
 1 | 2015-09-04 |        3
 2 | 2015-09-01 |        1
 2 | 2015-09-04 |        2
Run Code Online (Sandbox Code Playgroud)

这是我可以通过窗口功能实现的,例如

val w = Window.partitionBy("id").orderBy("date")
val resultDF = df.select( df("id"), rowNumber().over(w) )
Run Code Online (Sandbox Code Playgroud)

不幸的是,Spark 1.4.1不支持常规数据帧的窗口函数:

org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, …
Run Code Online (Sandbox Code Playgroud)

scala window-functions apache-spark apache-spark-sql

10
推荐指数
2
解决办法
1万
查看次数

如何将Dataframe列名称与Scala案例类属性相匹配?

来自spark-sql的这个例子中的列名来自case class Person.

case class Person(name: String, age: Int)

val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")
Run Code Online (Sandbox Code Playgroud)

https://spark.apache.org/docs/1.1.0/sql-programming-guide.html

但是,在许多情况下,参数名称可能会更改.如果文件尚未更新以反映更改,则会导致找不到列.

如何指定适当的映射?

我想的是:

  val schema = StructType(Seq(
    StructField("name", StringType, nullable = false),
    StructField("age", IntegerType, nullable = false)
  ))


  val ps: Seq[Person] = ???

  val personRDD = sc.parallelize(ps)

  // Apply the schema to the …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark parquet apache-spark-sql

10
推荐指数
1
解决办法
2万
查看次数

在DataFrame上定义自定义方法的最佳方法是什么?

我需要在DataFrame上定义自定义方法.有什么更好的方法呢?解决方案应该是可扩展的,因为我打算定义大量的自定义方法.

我当前的方法是MyClass使用DataFrameas参数创建一个类(比如说),在其中定义我的自定义方法(比如说customMethod)并定义一个转换DataFrame为的隐式方法MyClass.

implicit def dataFrametoMyClass(df: DataFrame): MyClass = new MyClass(df)
Run Code Online (Sandbox Code Playgroud)

因此,我可以打电话:

dataFrame.customMethod()
Run Code Online (Sandbox Code Playgroud)

这是正确的方法吗?公开征求意见.

scala apache-spark apache-spark-sql

10
推荐指数
2
解决办法
2859
查看次数

如何将Spark中`Dataframe`的两列合并为一个2-Tuple?

我有一个DataFrame df有五列的Spark .我想添加另一列,其值为第一列和第二列的元组.当使用withColumn()方法时,我得到不匹配错误,因为输入不是列类型,而是(列,列).我想知道在这种情况下是否有一个解决方案旁边的行循环运行?

var dfCol=(col1:Column,col2:Column)=>(col1,col2)
val vv = df.withColumn( "NewColumn", dfCol( df(df.schema.fieldNames(1)) , df(df.schema.fieldNames(2)) ) )
Run Code Online (Sandbox Code Playgroud)

scala apache-spark-sql spark-dataframe

9
推荐指数
3
解决办法
2万
查看次数