小编Joh*_*ter的帖子

Spark join *不*随机播放

我正在尝试优化我的 Spark 应用程序工作。

我试图理解这个问题的要点:如何在唯一键上加入 DataFrames 时避免洗牌?

  1. 我已确保必须进行连接操作的键分布在同一分区内(使用我的自定义分区器)。

  2. 我也无法进行广播连接,因为根据情况我的数据可能会变大。

  3. 在上述问题的答案中,重新分区仅优化连接,但我需要的是不进行混洗的连接。我对分区内键的帮助下的连接操作很满意。

是否可以?如果类似的功能不存在,我想实现类似 joinperpartition 的功能。

scala apache-spark

5
推荐指数
1
解决办法
7161
查看次数

Spark from_json 也不例外

我正在使用 Spark 2.1 (scala 2.11)。

我想将具有定义模式的 json 格式字符串从一个数据帧加载到另一个数据帧中。我尝试了一些解决方案,但最便宜的是标准列函数 from_json 。我用这个函数尝试了一个例子(https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-functions-collection.html#from_json),它给了我意想不到的结果。

val df = spark.read.text("testFile.txt")

df.show(false)

+----------------+
|value           |
+----------------+
|{"a": 1, "b": 2}|
|{bad-record     |
+----------------+


df.select(from_json(col("value"),
      StructType(List(
                  StructField("a",IntegerType),
                  StructField("b",IntegerType)
                ))
    )).show(false)


+-------------------+
|jsontostruct(value)|
+-------------------+
|[1,2]              |
|null               |
+-------------------+
Run Code Online (Sandbox Code Playgroud)

此行为类似于 mode:PERMISSIVE,它不是默认的。默认情况下,它设置为 FAILFAST 模式,这意味着只要输入数据和强制模式不匹配,它就应该抛出异常。

我尝试使用 DataFrameReader(JSON DataSource 和 FAILFAST 模式)加载 testFile.txt 并成功捕获异常。

spark.read.option("mode","FAILFAST").json("test.txt").show(false)

---
Caused by: org.apache.spark.sql.catalyst.json.SparkSQLJsonProcessingException: Malformed line in FAILFAST mode: {bad-record
---
Run Code Online (Sandbox Code Playgroud)

虽然两种情况下的解析模式相同,但为什么各自的输出如此不同?

json scala apache-spark

5
推荐指数
1
解决办法
3167
查看次数

Spark distinct 的实现

我是 Spark 和 Scala 的新手。我正在阅读 Spark 的 distinct() 函数。但我找不到任何适当的细节。我有一些我无法解决的疑问,并已将它们写下来。

  1. 在 Spark 中如何实现 distinct() ?

    我不太擅长使用 Spark 源代码来识别整个流程。当我检查执行计划时,我只能看到一个 ShuffleRDD

  2. distinct 的时间复杂度是多少?

    我还从 Google 搜索中发现,它还以某种方式使用了散列和排序。

    所以,我想它是否使用与在 Hashset 的帮助下从数组中获取唯一元素相同的原理。如果它是一个系统,我会猜到时间复杂度是 O(nlogn) 。

    但是它分布在许多分区中并被打乱,时间复杂度的顺序是什么?

  3. 有没有办法避免在特定情况下改组?

    如果我确保按照我的用例正确分区我的数据,我可以避免改组吗?

    即,例如,假设在具有唯一行的数据框中分解一个 ArrayType 列会创建新行,而其他列被复制。我将选择其他列。通过这种方式,我确保每个分区的重复项都是唯一的。因为我知道每个分区的重复项是唯一的,所以我可以避免洗牌,只是敏锐地删除该分区中的重复项

我还发现这是否 spark 的 distinct() 函数只对每个分区中的不同元组进行洗牌

谢谢你的帮助 。如果我在任何地方错了,请纠正我。

sorting scala dataframe apache-spark apache-spark-sql

2
推荐指数
1
解决办法
1209
查看次数