标签: apache-spark-dataset

Spark:如果 DataFrame 有架构,DataFrame 如何成为 Dataset[Row]

这篇文章声称DataFrameSpark中的 a等同于 a Dataset[Row],但这篇博文表明 aDataFrame有一个架构。

以博客文章中将 RDD 转换为 a 的示例DataFrame:如果DataFrame与 相同Dataset[Row],那么将 an 转换RDD为 aDataFrame应该很简单

val rddToDF = rdd.map(value => Row(value))
Run Code Online (Sandbox Code Playgroud)

但相反它表明它是这个

val rddStringToRowRDD = rdd.map(value => Row(value))
val dfschema = StructType(Array(StructField("value",StringType)))
val rddToDF = sparkSession.createDataFrame(rddStringToRowRDD,dfschema)
val rDDToDataSet = rddToDF.as[String]
Run Code Online (Sandbox Code Playgroud)

显然,数据框实际上是行和模式的数据集。

scala apache-spark apache-spark-sql apache-spark-dataset

4
推荐指数
1
解决办法
9040
查看次数

Spark数据集-强大的输入

关于Spark数据集,什么是强类型API和无类型API?

数据集如何与数据帧相似/不同?

dataset apache-spark apache-spark-dataset

4
推荐指数
1
解决办法
1777
查看次数

来自案例类的Spark模式具有正确的可空性

对于自定义Estimator的transformSchema方法,我需要能够将输入数据帧的模式与案例类中定义的模式进行比较.通常,这可以像从案例类生成Spark StructType/Schema一样执行,如下所述.但是,使用了错误的可空性:

推断出的df的真实模式spark.read.csv().as[MyClass]可能如下所示:

root
 |-- CUSTOMER_ID: integer (nullable = false)
Run Code Online (Sandbox Code Playgroud)

案例类:

case class MySchema(CUSTOMER_ID: Int)
Run Code Online (Sandbox Code Playgroud)

比较我使用:

val rawSchema = ScalaReflection.schemaFor[MySchema].dataType.asInstanceOf[StructType]
  if (!rawSchema.equals(rawDf.schema))
Run Code Online (Sandbox Code Playgroud)

不幸的是,这总是产生false,因为从case类手动推断的新模式设置为可为空true(因为ja java.Integer实际上可能为null)

root
 |-- CUSTOMER_ID: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)

如何nullable = false在创建架构时指定?

apache-spark apache-spark-sql spark-csv apache-spark-ml apache-spark-dataset

4
推荐指数
1
解决办法
4110
查看次数

Spark数据集相当于scala的"collect",它具有部分功能

常规scala集合有一个漂亮的collect方法,它允许我filter-map使用部分函数在一个传递中执行操作.火花上有相同的操作Dataset吗?

我想它有两个原因:

  • 句法简洁
  • 它将filter-map样式操作减少到一次通过(尽管在火花中我猜测有优化可以为你发现这些东西)

这是一个显示我的意思的例子.假设我有一系列选项,我想提取并加倍定义的整数(a中的那些Some):

val input = Seq(Some(3), None, Some(-1), None, Some(4), Some(5)) 
Run Code Online (Sandbox Code Playgroud)

方法1 - collect

input.collect {
  case Some(value) => value * 2
} 
// List(6, -2, 8, 10)
Run Code Online (Sandbox Code Playgroud)

collect使得这个语法非常简洁,并且一次通过.

方法2 - filter-map

input.filter(_.isDefined).map(_.get * 2)
Run Code Online (Sandbox Code Playgroud)

我可以将这种模式带到火花上,因为数据集和数据框有类似的方法.

但是,我不喜欢这个这么多,因为isDefinedget看起来像代码异味给我.有一个隐含的假设,即地图只接收Somes.编译器无法验证这一点.在一个更大的例子中,开发人员更难发现这种假设,开发人员可能会交换过滤器并映射,例如,不会出现语法错误.

方法3 - fold*操作

input.foldRight[List[Int]](Nil) {
  case (nextOpt, acc) => nextOpt match {
    case Some(next) => next*2 :: acc
    case None => …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-dataset

4
推荐指数
2
解决办法
2871
查看次数

如何解压缩Spark DataSet中的多个键

我有以下内容DataSet,具有以下结构。

case class Person(age: Int, gender: String, salary: Double)
Run Code Online (Sandbox Code Playgroud)

我想通过和来确定平均工资,因此我将这两个关键字归为一组。我遇到了两个主要问题,一个是两个键混合在一个键中,但我想将它们放在两个不同的列中,另一个是该列的名称很傻,我不知道该怎么做。使用将该文件重命名(显然,将不起作用)。genderageDSaggregatedasaliasDS API

val df = sc.parallelize(List(Person(100000.00, "male", 27), 
  Person(120000.00, "male", 27), 
  Person(95000, "male", 26),
  Person(89000, "female", 31),
  Person(250000, "female", 51),
  Person(120000, "female", 51)
)).toDF.as[Person]

df.groupByKey(p => (p.gender, p.age)).agg(typed.avg(_.salary)).show()

+-----------+------------------------------------------------------------------------------------------------+
|        key| TypedAverage(line2503618a50834b67a4b132d1b8d2310b12.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Person)|          
+-----------+------------------------------------------------------------------------------------------------+ 
|[female,31]|  89000.0... 
|[female,51]| 185000.0...
|  [male,27]| 110000.0...
|  [male,26]|  95000.0...
+-----------+------------------------------------------------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-dataset

4
推荐指数
2
解决办法
1849
查看次数

Spark UDF 不适用于 Double 字段中的空值

我正在尝试编写一个 spark UDF,用 0.0 替换 Double 字段的空值。我正在使用数据集 API。这是UDF:

val coalesceToZero=udf((rate: Double) =>  if(Option(rate).isDefined) rate else 0.0)
Run Code Online (Sandbox Code Playgroud)

这是基于我测试可以正常工作的以下功能:

def cz(value: Double): Double = if(Option(value).isDefined) value else 0.0

cz(null.asInstanceOf[Double])
cz: (value: Double)Double
res15: Double = 0.0
Run Code Online (Sandbox Code Playgroud)

但是当我以下列方式在 Spark 中使用它时,UDF 不起作用。

myDS.filter($"rate".isNull)
    .select($"rate", coalesceToZero($"rate")).show

+----+---------+
|rate|UDF(rate)|
+----+---------+
|null|     null|
|null|     null|
|null|     null|
|null|     null|
|null|     null|
|null|     null|
+----+---------+
Run Code Online (Sandbox Code Playgroud)

但是,以下工作:

val coalesceToZero=udf((rate: Any) =>  if(rate == null) 0.0 else rate.asInstanceOf[Double])
Run Code Online (Sandbox Code Playgroud)

所以我想知道 Spark 是否有一些特殊的方式来处理 null Double 值。

scala apache-spark apache-spark-dataset

4
推荐指数
1
解决办法
3814
查看次数

找不到零实际参数的适用构造函数/方法 - Apache Spark Java

我的 Spark 作业在运行时遇到了奇怪的错误。我没有看到MyBean类有任何问题,知道下面的驱动程序代码可能有什么问题吗?谢谢

Maven 依赖关系-

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>2.1.0</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>2.1.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

司机-

SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
spark.createDataset(Arrays.asList(new MyBean(10),new MyBean(20)),
      Encoders.bean(MyBean.class)).show();
Run Code Online (Sandbox Code Playgroud)

……

class MyBean implements Serializable {
    int i;
    public MyBean(){}
    public MyBean(int i){this.i=i;}
    public int getI() {return i;}
    public void setI(int i) {this.i = i;}
}
Run Code Online (Sandbox Code Playgroud)

运行时异常-

错误 org.codehaus.commons.compiler.CompileException:org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator org.codehaus.commons.commons.compiler.CompileException:文件“ generated.java”,第 43 行,第 21 列:否找到零实际参数的适用构造函数/方法;候选者是: org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11004) 处的“public int com.ts.spark.datasets.MyBean.getI()” org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler. java:8307) 在 org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8169) 在 org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8071)

java apache-spark apache-spark-sql apache-spark-dataset

4
推荐指数
1
解决办法
6588
查看次数

Generic T as Spark Dataset [T]构造函数

在以下代码段中,该tryParquet函数尝试从Parquet文件加载数据集(如果存在).如果没有,它会计算,持久并返回提供的数据集计划:

import scala.util.{Try, Success, Failure}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset

sealed trait CustomRow

case class MyRow(
  id: Int,
  name: String
) extends CustomRow

val ds: Dataset[MyRow] =
  Seq((1, "foo"),
      (2, "bar"),
      (3, "baz")).toDF("id", "name").as[MyRow]

def tryParquet[T <: CustomRow](session: SparkSession, path: String, target: Dataset[T]): Dataset[T] =
    Try(session.read.parquet(path)) match {
      case Success(df) => df.as[T] // <---- compile error here
      case Failure(_)  => {
        target.write.parquet(path)
        target
      }
    }

val readyDS: Dataset[MyRow] =
    tryParquet(spark, "/path/to/file.parq", ds)
Run Code Online (Sandbox Code Playgroud)

但是,这会产生编译错误df.as[T]:

无法找到存储在数据集中的类型的编码器.导入spark.implicits._支持原始类型(Int,String等)和产品类型(case类).

将来的版本中将添加对序列化其他类型的支持.

案例成功(df)=> df.as …

scala apache-spark apache-spark-dataset apache-spark-encoders

4
推荐指数
1
解决办法
1798
查看次数

Spark-sortWithInPartitions超过排序

以下是代表员工in_date和out_date的样本数据集。我必须获取所有员工的最后in_time。

Spark在4节点独立群集上运行。

初始数据集:

员工ID -----入职日期-----离职日期

1111111     2017-04-20  2017-09-14 
1111111     2017-11-02  null 
2222222     2017-09-26  2017-09-26 
2222222     2017-11-28  null 
3333333     2016-01-07  2016-01-20 
3333333     2017-10-25  null 
Run Code Online (Sandbox Code Playgroud)

之后的数据集df.sort(col(in_date).desc())

员工编号-in_date ----- out_date

1111111   2017-11-02   null 
1111111   2017-04-20   2017-09-14 
2222222   2017-09-26   2017-09-26 
2222222   2017-11-28   null 
3333333   2017-10-25   null 
3333333   2016-01-07   2016-01-20 
Run Code Online (Sandbox Code Playgroud)
df.dropDup(EmployeeID):  
Run Code Online (Sandbox Code Playgroud)

输出

员工ID -----入职日期-----离职日期

1111111    2017-11-02    null 
2222222    2017-09-26    2017-09-26 
3333333    2016-01-07    2016-01-20 
Run Code Online (Sandbox Code Playgroud)

预期数据集:

员工ID -----入职日期-----离职日期

1111111    2017-11-02   null 
2222222    2017-11-28   null 
3333333    2017-10-25   null 
Run Code Online (Sandbox Code Playgroud)

但是,当我使用进行初始数据集排序sortWithInPartitions并进行重复数据删除时,我得到了预期的数据集。我在这里错过了大大小小的东西吗?任何帮助表示赞赏。

附加信息: 当在本地模式下用Spark执行df.sort时,实现了上述预期输出。
我没有做任何分区,重新分区。初始数据集是从基础Cassandra数据库获得的。

apache-spark apache-spark-sql spark-cassandra-connector apache-spark-dataset

4
推荐指数
1
解决办法
3399
查看次数

在 Spark 数据集中对数字字符串进行排序

让我们假设我有以下内容Dataset

+-----------+----------+
|productCode|    amount|
+-----------+----------+
|      XX-13|       300|
|       XX-1|       250|
|       XX-2|       410|
|       XX-9|        50|
|      XX-10|        35|
|     XX-100|       870|
+-----------+----------+
Run Code Online (Sandbox Code Playgroud)

其中productCodeString类型,amountInt

如果有人尝试按productCode结果排序(并且由于String比较的性质,这是预期的):

def orderProducts(product: Dataset[Product]): Dataset[Product] = {
    product.orderBy("productCode")
}

// Output:
+-----------+----------+
|productCode|    amount|
+-----------+----------+
|       XX-1|       250|
|      XX-10|        35|
|     XX-100|       870|
|      XX-13|       300|
|       XX-2|       410|
|       XX-9|        50|
+-----------+----------+
Run Code Online (Sandbox Code Playgroud)

考虑到API,如何获得按以下Integer部分排序的输出?productCodeDataset

+-----------+----------+
|productCode| …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-dataset

4
推荐指数
1
解决办法
1885
查看次数