我非常满意Spark 2.0 DataSet,因为它的编译时类型安全.但是这里有几个我无法解决的问题,我也没有找到好的文档.
问题#1 - 在聚合列上划分操作 - 考虑下面的代码 - 我有一个DataSet [MyCaseClass],我想在c1,c2,c3和sum(c4)/ 8上groupByKey.如果我只是计算sum但它给出了除(8)的编译时错误.我想知道如何实现以下目标.
final case class MyClass (c1: String,
c2: String,
c3: String,
c4: Double)
val myCaseClass: DataSet[MyCaseClass] = ??? // assume it's being loaded
import sparkSession.implicits._
import org.apache.spark.sql.expressions.scalalang.typed.{sum => typedSum}
myCaseClass.
groupByKey(myCaseClass =>
(myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
agg(typedSum[MyCaseClass](_.c4).name("sum(c4)").
divide(8)). //this is breaking with exception
show()
Run Code Online (Sandbox Code Playgroud)
如果我删除.divide(8)操作并运行上面的命令它会给我低于输出.
+-----------+-------------+
| key|sum(c4) |
+-----------+-------------+
| [A1,F2,S1]| 80.0|
| [A1,F1,S1]| 40.0|
+-----------+-------------+
Run Code Online (Sandbox Code Playgroud)
问题#2 - 将groupedByKey结果转换为另一个Typed DataFrame - 现在问题的第二部分是我想再次输出一个类型化的DataSet.为此,我有另一个案例类(不确定是否需要),但我不确定如何映射分组结果 -
final case class AnotherClass(c1: String,
c2: …Run Code Online (Sandbox Code Playgroud) 当我们的数据源中缺少某些类型为Option [Seq [String]]的列时,我在编码数据时遇到了一些问题.理想情况下,我希望填充缺少的列数据None.
场景:
我们正在阅读的一些镶木地板文件中有column1而不是column2.
我们将这些镶木地板文件中的数据加载到a中Dataset,并将其转换为MyType.
case class MyType(column1: Option[String], column2: Option[Seq[String]])
sqlContext.read.parquet("dataSource.parquet").as[MyType]
Run Code Online (Sandbox Code Playgroud)
org.apache.spark.sql.AnalysisException:无法解析'
column2'给定的输入列:[column1];
有没有办法用column2数据创建数据集None?
所以,我正在从Java Spark API创建一些数据集.这些数据集使用spark.sql()方法从hive表填充.
因此,在执行一些sql操作(如连接)之后,我有一个最终的数据集.我想要做的是,我想为最终数据集添加一个新列,数据集中的所有行的值为"1".因此,您可能会将其视为向数据集添加约束.
所以,例如我有这个数据集:
Dataset<Row> final = otherDataset.select(otherDataset.col("colA"), otherDataSet.col("colB"));
Run Code Online (Sandbox Code Playgroud)
我想在"最终"数据集中添加一个新列,就像这样
final.addNewColumn("colName", 1); //I know this doesn't work, but just to give you an idea.
Run Code Online (Sandbox Code Playgroud)
是否有可行的方法将新列添加到数据集的所有行,值为1?
我是Spark和Spark SQL的新手.我有一个2列的数据集,"col1"和"col2",而"col2"最初是一个长的Seq.我想将"col2"分成多行,这样每行只有一行.
我尝试使用爆炸功能与使用flatMap和我自己的映射器功能.他们似乎有显着的性能差异.其他一切都保持不变,"爆炸"功能似乎要慢得多flatMap(数量级取决于数据大小).为什么?
选项1:使用"爆炸"
val exploded = data.withColumn("col2", explode(col("col2")))
Run Code Online (Sandbox Code Playgroud)
选项2:使用手动flatMap
case class MyPair(col1: Long, col2: Long)
def longAndLongArrayMapper(colToKeep: Long, colToExplode: Seq[Long]) = {
(for (val <- colToExplode) yield MyPair(val, colToKeep))
}
val exploded = data.flatMap{ (x: Row) =>
longAndLongArrayMapper(x.getAs[Long]("col1"), (x.getAs[Seq[Long]]("col2"))) }
Run Code Online (Sandbox Code Playgroud) 我需要join基于一些共享键列来组合许多DataFrame.对于键值RDD,可以指定分区器,以便将具有相同键的数据点混洗到同一个执行器,因此加入更有效(如果在之前有一个shuffle相关操作join).可以在Spark DataFrames或DataSet上完成同样的事情吗?
partitioning apache-spark apache-spark-sql spark-dataframe apache-spark-dataset
我使用:
dataset.withColumn("lead",lead(dataset.col(start_date),1).over(orderBy(start_date)));
Run Code Online (Sandbox Code Playgroud)
`我只想按trackId添加组,因此可以像任何agg函数一样领导每个组的工作:
+----------+---------------------------------------------+
| trackId | start_time | end_time | lead |
+-----+--------------------------------------------------+
| 1 | 12:00:00 | 12:04:00 | 12:05:00 |
+----------+---------------------------------------------+
| 1 | 12:05:00 | 12:08:00 | 12:20:00 |
+----------+---------------------------------------------+
| 1 | 12:20:00 | 12:22:00 | null |
+----------+---------------------------------------------+
| 2 | 13:00:00 | 13:04:00 | 13:05:00 |
+----------+---------------------------------------------+
| 2 | 13:05:00 | 13:08:00 | 13:20:00 |
+----------+---------------------------------------------+
| 2 | 13:20:00 | 13:22:00 | null |
+----------+---------------------------------------------+
Run Code Online (Sandbox Code Playgroud)
有什么帮助吗?
我在spark中加入两个数据集有点问题,我有这个:
SparkConf conf = new SparkConf()
.setAppName("MyFunnyApp")
.setMaster("local[*]");
SparkSession spark = SparkSession
.builder()
.config(conf)
.config("spark.debug.maxToStringFields", 150)
.getOrCreate();
//...
//Do stuff
//...
Encoder<MyOwnObject1> encoderObject1 = Encoders.bean(MyOwnObject1.class);
Encoder<MyOwnObject2> encoderObject2 = Encoders.bean(MyOwnObject2.class);
Dataset<MyOwnObject1> object1DS = spark.read()
.option("header","true")
.option("delimiter",";")
.option("inferSchema","true")
.csv(pathToFile1)
.as(encoderObject1);
Dataset<MyOwnObject2> object2DS = spark.read()
.option("header","true")
.option("delimiter",";")
.option("inferSchema","true")
.csv(pathToFile2)
.as(encoderObject2);
Run Code Online (Sandbox Code Playgroud)
我可以打印架构并正确显示它.
//Here start the problem
Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS =
object1DS.join(object2DS, object1DS.col("column01")
.equalTo(object2DS.col("column01")))
.as(Encoders.tuple(MyOwnObject1,MyOwnObject2));
Run Code Online (Sandbox Code Playgroud)
最后一行无法连接并得到我这个错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to map struct<"LIST WITH ALL VARS FROM TWO OBJECT"> to Tuple2, but failed …Run Code Online (Sandbox Code Playgroud) java apache-spark apache-spark-dataset apache-spark-encoders
我想做一个简单的 Spark SQL 代码,读取一个名为 的文件u.data,其中包含电影评级,创建一个Datasetof Rows,然后打印数据集的第一行。
作为前提,我将文件读取到 a JavaRDD,并根据 a 映射 RDD ratingsObject(该对象有两个参数movieID和rating)。所以我只想打印这个数据集中的第一行。
我使用 Java 语言和 Spark SQL。
public static void main(String[] args){
App obj = new App();
SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example").getOrCreate();
Map<Integer,String> movieNames = obj.loadMovieNames();
JavaRDD<String> lines = spark.read().textFile("hdfs:///ml-100k/u.data").javaRDD();
JavaRDD<MovieRatings> movies = lines.map(line -> {
String[] parts = line.split(" ");
MovieRatings ratingsObject = new MovieRatings();
ratingsObject.setMovieID(Integer.parseInt(parts[1].trim()));
ratingsObject.setRating(Integer.parseInt(parts[2].trim()));
return ratingsObject;
});
Dataset<Row> movieDataset = spark.createDataFrame(movies, …Run Code Online (Sandbox Code Playgroud) 我有以下案例类:
case class Person(name: String, lastname: Option[String] = None, age: BigInt) {}
Run Code Online (Sandbox Code Playgroud)
以及以下 json:
{ "name": "bemjamin", "age" : 1 }
Run Code Online (Sandbox Code Playgroud)
当我尝试将数据框转换为数据集时:
spark.read.json("example.json")
.as[Person].show()
Run Code Online (Sandbox Code Playgroud)
它向我显示以下错误:
线程“main”org.apache.spark.sql.AnalysisException 中的异常:无法解析
lastname给定的输入列“ ”:[年龄,名称];
我的问题是:如果我的架构是我的案例类并且它定义姓氏是可选的,那么 as() 不应该进行转换吗?
我可以使用 .map 轻松解决此问题,但我想知道是否有另一种更清洁的替代方法。
我有一个由三列组成的 Spark DataFrame:
id | col1 | col2
-----------------
x | p1 | a1
-----------------
x | p2 | b1
-----------------
y | p2 | b2
-----------------
y | p2 | b3
-----------------
y | p3 | c1
Run Code Online (Sandbox Code Playgroud)
申请后,df.groupBy("id").pivot("col1").agg(collect_list("col2"))我得到以下数据帧(aggDF):
+---+----+--------+----+
| id| p1| p2| p3|
+---+----+--------+----+
| x|[a1]| [b1]| []|
| y| []|[b2, b3]|[c1]|
+---+----+--------+----+
Run Code Online (Sandbox Code Playgroud)
然后我找到除了列之外的id列的名称。
val cols = aggDF.columns.filter(x => x != "id")
Run Code Online (Sandbox Code Playgroud)
之后我cols.foldLeft(aggDF)((df, x) => df.withColumn(x, when(size(col(x)) > 0, col(x)).otherwise(lit(null))))用null. …