标签: apache-spark-dataset

何时使用Spark DataFrame/Dataset API以及何时使用普通RDD?

Spark SQL DataFrame/Dataset执行引擎具有几个非常有效的时间和空间优化(例如InternalRow和表达式代码).根据许多文档,对于大多数分布式算法来说,它似乎比RDD更好.

但是,我做了一些源代码研究,但仍然不相信.我毫不怀疑InternalRow更紧凑,可以节省大量内存.但是执行算法可能不会更快地保存预定义表达式.也就是说,在源代码中表明 org.apache.spark.sql.catalyst.expressions.ScalaUDF,每个用户定义的函数都做3件事:

  1. 将催化剂类型(在InternalRow中使用)转换为scala类型(在GenericRow中使用).
  2. 应用该功能
  3. 将结果从scala类型转换回催化剂类型

显然,这比直接在RDD上应用函数而不进行任何转换要慢.任何人都可以通过一些实例分析和代码分析来确认或否认我的推测吗?

非常感谢您的任何建议或见解.

apache-spark apache-spark-sql spark-dataframe apache-spark-dataset

7
推荐指数
1
解决办法
1305
查看次数

将scala列表转换为DataFrame或DataSet

我是Scala的新手.我正在尝试将scala列表(将源数据帧上的某些计算数据的结果保存)转换为Dataframe或Dataset.我没有找到任何直接的方法来做到这一点.但是,我尝试了以下过程将我的列表转换为DataSet,但它似乎无法正常工作.我提供以下3种情况.

有人可以请给我一些希望,如何进行这种转换?谢谢.

import org.apache.spark.sql.{DataFrame, Row, SQLContext, DataFrameReader}
import java.sql.{Connection, DriverManager, ResultSet, Timestamp}
import scala.collection._

case class TestPerson(name: String, age: Long, salary: Double)
var tom = new TestPerson("Tom Hanks",37,35.5)
var sam = new TestPerson("Sam Smith",40,40.5)

val PersonList = mutable.MutableList[TestPerson]()

//Adding data in list
PersonList += tom
PersonList += sam

//Situation 1: Trying to create dataset from List of objects:- Result:Error
//Throwing error
var personDS = Seq(PersonList).toDS()
/*
ERROR:
error: Unable to find encoder for type stored in a Dataset.  Primitive types …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders

7
推荐指数
1
解决办法
2万
查看次数

Spark数据集:示例:无法生成编码器问题

新的火花世界和尝试用我在网上找到的scala编写的数据集示例

在通过SBT运行时,我继续收到以下错误

org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class

知道我在俯瞰什么

也可以随意指出编写相同数据集示例的更好方法

谢谢

> sbt>  runMain DatasetExample

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/10/25 01:06:39 INFO Remoting: Starting remoting
16/10/25 01:06:46 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.150.130:50555]
[error] (run-main-6) org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class `DatasetExample$Student` without access to the scope that this class was defined in. Try moving this class out of its parent class.;
org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders

7
推荐指数
1
解决办法
3471
查看次数

如何使用类型化数据集将多值列拆分为单独的行?

我面临的问题是如何拆分多值列,即List[String]分成不同的行.

初始数据集具有以下类型: Dataset[(Integer, String, Double, scala.List[String])]

+---+--------------------+-------+--------------------+
| id|       text         | value |    properties      |
+---+--------------------+-------+--------------------+
|  0|Lorem ipsum dolor...|    1.0|[prp1, prp2, prp3..]|
|  1|Lorem ipsum dolor...|    2.0|[prp4, prp5, prp6..]|
|  2|Lorem ipsum dolor...|    3.0|[prp7, prp8, prp9..]|
Run Code Online (Sandbox Code Playgroud)

生成的数据集应具有以下类型:

Dataset[(Integer, String, Double, String)]
Run Code Online (Sandbox Code Playgroud)

properties应拆分这样的:

+---+--------------------+-------+--------------------+
| id|       text         | value |    property        |
+---+--------------------+-------+--------------------+
|  0|Lorem ipsum dolor...|    1.0|        prp1        |
|  0|Lorem ipsum dolor...|    1.0|        prp2        |
|  0|Lorem ipsum dolor...|    1.0|        prp3        |
|  1|Lorem …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-dataset

7
推荐指数
1
解决办法
3192
查看次数

将镶木地板读入Spark数据集而忽略缺少的字段

假设我创建一个实木复合地板文件,如下所示:

case class A (i:Int,j:Double,s:String)

var l1 = List(A(1,2.0,"s1"),A(2,3.0,"S2"))

val ds = spark.createDataset(l1)
ds.write.parquet("/tmp/test.parquet")
Run Code Online (Sandbox Code Playgroud)

是否可以将其读入具有不同架构的类型的数据集中,其中唯一的区别是很少有其他字段?

例如:

case class B (i:Int,j:Double,s:String,d:Double=1.0)  // d is extra and has a default value 
Run Code Online (Sandbox Code Playgroud)

有什么办法可以使我工作吗?:

val ds2 = spark.read.parquet("/tmp/test.parquet").as[B]
Run Code Online (Sandbox Code Playgroud)

apache-spark parquet apache-spark-sql apache-spark-dataset apache-spark-2.0

7
推荐指数
1
解决办法
1677
查看次数

将Spark DataSet行值映射到新的哈希列

给出以下DataSetinputData:

column0 column1 column2 column3
A       88      text    99
Z       12      test    200
T       120     foo     12
Run Code Online (Sandbox Code Playgroud)

在星火,什么是计算一个新的有效途径hash列,并将它添加到一个新的DataSet,hashedData其中hash被定义为应用MurmurHash3过的每一行的值inputData.

具体来说,hashedData如下:

column0 column1 column2 column3 hash
A       88      text    99      MurmurHash3.arrayHash(Array("A", 88, "text", 99))
Z       12      test    200     MurmurHash3.arrayHash(Array("Z", 12, "test", 200))
T       120     foo     12      MurmurHash3.arrayHash(Array("T", 120, "foo", 12))
Run Code Online (Sandbox Code Playgroud)

如果需要更多细节,请告诉我.

任何帮助表示赞赏.谢谢!

scala apache-spark spark-dataframe apache-spark-dataset

7
推荐指数
2
解决办法
6411
查看次数

Spark 2.0隐式编码器,当类型为Option [Seq [String]](scala)时处理缺少的列

当我们的数据源中缺少某些类型为Option [Seq [String]]的列时,我在编码数据时遇到了一些问题.理想情况下,我希望填充缺少的列数据None.

场景:

我们正在阅读的一些镶木地板文件中有column1而不是column2.

我们将这些镶木地板文件中的数据加载到a中Dataset,并将其转换为MyType.

case class MyType(column1: Option[String], column2: Option[Seq[String]])

sqlContext.read.parquet("dataSource.parquet").as[MyType]
Run Code Online (Sandbox Code Playgroud)

org.apache.spark.sql.AnalysisException:无法解析' column2'给定的输入列:[column1];

有没有办法用column2数据创建数据集None

scala apache-spark apache-spark-dataset

6
推荐指数
1
解决办法
1826
查看次数

相当于 Scala Dataset#transform 方法的 Pyspark 变换方法

Spark Scala API 有一个Dataset#transform方法可以轻松链接自定义 DataFrame 转换,如下所示:

val weirdDf = df
  .transform(myFirstCustomTransformation)
  .transform(anotherCustomTransformation)
Run Code Online (Sandbox Code Playgroud)

在文档中没有看到pyspark的等效transform方法。

是否有 PySpark 方式来链接自定义转换?

如果没有,如何对pyspark.sql.DataFrame类进行猴子修补以添加transform方法?

更新

PySpark 3.0 开始,transform 方法被添加到 PySpark中。

apache-spark apache-spark-sql pyspark apache-spark-dataset

6
推荐指数
1
解决办法
4605
查看次数

Spark Dataset唯一ID性能 - row_number vs monotonically_increasing_id

我想为我的数据集行分配唯一的ID.我知道有两种实现选择:

  1. 第一种选择:

    import org.apache.spark.sql.expressions.Window;
    ds.withColumn("id",row_number().over(Window.orderBy("a column")))
    
    Run Code Online (Sandbox Code Playgroud)
  2. 第二种选择:

    df.withColumn("id", monotonically_increasing_id())
    
    Run Code Online (Sandbox Code Playgroud)

第二个选项不是顺序ID,它并不重要.

我想弄清楚是否存在这些实现的任何性能问题.也就是说,如果其中一个选项与另一个相比非常慢.更有意义的是:"monotonically_increasing_id比row_number快得多,因为它不是顺序的......"

scala apache-spark apache-spark-sql apache-spark-dataset

6
推荐指数
2
解决办法
3069
查看次数

Apache Spark 2.2:当您已经缓存要广播的数据帧时,广播连接不起作用

我有多个大数据帧(大约 30GB)称为 as 和 bs,一个相对较小的数据帧(大约 500MB ~ 1GB)称为 spp。我试图将 spp 缓存到内存中,以避免多次从数据库或文件中读取数据。

但是我发现如果我缓存 spp,物理计划显示它不会使用广播连接,即使 spp 被广播功能包围。但是,如果我取消持久化 spp,计划会显示它使用广播连接。

有熟悉这个的吗?

scala> spp.cache
res38: spp.type = [id: bigint, idPartner: int ... 41 more fields]

scala> val as = acs.join(broadcast(spp), $"idsegment" === $"idAdnetProductSegment")
as: org.apache.spark.sql.DataFrame = [idsegmentpartner: bigint, ssegmentsource: string ... 44 more fields]

scala> as.explain
== Physical Plan ==
*SortMergeJoin [idsegment#286L], [idAdnetProductSegment#91L], Inner
:- *Sort [idsegment#286L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(idsegment#286L, 200)
:     +- *Filter isnotnull(idsegment#286L)
:        +- HiveTableScan [idsegmentpartner#282L, …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0

6
推荐指数
1
解决办法
1637
查看次数