小编Ram*_*ram的帖子

SparkSQL:如何处理用户定义函数中的空值?

给定表1,其中一列为"x",类型为String.我想创建表2,其中列为"y",它是"x"中给出的日期字符串的整数表示形式.

必不可少的是将null值保留在"y"列中.

表1(数据帧df1):

+----------+
|         x|
+----------+
|2015-09-12|
|2015-09-13|
|      null|
|      null|
+----------+
root
 |-- x: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

表2(数据帧df2):

+----------+--------+                                                                  
|         x|       y|
+----------+--------+
|      null|    null|
|      null|    null|
|2015-09-12|20150912|
|2015-09-13|20150913|
+----------+--------+
root
 |-- x: string (nullable = true)
 |-- y: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)

用于将列"x"中的值转换为列"y"的用户定义函数(udf)为:

val extractDateAsInt = udf[Int, String] (
  (d:String) => d.substring(0, 10)
      .filterNot( "-".toSet)
      .toInt )
Run Code Online (Sandbox Code Playgroud)

并且工作,处理空值是不可能的.

尽管如此,我可以做类似的事情

val extractDateAsIntWithNull = udf[Int, String] (
  (d:String) => 
    if (d != …
Run Code Online (Sandbox Code Playgroud)

scala nullable user-defined-functions apache-spark apache-spark-sql

27
推荐指数
3
解决办法
4万
查看次数

如何找到RDD的大小

我有RDD[Row],需要持久保存到第三方存储库.但是这个第三方存储库在一次调用中最多接受5 MB.

所以我想根据RDD中存在的数据大小创建分区,而不是基于RDD中存在的行数.

如何找到a的大小RDD并根据它创建分区?

apache-spark apache-spark-sql

23
推荐指数
3
解决办法
4万
查看次数

在Scala选项类型isEmpty方法中检查无

我正在使用Option Type的isEmpty方法来检查是否没有值.我不想case match在我的情况下使用as,我只是想检查是否有,None因为我会向调用者抛出错误.但isEmpty即使值很大,该方法也会失败None.

这是我试过的!

val questionOption = Question.getQuestionForQuestionId(userExam.get.examId, currQuesId + 1)

if(questionOption.isEmpty) {
    Left(Failure(FailureCode.NO_DATA_FOUND, "Cannot get next exam question you tampered your cookie or cookie is lost.... >> TODO... modify the exception message"))
} 
Run Code Online (Sandbox Code Playgroud)

它没有进入if条件.我试着在questionOption上做一个println,然后打印出None.所以想知道为什么我没有进入if条件.

scala scala-option

22
推荐指数
2
解决办法
4万
查看次数

为什么Spark 1.6不使用Akka?

当我读取Master类的spark-1.6源代码时,receiveAndReply方法似乎没有使用Akka.[比照 在这里.]

为什么不使用Akka?他们用什么取代了Akka?

akka apache-spark

21
推荐指数
1
解决办法
4033
查看次数

在Spark SQL中编写SQL与使用Dataframe API

我是Spark SQL世界的新宠.我目前正在迁移我的应用程序的摄取代码,其中包括在舞台,HDFS中的Raw和Application层中摄取数据以及执行CDC(更改数据捕获),这当前在Hive查询中编写并通过Oozie执行.这需要迁移到Spark应用程序(当前版本1.6).代码的另一部分稍后将迁移.

在spark-SQL中,我可以直接从Hive中的表创建数据帧,只需执行查询(如sqlContext.sql("my hive hql")).另一种方法是使用数据帧API并以这种方式重写hql.

这两种方法有什么不同?

使用Dataframe API是否有任何性能提升?

有人建议,在直接使用"SQL"查询时会有一层额外的SQL引发核心引擎,这可能会在一定程度上影响性能,但我没有找到任何证实该声明的材料.我知道使用Datafrmae API的代码会更加紧凑,但是当我的hql查询非常方便时,将完整的代码编写到Dataframe API中真的值得吗?

谢谢.

hive hdfs apache-spark apache-spark-sql

18
推荐指数
3
解决办法
7003
查看次数

列点名称带点火花

我试图从a中取列DataFrame并将其转换为RDD[Vector].

问题是我的名称中有一个带"dot"的列作为以下数据集:

"col0.1","col1.2","col2.3","col3.4"
1,2,3,4
10,12,15,3
1,12,10,5
Run Code Online (Sandbox Code Playgroud)

这就是我正在做的事情:

val df = spark.read.format("csv").options(Map("header" -> "true", "inferSchema" -> "true")).load("C:/Users/mhattabi/Desktop/donnee/test.txt")
val column=df.columns.map(c=>s"`${c}`")
val rows = new VectorAssembler().setInputCols(column).setOutputCol("vs")
  .transform(df)
  .select("vs")
  .rdd
val data =rows.map(_.getAs[org.apache.spark.ml.linalg.Vector](0))
  .map(org.apache.spark.mllib.linalg.Vectors.fromML)

val mat: RowMatrix = new RowMatrix(data)
//// Compute the top 5 singular values and corresponding singular vectors.
val svd: SingularValueDecomposition[RowMatrix, Matrix] = mat.computeSVD(mat.numCols().toInt, computeU = true)
val U: RowMatrix = svd.U  // The U factor is a RowMatrix.
val s: Vector = svd.s  // The singular …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql apache-spark-ml apache-spark-mllib

16
推荐指数
2
解决办法
6450
查看次数

使用Kryo序列化时为什么Spark表现更差?

我为我的Spark作业启用了Kryo序列化,启用了设置以要求注册,并确保我的所有类型都已注册.

val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClasses(classes)
conf.registerAvroSchemas(avroSchemas: _*)
Run Code Online (Sandbox Code Playgroud)

作业的Wallclock-time性能恶化了大约20%,并且洗牌的字节数增加了近400%.

鉴于Spark文档建议Kryo应该更好,这对我来说似乎真的很令人惊讶.

Kryo比Java序列化更快,更紧凑(通常高达10倍)

我手动调用serializeSpark的实例上的方法org.apache.spark.serializer.KryoSerializerorg.apache.spark.serializer.JavaSerializer我的数据示例.结果与Spark文档中的建议一致:Kryo生成了98个字节; Java产生了993个字节.这确实是10倍的改进.

一个可能混淆的因素是被序列化和混洗的对象实现了Avro GenericRecord接口.我尝试注册Avro架构SparkConf,但没有显示出任何改进.

我尝试制作新的类来改组简单的Scala数据case class,不包括任何Avro机器.它没有改善shuffle性能或交换的字节数.

Spark代码最终沸腾到以下:

case class A(
    f1: Long,
    f2: Option[Long],
    f3: Int,
    f4: Int,
    f5: Option[String],
    f6: Option[Int],
    f7: Option[String],
    f8: Option[Int],
    f9: Option[Int],
    f10: Option[Int],
    f11: Option[Int],
    f12: String,
    f13: Option[Double],
    f14: Option[Int],
    f15: Option[Double],
    f16: Option[Double],
    f17: List[String],
    f18: String) extends org.apache.avro.specific.SpecificRecordBase {
  def get(f: …
Run Code Online (Sandbox Code Playgroud)

performance scala avro apache-spark

15
推荐指数
1
解决办法
1604
查看次数

如何计算单个组中的总和和计数?

基于以下内容DataFrame:

val client = Seq((1,"A",10),(2,"A",5),(3,"B",56)).toDF("ID","Categ","Amnt")
+---+-----+----+
| ID|Categ|Amnt|
+---+-----+----+
|  1|    A|  10|
|  2|    A|   5|
|  3|    B|  56|
+---+-----+----+
Run Code Online (Sandbox Code Playgroud)

我想按类别获取ID和总金额:

+-----+-----+---------+
|Categ|count|sum(Amnt)|
+-----+-----+---------+
|    B|    1|       56|
|    A|    2|       15|
+-----+-----+---------+
Run Code Online (Sandbox Code Playgroud)

是否可以在不进行连接的情况下进行计数和总和?

client.groupBy("Categ").count
      .join(client.withColumnRenamed("Categ","cat")
           .groupBy("cat")
           .sum("Amnt"), 'Categ === 'cat)
      .drop("cat")
Run Code Online (Sandbox Code Playgroud)

也许是这样的:

client.createOrReplaceTempView("client")
spark.sql("SELECT Categ count(Categ) sum(Amnt) FROM client GROUP BY Categ").show()
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

14
推荐指数
2
解决办法
4万
查看次数

Apache spark处理case语句

我正在处理将SQL代码转换为PySpark代码并遇到一些SQL语句.我不知道如何处理pyspark中的案例陈述?我打算创建一个RDD然后使用rdd.map然后做一些逻辑检查.这是正确的方法吗?请帮忙!

基本上我需要遍历RDD或DF中的每一行,并根据我需要编辑其中一个列值的逻辑.

     case  
               when (e."a" Like 'a%' Or e."b" Like 'b%') 
                And e."aa"='BW' And cast(e."abc" as decimal(10,4))=75.0 Then 'callitA'

               when (e."a" Like 'b%' Or e."b" Like 'a%') 
                And e."aa"='AW' And cast(e."abc" as decimal(10,4))=75.0 Then 'callitB'

else

'CallitC'
Run Code Online (Sandbox Code Playgroud)

apache-spark rdd pyspark spark-dataframe pyspark-sql

13
推荐指数
2
解决办法
3万
查看次数

如何获得今天 - sparksql中的"1天"日期?

如何current_date - 1在sparksql中获得一天,就像cur_date()-1在mysql中一样.

python java scala apache-spark apache-spark-sql

13
推荐指数
4
解决办法
3万
查看次数