小编Pow*_*ers的帖子

在Apache Spark Join中包含空值

我想在Apache Spark连接中包含空值.Spark默认情况下不包含null的行.

这是默认的Spark行为.

val numbersDf = Seq(
  ("123"),
  ("456"),
  (null),
  ("")
).toDF("numbers")

val lettersDf = Seq(
  ("123", "abc"),
  ("456", "def"),
  (null, "zzz"),
  ("", "hhh")
).toDF("numbers", "letters")

val joinedDf = numbersDf.join(lettersDf, Seq("numbers"))
Run Code Online (Sandbox Code Playgroud)

这是输出joinedDf.show():

+-------+-------+
|numbers|letters|
+-------+-------+
|    123|    abc|
|    456|    def|
|       |    hhh|
+-------+-------+
Run Code Online (Sandbox Code Playgroud)

这是我想要的输出:

+-------+-------+
|numbers|letters|
+-------+-------+
|    123|    abc|
|    456|    def|
|       |    hhh|
|   null|    zzz|
+-------+-------+
Run Code Online (Sandbox Code Playgroud)

sql scala join apache-spark apache-spark-sql

38
推荐指数
3
解决办法
2万
查看次数

根据指定黑名单标准的另一个DataFrame过滤Spark DataFrame

我有一个largeDataFrame(多列和数十亿行)和一个smallDataFrame(单列和10,000行).

我想所有的行从过滤largeDataFrame每当some_identifier列在largeDataFrame比赛中的行之一smallDataFrame.

这是一个例子:

largeDataFrame

some_idenfitier,first_name
111,bob
123,phil
222,mary
456,sue
Run Code Online (Sandbox Code Playgroud)

smallDataFrame

some_identifier
123
456
Run Code Online (Sandbox Code Playgroud)

desiredOutput

111,bob
222,mary
Run Code Online (Sandbox Code Playgroud)

这是我丑陋的解决方案.

val smallDataFrame2 = smallDataFrame.withColumn("is_bad", lit("bad_row"))
val desiredOutput = largeDataFrame.join(broadcast(smallDataFrame2), Seq("some_identifier"), "left").filter($"is_bad".isNull).drop("is_bad")
Run Code Online (Sandbox Code Playgroud)

有更清洁的解决方案吗?

dataframe apache-spark apache-spark-sql

24
推荐指数
2
解决办法
2万
查看次数

为什么repartition()方法会增加磁盘上的文件大小?

我正在使用的数据湖(df)有2 TB的数据和20,000个文件.我想将数据集压缩为2,000个1 GB文件.

如果您运行df.coalesce(2000)并写入磁盘,则数据湖包含1.9 TB的数据.

如果您运行df.repartition(2000)并写入磁盘,则数据湖包含2.6 TB的数据.

repartition()数据湖中的每个文件都比预期的大0.3 GB(它们都是1.3 GB文件而不是1 GB文件).

为什么该repartition()方法会增加整个数据湖的大小?

一个相关问题讨论了在聚合运行后数据湖大小增加的原因.答案是:

一般来说,像Parquet这样的柱状存储格式在数据分发(数据组织)和各列的基数方面非常敏感.数据越有条理,基数越低,存储效率越高.

coalesce()算法是否提供更有条理的数据......我不这么认为......

我不认为另一个问题回答了我的问题.

apache-spark

15
推荐指数
1
解决办法
852
查看次数

在SBT生成的胖JAR中包含Spark Package JAR文件

spark-daria项目上传到Spark Packages,我正在使用sbt-spark-package插件访问另一个SBT项目中的spark-daria代码.

我可以在文件中sbt assembly使用以下代码生成的胖JAR文件中包含spark-daria build.sbt.

spDependencies += "mrpowers/spark-daria:0.3.0"

val requiredJars = List("spark-daria-0.3.0.jar")
assemblyExcludedJars in assembly := {
  val cp = (fullClasspath in assembly).value
  cp filter { f =>
    !requiredJars.contains(f.data.getName)
  }
}
Run Code Online (Sandbox Code Playgroud)

这段代码感觉就像一个黑客.有没有更好的方法在fat JAR文件中包含spark-daria?

NB我想在这里建立一个半胖的JAR文件.我希望spark-daria包含在JAR文件中,但我不希望JAR文件中包含所有Spark!

scala sbt sbt-assembly apache-spark spark-packages

10
推荐指数
1
解决办法
533
查看次数

使用Spark Structured Streaming和Trigger.Once

有一个CSV文件的数据湖,全天更新.我正在尝试使用此博客文章中概述Trigger.Once功能创建Spark结构化流工作,以定期写入已写入Parquet数据湖中CSV数据湖的新数据.

这就是我所拥有的:

val df = spark
  .readStream
  .schema(s)
  .csv("s3a://csv-data-lake-files")
Run Code Online (Sandbox Code Playgroud)

以下命令将所有数据写入Parquet湖,但在写完所有数据后没有停止(我必须手动取消作业).

processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation", "s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")
Run Code Online (Sandbox Code Playgroud)

以下工作也有效,但在写完所有数据后都没有停止(我不得不手动取消工作):

val query = processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation", "s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")

query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

以下命令在写入任何数据之前停止查询.

val query = processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation", "s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")

query.stop()
Run Code Online (Sandbox Code Playgroud)

如何配置writeStream查询以等待所有增量数据写入Parquet文件然后停止?

scala apache-spark spark-structured-streaming

10
推荐指数
2
解决办法
2354
查看次数

从CSV文件创建Spark数据集

我想从一个简单的CSV文件创建一个Spark数据集.以下是CSV文件的内容:

name,state,number_of_people,coolness_index
trenton,nj,"10","4.5"
bedford,ny,"20","3.3"
patterson,nj,"30","2.2"
camden,nj,"40","8.8"
Run Code Online (Sandbox Code Playgroud)

以下是制作数据集的代码:

var location = "s3a://path_to_csv"

case class City(name: String, state: String, number_of_people: Long)

val cities = spark.read
  .option("header", "true")
  .option("charset", "UTF8")
  .option("delimiter",",")
  .csv(location)
  .as[City]
Run Code Online (Sandbox Code Playgroud)

以下是错误消息:"无法number_of_people向字符串转换为bigint,因为它可能会截断"

Databricks讨论了如何在此博客文章中创建数据集和此特定错误消息.

编码器急切地检查您的数据是否与预期的架构匹配,在您尝试错误处理TB数据之前提供有用的错误消息.例如,如果我们尝试使用太小的数据类型,那么转换为对象将导致截断(即numStudents大于一个字节,其最大值为255),Analyzer将发出AnalysisException.

我正在使用该Long类型,所以我没想到会看到此错误消息.

apache-spark apache-spark-dataset

9
推荐指数
1
解决办法
9130
查看次数

运行Scala/SBT测试套件时设置环境变量

我已经使用get方法创建了一个Config对象,以根据PROJECT_ENV环境变量返回不同的字符串.如果PROJECT_ENV=test,则Config.get("somePath")返回some/path.csv,否则返回s3a://some_bucket/a_file.csv.

object Config {

  def test(): Map[String,String] = {
    Map(
      "somePath" -> "some/path.csv"
    )
  }

  def default(): Map[String,String] = {
    Map(
      "somePath" -> "s3a://some_bucket/a_file.csv"
    )
  }

  def get(key: String, env: Option[String] = sys.env.get("PROJECT_ENV")): String = {
    val lookupMap = if (env == Some("test")) {
      List(default(), test()).flatten.toMap
    } else {
      default()
    }
    lookupMap(key)
  }

}
Run Code Online (Sandbox Code Playgroud)

使用Ruby/RSpec,我在spec_helper.rb文件中设置环境变量ENV['PROJECT_ENV'] = 'test'.

什么是spec_helper.rb文件的最新等价物?如何在Scala中设置环境变量? 这个答案是不够的.

如果我运行$ PROJECT_ENV=test sbt test,我的测试套件会成功运行,但我想简单地运行$ sbt …

scala sbt scalatest

7
推荐指数
2
解决办法
6490
查看次数

通过存储已排序的Parquet文件来增强Spark性能

如果DataFrame在被保存为Parquet文件之前进行排序,数据提取会更快地运行.

假设我们有以下peopleDfDataFrame(假装这是一个样本而真正的那个有200亿行):

+-----+----------------+
| age | favorite_color |
+-----+----------------+
|  54 | blue           |
|  10 | black          |
|  13 | blue           |
|  19 | red            |
|  89 | blue           |
+-----+----------------+
Run Code Online (Sandbox Code Playgroud)

让我们将这个DataFrame的已排序和未排序版本写出到Parquet文件.

peopleDf.write.parquet("s3a://some-bucket/unsorted/")
peopleDf.sort($"favorite_color").write.parquet("s3a://some-bucket/sorted/")
Run Code Online (Sandbox Code Playgroud)

在读取排序数据并基于数据提取时,是否有任何性能提升favorite_color

val pBlue1 = spark.read.parquet("s3a://some-bucket/unsorted/").filter($"favorite_color" === "blue")

// is this faster?

val pBlue2 = spark.read.parquet("s3a://some-bucket/sorted/").filter($"favorite_color" === "blue")
Run Code Online (Sandbox Code Playgroud)

sorting apache-spark parquet

6
推荐指数
1
解决办法
3063
查看次数

使用Spark的partitioningBy方法对S3中的大型偏斜数据集进行分区

我正在尝试使用Spark将较大的分区数据集写到磁盘上,并且该partitionBy算法在我尝试过的两种方法中都遇到了麻烦。

分区严重偏斜-有些分区很大,有些很小。

问题1

当我之前使用repartition时repartitionBy,Spark将所有分区写为单个文件,即使是大文件也是如此

val df = spark.read.parquet("some_data_lake")
df
  .repartition('some_col).write.partitionBy("some_col")
  .parquet("partitioned_lake")
Run Code Online (Sandbox Code Playgroud)

这需要永远执行,因为Spark不会并行编写大型分区。如果其中一个分区具有1TB的数据,Spark将尝试将整个1TB的数据作为单个文件写入。

问题2

当我不使用时repartition,Spark会写出太多文件。

此代码将写出疯狂的文件。

df.write.partitionBy("some_col").parquet("partitioned_lake")
Run Code Online (Sandbox Code Playgroud)

我在一个很小的8 GB数据子集上运行了此操作,Spark写入了85,000+个文件!

当我尝试在生产数据集上运行此文件时,一个包含1.3 GB数据的分区被写为3,100个文件。

我想要什么

我希望每个分区都写成1 GB文件。因此,具有7 GB数据的分区将作为7个文件被写出,而具有0.3 GB数据的分区将作为单个文件被写出。

我最好的前进道路是什么?

partitioning apache-spark apache-spark-sql

6
推荐指数
3
解决办法
631
查看次数

使用带参数的 PySpark 3 DataFrame#transform 方法

这个问题讨论如何链接自定义 PySpark 2 转换。

DataFrame #transform 方法已添加到 PySpark 3 API 中。

此代码片段显示了一个不带参数且按预期工作的自定义转换,以及另一个带参数但不工作的自定义转换。

from pyspark.sql.functions import col, lit

df = spark.createDataFrame([(1, 1.0), (2, 2.)], ["int", "float"])

def with_funny(word):
    def inner(df):
        return df.withColumn("funny", lit(word))
    return inner

def cast_all_to_int(input_df):
    return input_df.select([col(col_name).cast("int") for col_name in input_df.columns])

df.transform(with_funny("bumfuzzle")).transform(cast_all_to_int).show()
Run Code Online (Sandbox Code Playgroud)

这是输出的内容:

+---+-----+-----+
|int|float|funny|
+---+-----+-----+
|  1|    1| null|
|  2|    2| null|
+---+-----+-----+
Run Code Online (Sandbox Code Playgroud)

应如何with_funny()定义该方法来输出 PySpark 3 API 的值?

pyspark

6
推荐指数
1
解决办法
3370
查看次数