标签: apache-spark-sql

Spark RDD groupByKey + join 与 join 性能对比

我正在与其他用户共享的集群上使用 Spark。所以仅仅根据运行时间来判断我的哪一个代码运行效率更高是不可靠的。因为当我运行更高效的代码时,其他人可能会运行大量数据,并使我的代码执行更长时间。

那么我可以在这里问两个问题吗:

  1. 我正在使用joinfunction 来 join 2RDDs并且我尝试groupByKey()在 using 之前使用join,如下所示:

    rdd1.groupByKey().join(rdd2)
    
    Run Code Online (Sandbox Code Playgroud)

    似乎花了更长的时间,但是我记得当我使用 Hadoop Hive 时,group by 让我的查询运行得更快。由于 Spark 使用惰性求值,我想知道groupByKeybefore是否join会让事情变得更快

  2. 我注意到Spark有一个SQL模块,到目前为止我真的没有时间尝试它,但是我可以问一下SQL模块和RDD SQL类似功能之间有什么区别吗?

apache-spark rdd apache-spark-sql pyspark

0
推荐指数
1
解决办法
4321
查看次数

Spark RDD 上的非持久化操作的成本有多大?

我想知道,rdd.unpersist()spark RDD 上的操作成本有多大?存储级别设置是否会影响此操作的性能?任何基准(结果/技术)都会非常有帮助。

apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
1169
查看次数

spark sql 字符串到时间戳缺少毫秒

为什么是:

import spark.implicits._
  val content = Seq(("2019", "09", "11","17","16","54","762000000")).toDF("year", "month", "day", "hour", "minute", "second", "nano")
  content.printSchema
  content.show
  content.withColumn("event_time_utc", to_timestamp(concat('year, 'month, 'day, 'hour, 'minute, 'second), "yyyyMMddHHmmss"))
    .withColumn("event_time_utc_millis", to_timestamp(concat('year, 'month, 'day, 'hour, 'minute, 'second, substring('nano, 0, 3)), "yyyyMMddHHmmssSSS"))
    .select('year, 'month, 'day, 'hour, 'minute, 'second, 'nano,substring('nano, 0, 3), 'event_time_utc, 'event_time_utc_millis)
    .show
Run Code Online (Sandbox Code Playgroud)

缺少毫秒?

+----+-----+---+----+------+------+---------+---------------------+-------------------+---------------------+
|year|month|day|hour|minute|second|     nano|substring(nano, 0, 3)|     event_time_utc|event_time_utc_millis|
+----+-----+---+----+------+------+---------+---------------------+-------------------+---------------------+
|2019|   09| 11|  17|    16|    54|762000000|                  762|2019-09-11 17:16:54|  2019-09-11 17:16:54|
+----+-----+---+----+------+------+---------+---------------------+-------------------+---------------------+
Run Code Online (Sandbox Code Playgroud)

对于格式字符串:如果我没记错的话yyyyMMddHHmmssSSS,它应该包括毫秒SSS

timestamp milliseconds format-string apache-spark apache-spark-sql

0
推荐指数
1
解决办法
4561
查看次数

使用 Apache Spark DataFrame 的部门的第二高价值

如果我们将使用 SQL 查询来实现每个部门的第二大值的输出,那么我现在有一个表,department并且value现在可用,那么我们可以这样写:

select * from (SELECT department, asset_value, DENSE_RANK() over (partition by DEPARTMENT order by ASSET_VALUE) as x from [User1].[dbo].[UsersRecord]) y where x = 2;
Run Code Online (Sandbox Code Playgroud)

但是,在 Apache Spark 中,如果我们有不使用 SparkSQL 的限制,并且必须仅使用 DataFrame 实现输出,那么我们应该如何编写 Scala 代码?

我通过文档的帮助从我的角度进行了尝试,但无法弄清楚。

我的代码:

package com.tg.testpack1

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

object class2
{
 def main(args: Array[String]) 
 {
    val sparksessionobject = SparkSession.builder()
      .master("local")
      .config("spark.sql.warehouse.dir", "C:/Users/ox/spark/spark/spark-warehouse")
      .getOrCreate()
    sparksessionobject.conf.set("spark.sql.shuffle.partitions", 4)
    sparksessionobject.conf.set("spark.executor.memory", "2g")
    import sparksessionobject.sqlContext.implicits._

val …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql

0
推荐指数
1
解决办法
4312
查看次数

如何在不使用 SQL 插入的情况下向 Scala 中的 DataFrame 添加/追加新行?

我有一个按以下方式创建的 DataFrame。

val someDF = Seq((8, "bat"),(64, "mouse"),(-27, "horse")).toDF("number", "word")
someDF.printSchema
root
 |-- number: integer (nullable = false)
 |-- word: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

使用 SQL API,可以通过创建临时表并运行插入查询向其中插入一行。有什么方法可以使用 DataFrame API 的方法追加/添加新行?

scala apache-spark apache-spark-sql

0
推荐指数
1
解决办法
1062
查看次数

如何将 Scala 数据框中的所有十进制列转换为双精度类型?

我有一个十进制和字符串类型的数据框。我想将所有十进制列转换为 double 而不命名它们。我试过这个没有成功。有点新的火花。

>df.printSchema

root

 |-- var1: decimal(38,10) (nullable = true)
 |-- var2: decimal(38,10) (nullable = true)
 |-- var3: decimal(38,10) (nullable = true)
…
150 more decimal and string columns
Run Code Online (Sandbox Code Playgroud)

我尝试:

import org.apache.spark.sql.types._

val cols = df.columns.map(x => {
    if (x.dataType == DecimalType(38,0)) col(x).cast(DoubleType) 
    else col(x)
})
Run Code Online (Sandbox Code Playgroud)

我得到

<console>:30: error: value dataType is not a member of String
           if (x.dataType == DecimalType(38,0)) col(x).cast(DoubleType)
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

0
推荐指数
1
解决办法
3028
查看次数

用pyspark中对应的数字替换数组中的元素

我有一个如下所示的数据框:

   +----------+--------------------------------+
   | Index    |           flagArray            |
   +----------+--------------------------------+
   |    1     | ['A','S','A','E','Z','S','S']  | 
   +----------+--------------------------------+
   |    2     | ['A','Z','Z','E','Z','S','S']  |
   +--------- +--------------------------------+
Run Code Online (Sandbox Code Playgroud)

我想用其相应的数值来表示数组元素。

     A - 0
     F - 1
     S - 2
     E - 3
     Z - 4
Run Code Online (Sandbox Code Playgroud)

所以我的输出数据帧应该看起来像

   +----------+--------------------------------+--------------------------------+
   | Index    |           flagArray            |           finalArray           |
   +----------+--------------------------------+--------------------------------+
   |    1     | ['A','S','A','E','Z','S','S']  | [0, 2, 0, 3, 4, 2, 2]          | 
   +----------+--------------------------------+--------------------------------+
   |    2     | ['A','Z','Z','E','Z','S','S']  | [0, 4, 4, 3, 4, 2, 2]          |
   +--------- +--------------------------------+--------------------------------+
Run Code Online (Sandbox Code Playgroud)

我在 pyspark 中编写了一个 udf,我通过编写一些 …

python-3.x apache-spark apache-spark-sql pyspark pyspark-dataframes

0
推荐指数
1
解决办法
1435
查看次数

Scala Spark Dataframe 创建一个新列,其中包含另一列的前一值和当前值的最大值

我有一个只有列category和 A列的数据框,如下所示。我想填充 B 列,以便它比较 A 的当前值和 B 的先前值并存储每个类别的最大值。尝试使用 Windows 函数、滞后、类别最大值等,但我面临的最大挑战是如何在比较两个值时记住较早的最大值。

 +---+--------+--+--+
 id |  category | A | B |
 +---+--------+--+--+
  1  Fruit   1   1
  2  Fruit   5   5
  3  Fruit   3   5 
  4  Fruit   4   5 
  1  Dessert 4   4
  2  Dessert 2   4
  1  Veggies 11  11
  2  Veggies 7   11
  3  Veggies 12  12
  4  Veggies 3   12
  ---+------+---+----+-
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

0
推荐指数
1
解决办法
154
查看次数

写入 hdfs 路径时出现错误 java.io.IOException: Failed to rename

我使用的是使用 hadoop-2.6.5.jar 版本的 spark-sql-2.4.1v。我需要先将数据保存在 hdfs 上,然后再转移到 cassandra。因此,我试图将数据保存在 hdfs 上,如下所示:

String hdfsPath = "/user/order_items/";
cleanedDs.createTempViewOrTable("source_tab");

givenItemList.parallelStream().forEach( item -> {   
    String query = "select $item  as itemCol , avg($item) as mean groupBy year";
    Dataset<Row> resultDs = sparkSession.sql(query);

    saveDsToHdfs(hdfsPath, resultDs );   
});


public static void saveDsToHdfs(String parquet_file, Dataset<Row> df) {
    df.write()                                 
      .format("parquet")
      .mode("append")
      .save(parquet_file);
    logger.info(" Saved parquet file :   " + parquet_file + "successfully");
}
Run Code Online (Sandbox Code Playgroud)

当我在集群上运行我的工作时,它无法抛出此错误:

java.io.IOException: Failed to rename FileStatus{path=hdfs:/user/order_items/_temporary/0/_temporary/attempt_20180626192453_0003_m_000007_59/part-00007.parquet; isDirectory=false; length=952309; replication=1; blocksize=67108864; modification_time=1530041098000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to hdfs:/user/order_items/part-00007.parquet …
Run Code Online (Sandbox Code Playgroud)

hadoop hdfs apache-spark hadoop2 apache-spark-sql

0
推荐指数
1
解决办法
1744
查看次数

spark udf 没有被调用

给出以下示例:

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._

val testUdf: UserDefinedFunction = udf((a: String, b: String, c: Int) => { 
  val out = s"test1: $a $b $c"
  println(out)
  out
})

val testUdf2: UserDefinedFunction = udf((a: String, b: String, c: String) => { 
  val out = s"test2: $a $b $c"
  println(out)
  out
})

Seq(("hello", "world", null))
.toDF("a", "b", "c")
.withColumn("c", $"c" cast "Int")
.withColumn("test1", testUdf($"a", $"b", $"c"))
.withColumn("test2", testUdf2($"a", $"b", $"c"))
.show
Run Code Online (Sandbox Code Playgroud)

testUdf似乎没有被调用。没有错误,没有警告,它只是返回 null。

有没有办法检测这些静默故障?另外,这里发生了什么?

火花 2.4.4 斯卡拉 2.11

scala user-defined-functions apache-spark apache-spark-sql

0
推荐指数
1
解决办法
216
查看次数