标签: apache-spark-dataset

Spark结构化流 - 将静态数据集与流数据集连接起来

我正在Spark structured streaming用来处理从中读取的记录Kafka.这是我想要实现的目标:

(a)每条记录都是一种Tuple2类型(Timestamp, DeviceId).

(b)我创建了一个静态Dataset[DeviceId],其中包含DeviceId预期在Kafka流中看到的所有有效设备ID(类型)的集合.

(c)我需要写一个Spark structured streaming查询

 (i) Groups records by their timestamp into 5-minute windows
 (ii) For each window, get the list of valid device IDs that were **not** seen in that window
Run Code Online (Sandbox Code Playgroud)

例如,假设所有有效设备ID的列表都是,[A,B,C,D,E]并且某个5分钟窗口中的kafka记录包含设备ID [A,B,E].然后,对于该窗口,我正在寻找的看不见的设备ID列表是[C,D].

  1. 如何在Spark结构化流中编写此查询?我尝试使用公开的方法except()join()方法Dataset.但是,他们都抛出了一个运行时异常,抱怨说这些操作都不受支持streaming Dataset.

这是我的代码片段:

val validDeviceIds: Dataset[(DeviceId, Long)] = spark.createDataset[DeviceId](listOfAllDeviceIds.map(id => (id, 0L))) …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql apache-spark-dataset spark-structured-streaming

13
推荐指数
2
解决办法
3717
查看次数

Apache Spark 2.0:java.lang.UnsupportedOperationException:找不到java.time.LocalDate的编码器

我正在使用Apache Spark 2.0并创建case class提供架构DetaSet.当我试图根据如何在数据集中存储自定义对象来定义自定义编码器因为java.time.LocalDate我得到以下例外:

java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate
- field (class: "java.time.LocalDate", name: "callDate")
- root class: "FireService"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:598)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:592)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:583)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
............
Run Code Online (Sandbox Code Playgroud)

以下是代码:

case class FireService(callNumber: String, callDate: java.time.LocalDate)
implicit val localDateEncoder: org.apache.spark.sql.Encoder[java.time.LocalDate] = org.apache.spark.sql.Encoders.kryo[java.time.LocalDate]

val fireServiceDf = df.map(row => {
val dateFormatter = java.time.format.DateTimeFormatter.ofPattern("MM/dd /yyyy")
FireService(row.getAs[String](0),  java.time.LocalDate.parse(row.getAs[String](4), dateFormatter))
})
Run Code Online (Sandbox Code Playgroud)

我们如何定义第三方api的spark编码器?

更新

当我为整个案例类创建编码器时,df.map..将对象映射为二进制,如下所示:

implicit val …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders

12
推荐指数
1
解决办法
6199
查看次数

为少数列创建具有空值的DataFrame

我正在尝试创建一个DataFrame使用RDD.

首先,我创建一个RDD使用下面的代码 -

val account = sc.parallelize(Seq(
                                 (1, null, 2,"F"), 
                                 (2, 2, 4, "F"),
                                 (3, 3, 6, "N"),
                                 (4,null,8,"F")))
Run Code Online (Sandbox Code Playgroud)

它工作正常 -

account:org.apache.spark.rdd.RDD [(Int,Any,Int,String)] = ParallelCollectionRDD [0]并行化:27

但是,当尝试创建DataFrameRDD使用下面的代码

account.toDF("ACCT_ID", "M_CD", "C_CD","IND")
Run Code Online (Sandbox Code Playgroud)

我收到了以下错误

java.lang.UnsupportedOperationException:不支持类型为Any的架构

我分析说,每当我把null值放进去的时候,Seq只有我得到了错误.

有没有办法添加空值?

scala apache-spark spark-dataframe apache-spark-dataset

12
推荐指数
2
解决办法
6986
查看次数

java.lang.UnsupportedOperationException:写入时spark出错

当我尝试将数据集写入镶木地板文件时,出现以下错误

18/11/05 06:25:43 ERROR FileFormatWriter: Aborting job null.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 84 in stage 1.0 failed 4 times, most recent failure: Lost task 84.3 in stage 1.0 (TID 989, ip-10-253-194-207.nonprd.aws.csp.net, executor 4): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary
        at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:48)
        at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt(OnHeapColumnVector.java:233)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

但是当我给出时dataset.show()我可以查看数据。不确定在哪里检查根本原因。

apache-spark apache-spark-dataset

12
推荐指数
2
解决办法
3万
查看次数

如何从SparkSQL DataFrame中的MapType列获取键和值

我有一个镶木地板文件中的数据有2个字段:object_id: Stringalpha: Map<>.

它被读入sparkSQL中的数据框,模式如下所示:

scala> alphaDF.printSchema()
root
 |-- object_id: string (nullable = true)
 |-- ALPHA: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
Run Code Online (Sandbox Code Playgroud)

我正在使用Spark 2.0,我正在尝试创建一个新的数据框,其中列需要是地图的object_id加号键,ALPHAobject_id, key1, key2, key2, ...

我是第一次尝试看看我是否至少可以像这样访问地图:

scala> alphaDF.map(a => a(0)).collect()
<console>:32: error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are 
supported by importing spark.implicits._  Support for serializing …
Run Code Online (Sandbox Code Playgroud)

scala dataframe apache-spark apache-spark-sql apache-spark-dataset

11
推荐指数
1
解决办法
1万
查看次数

Spark - RelationalGroupedDataset与KeyvalueGroupedDataset?我什么时候应该使用它们?

Dataset在Spark中进行分组时,有两种方法:groupBygroupByKey[K].

groupBy返回RelationalGroupedDataset,同时groupByKey[K]返回KeyvalueGroupedDataset.

它们之间有什么区别?

在什么情况下我应该选择一个而不是另一个?


为什么我的问题是关于"Dataset vs DataFrame"的问题的副本?我不明白.这显然是完全不同的事情!我的问题非常具体而非通用.

aggregation apache-spark apache-spark-dataset

11
推荐指数
0
解决办法
478
查看次数

使用typedcolumn选择Spark数据集

查看select()spark DataSet 上的函数,有各种生成的函数签名:

(c1: TypedColumn[MyClass, U1],c2: TypedColumn[MyClass, U2] ....)
Run Code Online (Sandbox Code Playgroud)

这似乎暗示我应该能够直接引用MyClass的成员并且类型安全,但我不确定如何...

ds.select("member")当然有效..似乎也ds.select(_.member)可能以某种方式工作?

scala apache-spark apache-spark-dataset

10
推荐指数
2
解决办法
8171
查看次数

将ADT /密封特征层次结构编码到Spark DataSet列中

如果我想在Spark DataSet列中存储代数数据类型(ADT)(即Scala密封特征层次结构),那么最佳编码策略是什么?

例如,如果我有一个ADT,其中叶子类型存储不同类型的数据:

sealed trait Occupation
case object SoftwareEngineer extends Occupation
case class Wizard(level: Int) extends Occupation
case class Other(description: String) extends Occupation
Run Code Online (Sandbox Code Playgroud)

什么是构建一个最好的方法:

org.apache.spark.sql.DataSet[Occupation]
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-dataset apache-spark-encoders

10
推荐指数
1
解决办法
1520
查看次数

如何降低数据框列名的大小写而不是其值?

如何降低数据框列名的大小写而不是其值?使用 RAW Spark SQL 和 Dataframe 方法?

输入数据框(假设我有 100 个大写的这些列)

NAME | COUNTRY | SRC        | CITY       | DEBIT
---------------------------------------------
"foo"| "NZ"    | salary     | "Auckland" | 15.0
"bar"| "Aus"   | investment | "Melbourne"| 12.5
Run Code Online (Sandbox Code Playgroud)

目标数据框

name | country | src        | city       | debit
------------------------------------------------
"foo"| "NZ"    | salary     | "Auckland" | 15.0
"bar"| "Aus"   | investment | "Melbourne"| 12.5
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql apache-spark-dataset

10
推荐指数
3
解决办法
2万
查看次数

为什么谓词下推没有在类型化数据集API中使用(与非类型化数据框架API相比)?

我一直认为数据集/数据帧API是相同的......唯一的区别是数据集API将为您提供编译时安全性.对 ?

那么......我的案子非常简单:

 case class Player (playerID: String, birthYear: Int)

 val playersDs: Dataset[Player] = session.read
  .option("header", "true")
  .option("delimiter", ",")
  .option("inferSchema", "true")
  .csv(PeopleCsv)
  .as[Player]

 // Let's try to find players born in 1999. 
 // This will work, you have compile time safety... but it will not use predicate pushdown!!!
 playersDs.filter(_.birthYear == 1999).explain()

 // This will work as expected and use predicate pushdown!!!
 // But you can't have compile time safety with this :(
 playersDs.filter('birthYear === 1999).explain()
Run Code Online (Sandbox Code Playgroud)

从第一个示例解释将显示它不执行谓词下推(注意空PushedFilters):

== Physical Plan == …
Run Code Online (Sandbox Code Playgroud)

dataframe apache-spark apache-spark-sql apache-spark-dataset

9
推荐指数
1
解决办法
1643
查看次数