Spark SQL DataFrame/Dataset执行引擎具有几个非常有效的时间和空间优化(例如InternalRow和表达式代码).根据许多文档,对于大多数分布式算法来说,它似乎比RDD更好.
但是,我做了一些源代码研究,但仍然不相信.我毫不怀疑InternalRow更紧凑,可以节省大量内存.但是执行算法可能不会更快地保存预定义表达式.也就是说,在源代码中表明 org.apache.spark.sql.catalyst.expressions.ScalaUDF,每个用户定义的函数都做3件事:
显然,这比直接在RDD上应用函数而不进行任何转换要慢.任何人都可以通过一些实例分析和代码分析来确认或否认我的推测吗?
非常感谢您的任何建议或见解.
apache-spark apache-spark-sql spark-dataframe apache-spark-dataset
我是Scala的新手.我正在尝试将scala列表(将源数据帧上的某些计算数据的结果保存)转换为Dataframe或Dataset.我没有找到任何直接的方法来做到这一点.但是,我尝试了以下过程将我的列表转换为DataSet,但它似乎无法正常工作.我提供以下3种情况.
有人可以请给我一些希望,如何进行这种转换?谢谢.
import org.apache.spark.sql.{DataFrame, Row, SQLContext, DataFrameReader}
import java.sql.{Connection, DriverManager, ResultSet, Timestamp}
import scala.collection._
case class TestPerson(name: String, age: Long, salary: Double)
var tom = new TestPerson("Tom Hanks",37,35.5)
var sam = new TestPerson("Sam Smith",40,40.5)
val PersonList = mutable.MutableList[TestPerson]()
//Adding data in list
PersonList += tom
PersonList += sam
//Situation 1: Trying to create dataset from List of objects:- Result:Error
//Throwing error
var personDS = Seq(PersonList).toDS()
/*
ERROR:
error: Unable to find encoder for type stored in a Dataset. Primitive types …Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders
新的火花世界和尝试用我在网上找到的scala编写的数据集示例
在通过SBT运行时,我继续收到以下错误
org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class
知道我在俯瞰什么
也可以随意指出编写相同数据集示例的更好方法
谢谢
> sbt> runMain DatasetExample
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/10/25 01:06:39 INFO Remoting: Starting remoting
16/10/25 01:06:46 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.150.130:50555]
[error] (run-main-6) org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class `DatasetExample$Student` without access to the scope that this class was defined in. Try moving this class out of its parent class.;
org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner …Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders
我面临的问题是如何拆分多值列,即List[String]分成不同的行.
初始数据集具有以下类型: Dataset[(Integer, String, Double, scala.List[String])]
+---+--------------------+-------+--------------------+
| id| text | value | properties |
+---+--------------------+-------+--------------------+
| 0|Lorem ipsum dolor...| 1.0|[prp1, prp2, prp3..]|
| 1|Lorem ipsum dolor...| 2.0|[prp4, prp5, prp6..]|
| 2|Lorem ipsum dolor...| 3.0|[prp7, prp8, prp9..]|
Run Code Online (Sandbox Code Playgroud)
生成的数据集应具有以下类型:
Dataset[(Integer, String, Double, String)]
Run Code Online (Sandbox Code Playgroud)
而properties应拆分这样的:
+---+--------------------+-------+--------------------+
| id| text | value | property |
+---+--------------------+-------+--------------------+
| 0|Lorem ipsum dolor...| 1.0| prp1 |
| 0|Lorem ipsum dolor...| 1.0| prp2 |
| 0|Lorem ipsum dolor...| 1.0| prp3 |
| 1|Lorem …Run Code Online (Sandbox Code Playgroud) 假设我创建一个实木复合地板文件,如下所示:
case class A (i:Int,j:Double,s:String)
var l1 = List(A(1,2.0,"s1"),A(2,3.0,"S2"))
val ds = spark.createDataset(l1)
ds.write.parquet("/tmp/test.parquet")
Run Code Online (Sandbox Code Playgroud)
是否可以将其读入具有不同架构的类型的数据集中,其中唯一的区别是很少有其他字段?
例如:
case class B (i:Int,j:Double,s:String,d:Double=1.0) // d is extra and has a default value
Run Code Online (Sandbox Code Playgroud)
有什么办法可以使我工作吗?:
val ds2 = spark.read.parquet("/tmp/test.parquet").as[B]
Run Code Online (Sandbox Code Playgroud) apache-spark parquet apache-spark-sql apache-spark-dataset apache-spark-2.0
给出以下DataSet值inputData:
column0 column1 column2 column3
A 88 text 99
Z 12 test 200
T 120 foo 12
Run Code Online (Sandbox Code Playgroud)
在星火,什么是计算一个新的有效途径hash列,并将它添加到一个新的DataSet,hashedData其中hash被定义为应用MurmurHash3过的每一行的值inputData.
具体来说,hashedData如下:
column0 column1 column2 column3 hash
A 88 text 99 MurmurHash3.arrayHash(Array("A", 88, "text", 99))
Z 12 test 200 MurmurHash3.arrayHash(Array("Z", 12, "test", 200))
T 120 foo 12 MurmurHash3.arrayHash(Array("T", 120, "foo", 12))
Run Code Online (Sandbox Code Playgroud)
如果需要更多细节,请告诉我.
任何帮助表示赞赏.谢谢!
当我们的数据源中缺少某些类型为Option [Seq [String]]的列时,我在编码数据时遇到了一些问题.理想情况下,我希望填充缺少的列数据None.
场景:
我们正在阅读的一些镶木地板文件中有column1而不是column2.
我们将这些镶木地板文件中的数据加载到a中Dataset,并将其转换为MyType.
case class MyType(column1: Option[String], column2: Option[Seq[String]])
sqlContext.read.parquet("dataSource.parquet").as[MyType]
Run Code Online (Sandbox Code Playgroud)
org.apache.spark.sql.AnalysisException:无法解析'
column2'给定的输入列:[column1];
有没有办法用column2数据创建数据集None?
Spark Scala API 有一个Dataset#transform方法可以轻松链接自定义 DataFrame 转换,如下所示:
val weirdDf = df
.transform(myFirstCustomTransformation)
.transform(anotherCustomTransformation)
Run Code Online (Sandbox Code Playgroud)
我在文档中没有看到pyspark的等效transform方法。
是否有 PySpark 方式来链接自定义转换?
如果没有,如何对pyspark.sql.DataFrame类进行猴子修补以添加transform方法?
更新
从PySpark 3.0 开始,transform 方法被添加到 PySpark中。
我想为我的数据集行分配唯一的ID.我知道有两种实现选择:
第一种选择:
import org.apache.spark.sql.expressions.Window;
ds.withColumn("id",row_number().over(Window.orderBy("a column")))
Run Code Online (Sandbox Code Playgroud)第二种选择:
df.withColumn("id", monotonically_increasing_id())
Run Code Online (Sandbox Code Playgroud)第二个选项不是顺序ID,它并不重要.
我想弄清楚是否存在这些实现的任何性能问题.也就是说,如果其中一个选项与另一个相比非常慢.更有意义的是:"monotonically_increasing_id比row_number快得多,因为它不是顺序的......"
我有多个大数据帧(大约 30GB)称为 as 和 bs,一个相对较小的数据帧(大约 500MB ~ 1GB)称为 spp。我试图将 spp 缓存到内存中,以避免多次从数据库或文件中读取数据。
但是我发现如果我缓存 spp,物理计划显示它不会使用广播连接,即使 spp 被广播功能包围。但是,如果我取消持久化 spp,计划会显示它使用广播连接。
有熟悉这个的吗?
scala> spp.cache
res38: spp.type = [id: bigint, idPartner: int ... 41 more fields]
scala> val as = acs.join(broadcast(spp), $"idsegment" === $"idAdnetProductSegment")
as: org.apache.spark.sql.DataFrame = [idsegmentpartner: bigint, ssegmentsource: string ... 44 more fields]
scala> as.explain
== Physical Plan ==
*SortMergeJoin [idsegment#286L], [idAdnetProductSegment#91L], Inner
:- *Sort [idsegment#286L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(idsegment#286L, 200)
: +- *Filter isnotnull(idsegment#286L)
: +- HiveTableScan [idsegmentpartner#282L, …Run Code Online (Sandbox Code Playgroud) apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0