根据Spark数据集介绍:
正如我们期待着星火2.0,我们计划将数据集的一些令人兴奋的改进,具体包括:...自定义编码器 - 而我们对各种各样的类型目前自动生成编码器,我们想开了一个API,用于自定义对象.
并尝试将自定义类型存储为Dataset导致以下错误:
无法找到存储在数据集中的类型的编码器.导入sqlContext.implicits._支持原始类型(Int,String等)和产品类型(case类).将来版本中将添加对序列化其他类型的支持
要么:
Java.lang.UnsupportedOperationException:找不到针对....的编码器
有没有现成的解决方法?
请注意,此问题仅作为社区Wiki答案的入口点存在.随意更新/改进问题和答案.
scala apache-spark apache-spark-dataset apache-spark-encoders
使用Scala 2.11.8的Spark 2.0(最终版).以下超级简单代码会产生编译错误Error:(17, 45) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
import org.apache.spark.sql.SparkSession
case class SimpleTuple(id: Int, desc: String)
object DatasetTest {
val dataList = List(
SimpleTuple(5, "abc"),
SimpleTuple(6, "bcd")
)
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder.
master("local")
.appName("example")
.getOrCreate()
val dataset = sparkSession.createDataset(dataList)
}
}
Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-dataset apache-spark-encoders
当我试图在我的代码中做同样的事情,如下所述
dataframe.map(row => {
val row1 = row.getAs[String](1)
val make = if (row1.toLowerCase == "tesla") "S" else row1
Row(row(0),make,row(2))
})
Run Code Online (Sandbox Code Playgroud)
我从这里采取了上述参考: Scala:如何使用scala替换Dataframs中的值 但是我收到编码器错误
无法找到存储在数据集中的类型的编码器.导入spark.im plicits支持原始类型(Int,S tring等)和产品类型(case类)._将在以后的版本中添加对序列化其他类型的支持.
注意:我正在使用spark 2.0!
scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders
我想在DataSet中为Row类型编写一个编码器,用于我正在进行的地图操作.基本上,我不明白如何编写编码器.
以下是地图操作的示例:
In the example below, instead of returning Dataset<String>, I would like to return Dataset<Row>
Dataset<String> output = dataset1.flatMap(new FlatMapFunction<Row, String>() {
@Override
public Iterator<String> call(Row row) throws Exception {
ArrayList<String> obj = //some map operation
return obj.iterator();
}
},Encoders.STRING());
Run Code Online (Sandbox Code Playgroud)
我明白,编码器需要编写如下代码:
Encoder<Row> encoder = new Encoder<Row>() {
@Override
public StructType schema() {
return join.schema();
//return null;
}
@Override
public ClassTag<Row> clsTag() {
return null;
}
};
Run Code Online (Sandbox Code Playgroud)
但是,我不理解编码器中的clsTag(),我试图找到一个可以演示相似内容的运行示例(即行类型的编码器)
编辑 - 这不是所提问题的副本:尝试将数据帧行映射到更新行时编码器错误,因为答案谈到在Spark 2.x中使用Spark 1.x(我不是这样做),我也在寻找用于Row类的编码器而不是解决错误.最后,我一直在寻找Java解决方案,而不是Scala.
java apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders
Encoder对于Pojo /原语,Spark数据集从Row转移到了's'.该Catalyst引擎使用ExpressionEncoder的列转换成SQL表达式.但是,似乎没有其他子类Encoder可用作我们自己实现的模板.
下面是一个代码,它在Spark 1.X/DataFrames中很高兴,它不能在新系统中编译:
//mapping each row to RDD tuple
df.map(row => {
var id: String = if (!has_id) "" else row.getAs[String]("id")
var label: String = row.getAs[String]("label")
val channels : Int = if (!has_channels) 0 else row.getAs[Int]("channels")
val height : Int = if (!has_height) 0 else row.getAs[Int]("height")
val width : Int = if (!has_width) 0 else row.getAs[Int]("width")
val data : Array[Byte] = row.getAs[Any]("data") match {
case str: String => str.getBytes
case arr: Array[Byte@unchecked] …Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-dataset apache-spark-encoders
我写过火花工作:
object SimpleApp {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application").setMaster("local")
val sc = new SparkContext(conf)
val ctx = new org.apache.spark.sql.SQLContext(sc)
import ctx.implicits._
case class Person(age: Long, city: String, id: String, lname: String, name: String, sex: String)
case class Person2(name: String, age: Long, city: String)
val persons = ctx.read.json("/tmp/persons.json").as[Person]
persons.printSchema()
}
}
Run Code Online (Sandbox Code Playgroud)
在IDE中运行main函数时,发生2错误:
Error:(15, 67) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported …Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-dataset apache-spark-encoders
我正在使用Apache Spark 2.0并创建case class提供架构DetaSet.当我试图根据如何在数据集中存储自定义对象来定义自定义编码器?因为java.time.LocalDate我得到以下例外:
java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate
- field (class: "java.time.LocalDate", name: "callDate")
- root class: "FireService"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:598)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:592)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:583)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
............
Run Code Online (Sandbox Code Playgroud)
以下是代码:
case class FireService(callNumber: String, callDate: java.time.LocalDate)
implicit val localDateEncoder: org.apache.spark.sql.Encoder[java.time.LocalDate] = org.apache.spark.sql.Encoders.kryo[java.time.LocalDate]
val fireServiceDf = df.map(row => {
val dateFormatter = java.time.format.DateTimeFormatter.ofPattern("MM/dd /yyyy")
FireService(row.getAs[String](0), java.time.LocalDate.parse(row.getAs[String](4), dateFormatter))
})
Run Code Online (Sandbox Code Playgroud)
我们如何定义第三方api的spark编码器?
更新
当我为整个案例类创建编码器时,df.map..将对象映射为二进制,如下所示:
implicit val …Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders
我需要将我的数据帧转换为数据集,并使用以下代码:
val final_df = Dataframe.withColumn(
"features",
toVec4(
// casting into Timestamp to parse the string, and then into Int
$"time_stamp_0".cast(TimestampType).cast(IntegerType),
$"count",
$"sender_ip_1",
$"receiver_ip_2"
)
).withColumn("label", (Dataframe("count"))).select("features", "label")
final_df.show()
val trainingTest = final_df.randomSplit(Array(0.3, 0.7))
val TrainingDF = trainingTest(0)
val TestingDF=trainingTest(1)
TrainingDF.show()
TestingDF.show()
///lets create our liner regression
val lir= new LinearRegression()
.setRegParam(0.3)
.setElasticNetParam(0.8)
.setMaxIter(100)
.setTol(1E-6)
case class df_ds(features:Vector, label:Integer)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
val Training_ds = TrainingDF.as[df_ds]
Run Code Online (Sandbox Code Playgroud)
我的问题是,我收到以下错误:
Error:(96, 36) Unable to find encoder for type stored in a Dataset. Primitive types (Int, …Run Code Online (Sandbox Code Playgroud) 如果我想在Spark DataSet列中存储代数数据类型(ADT)(即Scala密封特征层次结构),那么最佳编码策略是什么?
例如,如果我有一个ADT,其中叶子类型存储不同类型的数据:
sealed trait Occupation
case object SoftwareEngineer extends Occupation
case class Wizard(level: Int) extends Occupation
case class Other(description: String) extends Occupation
Run Code Online (Sandbox Code Playgroud)
什么是构建一个最好的方法:
org.apache.spark.sql.DataSet[Occupation]
Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-dataset apache-spark-encoders
我是Scala的新手.我正在尝试将scala列表(将源数据帧上的某些计算数据的结果保存)转换为Dataframe或Dataset.我没有找到任何直接的方法来做到这一点.但是,我尝试了以下过程将我的列表转换为DataSet,但它似乎无法正常工作.我提供以下3种情况.
有人可以请给我一些希望,如何进行这种转换?谢谢.
import org.apache.spark.sql.{DataFrame, Row, SQLContext, DataFrameReader}
import java.sql.{Connection, DriverManager, ResultSet, Timestamp}
import scala.collection._
case class TestPerson(name: String, age: Long, salary: Double)
var tom = new TestPerson("Tom Hanks",37,35.5)
var sam = new TestPerson("Sam Smith",40,40.5)
val PersonList = mutable.MutableList[TestPerson]()
//Adding data in list
PersonList += tom
PersonList += sam
//Situation 1: Trying to create dataset from List of objects:- Result:Error
//Throwing error
var personDS = Seq(PersonList).toDS()
/*
ERROR:
error: Unable to find encoder for type stored in a Dataset. Primitive types …Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders