给定表1,其中一列为"x",类型为String.我想创建表2,其中列为"y",它是"x"中给出的日期字符串的整数表示形式.
必不可少的是将null值保留在"y"列中.
表1(数据帧df1):
+----------+
| x|
+----------+
|2015-09-12|
|2015-09-13|
| null|
| null|
+----------+
root
|-- x: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
表2(数据帧df2):
+----------+--------+
| x| y|
+----------+--------+
| null| null|
| null| null|
|2015-09-12|20150912|
|2015-09-13|20150913|
+----------+--------+
root
|-- x: string (nullable = true)
|-- y: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)
用于将列"x"中的值转换为列"y"的用户定义函数(udf)为:
val extractDateAsInt = udf[Int, String] (
(d:String) => d.substring(0, 10)
.filterNot( "-".toSet)
.toInt )
Run Code Online (Sandbox Code Playgroud)
并且工作,处理空值是不可能的.
尽管如此,我可以做类似的事情
val extractDateAsIntWithNull = udf[Int, String] (
(d:String) =>
if (d != …Run Code Online (Sandbox Code Playgroud) scala nullable user-defined-functions apache-spark apache-spark-sql
我有RDD[Row],需要持久保存到第三方存储库.但是这个第三方存储库在一次调用中最多接受5 MB.
所以我想根据RDD中存在的数据大小创建分区,而不是基于RDD中存在的行数.
如何找到a的大小RDD并根据它创建分区?
我正在使用Option Type的isEmpty方法来检查是否没有值.我不想case match在我的情况下使用as,我只是想检查是否有,None因为我会向调用者抛出错误.但isEmpty即使值很大,该方法也会失败None.
这是我试过的!
val questionOption = Question.getQuestionForQuestionId(userExam.get.examId, currQuesId + 1)
if(questionOption.isEmpty) {
Left(Failure(FailureCode.NO_DATA_FOUND, "Cannot get next exam question you tampered your cookie or cookie is lost.... >> TODO... modify the exception message"))
}
Run Code Online (Sandbox Code Playgroud)
它没有进入if条件.我试着在questionOption上做一个println,然后打印出None.所以想知道为什么我没有进入if条件.
我是Spark SQL世界的新宠.我目前正在迁移我的应用程序的摄取代码,其中包括在舞台,HDFS中的Raw和Application层中摄取数据以及执行CDC(更改数据捕获),这当前在Hive查询中编写并通过Oozie执行.这需要迁移到Spark应用程序(当前版本1.6).代码的另一部分稍后将迁移.
在spark-SQL中,我可以直接从Hive中的表创建数据帧,只需执行查询(如sqlContext.sql("my hive hql")).另一种方法是使用数据帧API并以这种方式重写hql.
这两种方法有什么不同?
使用Dataframe API是否有任何性能提升?
有人建议,在直接使用"SQL"查询时会有一层额外的SQL引发核心引擎,这可能会在一定程度上影响性能,但我没有找到任何证实该声明的材料.我知道使用Datafrmae API的代码会更加紧凑,但是当我的hql查询非常方便时,将完整的代码编写到Dataframe API中真的值得吗?
谢谢.
我试图从a中取列DataFrame并将其转换为RDD[Vector].
问题是我的名称中有一个带"dot"的列作为以下数据集:
"col0.1","col1.2","col2.3","col3.4"
1,2,3,4
10,12,15,3
1,12,10,5
Run Code Online (Sandbox Code Playgroud)
这就是我正在做的事情:
val df = spark.read.format("csv").options(Map("header" -> "true", "inferSchema" -> "true")).load("C:/Users/mhattabi/Desktop/donnee/test.txt")
val column=df.columns.map(c=>s"`${c}`")
val rows = new VectorAssembler().setInputCols(column).setOutputCol("vs")
.transform(df)
.select("vs")
.rdd
val data =rows.map(_.getAs[org.apache.spark.ml.linalg.Vector](0))
.map(org.apache.spark.mllib.linalg.Vectors.fromML)
val mat: RowMatrix = new RowMatrix(data)
//// Compute the top 5 singular values and corresponding singular vectors.
val svd: SingularValueDecomposition[RowMatrix, Matrix] = mat.computeSVD(mat.numCols().toInt, computeU = true)
val U: RowMatrix = svd.U // The U factor is a RowMatrix.
val s: Vector = svd.s // The singular …Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-sql apache-spark-ml apache-spark-mllib
我为我的Spark作业启用了Kryo序列化,启用了设置以要求注册,并确保我的所有类型都已注册.
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClasses(classes)
conf.registerAvroSchemas(avroSchemas: _*)
Run Code Online (Sandbox Code Playgroud)
作业的Wallclock-time性能恶化了大约20%,并且洗牌的字节数增加了近400%.
鉴于Spark文档建议Kryo应该更好,这对我来说似乎真的很令人惊讶.
Kryo比Java序列化更快,更紧凑(通常高达10倍)
我手动调用serializeSpark的实例上的方法org.apache.spark.serializer.KryoSerializer和org.apache.spark.serializer.JavaSerializer我的数据示例.结果与Spark文档中的建议一致:Kryo生成了98个字节; Java产生了993个字节.这确实是10倍的改进.
一个可能混淆的因素是被序列化和混洗的对象实现了Avro GenericRecord接口.我尝试注册Avro架构SparkConf,但没有显示出任何改进.
我尝试制作新的类来改组简单的Scala数据case class,不包括任何Avro机器.它没有改善shuffle性能或交换的字节数.
Spark代码最终沸腾到以下:
case class A(
f1: Long,
f2: Option[Long],
f3: Int,
f4: Int,
f5: Option[String],
f6: Option[Int],
f7: Option[String],
f8: Option[Int],
f9: Option[Int],
f10: Option[Int],
f11: Option[Int],
f12: String,
f13: Option[Double],
f14: Option[Int],
f15: Option[Double],
f16: Option[Double],
f17: List[String],
f18: String) extends org.apache.avro.specific.SpecificRecordBase {
def get(f: …Run Code Online (Sandbox Code Playgroud) 基于以下内容DataFrame:
val client = Seq((1,"A",10),(2,"A",5),(3,"B",56)).toDF("ID","Categ","Amnt")
+---+-----+----+
| ID|Categ|Amnt|
+---+-----+----+
| 1| A| 10|
| 2| A| 5|
| 3| B| 56|
+---+-----+----+
Run Code Online (Sandbox Code Playgroud)
我想按类别获取ID和总金额:
+-----+-----+---------+
|Categ|count|sum(Amnt)|
+-----+-----+---------+
| B| 1| 56|
| A| 2| 15|
+-----+-----+---------+
Run Code Online (Sandbox Code Playgroud)
是否可以在不进行连接的情况下进行计数和总和?
client.groupBy("Categ").count
.join(client.withColumnRenamed("Categ","cat")
.groupBy("cat")
.sum("Amnt"), 'Categ === 'cat)
.drop("cat")
Run Code Online (Sandbox Code Playgroud)
也许是这样的:
client.createOrReplaceTempView("client")
spark.sql("SELECT Categ count(Categ) sum(Amnt) FROM client GROUP BY Categ").show()
Run Code Online (Sandbox Code Playgroud) 我正在处理将SQL代码转换为PySpark代码并遇到一些SQL语句.我不知道如何处理pyspark中的案例陈述?我打算创建一个RDD然后使用rdd.map然后做一些逻辑检查.这是正确的方法吗?请帮忙!
基本上我需要遍历RDD或DF中的每一行,并根据我需要编辑其中一个列值的逻辑.
case
when (e."a" Like 'a%' Or e."b" Like 'b%')
And e."aa"='BW' And cast(e."abc" as decimal(10,4))=75.0 Then 'callitA'
when (e."a" Like 'b%' Or e."b" Like 'a%')
And e."aa"='AW' And cast(e."abc" as decimal(10,4))=75.0 Then 'callitB'
else
'CallitC'
Run Code Online (Sandbox Code Playgroud) 如何current_date - 1在sparksql中获得一天,就像cur_date()-1在mysql中一样.
apache-spark ×9
scala ×6
akka ×1
avro ×1
hdfs ×1
hive ×1
java ×1
nullable ×1
performance ×1
pyspark ×1
pyspark-sql ×1
python ×1
rdd ×1
scala-option ×1