我正在尝试使用Spark 数据集 API但我在进行简单连接时遇到了一些问题.
假设我有两个带有字段的数据集:date | value,然后在DataFrame我的连接的情况下看起来像:
val dfA : DataFrame
val dfB : DataFrame
dfA.join(dfB, dfB("date") === dfA("date") )
Run Code Online (Sandbox Code Playgroud)
但是,Dataset有.joinWith方法,但相同的方法不起作用:
val dfA : Dataset
val dfB : Dataset
dfA.joinWith(dfB, ? )
Run Code Online (Sandbox Code Playgroud)
需要什么论点.joinWith?
Spark Dataset 2.0提供了两个功能createOrReplaceTempView和 createGlobalTempView.我无法理解两种功能之间的基本区别.
根据API文件:
createOrReplaceTempView:此临时视图的生命周期与用于创建此数据集的[[SparkSession]]相关联.
所以,当我调用时sparkSession.close(),定义将被销毁.这是真的吗?
createGlobalTempView:此临时视图的生命周期与此Spark应用程序绑定.
何时会破坏这种类型的视图?任何例子.喜欢sparkSession.close()?
Encoder对于Pojo /原语,Spark数据集从Row转移到了's'.该Catalyst引擎使用ExpressionEncoder的列转换成SQL表达式.但是,似乎没有其他子类Encoder可用作我们自己实现的模板.
下面是一个代码,它在Spark 1.X/DataFrames中很高兴,它不能在新系统中编译:
//mapping each row to RDD tuple
df.map(row => {
var id: String = if (!has_id) "" else row.getAs[String]("id")
var label: String = row.getAs[String]("label")
val channels : Int = if (!has_channels) 0 else row.getAs[Int]("channels")
val height : Int = if (!has_height) 0 else row.getAs[Int]("height")
val width : Int = if (!has_width) 0 else row.getAs[Int]("width")
val data : Array[Byte] = row.getAs[Any]("data") match {
case str: String => str.getBytes
case arr: Array[Byte@unchecked] …Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-dataset apache-spark-encoders
我在写一个镶木地板文件时遇到这个错误,这已经开始发生了
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Please reduce your request rate. (Service: Amazon S3; Status Code: 503; Error Code: SlowDown; Request ID: 2CA496E2AB87DC16), S3 Extended Request ID: 1dBrcqVGJU9VgoT79NAVGyN0fsbj9+6bipC7op97ZmP+zSFIuH72lN03ZtYabNIA2KaSj18a8ho=
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1389)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:902)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.deleteObjects(AmazonS3Client.java:1777)
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.DeleteObjectsCall.perform(DeleteObjectsCall.java:22)
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.DeleteObjectsCall.perform(DeleteObjectsCall.java:7)
at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:75)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.deleteObjects(AmazonS3LiteClient.java:125)
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.deleteAll(Jets3tNativeFileSystemStore.java:355)
at sun.reflect.GeneratedMethodAccessor121.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy28.deleteAll(Unknown Source)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.doSingleThreadedBatchDelete(S3NativeFileSystem.java:1331)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.delete(S3NativeFileSystem.java:663)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.delete(EmrFileSystem.java:296)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.cleanupJob(FileOutputCommitter.java:463)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.abortJob(FileOutputCommitter.java:482)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:134)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:146) …Run Code Online (Sandbox Code Playgroud) scala amazon-s3 amazon-emr apache-spark apache-spark-dataset
我可以很容易地将Scala中的DataFrame转换为Dataset:
case class Person(name:String, age:Long)
val df = ctx.read.json("/tmp/persons.json")
val ds = df.as[Person]
ds.printSchema
Run Code Online (Sandbox Code Playgroud)
但在Java版本中我不知道如何将Dataframe转换为Dataset?任何的想法?
我的努力是:
DataFrame df = ctx.read().json(logFile);
Encoder<Person> encoder = new Encoder<>();
Dataset<Person> ds = new Dataset<Person>(ctx,df.logicalPlan(),encoder);
ds.printSchema();
Run Code Online (Sandbox Code Playgroud)
但是编译器说:
Error:(23, 27) java: org.apache.spark.sql.Encoder is abstract; cannot be instantiated
Run Code Online (Sandbox Code Playgroud)
基于@Leet-Falcon答案的解决方案:
DataFrame df = ctx.read().json(logFile);
Encoder<Person> encoder = Encoders.bean(Person.class);
Dataset<Person> ds = new Dataset<Person>(ctx, df.logicalPlan(), encoder);
Run Code Online (Sandbox Code Playgroud) 我写过火花工作:
object SimpleApp {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application").setMaster("local")
val sc = new SparkContext(conf)
val ctx = new org.apache.spark.sql.SQLContext(sc)
import ctx.implicits._
case class Person(age: Long, city: String, id: String, lname: String, name: String, sex: String)
case class Person2(name: String, age: Long, city: String)
val persons = ctx.read.json("/tmp/persons.json").as[Person]
persons.printSchema()
}
}
Run Code Online (Sandbox Code Playgroud)
在IDE中运行main函数时,发生2错误:
Error:(15, 67) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported …Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-dataset apache-spark-encoders
我在Scala中使用Spark,我的聚合列是匿名的.有没有一种方便的方法来重命名数据集中的多个列?我想到了征收模式与as更关键的列是一个结构(由于groupBy操作),我不能找出如何定义case class与StructType它.
我尝试按如下方式定义模式:
val returnSchema = StructType(StructField("edge", StructType(StructField("src", IntegerType, true),
StructField("dst", IntegerType), true)),
StructField("count", LongType, true))
edge_count.as[returnSchema]
Run Code Online (Sandbox Code Playgroud)
但是我遇到了编译错误:
Message: <console>:74: error: overloaded method value apply with alternatives:
(fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
(fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
(fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, Boolean)
val returnSchema = StructType(StructField("edge", StructType(StructField("src", IntegerType, true),
Run Code Online (Sandbox Code Playgroud) 我们如何覆盖分区数据集,但只覆盖我们要更改的分区?例如,重新计算上周的日常工作,并且只覆盖上周的数据.
默认的Spark行为是覆盖整个表,即使只写一些分区.
我RDD[LabeledPoint]打算在机器学习管道中使用.我们如何将其转换RDD为DataSet?请注意,较新的 spark.mlapis需要Dataset格式的输入.
我想在Spark数据集中将整列的大小写更改为小写
Desired Input
+------+--------------------+
|ItemID| Category name|
+------+--------------------+
| ABC|BRUSH & BROOM HAN...|
| XYZ|WHEEL BRUSH PARTS...|
+------+--------------------+
Desired Output
+------+--------------------+
|ItemID| Category name|
+------+--------------------+
| ABC|brush & broom han...|
| XYZ|wheel brush parts...|
+------+--------------------+
Run Code Online (Sandbox Code Playgroud)
我尝试使用collectAsList()和toString(),这对于非常大的数据集来说是一个缓慢而复杂的过程.
我还发现了一种方法'较低',但没有知道如何让它在dasaset中工作请建议我一个简单或有效的方法来做到这一点.提前致谢
apache-spark apache-spark-sql spark-dataframe apache-spark-dataset