我正在尝试优化我的 Spark 应用程序工作。
我试图理解这个问题的要点:如何在唯一键上加入 DataFrames 时避免洗牌?
我已确保必须进行连接操作的键分布在同一分区内(使用我的自定义分区器)。
我也无法进行广播连接,因为根据情况我的数据可能会变大。
在上述问题的答案中,重新分区仅优化连接,但我需要的是不进行混洗的连接。我对分区内键的帮助下的连接操作很满意。
是否可以?如果类似的功能不存在,我想实现类似 joinperpartition 的功能。
我正在使用 Spark 2.1 (scala 2.11)。
我想将具有定义模式的 json 格式字符串从一个数据帧加载到另一个数据帧中。我尝试了一些解决方案,但最便宜的是标准列函数 from_json 。我用这个函数尝试了一个例子(https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-functions-collection.html#from_json),它给了我意想不到的结果。
val df = spark.read.text("testFile.txt")
df.show(false)
+----------------+
|value |
+----------------+
|{"a": 1, "b": 2}|
|{bad-record |
+----------------+
df.select(from_json(col("value"),
StructType(List(
StructField("a",IntegerType),
StructField("b",IntegerType)
))
)).show(false)
+-------------------+
|jsontostruct(value)|
+-------------------+
|[1,2] |
|null |
+-------------------+
Run Code Online (Sandbox Code Playgroud)
此行为类似于 mode:PERMISSIVE,它不是默认的。默认情况下,它设置为 FAILFAST 模式,这意味着只要输入数据和强制模式不匹配,它就应该抛出异常。
我尝试使用 DataFrameReader(JSON DataSource 和 FAILFAST 模式)加载 testFile.txt 并成功捕获异常。
spark.read.option("mode","FAILFAST").json("test.txt").show(false)
---
Caused by: org.apache.spark.sql.catalyst.json.SparkSQLJsonProcessingException: Malformed line in FAILFAST mode: {bad-record
---
Run Code Online (Sandbox Code Playgroud)
虽然两种情况下的解析模式相同,但为什么各自的输出如此不同?
我是 Spark 和 Scala 的新手。我正在阅读 Spark 的 distinct() 函数。但我找不到任何适当的细节。我有一些我无法解决的疑问,并已将它们写下来。
在 Spark 中如何实现 distinct() ?
我不太擅长使用 Spark 源代码来识别整个流程。当我检查执行计划时,我只能看到一个 ShuffleRDD
distinct 的时间复杂度是多少?
我还从 Google 搜索中发现,它还以某种方式使用了散列和排序。
所以,我想它是否使用与在 Hashset 的帮助下从数组中获取唯一元素相同的原理。如果它是一个系统,我会猜到时间复杂度是 O(nlogn) 。
但是它分布在许多分区中并被打乱,时间复杂度的顺序是什么?
有没有办法避免在特定情况下改组?
如果我确保按照我的用例正确分区我的数据,我可以避免改组吗?
即,例如,假设在具有唯一行的数据框中分解一个 ArrayType 列会创建新行,而其他列被复制。我将选择其他列。通过这种方式,我确保每个分区的重复项都是唯一的。因为我知道每个分区的重复项是唯一的,所以我可以避免洗牌,只是敏锐地删除该分区中的重复项
我还发现这是否 spark 的 distinct() 函数只对每个分区中的不同元组进行洗牌。
谢谢你的帮助 。如果我在任何地方错了,请纠正我。