我想在Apache Spark连接中包含空值.Spark默认情况下不包含null的行.
这是默认的Spark行为.
val numbersDf = Seq(
("123"),
("456"),
(null),
("")
).toDF("numbers")
val lettersDf = Seq(
("123", "abc"),
("456", "def"),
(null, "zzz"),
("", "hhh")
).toDF("numbers", "letters")
val joinedDf = numbersDf.join(lettersDf, Seq("numbers"))
Run Code Online (Sandbox Code Playgroud)
这是输出joinedDf.show():
+-------+-------+
|numbers|letters|
+-------+-------+
| 123| abc|
| 456| def|
| | hhh|
+-------+-------+
Run Code Online (Sandbox Code Playgroud)
这是我想要的输出:
+-------+-------+
|numbers|letters|
+-------+-------+
| 123| abc|
| 456| def|
| | hhh|
| null| zzz|
+-------+-------+
Run Code Online (Sandbox Code Playgroud) 我有一个largeDataFrame(多列和数十亿行)和一个smallDataFrame(单列和10,000行).
我想所有的行从过滤largeDataFrame每当some_identifier列在largeDataFrame比赛中的行之一smallDataFrame.
这是一个例子:
largeDataFrame
some_idenfitier,first_name
111,bob
123,phil
222,mary
456,sue
Run Code Online (Sandbox Code Playgroud)
smallDataFrame
some_identifier
123
456
Run Code Online (Sandbox Code Playgroud)
desiredOutput
111,bob
222,mary
Run Code Online (Sandbox Code Playgroud)
这是我丑陋的解决方案.
val smallDataFrame2 = smallDataFrame.withColumn("is_bad", lit("bad_row"))
val desiredOutput = largeDataFrame.join(broadcast(smallDataFrame2), Seq("some_identifier"), "left").filter($"is_bad".isNull).drop("is_bad")
Run Code Online (Sandbox Code Playgroud)
有更清洁的解决方案吗?
我正在使用的数据湖(df)有2 TB的数据和20,000个文件.我想将数据集压缩为2,000个1 GB文件.
如果您运行df.coalesce(2000)并写入磁盘,则数据湖包含1.9 TB的数据.
如果您运行df.repartition(2000)并写入磁盘,则数据湖包含2.6 TB的数据.
repartition()数据湖中的每个文件都比预期的大0.3 GB(它们都是1.3 GB文件而不是1 GB文件).
为什么该repartition()方法会增加整个数据湖的大小?
有一个相关问题讨论了在聚合运行后数据湖大小增加的原因.答案是:
一般来说,像Parquet这样的柱状存储格式在数据分发(数据组织)和各列的基数方面非常敏感.数据越有条理,基数越低,存储效率越高.
coalesce()算法是否提供更有条理的数据......我不这么认为......
我不认为另一个问题回答了我的问题.
spark-daria项目上传到Spark Packages,我正在使用sbt-spark-package插件访问另一个SBT项目中的spark-daria代码.
我可以在文件中sbt assembly使用以下代码生成的胖JAR文件中包含spark-daria build.sbt.
spDependencies += "mrpowers/spark-daria:0.3.0"
val requiredJars = List("spark-daria-0.3.0.jar")
assemblyExcludedJars in assembly := {
val cp = (fullClasspath in assembly).value
cp filter { f =>
!requiredJars.contains(f.data.getName)
}
}
Run Code Online (Sandbox Code Playgroud)
这段代码感觉就像一个黑客.有没有更好的方法在fat JAR文件中包含spark-daria?
NB我想在这里建立一个半胖的JAR文件.我希望spark-daria包含在JAR文件中,但我不希望JAR文件中包含所有Spark!
有一个CSV文件的数据湖,全天更新.我正在尝试使用此博客文章中概述的Trigger.Once功能创建Spark结构化流工作,以定期写入已写入Parquet数据湖中CSV数据湖的新数据.
这就是我所拥有的:
val df = spark
.readStream
.schema(s)
.csv("s3a://csv-data-lake-files")
Run Code Online (Sandbox Code Playgroud)
以下命令将所有数据写入Parquet湖,但在写完所有数据后没有停止(我必须手动取消作业).
processedDf
.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", "s3-path-to-checkpoint")
.start("s3-path-to-parquet-lake")
Run Code Online (Sandbox Code Playgroud)
以下工作也有效,但在写完所有数据后都没有停止(我不得不手动取消工作):
val query = processedDf
.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", "s3-path-to-checkpoint")
.start("s3-path-to-parquet-lake")
query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
以下命令在写入任何数据之前停止查询.
val query = processedDf
.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", "s3-path-to-checkpoint")
.start("s3-path-to-parquet-lake")
query.stop()
Run Code Online (Sandbox Code Playgroud)
如何配置writeStream查询以等待所有增量数据写入Parquet文件然后停止?
我想从一个简单的CSV文件创建一个Spark数据集.以下是CSV文件的内容:
name,state,number_of_people,coolness_index
trenton,nj,"10","4.5"
bedford,ny,"20","3.3"
patterson,nj,"30","2.2"
camden,nj,"40","8.8"
Run Code Online (Sandbox Code Playgroud)
以下是制作数据集的代码:
var location = "s3a://path_to_csv"
case class City(name: String, state: String, number_of_people: Long)
val cities = spark.read
.option("header", "true")
.option("charset", "UTF8")
.option("delimiter",",")
.csv(location)
.as[City]
Run Code Online (Sandbox Code Playgroud)
以下是错误消息:"无法number_of_people向字符串转换为bigint,因为它可能会截断"
Databricks讨论了如何在此博客文章中创建数据集和此特定错误消息.
编码器急切地检查您的数据是否与预期的架构匹配,在您尝试错误处理TB数据之前提供有用的错误消息.例如,如果我们尝试使用太小的数据类型,那么转换为对象将导致截断(即numStudents大于一个字节,其最大值为255),Analyzer将发出AnalysisException.
我正在使用该Long类型,所以我没想到会看到此错误消息.
我已经使用get方法创建了一个Config对象,以根据PROJECT_ENV环境变量返回不同的字符串.如果PROJECT_ENV=test,则Config.get("somePath")返回some/path.csv,否则返回s3a://some_bucket/a_file.csv.
object Config {
def test(): Map[String,String] = {
Map(
"somePath" -> "some/path.csv"
)
}
def default(): Map[String,String] = {
Map(
"somePath" -> "s3a://some_bucket/a_file.csv"
)
}
def get(key: String, env: Option[String] = sys.env.get("PROJECT_ENV")): String = {
val lookupMap = if (env == Some("test")) {
List(default(), test()).flatten.toMap
} else {
default()
}
lookupMap(key)
}
}
Run Code Online (Sandbox Code Playgroud)
使用Ruby/RSpec,我在spec_helper.rb文件中设置环境变量ENV['PROJECT_ENV'] = 'test'.
什么是spec_helper.rb文件的最新等价物?如何在Scala中设置环境变量? 这个答案是不够的.
如果我运行$ PROJECT_ENV=test sbt test,我的测试套件会成功运行,但我想简单地运行$ sbt …
如果DataFrame在被保存为Parquet文件之前进行排序,数据提取会更快地运行.
假设我们有以下peopleDfDataFrame(假装这是一个样本而真正的那个有200亿行):
+-----+----------------+
| age | favorite_color |
+-----+----------------+
| 54 | blue |
| 10 | black |
| 13 | blue |
| 19 | red |
| 89 | blue |
+-----+----------------+
Run Code Online (Sandbox Code Playgroud)
让我们将这个DataFrame的已排序和未排序版本写出到Parquet文件.
peopleDf.write.parquet("s3a://some-bucket/unsorted/")
peopleDf.sort($"favorite_color").write.parquet("s3a://some-bucket/sorted/")
Run Code Online (Sandbox Code Playgroud)
在读取排序数据并基于数据提取时,是否有任何性能提升favorite_color?
val pBlue1 = spark.read.parquet("s3a://some-bucket/unsorted/").filter($"favorite_color" === "blue")
// is this faster?
val pBlue2 = spark.read.parquet("s3a://some-bucket/sorted/").filter($"favorite_color" === "blue")
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用Spark将较大的分区数据集写到磁盘上,并且该partitionBy算法在我尝试过的两种方法中都遇到了麻烦。
分区严重偏斜-有些分区很大,有些很小。
问题1:
当我之前使用repartition时repartitionBy,Spark将所有分区写为单个文件,即使是大文件也是如此
val df = spark.read.parquet("some_data_lake")
df
.repartition('some_col).write.partitionBy("some_col")
.parquet("partitioned_lake")
Run Code Online (Sandbox Code Playgroud)
这需要永远执行,因为Spark不会并行编写大型分区。如果其中一个分区具有1TB的数据,Spark将尝试将整个1TB的数据作为单个文件写入。
问题2:
当我不使用时repartition,Spark会写出太多文件。
此代码将写出疯狂的文件。
df.write.partitionBy("some_col").parquet("partitioned_lake")
Run Code Online (Sandbox Code Playgroud)
我在一个很小的8 GB数据子集上运行了此操作,Spark写入了85,000+个文件!
当我尝试在生产数据集上运行此文件时,一个包含1.3 GB数据的分区被写为3,100个文件。
我想要什么
我希望每个分区都写成1 GB文件。因此,具有7 GB数据的分区将作为7个文件被写出,而具有0.3 GB数据的分区将作为单个文件被写出。
我最好的前进道路是什么?
这个问题讨论如何链接自定义 PySpark 2 转换。
DataFrame #transform 方法已添加到 PySpark 3 API 中。
此代码片段显示了一个不带参数且按预期工作的自定义转换,以及另一个带参数但不工作的自定义转换。
from pyspark.sql.functions import col, lit
df = spark.createDataFrame([(1, 1.0), (2, 2.)], ["int", "float"])
def with_funny(word):
def inner(df):
return df.withColumn("funny", lit(word))
return inner
def cast_all_to_int(input_df):
return input_df.select([col(col_name).cast("int") for col_name in input_df.columns])
df.transform(with_funny("bumfuzzle")).transform(cast_all_to_int).show()
Run Code Online (Sandbox Code Playgroud)
这是输出的内容:
+---+-----+-----+
|int|float|funny|
+---+-----+-----+
| 1| 1| null|
| 2| 2| null|
+---+-----+-----+
Run Code Online (Sandbox Code Playgroud)
应如何with_funny()定义该方法来输出 PySpark 3 API 的值?
apache-spark ×8
scala ×4
sbt ×2
dataframe ×1
join ×1
parquet ×1
partitioning ×1
pyspark ×1
sbt-assembly ×1
scalatest ×1
sorting ×1
sql ×1