我正在Spark structured streaming用来处理从中读取的记录Kafka.这是我想要实现的目标:
(a)每条记录都是一种Tuple2类型(Timestamp, DeviceId).
(b)我创建了一个静态Dataset[DeviceId],其中包含DeviceId预期在Kafka流中看到的所有有效设备ID(类型)的集合.
(c)我需要写一个Spark structured streaming查询
(i) Groups records by their timestamp into 5-minute windows
(ii) For each window, get the list of valid device IDs that were **not** seen in that window
Run Code Online (Sandbox Code Playgroud)
例如,假设所有有效设备ID的列表都是,[A,B,C,D,E]并且某个5分钟窗口中的kafka记录包含设备ID [A,B,E].然后,对于该窗口,我正在寻找的看不见的设备ID列表是[C,D].
题
except()和join()方法Dataset.但是,他们都抛出了一个运行时异常,抱怨说这些操作都不受支持streaming Dataset.这是我的代码片段:
val validDeviceIds: Dataset[(DeviceId, Long)] = spark.createDataset[DeviceId](listOfAllDeviceIds.map(id => (id, 0L))) …Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-sql apache-spark-dataset spark-structured-streaming
我正在使用Apache Spark 2.0并创建case class提供架构DetaSet.当我试图根据如何在数据集中存储自定义对象来定义自定义编码器?因为java.time.LocalDate我得到以下例外:
java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate
- field (class: "java.time.LocalDate", name: "callDate")
- root class: "FireService"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:598)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:592)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:583)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
............
Run Code Online (Sandbox Code Playgroud)
以下是代码:
case class FireService(callNumber: String, callDate: java.time.LocalDate)
implicit val localDateEncoder: org.apache.spark.sql.Encoder[java.time.LocalDate] = org.apache.spark.sql.Encoders.kryo[java.time.LocalDate]
val fireServiceDf = df.map(row => {
val dateFormatter = java.time.format.DateTimeFormatter.ofPattern("MM/dd /yyyy")
FireService(row.getAs[String](0), java.time.LocalDate.parse(row.getAs[String](4), dateFormatter))
})
Run Code Online (Sandbox Code Playgroud)
我们如何定义第三方api的spark编码器?
更新
当我为整个案例类创建编码器时,df.map..将对象映射为二进制,如下所示:
implicit val …Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders
我正在尝试创建一个DataFrame使用RDD.
首先,我创建一个RDD使用下面的代码 -
val account = sc.parallelize(Seq(
(1, null, 2,"F"),
(2, 2, 4, "F"),
(3, 3, 6, "N"),
(4,null,8,"F")))
Run Code Online (Sandbox Code Playgroud)
它工作正常 -
account:org.apache.spark.rdd.RDD [(Int,Any,Int,String)] = ParallelCollectionRDD [0]并行化:27
但是,当尝试创建DataFrame从RDD使用下面的代码
account.toDF("ACCT_ID", "M_CD", "C_CD","IND")
Run Code Online (Sandbox Code Playgroud)
我收到了以下错误
java.lang.UnsupportedOperationException:不支持类型为Any的架构
我分析说,每当我把null值放进去的时候,Seq只有我得到了错误.
有没有办法添加空值?
当我尝试将数据集写入镶木地板文件时,出现以下错误
18/11/05 06:25:43 ERROR FileFormatWriter: Aborting job null.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 84 in stage 1.0 failed 4 times, most recent failure: Lost task 84.3 in stage 1.0 (TID 989, ip-10-253-194-207.nonprd.aws.csp.net, executor 4): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary
at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:48)
at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt(OnHeapColumnVector.java:233)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
但是当我给出时dataset.show()我可以查看数据。不确定在哪里检查根本原因。
我有一个镶木地板文件中的数据有2个字段:object_id: String和alpha: Map<>.
它被读入sparkSQL中的数据框,模式如下所示:
scala> alphaDF.printSchema()
root
|-- object_id: string (nullable = true)
|-- ALPHA: map (nullable = true)
| |-- key: string
| |-- value: struct (valueContainsNull = true)
Run Code Online (Sandbox Code Playgroud)
我正在使用Spark 2.0,我正在尝试创建一个新的数据框,其中列需要是地图的object_id加号键,ALPHA如object_id, key1, key2, key2, ...
我是第一次尝试看看我是否至少可以像这样访问地图:
scala> alphaDF.map(a => a(0)).collect()
<console>:32: error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are
supported by importing spark.implicits._ Support for serializing …Run Code Online (Sandbox Code Playgroud) scala dataframe apache-spark apache-spark-sql apache-spark-dataset
Dataset在Spark中进行分组时,有两种方法:groupBy和groupByKey[K].
groupBy返回RelationalGroupedDataset,同时groupByKey[K]返回KeyvalueGroupedDataset.
它们之间有什么区别?
在什么情况下我应该选择一个而不是另一个?
为什么我的问题是关于"Dataset vs DataFrame"的问题的副本?我不明白.这显然是完全不同的事情!我的问题非常具体而非通用.
查看select()spark DataSet 上的函数,有各种生成的函数签名:
(c1: TypedColumn[MyClass, U1],c2: TypedColumn[MyClass, U2] ....)
Run Code Online (Sandbox Code Playgroud)
这似乎暗示我应该能够直接引用MyClass的成员并且类型安全,但我不确定如何...
ds.select("member")当然有效..似乎也ds.select(_.member)可能以某种方式工作?
如果我想在Spark DataSet列中存储代数数据类型(ADT)(即Scala密封特征层次结构),那么最佳编码策略是什么?
例如,如果我有一个ADT,其中叶子类型存储不同类型的数据:
sealed trait Occupation
case object SoftwareEngineer extends Occupation
case class Wizard(level: Int) extends Occupation
case class Other(description: String) extends Occupation
Run Code Online (Sandbox Code Playgroud)
什么是构建一个最好的方法:
org.apache.spark.sql.DataSet[Occupation]
Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-dataset apache-spark-encoders
如何降低数据框列名的大小写而不是其值?使用 RAW Spark SQL 和 Dataframe 方法?
输入数据框(假设我有 100 个大写的这些列)
NAME | COUNTRY | SRC | CITY | DEBIT
---------------------------------------------
"foo"| "NZ" | salary | "Auckland" | 15.0
"bar"| "Aus" | investment | "Melbourne"| 12.5
Run Code Online (Sandbox Code Playgroud)
目标数据框
name | country | src | city | debit
------------------------------------------------
"foo"| "NZ" | salary | "Auckland" | 15.0
"bar"| "Aus" | investment | "Melbourne"| 12.5
Run Code Online (Sandbox Code Playgroud) 我一直认为数据集/数据帧API是相同的......唯一的区别是数据集API将为您提供编译时安全性.对 ?
那么......我的案子非常简单:
case class Player (playerID: String, birthYear: Int)
val playersDs: Dataset[Player] = session.read
.option("header", "true")
.option("delimiter", ",")
.option("inferSchema", "true")
.csv(PeopleCsv)
.as[Player]
// Let's try to find players born in 1999.
// This will work, you have compile time safety... but it will not use predicate pushdown!!!
playersDs.filter(_.birthYear == 1999).explain()
// This will work as expected and use predicate pushdown!!!
// But you can't have compile time safety with this :(
playersDs.filter('birthYear === 1999).explain()
Run Code Online (Sandbox Code Playgroud)
从第一个示例解释将显示它不执行谓词下推(注意空PushedFilters):
== Physical Plan == …Run Code Online (Sandbox Code Playgroud) dataframe apache-spark apache-spark-sql apache-spark-dataset