标签: apache-spark-dataset

Spark数据集API - 加入

我正在尝试使用Spark 数据集 API但我在进行简单连接时遇到了一些问题.

假设我有两个带有字段的数据集:date | value,然后在DataFrame我的连接的情况下看起来像:

val dfA : DataFrame
val dfB : DataFrame

dfA.join(dfB, dfB("date") === dfA("date") )
Run Code Online (Sandbox Code Playgroud)

但是,Dataset.joinWith方法,但相同的方法不起作用:

val dfA : Dataset
val dfB : Dataset

dfA.joinWith(dfB, ? )
Run Code Online (Sandbox Code Playgroud)

需要什么论点.joinWith

scala apache-spark apache-spark-sql apache-spark-dataset

20
推荐指数
2
解决办法
3万
查看次数

spark createOrReplaceTempView vs createGlobalTempView

Spark Dataset 2.0提供了两个功能createOrReplaceTempViewcreateGlobalTempView.我无法理解两种功能之间的基本区别.

根据API文件:

createOrReplaceTempView:此临时视图的生命周期与用于创建此数据集的[[SparkSession]]相关联.
所以,当我调用时sparkSession.close(),定义将被销毁.这是真的吗?

createGlobalTempView:此临时视图的生命周期与此Spark应用程序绑定.

何时会破坏这种类型的视图?任何例子.喜欢sparkSession.close()?

apache-spark apache-spark-dataset

20
推荐指数
2
解决办法
2万
查看次数

如何在Spark 2.X数据集中创建自定义编码器?

Encoder对于Pojo /原语,Spark数据集从Row转移到了's'.该Catalyst引擎使用ExpressionEncoder的列转换成SQL表达式.但是,似乎没有其他子类Encoder可用作我们自己实现的模板.

下面是一个代码,它在Spark 1.X/DataFrames中很高兴,它不能在新系统中编译:

//mapping each row to RDD tuple
df.map(row => {
    var id: String = if (!has_id) "" else row.getAs[String]("id")
    var label: String = row.getAs[String]("label")
    val channels  : Int = if (!has_channels) 0 else row.getAs[Int]("channels")
    val height  : Int = if (!has_height) 0 else row.getAs[Int]("height")
    val width : Int = if (!has_width) 0 else row.getAs[Int]("width")
    val data : Array[Byte] = row.getAs[Any]("data") match {
      case str: String => str.getBytes
      case arr: Array[Byte@unchecked] …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-dataset apache-spark-encoders

18
推荐指数
1
解决办法
2万
查看次数

EMR上Spark中的S3 SlowDown错误

我在写一个镶木地板文件时遇到这个错误,这已经开始发生了

com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Please reduce your request rate. (Service: Amazon S3; Status Code: 503; Error Code: SlowDown; Request ID: 2CA496E2AB87DC16), S3 Extended Request ID: 1dBrcqVGJU9VgoT79NAVGyN0fsbj9+6bipC7op97ZmP+zSFIuH72lN03ZtYabNIA2KaSj18a8ho=
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1389)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:902)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.deleteObjects(AmazonS3Client.java:1777)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.DeleteObjectsCall.perform(DeleteObjectsCall.java:22)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.DeleteObjectsCall.perform(DeleteObjectsCall.java:7)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:75)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.deleteObjects(AmazonS3LiteClient.java:125)
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.deleteAll(Jets3tNativeFileSystemStore.java:355)
    at sun.reflect.GeneratedMethodAccessor121.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy28.deleteAll(Unknown Source)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.doSingleThreadedBatchDelete(S3NativeFileSystem.java:1331)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.delete(S3NativeFileSystem.java:663)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.delete(EmrFileSystem.java:296)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.cleanupJob(FileOutputCommitter.java:463)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.abortJob(FileOutputCommitter.java:482)
    at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:134)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:146) …
Run Code Online (Sandbox Code Playgroud)

scala amazon-s3 amazon-emr apache-spark apache-spark-dataset

17
推荐指数
1
解决办法
3494
查看次数

如何在Java中的Apache Spark中将DataFrame转换为Dataset?

我可以很容易地将Scala中的DataFrame转换为Dataset:

case class Person(name:String, age:Long)
val df = ctx.read.json("/tmp/persons.json")
val ds = df.as[Person]
ds.printSchema
Run Code Online (Sandbox Code Playgroud)

但在Java版本中我不知道如何将Dataframe转换为Dataset?任何的想法?

我的努力是:

DataFrame df = ctx.read().json(logFile);
Encoder<Person> encoder = new Encoder<>();
Dataset<Person> ds = new Dataset<Person>(ctx,df.logicalPlan(),encoder);
ds.printSchema();
Run Code Online (Sandbox Code Playgroud)

但是编译器说:

Error:(23, 27) java: org.apache.spark.sql.Encoder is abstract; cannot be instantiated
Run Code Online (Sandbox Code Playgroud)

编辑(解决方案):

基于@Leet-Falcon答案的解决方案:

DataFrame df = ctx.read().json(logFile);
Encoder<Person> encoder = Encoders.bean(Person.class);
Dataset<Person> ds = new Dataset<Person>(ctx, df.logicalPlan(), encoder);
Run Code Online (Sandbox Code Playgroud)

java apache-spark spark-dataframe apache-spark-dataset

15
推荐指数
2
解决办法
2万
查看次数

使用案例类编码JSON时,为什么错误"无法找到存储在数据集中的类型的编码器"?

我写过火花工作:

object SimpleApp {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Simple Application").setMaster("local")
    val sc = new SparkContext(conf)
    val ctx = new org.apache.spark.sql.SQLContext(sc)
    import ctx.implicits._

    case class Person(age: Long, city: String, id: String, lname: String, name: String, sex: String)
    case class Person2(name: String, age: Long, city: String)

    val persons = ctx.read.json("/tmp/persons.json").as[Person]
    persons.printSchema()
  }
}
Run Code Online (Sandbox Code Playgroud)

在IDE中运行main函数时,发生2错误:

Error:(15, 67) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-dataset apache-spark-encoders

15
推荐指数
1
解决办法
2万
查看次数

如何命名聚合列?

我在Scala中使用Spark,我的聚合列是匿名的.有没有一种方便的方法来重命名数据集中的多个列?我想到了征收模式与as更关键的列是一个结构(由于groupBy操作),我不能找出如何定义case classStructType它.

我尝试按如下方式定义模式:

val returnSchema = StructType(StructField("edge", StructType(StructField("src", IntegerType, true),
                                                             StructField("dst", IntegerType), true)), 
                              StructField("count", LongType, true))
edge_count.as[returnSchema]
Run Code Online (Sandbox Code Playgroud)

但是我遇到了编译错误:

Message: <console>:74: error: overloaded method value apply with alternatives:
  (fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
 cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, Boolean)
       val returnSchema = StructType(StructField("edge", StructType(StructField("src", IntegerType, true),
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-dataset

15
推荐指数
2
解决办法
1万
查看次数

仅覆盖分区的spark数据集中的某些分区

我们如何覆盖分区数据集,但只覆盖我们要更改的分区?例如,重新计算上周的日常工作,并且只覆盖上周的数据.

默认的Spark行为是覆盖整个表,即使只写一些分区.

hive apache-spark apache-spark-dataset

15
推荐指数
2
解决办法
5097
查看次数

如何从RDD创建Spark数据集

RDD[LabeledPoint]打算在机器学习管道中使用.我们如何将其转换RDDDataSet?请注意,较新的 spark.mlapis需要Dataset格式的输入.

scala dataset apache-spark apache-spark-dataset

14
推荐指数
1
解决办法
2万
查看次数

如何将整列的大小写改为小写?

我想在Spark数据集中将整列的大小写更改为小写

        Desired Input
        +------+--------------------+
        |ItemID|       Category name|
        +------+--------------------+
        |   ABC|BRUSH & BROOM HAN...|
        |   XYZ|WHEEL BRUSH PARTS...|
        +------+--------------------+

        Desired Output
        +------+--------------------+
        |ItemID|       Category name|
        +------+--------------------+
        |   ABC|brush & broom han...|
        |   XYZ|wheel brush parts...|
        +------+--------------------+
Run Code Online (Sandbox Code Playgroud)

我尝试使用collectAsList()和toString(),这对于非常大的数据集来说是一个缓慢而复杂的过程.

我还发现了一种方法'较低',但没有知道如何让它在dasaset中工作请建议我一个简单或有效的方法来做到这一点.提前致谢

apache-spark apache-spark-sql spark-dataframe apache-spark-dataset

14
推荐指数
2
解决办法
3万
查看次数