标签: apache-spark-dataset

Spark 2.0 DataSets groupByKey并划分操作和类型安全

我非常满意Spark 2.0 DataSet,因为它的编译时类型安全.但是这里有几个我无法解决的问题,我也没有找到好的文档.

问题#1 - 在聚合列上划分操作 - 考虑下面的代码 - 我有一个DataSet [MyCaseClass],我想在c1,c2,c3和sum(c4)/ 8上groupByKey.如果我只是计算sum但它给出了除(8)的编译时错误.我想知道如何实现以下目标.

final case class MyClass (c1: String,
                          c2: String,
                          c3: String,
                          c4: Double)

    val myCaseClass: DataSet[MyCaseClass] = ??? // assume it's being loaded

    import sparkSession.implicits._
    import org.apache.spark.sql.expressions.scalalang.typed.{sum => typedSum}

     myCaseClass.
       groupByKey(myCaseClass =>
          (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
          agg(typedSum[MyCaseClass](_.c4).name("sum(c4)").
          divide(8)). //this is breaking with exception
       show()
Run Code Online (Sandbox Code Playgroud)

如果我删除.divide(8)操作并运行上面的命令它会给我低于输出.

+-----------+-------------+
|        key|sum(c4)      |
+-----------+-------------+
| [A1,F2,S1]|         80.0|
| [A1,F1,S1]|         40.0|  
+-----------+-------------+
Run Code Online (Sandbox Code Playgroud)

问题#2 - 将groupedByKey结果转换为另一个Typed DataFrame - 现在问题的第二部分是我想再次输出一个类型化的DataSet.为此,我有另一个案例类(不确定是否需要),但我不确定如何映射分组结果 -

final case class AnotherClass(c1: String,
                          c2: …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql apache-spark-dataset

6
推荐指数
1
解决办法
5404
查看次数

Spark 2.0隐式编码器,当类型为Option [Seq [String]](scala)时处理缺少的列

当我们的数据源中缺少某些类型为Option [Seq [String]]的列时,我在编码数据时遇到了一些问题.理想情况下,我希望填充缺少的列数据None.

场景:

我们正在阅读的一些镶木地板文件中有column1而不是column2.

我们将这些镶木地板文件中的数据加载到a中Dataset,并将其转换为MyType.

case class MyType(column1: Option[String], column2: Option[Seq[String]])

sqlContext.read.parquet("dataSource.parquet").as[MyType]
Run Code Online (Sandbox Code Playgroud)

org.apache.spark.sql.AnalysisException:无法解析' column2'给定的输入列:[column1];

有没有办法用column2数据创建数据集None

scala apache-spark apache-spark-dataset

6
推荐指数
1
解决办法
1826
查看次数

如何在Spark Java中将具有值的列添加到新的数据集?

所以,我正在从Java Spark API创建一些数据集.这些数据集使用spark.sql()方法从hive表填充.

因此,在执行一些sql操作(如连接)之后,我有一个最终的数据集.我想要做的是,我想为最终数据集添加一个新列,数据集中的所有行的值为"1".因此,您可能会将其视为向数据集添加约束.

所以,例如我有这个数据集:

Dataset<Row> final = otherDataset.select(otherDataset.col("colA"), otherDataSet.col("colB"));
Run Code Online (Sandbox Code Playgroud)

我想在"最终"数据集中添加一个新列,就像这样

final.addNewColumn("colName", 1); //I know this doesn't work, but just to give you an idea.
Run Code Online (Sandbox Code Playgroud)

是否有可行的方法将新列添加到数据集的所有行,值为1?

java dataset bigdata apache-spark apache-spark-dataset

6
推荐指数
1
解决办法
9108
查看次数

为什么Spark爆炸功能比平面地图功能拆分数组慢得多?

我是Spark和Spark SQL的新手.我有一个2列的数据集,"col1"和"col2",而"col2"最初是一个长的Seq.我想将"col2"分成多行,这样每行只有一行.

我尝试使用爆炸功能与使用flatMap和我自己的映射器功能.他们似乎有显着的性能差异.其他一切都保持不变,"爆炸"功能似乎要慢得多flatMap(数量级取决于数据大小).为什么?

选项1:使用"爆炸"

val exploded = data.withColumn("col2", explode(col("col2")))
Run Code Online (Sandbox Code Playgroud)

选项2:使用手动flatMap

case class MyPair(col1: Long, col2: Long)

def longAndLongArrayMapper(colToKeep: Long, colToExplode: Seq[Long]) = {
    (for (val <- colToExplode) yield MyPair(val, colToKeep))
 }

val exploded = data.flatMap{ (x: Row) =>
      longAndLongArrayMapper(x.getAs[Long]("col1"), (x.getAs[Seq[Long]]("col2"))) }
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql apache-spark-dataset

6
推荐指数
0
解决办法
1393
查看次数

用于有效连接Spark数据帧/数据集的分区数据

我需要join基于一些共享键列来组合许多DataFrame.对于键值RDD,可以指定分区器,以便将具有相同键的数据点混洗到同一个执行器,因此加入更有效(如果在之前有一个shuffle相关操作join).可以在Spark DataFrames或DataSet上完成同样的事情吗?

partitioning apache-spark apache-spark-sql spark-dataframe apache-spark-dataset

6
推荐指数
2
解决办法
1万
查看次数

如何使用火花滞后和超前分组和排序

我使用:

dataset.withColumn("lead",lead(dataset.col(start_date),1).over(orderBy(start_date)));
Run Code Online (Sandbox Code Playgroud)

`我只想按trackId添加组,因此可以像任何agg函数一样领导每个组的工作:

+----------+---------------------------------------------+
|  trackId |  start_time    |  end_time   |      lead    |
+-----+--------------------------------------------------+
|  1       | 12:00:00       |   12:04:00  |     12:05:00 |
+----------+---------------------------------------------+
|  1       | 12:05:00       |   12:08:00  |    12:20:00  |  
+----------+---------------------------------------------+
|  1       | 12:20:00       |   12:22:00  |     null     | 
+----------+---------------------------------------------+
|  2       | 13:00:00       |   13:04:00  |    13:05:00 |
+----------+---------------------------------------------+
|  2       | 13:05:00       |   13:08:00  |    13:20:00  |  
+----------+---------------------------------------------+
|  2       | 13:20:00       |   13:22:00  |     null     | 
+----------+---------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

有什么帮助吗?

apache-spark apache-spark-sql apache-spark-dataset

6
推荐指数
1
解决办法
2170
查看次数

如何使用java对象将两个spark数据集连接到一个?

我在spark中加入两个数据集有点问题,我有这个:

SparkConf conf = new SparkConf()
    .setAppName("MyFunnyApp")
    .setMaster("local[*]");

SparkSession spark = SparkSession
    .builder()
    .config(conf)
    .config("spark.debug.maxToStringFields", 150)
    .getOrCreate();
//...
//Do stuff
//...
Encoder<MyOwnObject1> encoderObject1 = Encoders.bean(MyOwnObject1.class);
Encoder<MyOwnObject2> encoderObject2 = Encoders.bean(MyOwnObject2.class);

Dataset<MyOwnObject1> object1DS = spark.read()
    .option("header","true")
    .option("delimiter",";")
    .option("inferSchema","true")
    .csv(pathToFile1)
    .as(encoderObject1);

Dataset<MyOwnObject2> object2DS = spark.read()
    .option("header","true")
    .option("delimiter",";")
    .option("inferSchema","true")
    .csv(pathToFile2)
    .as(encoderObject2);
Run Code Online (Sandbox Code Playgroud)

我可以打印架构并正确显示它.

//Here start the problem
Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS = 
    object1DS.join(object2DS, object1DS.col("column01")
    .equalTo(object2DS.col("column01")))
    .as(Encoders.tuple(MyOwnObject1,MyOwnObject2));
Run Code Online (Sandbox Code Playgroud)

最后一行无法连接并得到我这个错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to map struct<"LIST WITH ALL VARS FROM TWO OBJECT"> to Tuple2, but failed …
Run Code Online (Sandbox Code Playgroud)

java apache-spark apache-spark-dataset apache-spark-encoders

5
推荐指数
1
解决办法
2018
查看次数

如何使用 Java 和 Spark SQL 打印数据集中的行内容?

我想做一个简单的 Spark SQL 代码,读取一个名为 的文件u.data,其中包含电影评级,创建一个Datasetof Rows,然后打印数据集的第一行。

作为前提,我将文件读取到 a JavaRDD,并根据 a 映射 RDD ratingsObject(该对象有两个参数movieIDrating)。所以我只想打印这个数据集中的第一行。

我使用 Java 语言和 Spark SQL。

public static void main(String[] args){
    App obj = new App();
    SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example").getOrCreate();

    Map<Integer,String> movieNames = obj.loadMovieNames();
    JavaRDD<String> lines = spark.read().textFile("hdfs:///ml-100k/u.data").javaRDD();

    JavaRDD<MovieRatings> movies = lines.map(line -> {
        String[] parts = line.split(" ");
        MovieRatings ratingsObject = new MovieRatings();
        ratingsObject.setMovieID(Integer.parseInt(parts[1].trim()));
        ratingsObject.setRating(Integer.parseInt(parts[2].trim()));
        return ratingsObject;
    });

    Dataset<Row> movieDataset = spark.createDataFrame(movies, …
Run Code Online (Sandbox Code Playgroud)

java apache-spark-sql apache-spark-dataset

5
推荐指数
1
解决办法
1万
查看次数

spark 使用带有选项字段的案例类将数据帧转换为数据集

我有以下案例类:

case class Person(name: String, lastname: Option[String] = None, age: BigInt) {}
Run Code Online (Sandbox Code Playgroud)

以及以下 json:

{ "name": "bemjamin", "age" : 1 }
Run Code Online (Sandbox Code Playgroud)

当我尝试将数据框转换为数据集时:

spark.read.json("example.json")
  .as[Person].show()
Run Code Online (Sandbox Code Playgroud)

它向我显示以下错误:

线程“main”org.apache.spark.sql.AnalysisException 中的异常:无法解析lastname给定的输入列“ ”:[年龄,名称];

我的问题是:如果我的架构是我的案例类并且它定义姓氏是可选的,那么 as() 不应该进行转换吗?

我可以使用 .map 轻松解决此问题,但我想知道是否有另一种更清洁的替代方法。

scala apache-spark apache-spark-sql apache-spark-dataset

5
推荐指数
1
解决办法
4728
查看次数

Spark SQL 使用 foldLeft 和 withColumn 替代 groupby/pivot/agg/collect_list 以提高性能

我有一个由三列组成的 Spark DataFrame:

 id | col1 | col2 
-----------------
 x  |  p1  |  a1  
-----------------
 x  |  p2  |  b1
-----------------
 y  |  p2  |  b2
-----------------
 y  |  p2  |  b3
-----------------
 y  |  p3  |  c1
Run Code Online (Sandbox Code Playgroud)

申请后,df.groupBy("id").pivot("col1").agg(collect_list("col2"))我得到以下数据帧(aggDF):

+---+----+--------+----+
| id|  p1|      p2|  p3|
+---+----+--------+----+
|  x|[a1]|    [b1]|  []|
|  y|  []|[b2, b3]|[c1]|
+---+----+--------+----+
Run Code Online (Sandbox Code Playgroud)

然后我找到除了列之外的id列的名称。

val cols = aggDF.columns.filter(x => x != "id")
Run Code Online (Sandbox Code Playgroud)

之后我cols.foldLeft(aggDF)((df, x) => df.withColumn(x, when(size(col(x)) > 0, col(x)).otherwise(lit(null))))null. …

apache-spark apache-spark-sql apache-spark-dataset

5
推荐指数
1
解决办法
1558
查看次数