这篇文章声称DataFrameSpark中的 a等同于 a Dataset[Row],但这篇博文表明 aDataFrame有一个架构。
以博客文章中将 RDD 转换为 a 的示例DataFrame:如果DataFrame与 相同Dataset[Row],那么将 an 转换RDD为 aDataFrame应该很简单
val rddToDF = rdd.map(value => Row(value))
Run Code Online (Sandbox Code Playgroud)
但相反它表明它是这个
val rddStringToRowRDD = rdd.map(value => Row(value))
val dfschema = StructType(Array(StructField("value",StringType)))
val rddToDF = sparkSession.createDataFrame(rddStringToRowRDD,dfschema)
val rDDToDataSet = rddToDF.as[String]
Run Code Online (Sandbox Code Playgroud)
显然,数据框实际上是行和模式的数据集。
关于Spark数据集,什么是强类型API和无类型API?
数据集如何与数据帧相似/不同?
对于自定义Estimator的transformSchema方法,我需要能够将输入数据帧的模式与案例类中定义的模式进行比较.通常,这可以像从案例类生成Spark StructType/Schema一样执行,如下所述.但是,使用了错误的可空性:
推断出的df的真实模式spark.read.csv().as[MyClass]可能如下所示:
root
|-- CUSTOMER_ID: integer (nullable = false)
Run Code Online (Sandbox Code Playgroud)
案例类:
case class MySchema(CUSTOMER_ID: Int)
Run Code Online (Sandbox Code Playgroud)
比较我使用:
val rawSchema = ScalaReflection.schemaFor[MySchema].dataType.asInstanceOf[StructType]
if (!rawSchema.equals(rawDf.schema))
Run Code Online (Sandbox Code Playgroud)
不幸的是,这总是产生false,因为从case类手动推断的新模式设置为可为空true(因为ja java.Integer实际上可能为null)
root
|-- CUSTOMER_ID: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)
如何nullable = false在创建架构时指定?
apache-spark apache-spark-sql spark-csv apache-spark-ml apache-spark-dataset
常规scala集合有一个漂亮的collect方法,它允许我filter-map使用部分函数在一个传递中执行操作.火花上有相同的操作Dataset吗?
我想它有两个原因:
filter-map样式操作减少到一次通过(尽管在火花中我猜测有优化可以为你发现这些东西)这是一个显示我的意思的例子.假设我有一系列选项,我想提取并加倍定义的整数(a中的那些Some):
val input = Seq(Some(3), None, Some(-1), None, Some(4), Some(5))
Run Code Online (Sandbox Code Playgroud)
方法1 - collect
input.collect {
case Some(value) => value * 2
}
// List(6, -2, 8, 10)
Run Code Online (Sandbox Code Playgroud)
这collect使得这个语法非常简洁,并且一次通过.
方法2 - filter-map
input.filter(_.isDefined).map(_.get * 2)
Run Code Online (Sandbox Code Playgroud)
我可以将这种模式带到火花上,因为数据集和数据框有类似的方法.
但是,我不喜欢这个这么多,因为isDefined和get看起来像代码异味给我.有一个隐含的假设,即地图只接收Somes.编译器无法验证这一点.在一个更大的例子中,开发人员更难发现这种假设,开发人员可能会交换过滤器并映射,例如,不会出现语法错误.
方法3 - fold*操作
input.foldRight[List[Int]](Nil) {
case (nextOpt, acc) => nextOpt match {
case Some(next) => next*2 :: acc
case None => …Run Code Online (Sandbox Code Playgroud) 我有以下内容DataSet,具有以下结构。
case class Person(age: Int, gender: String, salary: Double)
Run Code Online (Sandbox Code Playgroud)
我想通过和来确定平均工资,因此我将这两个关键字归为一组。我遇到了两个主要问题,一个是两个键混合在一个键中,但我想将它们放在两个不同的列中,另一个是该列的名称很傻,我不知道该怎么做。使用将该文件重命名(显然,将不起作用)。genderageDSaggregatedasaliasDS API
val df = sc.parallelize(List(Person(100000.00, "male", 27),
Person(120000.00, "male", 27),
Person(95000, "male", 26),
Person(89000, "female", 31),
Person(250000, "female", 51),
Person(120000, "female", 51)
)).toDF.as[Person]
df.groupByKey(p => (p.gender, p.age)).agg(typed.avg(_.salary)).show()
+-----------+------------------------------------------------------------------------------------------------+
| key| TypedAverage(line2503618a50834b67a4b132d1b8d2310b12.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Person)|
+-----------+------------------------------------------------------------------------------------------------+
|[female,31]| 89000.0...
|[female,51]| 185000.0...
| [male,27]| 110000.0...
| [male,26]| 95000.0...
+-----------+------------------------------------------------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud) 我正在尝试编写一个 spark UDF,用 0.0 替换 Double 字段的空值。我正在使用数据集 API。这是UDF:
val coalesceToZero=udf((rate: Double) => if(Option(rate).isDefined) rate else 0.0)
Run Code Online (Sandbox Code Playgroud)
这是基于我测试可以正常工作的以下功能:
def cz(value: Double): Double = if(Option(value).isDefined) value else 0.0
cz(null.asInstanceOf[Double])
cz: (value: Double)Double
res15: Double = 0.0
Run Code Online (Sandbox Code Playgroud)
但是当我以下列方式在 Spark 中使用它时,UDF 不起作用。
myDS.filter($"rate".isNull)
.select($"rate", coalesceToZero($"rate")).show
+----+---------+
|rate|UDF(rate)|
+----+---------+
|null| null|
|null| null|
|null| null|
|null| null|
|null| null|
|null| null|
+----+---------+
Run Code Online (Sandbox Code Playgroud)
但是,以下工作:
val coalesceToZero=udf((rate: Any) => if(rate == null) 0.0 else rate.asInstanceOf[Double])
Run Code Online (Sandbox Code Playgroud)
所以我想知道 Spark 是否有一些特殊的方式来处理 null Double 值。
我的 Spark 作业在运行时遇到了奇怪的错误。我没有看到MyBean类有任何问题,知道下面的驱动程序代码可能有什么问题吗?谢谢
Maven 依赖关系-
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.1.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
司机-
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
spark.createDataset(Arrays.asList(new MyBean(10),new MyBean(20)),
Encoders.bean(MyBean.class)).show();
Run Code Online (Sandbox Code Playgroud)
……
class MyBean implements Serializable {
int i;
public MyBean(){}
public MyBean(int i){this.i=i;}
public int getI() {return i;}
public void setI(int i) {this.i = i;}
}
Run Code Online (Sandbox Code Playgroud)
运行时异常-
错误 org.codehaus.commons.compiler.CompileException:org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator org.codehaus.commons.commons.compiler.CompileException:文件“ generated.java”,第 43 行,第 21 列:否找到零实际参数的适用构造函数/方法;候选者是: org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11004) 处的“public int com.ts.spark.datasets.MyBean.getI()” org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler. java:8307) 在 org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8169) 在 org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8071)
在以下代码段中,该tryParquet函数尝试从Parquet文件加载数据集(如果存在).如果没有,它会计算,持久并返回提供的数据集计划:
import scala.util.{Try, Success, Failure}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
sealed trait CustomRow
case class MyRow(
id: Int,
name: String
) extends CustomRow
val ds: Dataset[MyRow] =
Seq((1, "foo"),
(2, "bar"),
(3, "baz")).toDF("id", "name").as[MyRow]
def tryParquet[T <: CustomRow](session: SparkSession, path: String, target: Dataset[T]): Dataset[T] =
Try(session.read.parquet(path)) match {
case Success(df) => df.as[T] // <---- compile error here
case Failure(_) => {
target.write.parquet(path)
target
}
}
val readyDS: Dataset[MyRow] =
tryParquet(spark, "/path/to/file.parq", ds)
Run Code Online (Sandbox Code Playgroud)
但是,这会产生编译错误df.as[T]:
无法找到存储在数据集中的类型的编码器.导入spark.implicits._支持原始类型(Int,String等)和产品类型(case类).
将来的版本中将添加对序列化其他类型的支持.
案例成功(df)=> df.as …
scala apache-spark apache-spark-dataset apache-spark-encoders
以下是代表员工in_date和out_date的样本数据集。我必须获取所有员工的最后in_time。
Spark在4节点独立群集上运行。
初始数据集:
员工ID -----入职日期-----离职日期
1111111 2017-04-20 2017-09-14
1111111 2017-11-02 null
2222222 2017-09-26 2017-09-26
2222222 2017-11-28 null
3333333 2016-01-07 2016-01-20
3333333 2017-10-25 null
Run Code Online (Sandbox Code Playgroud)
之后的数据集df.sort(col(in_date).desc()):
员工编号-in_date ----- out_date
1111111 2017-11-02 null
1111111 2017-04-20 2017-09-14
2222222 2017-09-26 2017-09-26
2222222 2017-11-28 null
3333333 2017-10-25 null
3333333 2016-01-07 2016-01-20
Run Code Online (Sandbox Code Playgroud)
df.dropDup(EmployeeID):
Run Code Online (Sandbox Code Playgroud)
输出:
员工ID -----入职日期-----离职日期
1111111 2017-11-02 null
2222222 2017-09-26 2017-09-26
3333333 2016-01-07 2016-01-20
Run Code Online (Sandbox Code Playgroud)
预期数据集:
员工ID -----入职日期-----离职日期
1111111 2017-11-02 null
2222222 2017-11-28 null
3333333 2017-10-25 null
Run Code Online (Sandbox Code Playgroud)
但是,当我使用进行初始数据集排序sortWithInPartitions并进行重复数据删除时,我得到了预期的数据集。我在这里错过了大大小小的东西吗?任何帮助表示赞赏。
附加信息:
当在本地模式下用Spark执行df.sort时,实现了上述预期输出。
我没有做任何分区,重新分区。初始数据集是从基础Cassandra数据库获得的。
apache-spark apache-spark-sql spark-cassandra-connector apache-spark-dataset
让我们假设我有以下内容Dataset:
+-----------+----------+
|productCode| amount|
+-----------+----------+
| XX-13| 300|
| XX-1| 250|
| XX-2| 410|
| XX-9| 50|
| XX-10| 35|
| XX-100| 870|
+-----------+----------+
Run Code Online (Sandbox Code Playgroud)
其中productCode是String类型,amount是Int。
如果有人尝试按productCode结果排序(并且由于String比较的性质,这是预期的):
def orderProducts(product: Dataset[Product]): Dataset[Product] = {
product.orderBy("productCode")
}
// Output:
+-----------+----------+
|productCode| amount|
+-----------+----------+
| XX-1| 250|
| XX-10| 35|
| XX-100| 870|
| XX-13| 300|
| XX-2| 410|
| XX-9| 50|
+-----------+----------+
Run Code Online (Sandbox Code Playgroud)
考虑到API,如何获得按以下Integer部分排序的输出?productCodeDataset
+-----------+----------+
|productCode| …Run Code Online (Sandbox Code Playgroud)