小编Chr*_*ris的帖子

合并排序连接在 Spark 中如何工作以及为什么会抛出 OOM?

我想深入了解Spark中合并排序连接的概念。我理解总体思路:这与合并排序算法的方法相同:采用 2 个排序数据集,比较第一行,写入最小的行,重复。我也了解如何实现分布式合并排序。

但我不明白它是如何在 Spark 中实现分区和执行器概念的。

这是我的看法。

  1. 鉴于我需要连接 2 个表 A 和 B。表是通过 Spark SQL 从 Hive 读取的,如果这很重要的话。
  2. 默认情况下 Spark 使用 200 个分区。
  3. 然后 Spark 将计算连接键范围(从 minKey(A,B) 到 maxKey(A,B) )并将其分成 200 个部分。两个数据集均按关键范围分为 200 个部分:A 分区和 B 分区。
  4. 与相同键相关的每个 A 分区和每个 B 分区都被发送到同一个执行器,并在那里彼此分开排序。
  5. 现在,200 个执行程序可以加入 200 个 A 分区和 200 个 B 分区,并保证它们共享相同的密钥范围。
  6. 连接通过合并排序算法进行:从 A 分区中获取最小键,与 B 分区中的最小键进行比较,写入匹配或迭代。
  7. 最后,我有 200 个已连接的数据分区。

是否有意义?

问题: 按键倾斜。如果某个键范围包含数据集键的 50%,则某些执行器将受到影响,因为太多行将进入同一分区。当尝试对内存中太大的 A 分区或 B 分区进行排序时,它甚至可能因 OOM 而失败(我不明白为什么 Spark 无法像 Hadoop 那样对磁盘溢出进行排序?..)或者它可能会失败,因为它尝试读取两个分区都放入内存中进行连接?

所以,这是我的猜测。您能纠正我并帮助理解 Spark 的工作方式吗?

apache-spark

6
推荐指数
1
解决办法
2099
查看次数

Python假设:如何组成相互依赖的生成器?

我有一个使用 python 假设的生成器,如下所示:

@st.composite
def generate_network_fault_only(draw):
    fault = {
        "impaired": st.just(True),   # need to detect if all faults are None to switch this back.
        "limit": draw(NetworkFaultGen.generate_limit()),
        "delay": draw(NetworkFaultGen.generate_delay()),
        "loss_random": draw(NetworkFaultGen.generate_loss_random()),
        "corrupt": draw(NetworkFaultGen.generate_corrupt()),
        "duplicate": draw(NetworkFaultGen.generate_duplicate()),
        "reorder": draw(NetworkFaultGen.generate_reorder()),
        "rate": draw(NetworkFaultGen.generate_rate())
    }

    return draw(st.fixed_dictionaries(fault))
Run Code Online (Sandbox Code Playgroud)

上面的每个函数都会返回类似以下内容的内容:

@st.composite
def generate_reorder(draw):
    """
    This must only happen if delay is applied
    """
    return st.one_of(st.fixed_dictionaries(
              {"percent": st.integers(min_value=0, max_value=100),
               "correlation": st.integers(min_value=0, max_value=100),
               "distance": st.integers(min_value=0)}),st.none())
Run Code Online (Sandbox Code Playgroud)

reorder我的值和重新排序中的值之间存在依赖关系,delay只有在延迟不是时才能指定None

我不知道如何实现这一点。过滤似乎遇到了性能问题。此时代码中的delay值还不是具体值。

python python-hypothesis

5
推荐指数
1
解决办法
1287
查看次数

Hadoop YARN:如何强制将节点标记为“LOST”而不是“SHUTDOWN”?

我正在对节点丢失时发生的 YARN 应用程序故障进行故障排除,因此我正在尝试重新创建此场景。但我只能强制节点关闭而不是丢失。我正在使用 AWS EMR,并且尝试过:

  • 登录节点并执行 shutdown -h now
  • 登录节点并执行sudo stop hadoop-yarn-nodemanagersudo stop hadoop-hdfs-datanode
  • 用一个杀死 NodeManager kill -9 <pid>

那些导致 SHUTDOWN 节点但不是 LOST 节点。

如何在 AWS EMR 中创建 LOST 节点?

hadoop amazon-emr hadoop-yarn

5
推荐指数
1
解决办法
152
查看次数

我可以通过索引访问 Parquet 文件而不将整个文件读入内存吗?

我刚刚读到 HDF5 允许您访问数据查找,而无需将整个文件读入内存。

这种寻找行为在没有 Java 的 Parquet 文件中是否可能(非 pyspark 解决方案)?我使用 Parquet 是因为它有强大的 dtype 支持。

import h5py

f = h5py.File('my_file.hdf5', 'w')
dset = f.create_dataset('coords', data=my_ndarray)
f.close()

f = h5py.File('my_file.hdf5', 'r')
dset = f['coords']
my_array = dset[-2:]
Run Code Online (Sandbox Code Playgroud)

https://arrow.apache.org/docs/python/parquet.html#inspecting-the-parquet-file-metadata

我在这里看到 Parquet 元数据具有num_row_groups: 1 (or more). 但我不确定这如何帮助我获取行 [23, 42, 117, 99293184]。

parquet fastparquet pyarrow

5
推荐指数
1
解决办法
4670
查看次数

为什么 Pandas UDF 没有被并行化?

我有来自许多物联网传感器的数据。对于每个特定的传感器,数据帧中只有大约 100 行:数据没有倾斜。我正在为每个传感器训练一个单独的机器学习模型。

我正在pandas udf成功地使用并行训练和记录不同模型的 mlflow 指标(据说),如这里所教。

在 Azure 上使用 Databricks 和单节点集群(Standard_DS3_v2 - 14GB 内存 - 4 核)我能够在大约 23分钟内完成所有训练。

因为pandas udf,据说,为每个组并行计算,我认为我可以通过使用具有更多内核的单节点集群或使用具有更多工作人员的集群更快地完成训练。所以我尝试运行相同的笔记本:

  1. 一组计算机:1 个 master + 3 个 worker,全部(Standard_DS3_v2 - 14GB 内存 - 4 核)
  2. 具有(Standard_DS5_v2 - 56GB 内存 - 16 核)的单节点集群

对于我惊讶的是,训练时间没有减少:23分钟的选项1,然后26.5min选项2

我尝试使用较新的 applyInPandas,但结果大致相同。

注意:@Chris 回答后,查看 Web UI 上的 Stage Detail 页面(对于具有 1 个 master + 3 个 worker 的集群),我看到我只有一个 stage 负责 udf pandas 培训。花了 20 …

python apache-spark pyspark databricks azure-databricks

5
推荐指数
1
解决办法
344
查看次数

col 函数如何知道我们引用的是哪个 DataFrame?

我一直对 Spark 中的函数有点困惑col,无论是 Python 还是 Scala 中的函数。看起来:

df.col("zipcode")在 Scala 中相当于

df["zipcode"]spark.sql.functions.col("zipcode")Python中。

使用https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.col的最后一个用法让我感到困惑。如何spark.sql.functions.col知道我们指的是哪个Python DataFrame?我们只是传递列的名称。

python scala apache-spark apache-spark-sql pyspark

2
推荐指数
1
解决办法
1237
查看次数