标签: apache-spark-encoders

如何在Dataset中存储自定义对象?

根据Spark数据集介绍:

正如我们期待着星火2.0,我们计划将数据集的一些令人兴奋的改进,具体包括:...自定义编码器 - 而我们对各种各样的类型目前自动生成编码器,我们想开了一个API,用于自定义对象.

并尝试将自定义类型存储为Dataset导致以下错误:

无法找到存储在数据集中的类型的编码器.导入sqlContext.implicits._支持原始类型(Int,String等)和产品类型(case类).将来版本中将添加对序列化其他类型的支持

要么:

Java.lang.UnsupportedOperationException:找不到针对....的编码器

有没有现成的解决方法?


请注意,此问题仅作为社区Wiki答案的入口点存在.随意更新/改进问题和答案.

scala apache-spark apache-spark-dataset apache-spark-encoders

133
推荐指数
4
解决办法
6万
查看次数

为什么在创建自定义案例类的数据集时"无法找到存储在数据集中的类型的编码器"?

使用Scala 2.11.8的Spark 2.0(最终版).以下超级简单代码会产生编译错误Error:(17, 45) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.

import org.apache.spark.sql.SparkSession

case class SimpleTuple(id: Int, desc: String)

object DatasetTest {
  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder.
      master("local")
      .appName("example")
      .getOrCreate()

    val dataset = sparkSession.createDataset(dataList)
  }
}
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-dataset apache-spark-encoders

53
推荐指数
2
解决办法
5万
查看次数

尝试将数据帧行映射到更新行时出现编码器错误

当我试图在我的代码中做同样的事情,如下所述

dataframe.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})
Run Code Online (Sandbox Code Playgroud)

我从这里采取了上述参考: Scala:如何使用scala替换Dataframs中的值 但是我收到编码器错误

无法找到存储在数据集中的类型的编码器.导入spark.im plicits支持原始类型(Int,S tring等)和产品类型(case类)._将在以后的版本中添加对序列化其他类型的支持.

注意:我正在使用spark 2.0!

scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders

33
推荐指数
2
解决办法
3万
查看次数

用于行类型Spark数据集的编码器

我想在DataSet中为Row类型编写一个编码器,用于我正在进行的地图操作.基本上,我不明白如何编写编码器.

以下是地图操作的示例:

In the example below, instead of returning Dataset<String>, I would like to return Dataset<Row>

Dataset<String> output = dataset1.flatMap(new FlatMapFunction<Row, String>() {
            @Override
            public Iterator<String> call(Row row) throws Exception {

                ArrayList<String> obj = //some map operation
                return obj.iterator();
            }
        },Encoders.STRING());
Run Code Online (Sandbox Code Playgroud)

我明白,编码器需要编写如下代码:

    Encoder<Row> encoder = new Encoder<Row>() {
        @Override
        public StructType schema() {
            return join.schema();
            //return null;
        }

        @Override
        public ClassTag<Row> clsTag() {
            return null;
        }
    };
Run Code Online (Sandbox Code Playgroud)

但是,我不理解编码器中的clsTag(),我试图找到一个可以演示相似内容的运行示例(即行类型的编码器)

编辑 - 这不是所提问题的副本:尝试将数据帧行映射到更新行时编码器错误,因为答案谈到在Spark 2.x中使用Spark 1.x(我不是这样做),我也在寻找用于Row类的编码器而不是解决错误.最后,我一直在寻找Java解决方案,而不是Scala.

java apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders

24
推荐指数
2
解决办法
2万
查看次数

如何在Spark 2.X数据集中创建自定义编码器?

Encoder对于Pojo /原语,Spark数据集从Row转移到了's'.该Catalyst引擎使用ExpressionEncoder的列转换成SQL表达式.但是,似乎没有其他子类Encoder可用作我们自己实现的模板.

下面是一个代码,它在Spark 1.X/DataFrames中很高兴,它不能在新系统中编译:

//mapping each row to RDD tuple
df.map(row => {
    var id: String = if (!has_id) "" else row.getAs[String]("id")
    var label: String = row.getAs[String]("label")
    val channels  : Int = if (!has_channels) 0 else row.getAs[Int]("channels")
    val height  : Int = if (!has_height) 0 else row.getAs[Int]("height")
    val width : Int = if (!has_width) 0 else row.getAs[Int]("width")
    val data : Array[Byte] = row.getAs[Any]("data") match {
      case str: String => str.getBytes
      case arr: Array[Byte@unchecked] …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-dataset apache-spark-encoders

18
推荐指数
1
解决办法
2万
查看次数

使用案例类编码JSON时,为什么错误"无法找到存储在数据集中的类型的编码器"?

我写过火花工作:

object SimpleApp {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Simple Application").setMaster("local")
    val sc = new SparkContext(conf)
    val ctx = new org.apache.spark.sql.SQLContext(sc)
    import ctx.implicits._

    case class Person(age: Long, city: String, id: String, lname: String, name: String, sex: String)
    case class Person2(name: String, age: Long, city: String)

    val persons = ctx.read.json("/tmp/persons.json").as[Person]
    persons.printSchema()
  }
}
Run Code Online (Sandbox Code Playgroud)

在IDE中运行main函数时,发生2错误:

Error:(15, 67) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-dataset apache-spark-encoders

15
推荐指数
1
解决办法
2万
查看次数

Apache Spark 2.0:java.lang.UnsupportedOperationException:找不到java.time.LocalDate的编码器

我正在使用Apache Spark 2.0并创建case class提供架构DetaSet.当我试图根据如何在数据集中存储自定义对象来定义自定义编码器因为java.time.LocalDate我得到以下例外:

java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate
- field (class: "java.time.LocalDate", name: "callDate")
- root class: "FireService"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:598)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:592)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:583)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
............
Run Code Online (Sandbox Code Playgroud)

以下是代码:

case class FireService(callNumber: String, callDate: java.time.LocalDate)
implicit val localDateEncoder: org.apache.spark.sql.Encoder[java.time.LocalDate] = org.apache.spark.sql.Encoders.kryo[java.time.LocalDate]

val fireServiceDf = df.map(row => {
val dateFormatter = java.time.format.DateTimeFormatter.ofPattern("MM/dd /yyyy")
FireService(row.getAs[String](0),  java.time.LocalDate.parse(row.getAs[String](4), dateFormatter))
})
Run Code Online (Sandbox Code Playgroud)

我们如何定义第三方api的spark编码器?

更新

当我为整个案例类创建编码器时,df.map..将对象映射为二进制,如下所示:

implicit val …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders

12
推荐指数
1
解决办法
6199
查看次数

如何在Scala中的Apache Spark中将数据帧转换为数据集?

我需要将我的数据帧转换为数据集,并使用以下代码:

    val final_df = Dataframe.withColumn(
      "features",
      toVec4(
        // casting into Timestamp to parse the string, and then into Int
        $"time_stamp_0".cast(TimestampType).cast(IntegerType),
        $"count",
        $"sender_ip_1",
        $"receiver_ip_2"
      )
    ).withColumn("label", (Dataframe("count"))).select("features", "label")

    final_df.show()

    val trainingTest = final_df.randomSplit(Array(0.3, 0.7))
    val TrainingDF = trainingTest(0)
    val TestingDF=trainingTest(1)
    TrainingDF.show()
    TestingDF.show()

    ///lets create our liner regression
    val lir= new LinearRegression()
    .setRegParam(0.3)
    .setElasticNetParam(0.8)
    .setMaxIter(100)
    .setTol(1E-6)

    case class df_ds(features:Vector, label:Integer)
    org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

    val Training_ds = TrainingDF.as[df_ds]
Run Code Online (Sandbox Code Playgroud)

我的问题是,我收到以下错误:

Error:(96, 36) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql apache-spark-encoders

12
推荐指数
2
解决办法
3万
查看次数

将ADT /密封特征层次结构编码到Spark DataSet列中

如果我想在Spark DataSet列中存储代数数据类型(ADT)(即Scala密封特征层次结构),那么最佳编码策略是什么?

例如,如果我有一个ADT,其中叶子类型存储不同类型的数据:

sealed trait Occupation
case object SoftwareEngineer extends Occupation
case class Wizard(level: Int) extends Occupation
case class Other(description: String) extends Occupation
Run Code Online (Sandbox Code Playgroud)

什么是构建一个最好的方法:

org.apache.spark.sql.DataSet[Occupation]
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-dataset apache-spark-encoders

10
推荐指数
1
解决办法
1520
查看次数

将scala列表转换为DataFrame或DataSet

我是Scala的新手.我正在尝试将scala列表(将源数据帧上的某些计算数据的结果保存)转换为Dataframe或Dataset.我没有找到任何直接的方法来做到这一点.但是,我尝试了以下过程将我的列表转换为DataSet,但它似乎无法正常工作.我提供以下3种情况.

有人可以请给我一些希望,如何进行这种转换?谢谢.

import org.apache.spark.sql.{DataFrame, Row, SQLContext, DataFrameReader}
import java.sql.{Connection, DriverManager, ResultSet, Timestamp}
import scala.collection._

case class TestPerson(name: String, age: Long, salary: Double)
var tom = new TestPerson("Tom Hanks",37,35.5)
var sam = new TestPerson("Sam Smith",40,40.5)

val PersonList = mutable.MutableList[TestPerson]()

//Adding data in list
PersonList += tom
PersonList += sam

//Situation 1: Trying to create dataset from List of objects:- Result:Error
//Throwing error
var personDS = Seq(PersonList).toDS()
/*
ERROR:
error: Unable to find encoder for type stored in a Dataset.  Primitive types …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders

7
推荐指数
1
解决办法
2万
查看次数