标签: apache-spark-dataset

没有找到对应于带有可序列化和 Base 的 Product 的 Java 类

我已经写two case class了扩展 Base abstract class。我有每个班级的两个列表(listAlistB)。当我想合并这两个列表时,我无法将最终列表转换为 Apache Spark 1.6.1 数据集。

abstract class Base

case class A(name: String) extends Base
case class B(age: Int) extends Base

val listA: List[A] = A("foo")::A("bar")::Nil
val listB: List[B] = B(10)::B(20)::Nil
val list: List[Base with Product with Serializable] = listA ++ listB

val result: RDD[Base with Product with Serializable] = sc.parallelize(list).toDS()
Run Code Online (Sandbox Code Playgroud)

Apache Spark 将引发此异常:

A needed class was not found. This could be due to an error in your …
Run Code Online (Sandbox Code Playgroud)

java scala apache-spark rdd apache-spark-dataset

5
推荐指数
1
解决办法
4656
查看次数

在 Spark 2.0.0 中使用 stat.bloomFilter 过滤另一个数据帧

我有两个大型数据框 [a] 一个,其中包含由 id [b] 标识的所有事件。我想使用 spark 2.0.0 中的 stat.bloomFilter 实现基于 [b] 中的 id 过滤 [a]

但是我在数据集 API 中没有看到任何将布隆过滤器连接到数据框的操作 [a]

val in1 = spark.sparkContext.parallelize(List(0, 1, 2, 3, 4, 5))
val df1 = in1.map(x => (x, x+1, x+2)).toDF("c1", "c2", "c3")

val in2 = spark.sparkContext.parallelize(List(0, 1, 2))
val df2 = in2.map(x => (x)).toDF("c1")

val expectedNumItems: Long = 1000
val fpp: Double = 0.005

val sbf = df.stat.bloomFilter($"c1", expectedNumItems, fpp)
val sbf2 = df2.stat.bloomFilter($"c1", expectedNumItems, fpp)
Run Code Online (Sandbox Code Playgroud)

根据 df2 中的值过滤“df1”的最佳方法是什么?

谢谢!

scala bloom-filter apache-spark apache-spark-sql apache-spark-dataset

5
推荐指数
1
解决办法
3704
查看次数

Spark-SQL连接具有相同列名的两个数据框/数据集

我有以下两个数据集

controlSetDF : has columns loan_id, merchant_id, loan_type, created_date, as_of_date
accountDF : has columns merchant_id, id, name, status, merchant_risk_status
Run Code Online (Sandbox Code Playgroud)

我正在使用Java Spark API来加入它们,我只需要最终数据集中的特定列

private String[] control_set_columns = {"loan_id", "merchant_id", "loan_type"};
private String[] sf_account_columns = {"id as account_id", "name as account_name", "merchant_risk_status"};

controlSetDF.selectExpr(control_set_columns)                                               
.join(accountDF.selectExpr(sf_account_columns),controlSetDF.col("merchant_id").equalTo(accountDF.col("merchant_id")), 
"left_outer"); 
Run Code Online (Sandbox Code Playgroud)

但我得到以下错误

org.apache.spark.sql.AnalysisException: resolved attribute(s) merchant_id#3L missing from account_name#131,loan_type#105,account_id#130,merchant_id#104L,loan_id#103,merchant_risk_status#2 in operator !Join LeftOuter, (merchant_id#104L = merchant_id#3L);;!Join LeftOuter, (merchant_id#104L = merchant_id#3L)
Run Code Online (Sandbox Code Playgroud)

似乎存在问题,因为两个数据帧都具有merchant_id列。

注意:如果我不使用.selectExpr(),它将正常工作。但是它将显示第一和第二数据集的所有列。

java apache-spark apache-spark-sql apache-spark-dataset

5
推荐指数
1
解决办法
3899
查看次数

用于火花箱类的scala通用编码器

我怎样才能编译这个方法.奇怪的是,隐含的火花已经被导入了.

def loadDsFromHive[T <: Product](tableName: String, spark: SparkSession): Dataset[T] = {
    import spark.implicits._
    spark.sql(s"SELECT * FROM $tableName").as[T]
  }
Run Code Online (Sandbox Code Playgroud)

这是错误:

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
[error]     spark.sql(s"SELECT * FROM $tableName").as[T]
Run Code Online (Sandbox Code Playgroud)

generics scala apache-spark apache-spark-dataset apache-spark-encoders

5
推荐指数
1
解决办法
3433
查看次数

Spark SQL的Scala API - TimestampType - 找不到org.apache.spark.sql.types.TimestampType的编码器

我在Databricks笔记本上使用Spark 2.1和Scala 2.11

什么是TimestampType?

我们知道,从SparkSQL的文档那是官方的时间戳类型是TimestampType,这显然是对的java.sql.Timestamp一个别名:

TimestampType可以在SparkSQL的Scala API中找到

使用模式和数据集API时,我们有所不同

从Databricks的Scala Structured Streaming示例解析时{"time":1469501297,"action":"Open"}

使用Json架构 - > OK(我更喜欢使用优雅的Dataset API):

val jsonSchema = new StructType().add("time", TimestampType).add("action", StringType)

val staticInputDF = 
  spark
    .read
    .schema(jsonSchema)
    .json(inputPath)
Run Code Online (Sandbox Code Playgroud)

使用数据集API - > KO:找不到TimestampType的编码器

创建Event类

import org.apache.spark.sql.types._
case class Event(action: String, time: TimestampType)
--> defined class Event
Run Code Online (Sandbox Code Playgroud)

在databricks上从DBFS读取事件时出错.

注意:java.sql.Timestamp作为"时间"类型使用时,我们不会收到错误

val path = "/databricks-datasets/structured-streaming/events/"
val events = spark.read.json(path).as[Event]
Run Code Online (Sandbox Code Playgroud)

错误信息

java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.types.TimestampType
- field (class: "org.apache.spark.sql.types.TimestampType", name: "time")
- …
Run Code Online (Sandbox Code Playgroud)

timestamp scala apache-spark apache-spark-sql apache-spark-dataset

5
推荐指数
1
解决办法
3514
查看次数

您可以从命名管道加载 Apache Spark 数据集吗?

我目前使用 XUbuntu 16.04、Apache Spark 2.1.1、IntelliJ 和 Scala 2.11.8

我试图将一些 CSV 格式的简单文本数据加载到 Apache Spark 数据集中,但我没有使用常规文本文件,而是将数据转储到命名管道中,然后我想将该数据直接读入数据集中。如果数据是常规文件,它可以完美地工作,但如果它来自命名管道,则完全相同的数据不起作用。我的 Scala 代码非常简单,如下所示:

import org.apache.spark.sql.SparkSession

object PipeTest {

  def main(args: Array[String]): Unit = {

  val spark = SparkSession
    .builder()
    .appName("PipeTest")
    .master("local")
    .getOrCreate()

     // Read data in from a text file and input to a DataSet    
    var dataFromTxt = spark.read.csv("csvData.txt")
    dataFromTxt.show()


     // Read data in from a pipe and input to a DataSet
    var dataFromPipe = spark.read.csv("csvData.pipe")
    dataFromPipe.show()
  }
}
Run Code Online (Sandbox Code Playgroud)

第一个代码部分从常规文件加载 csv 数据并且工作正常。第二个代码部分失败并出现以下错误:

线程“main”中的异常 java.io.IOException:访问文件时出错:/home/andersonlab/test/csvData.pipe

任何人都知道您将如何将命名管道与 Spark Datasets …

apache-spark apache-spark-sql apache-spark-dataset

5
推荐指数
0
解决办法
189
查看次数

如何创建地图数据集?

我使用的Spark 2.2,并试图打电话的时候,我遇到了麻烦spark.createDatasetSeqMap.

我的Spark Shell会话的代码和输出如下:

// createDataSet on Seq[T] where T = Int works
scala> spark.createDataset(Seq(1, 2, 3)).collect
res0: Array[Int] = Array(1, 2, 3)

scala> spark.createDataset(Seq(Map(1 -> 2))).collect
<console>:24: error: Unable to find encoder for type stored in a Dataset.  
Primitive types (Int, String, etc) and Product types (case classes) are 
supported by importing spark.implicits._
Support for serializing other types will be added in future releases.
       spark.createDataset(Seq(Map(1 -> 2))).collect
                          ^

// createDataSet on a custom …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders

5
推荐指数
1
解决办法
1319
查看次数

使用递归案例类进行Spark

我有一个递归的数据结构。Spark给出了这个错误:

Exception in thread "main" java.lang.UnsupportedOperationException: cannot have circular references in class, but got the circular reference of class BulletPoint
Run Code Online (Sandbox Code Playgroud)

作为示例,我做了以下代码:

case class BulletPoint(item: String, children: List[BulletPoint])

object TestApp extends App {
  val sparkSession = SparkSession
    .builder()
    .appName("spark app")
    .master(s"local")
    .getOrCreate()

  import sparkSession.implicits._

  sparkSession.createDataset(List(BulletPoint("1", Nil), BulletPoint("2", Nil)))
}
Run Code Online (Sandbox Code Playgroud)

有人知道如何解决这个问题吗?

scala apache-spark apache-spark-sql apache-spark-dataset

5
推荐指数
1
解决办法
89
查看次数

需要基于1列的值在数据集中的列中设置值

Dataset<Row>在Java中。我需要读取1列的值(它是JSON字符串),对其进行解析,然后根据已解析的JSON值设置其他几列的值。

我的数据集如下所示:

|json                     | name|  age |
======================================== 
| "{'a':'john', 'b': 23}" | null| null |
----------------------------------------
| "{'a':'joe', 'b': 25}"  | null| null |
----------------------------------------
| "{'a':'zack'}"          | null| null |
----------------------------------------
Run Code Online (Sandbox Code Playgroud)

我需要这样:

|json                     | name  |  age |
======================================== 
| "{'a':'john', 'b': 23}" | 'john'| 23 |
----------------------------------------
| "{'a':'joe', 'b': 25}"  | 'joe' | 25 |
----------------------------------------
| "{'a':'zack'}"          | 'zack'|null|
----------------------------------------
Run Code Online (Sandbox Code Playgroud)

我无法找到一种方法。请帮助代码。

java apache-spark apache-spark-dataset

5
推荐指数
1
解决办法
75
查看次数

是否应该在数据集上同时使用缓存和检查点?如果是这样,它如何在后台运行?

我正在一个Spark ML管道上工作,在该管道上我们会在较大的数据集上看到OOM错误。在训练之前我们正在使用cache(); 我换了一下checkpoint(),我们的内存需求大大下降了。然而,在文档进行RDDcheckpoint(),它说:

强烈建议将该RDD保留在内存中,否则将其保存在文件中将需要重新计算。

DataSet我正在使用的检查点未提供相同的指导。无论如何,遵循以上建议,我发现cache()单独使用内存的需求实际上有所增加。

我的期望是当我们这样做时

...
ds.cache()
ds.checkpoint()
...
Run Code Online (Sandbox Code Playgroud)

对检查点的调用会强制对进行评估,该评估会DataSet在被检查点之前同时缓存。之后,任何对的引用都ds将引用缓存的分区,并且如果需要更多的内存并且将分区撤离,将使用检查点分区,而不是重新评估它们。这是真的吗,还是在幕后发生了什么变化?理想情况下,如果可能的话,我希望将DataSet保留在内存中,但是从内存的角度来看,使用缓存和检查点方法似乎没有任何好处。

apache-spark apache-spark-sql apache-spark-dataset

5
推荐指数
1
解决办法
124
查看次数