标签: apache-spark-dataset

Spark中的DataFrame,Dataset和RDD之间的区别

我只是想知道Apache Spark中的RDDDataFrame (Spark 2.0.0 DataFrame只是一个类型别名Dataset[Row])有什么区别?

你能把一个转换成另一个吗?

apache-spark rdd apache-spark-sql apache-spark-dataset

228
推荐指数
10
解决办法
10万
查看次数

如何在Dataset中存储自定义对象?

根据Spark数据集介绍:

正如我们期待着星火2.0,我们计划将数据集的一些令人兴奋的改进,具体包括:...自定义编码器 - 而我们对各种各样的类型目前自动生成编码器,我们想开了一个API,用于自定义对象.

并尝试将自定义类型存储为Dataset导致以下错误:

无法找到存储在数据集中的类型的编码器.导入sqlContext.implicits._支持原始类型(Int,String等)和产品类型(case类).将来版本中将添加对序列化其他类型的支持

要么:

Java.lang.UnsupportedOperationException:找不到针对....的编码器

有没有现成的解决方法?


请注意,此问题仅作为社区Wiki答案的入口点存在.随意更新/改进问题和答案.

scala apache-spark apache-spark-dataset apache-spark-encoders

133
推荐指数
4
解决办法
6万
查看次数

为什么在创建自定义案例类的数据集时"无法找到存储在数据集中的类型的编码器"?

使用Scala 2.11.8的Spark 2.0(最终版).以下超级简单代码会产生编译错误Error:(17, 45) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.

import org.apache.spark.sql.SparkSession

case class SimpleTuple(id: Int, desc: String)

object DatasetTest {
  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder.
      master("local")
      .appName("example")
      .getOrCreate()

    val dataset = sparkSession.createDataset(dataList)
  }
}
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-dataset apache-spark-encoders

53
推荐指数
2
解决办法
5万
查看次数

DataSet API和DataFrame API之间的区别

有没有人可以通过示例帮助我理解DataSet API和DataFrame API之间的区别?为什么需要在Spark中引入DataSet API?

apache-spark rdd apache-spark-sql apache-spark-dataset

49
推荐指数
0
解决办法
3万
查看次数

SparkContext,JavaSparkContext,SQLContext和SparkSession之间的区别?

  1. SparkContext, JavaSparkContext, SQLContext和之间有什么区别SparkSession
  2. 是否有任何方法可以使用SparkSession?转换或创建Context ?
  3. 我可以使用一个条目完全替换所有上下文SparkSession吗?
  4. 在所有的功能SQLContext,SparkContextJavaSparkContextSparkSession
  5. 有些功能parallelizeSparkContext和中有不同的行为JavaSparkContext.他们是如何表现的SparkSession
  6. 如何使用SparkSession?创建以下内容?

    • RDD
    • JavaRDD
    • JavaPairRDD
    • Dataset

有没有一种方法可以将a JavaPairRDD转换为a DatasetDataseta JavaPairRDD

java scala apache-spark rdd apache-spark-dataset

34
推荐指数
3
解决办法
1万
查看次数

尝试将数据帧行映射到更新行时出现编码器错误

当我试图在我的代码中做同样的事情,如下所述

dataframe.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})
Run Code Online (Sandbox Code Playgroud)

我从这里采取了上述参考: Scala:如何使用scala替换Dataframs中的值 但是我收到编码器错误

无法找到存储在数据集中的类型的编码器.导入spark.im plicits支持原始类型(Int,S tring等)和产品类型(case类)._将在以后的版本中添加对序列化其他类型的支持.

注意:我正在使用spark 2.0!

scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders

33
推荐指数
2
解决办法
3万
查看次数

DataFrame/Dataset groupBy行为/优化

假设我们有DataFrame,df包含以下列:

名称,姓氏,大小,宽度,长度,重量

现在我们想要执行几个操作,例如我们想要创建一些包含Size和Width数据的DataFrame.

val df1 = df.groupBy("surname").agg( sum("size") )
val df2 = df.groupBy("surname").agg( sum("width") )
Run Code Online (Sandbox Code Playgroud)

正如您所注意到的,其他列(如Length)不会在任何地方使用.Spark是否足够聪明,可以在洗牌阶段之前丢弃多余的列,还是随身携带?威尔跑:

val dfBasic = df.select("surname", "size", "width")
Run Code Online (Sandbox Code Playgroud)

在分组之前以某种方式影响性能?

performance dataframe apache-spark apache-spark-sql apache-spark-dataset

28
推荐指数
1
解决办法
1万
查看次数

使用Spark数据集在Scala中执行类型化连接

我喜欢Spark数据集,因为它们在编译时给我分析错误和语法错误,并且允许我使用getter而不是硬编码的名称/数字.大多数计算都可以使用Dataset的高级API完成.例如,通过访问数据集类型对象而不是使用RDD行的数据字段来执行agg,select,sum,avg,map,filter或groupBy操作要简单得多.

但是,由于缺少连接操作,我读到我可以像这样进行连接

ds1.joinWith(ds2, ds1.toDF().col("key") === ds2.toDF().col("key"), "inner")
Run Code Online (Sandbox Code Playgroud)

但这不是我想要的,因为我更喜欢通过case类接口来做,所以更像这样的东西

ds1.joinWith(ds2, ds1.key === ds2.key, "inner")
Run Code Online (Sandbox Code Playgroud)

现在最好的选择似乎是在case类旁边创建一个对象,并给这个函数提供正确的列名作为String.所以我会使用第一行代码但是放置一个函数而不是硬编码的列名.但那感觉不够优雅..

有人可以告诉我其他选项吗?目标是从实际的列名中抽象出来,最好通过case类的getter工作.

我正在使用Spark 1.6.1和Scala 2.10

scala join apache-spark apache-spark-sql apache-spark-dataset

28
推荐指数
1
解决办法
8694
查看次数

用于行类型Spark数据集的编码器

我想在DataSet中为Row类型编写一个编码器,用于我正在进行的地图操作.基本上,我不明白如何编写编码器.

以下是地图操作的示例:

In the example below, instead of returning Dataset<String>, I would like to return Dataset<Row>

Dataset<String> output = dataset1.flatMap(new FlatMapFunction<Row, String>() {
            @Override
            public Iterator<String> call(Row row) throws Exception {

                ArrayList<String> obj = //some map operation
                return obj.iterator();
            }
        },Encoders.STRING());
Run Code Online (Sandbox Code Playgroud)

我明白,编码器需要编写如下代码:

    Encoder<Row> encoder = new Encoder<Row>() {
        @Override
        public StructType schema() {
            return join.schema();
            //return null;
        }

        @Override
        public ClassTag<Row> clsTag() {
            return null;
        }
    };
Run Code Online (Sandbox Code Playgroud)

但是,我不理解编码器中的clsTag(),我试图找到一个可以演示相似内容的运行示例(即行类型的编码器)

编辑 - 这不是所提问题的副本:尝试将数据帧行映射到更新行时编码器错误,因为答案谈到在Spark 2.x中使用Spark 1.x(我不是这样做),我也在寻找用于Row类的编码器而不是解决错误.最后,我一直在寻找Java解决方案,而不是Scala.

java apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders

24
推荐指数
2
解决办法
2万
查看次数

Spark 2.0 Dataset vs DataFrame

从spark 2.0.1开始我有一些问题.我阅读了很多文档,但到目前为止找不到足够的答案:

  • 有什么区别
    • df.select("foo")
    • df.select($"foo")
  • 我能正确理解吗
    • myDataSet.map(foo.someVal)是类型安全的,不会转换为RDD但保留在DataSet表示/没有额外的开销(2.0.0的性能明智)
  • 所有其他命令,例如select,..只是语法糖.它们不是类型安全的,可以使用地图代替.如果df.select("foo")没有地图声明,我怎么能输入?
    • 为什么我应该使用UDF/UADF而不是地图(假设地图保留在数据集表示中)?

scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0

23
推荐指数
1
解决办法
4687
查看次数