我已经写two case class了扩展 Base abstract class。我有每个班级的两个列表(listA和listB)。当我想合并这两个列表时,我无法将最终列表转换为 Apache Spark 1.6.1 数据集。
abstract class Base
case class A(name: String) extends Base
case class B(age: Int) extends Base
val listA: List[A] = A("foo")::A("bar")::Nil
val listB: List[B] = B(10)::B(20)::Nil
val list: List[Base with Product with Serializable] = listA ++ listB
val result: RDD[Base with Product with Serializable] = sc.parallelize(list).toDS()
Run Code Online (Sandbox Code Playgroud)
Apache Spark 将引发此异常:
A needed class was not found. This could be due to an error in your …Run Code Online (Sandbox Code Playgroud) 我有两个大型数据框 [a] 一个,其中包含由 id [b] 标识的所有事件。我想使用 spark 2.0.0 中的 stat.bloomFilter 实现基于 [b] 中的 id 过滤 [a]
但是我在数据集 API 中没有看到任何将布隆过滤器连接到数据框的操作 [a]
val in1 = spark.sparkContext.parallelize(List(0, 1, 2, 3, 4, 5))
val df1 = in1.map(x => (x, x+1, x+2)).toDF("c1", "c2", "c3")
val in2 = spark.sparkContext.parallelize(List(0, 1, 2))
val df2 = in2.map(x => (x)).toDF("c1")
val expectedNumItems: Long = 1000
val fpp: Double = 0.005
val sbf = df.stat.bloomFilter($"c1", expectedNumItems, fpp)
val sbf2 = df2.stat.bloomFilter($"c1", expectedNumItems, fpp)
Run Code Online (Sandbox Code Playgroud)
根据 df2 中的值过滤“df1”的最佳方法是什么?
谢谢!
scala bloom-filter apache-spark apache-spark-sql apache-spark-dataset
我有以下两个数据集
controlSetDF : has columns loan_id, merchant_id, loan_type, created_date, as_of_date
accountDF : has columns merchant_id, id, name, status, merchant_risk_status
Run Code Online (Sandbox Code Playgroud)
我正在使用Java Spark API来加入它们,我只需要最终数据集中的特定列
private String[] control_set_columns = {"loan_id", "merchant_id", "loan_type"};
private String[] sf_account_columns = {"id as account_id", "name as account_name", "merchant_risk_status"};
controlSetDF.selectExpr(control_set_columns)
.join(accountDF.selectExpr(sf_account_columns),controlSetDF.col("merchant_id").equalTo(accountDF.col("merchant_id")),
"left_outer");
Run Code Online (Sandbox Code Playgroud)
但我得到以下错误
org.apache.spark.sql.AnalysisException: resolved attribute(s) merchant_id#3L missing from account_name#131,loan_type#105,account_id#130,merchant_id#104L,loan_id#103,merchant_risk_status#2 in operator !Join LeftOuter, (merchant_id#104L = merchant_id#3L);;!Join LeftOuter, (merchant_id#104L = merchant_id#3L)
Run Code Online (Sandbox Code Playgroud)
似乎存在问题,因为两个数据帧都具有merchant_id列。
注意:如果我不使用.selectExpr(),它将正常工作。但是它将显示第一和第二数据集的所有列。
我怎样才能编译这个方法.奇怪的是,隐含的火花已经被导入了.
def loadDsFromHive[T <: Product](tableName: String, spark: SparkSession): Dataset[T] = {
import spark.implicits._
spark.sql(s"SELECT * FROM $tableName").as[T]
}
Run Code Online (Sandbox Code Playgroud)
这是错误:
Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
[error] spark.sql(s"SELECT * FROM $tableName").as[T]
Run Code Online (Sandbox Code Playgroud) generics scala apache-spark apache-spark-dataset apache-spark-encoders
我在Databricks笔记本上使用Spark 2.1和Scala 2.11
什么是TimestampType?
我们知道,从SparkSQL的文档那是官方的时间戳类型是TimestampType,这显然是对的java.sql.Timestamp一个别名:
TimestampType可以在SparkSQL的Scala API中找到
使用模式和数据集API时,我们有所不同
从Databricks的Scala Structured Streaming示例解析时{"time":1469501297,"action":"Open"}
使用Json架构 - > OK(我更喜欢使用优雅的Dataset API):
val jsonSchema = new StructType().add("time", TimestampType).add("action", StringType)
val staticInputDF =
spark
.read
.schema(jsonSchema)
.json(inputPath)
Run Code Online (Sandbox Code Playgroud)
使用数据集API - > KO:找不到TimestampType的编码器
创建Event类
import org.apache.spark.sql.types._
case class Event(action: String, time: TimestampType)
--> defined class Event
Run Code Online (Sandbox Code Playgroud)
在databricks上从DBFS读取事件时出错.
注意:java.sql.Timestamp作为"时间"类型使用时,我们不会收到错误
val path = "/databricks-datasets/structured-streaming/events/"
val events = spark.read.json(path).as[Event]
Run Code Online (Sandbox Code Playgroud)
错误信息
java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.types.TimestampType
- field (class: "org.apache.spark.sql.types.TimestampType", name: "time")
- …Run Code Online (Sandbox Code Playgroud) timestamp scala apache-spark apache-spark-sql apache-spark-dataset
我目前使用 XUbuntu 16.04、Apache Spark 2.1.1、IntelliJ 和 Scala 2.11.8
我试图将一些 CSV 格式的简单文本数据加载到 Apache Spark 数据集中,但我没有使用常规文本文件,而是将数据转储到命名管道中,然后我想将该数据直接读入数据集中。如果数据是常规文件,它可以完美地工作,但如果它来自命名管道,则完全相同的数据不起作用。我的 Scala 代码非常简单,如下所示:
import org.apache.spark.sql.SparkSession
object PipeTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("PipeTest")
.master("local")
.getOrCreate()
// Read data in from a text file and input to a DataSet
var dataFromTxt = spark.read.csv("csvData.txt")
dataFromTxt.show()
// Read data in from a pipe and input to a DataSet
var dataFromPipe = spark.read.csv("csvData.pipe")
dataFromPipe.show()
}
}
Run Code Online (Sandbox Code Playgroud)
第一个代码部分从常规文件加载 csv 数据并且工作正常。第二个代码部分失败并出现以下错误:
线程“main”中的异常 java.io.IOException:访问文件时出错:/home/andersonlab/test/csvData.pipe
任何人都知道您将如何将命名管道与 Spark Datasets …
我使用的Spark 2.2,并试图打电话的时候,我遇到了麻烦spark.createDataset上Seq的Map.
我的Spark Shell会话的代码和输出如下:
// createDataSet on Seq[T] where T = Int works
scala> spark.createDataset(Seq(1, 2, 3)).collect
res0: Array[Int] = Array(1, 2, 3)
scala> spark.createDataset(Seq(Map(1 -> 2))).collect
<console>:24: error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are
supported by importing spark.implicits._
Support for serializing other types will be added in future releases.
spark.createDataset(Seq(Map(1 -> 2))).collect
^
// createDataSet on a custom …Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders
我有一个递归的数据结构。Spark给出了这个错误:
Exception in thread "main" java.lang.UnsupportedOperationException: cannot have circular references in class, but got the circular reference of class BulletPoint
Run Code Online (Sandbox Code Playgroud)
作为示例,我做了以下代码:
case class BulletPoint(item: String, children: List[BulletPoint])
object TestApp extends App {
val sparkSession = SparkSession
.builder()
.appName("spark app")
.master(s"local")
.getOrCreate()
import sparkSession.implicits._
sparkSession.createDataset(List(BulletPoint("1", Nil), BulletPoint("2", Nil)))
}
Run Code Online (Sandbox Code Playgroud)
有人知道如何解决这个问题吗?
我Dataset<Row>在Java中。我需要读取1列的值(它是JSON字符串),对其进行解析,然后根据已解析的JSON值设置其他几列的值。
我的数据集如下所示:
|json | name| age |
========================================
| "{'a':'john', 'b': 23}" | null| null |
----------------------------------------
| "{'a':'joe', 'b': 25}" | null| null |
----------------------------------------
| "{'a':'zack'}" | null| null |
----------------------------------------
Run Code Online (Sandbox Code Playgroud)
我需要这样:
|json | name | age |
========================================
| "{'a':'john', 'b': 23}" | 'john'| 23 |
----------------------------------------
| "{'a':'joe', 'b': 25}" | 'joe' | 25 |
----------------------------------------
| "{'a':'zack'}" | 'zack'|null|
----------------------------------------
Run Code Online (Sandbox Code Playgroud)
我无法找到一种方法。请帮助代码。
我正在一个Spark ML管道上工作,在该管道上我们会在较大的数据集上看到OOM错误。在训练之前我们正在使用cache(); 我换了一下checkpoint(),我们的内存需求大大下降了。然而,在文档进行RDD的checkpoint(),它说:
强烈建议将该RDD保留在内存中,否则将其保存在文件中将需要重新计算。
DataSet我正在使用的检查点未提供相同的指导。无论如何,遵循以上建议,我发现cache()单独使用内存的需求实际上有所增加。
我的期望是当我们这样做时
...
ds.cache()
ds.checkpoint()
...
Run Code Online (Sandbox Code Playgroud)
对检查点的调用会强制对进行评估,该评估会DataSet在被检查点之前同时缓存。之后,任何对的引用都ds将引用缓存的分区,并且如果需要更多的内存并且将分区撤离,将使用检查点分区,而不是重新评估它们。这是真的吗,还是在幕后发生了什么变化?理想情况下,如果可能的话,我希望将DataSet保留在内存中,但是从内存的角度来看,使用缓存和检查点方法似乎没有任何好处。