car*_*iem 5 python arrays count apache-spark
我有一个这样的数据框:
df = spark.createDataFrame([(0, ["B","C","D","E"]),(1,["E","A","C"]),(2, ["F","A","E","B"]),(3,["E","G","A"]),(4,["A","C","E","B","D"])], ["id","items"])
Run Code Online (Sandbox Code Playgroud)
它创建了一个df像这样的数据框:
+---+-----------------+
| 0| [B, C, D, E]|
| 1| [E, A, C]|
| 2| [F, A, E, B]|
| 3| [E, G, A]|
| 4| [A, C, E, B, D]|
+---+-----------------+
Run Code Online (Sandbox Code Playgroud)
我想得到这样的结果:
+---+-----+
|all|count|
+---+-----+
| F| 1|
| E| 5|
| B| 3|
| D| 2|
| C| 3|
| A| 4|
| G| 1|
+---+-----+
Run Code Online (Sandbox Code Playgroud)
这基本上只是找到所有不同的元素df["items"]并计算它们的频率。如果我的数据更易于管理,我会这样做:
all_items = df.select(explode("items").alias("all"))
result = all_items.groupby(all_items.all).count().distinct()
result.show()
Run Code Online (Sandbox Code Playgroud)
但是因为我的数据在每个列表中有数百万行和数千个元素,所以这不是一个选项。我正在考虑逐行执行此操作,以便一次只处理 2 个列表。因为大多数元素经常在多行中重复(但每行中的列表是一个集合),这种方法应该可以解决我的问题。但问题是,我真的不知道如何在 Spark 中做到这一点,因为我才刚刚开始学习。请问有人可以帮忙吗?
您需要做的是减少进入爆炸的分区的大小。有 2 个选项可以执行此操作。首先,如果您的输入数据是可拆分的,您可以减小其大小,spark.sql.files.maxPartitionBytes以便 Spark 读取较小的拆分。另一种选择是在爆炸之前重新分区。
该默认值的maxPartitionBytes为128MB,这样的Spark将尝试在128MB块读取数据。如果数据不可拆分,那么它会将整个文件读入单个分区,在这种情况下,您需要改为执行repartition。
在你的情况下,因为你正在做一个爆炸,假设它增加了 100 倍,每个分区有 128MB,你最终每个分区有 12GB+ 出来!
您可能需要考虑的另一件事是您的随机分区,因为您正在进行聚合。同样,您可能需要通过设置spark.sql.shuffle.partitions为比默认值 200 更高的值来增加爆炸后聚合的分区。您可以使用 Spark UI 查看您的 shuffle 阶段并查看每个任务正在读取多少数据以及相应调整。
我在刚刚在欧洲 Spark 峰会上发表的演讲中讨论了这个和其他调整建议。
| 归档时间: |
|
| 查看次数: |
2811 次 |
| 最近记录: |