我想深入了解Spark中合并排序连接的概念。我理解总体思路:这与合并排序算法的方法相同:采用 2 个排序数据集,比较第一行,写入最小的行,重复。我也了解如何实现分布式合并排序。
但我不明白它是如何在 Spark 中实现分区和执行器概念的。
这是我的看法。
是否有意义?
问题: 按键倾斜。如果某个键范围包含数据集键的 50%,则某些执行器将受到影响,因为太多行将进入同一分区。当尝试对内存中太大的 A 分区或 B 分区进行排序时,它甚至可能因 OOM 而失败(我不明白为什么 Spark 无法像 Hadoop 那样对磁盘溢出进行排序?..)或者它可能会失败,因为它尝试读取两个分区都放入内存中进行连接?
所以,这是我的猜测。您能纠正我并帮助理解 Spark 的工作方式吗?
我有一个使用 python 假设的生成器,如下所示:
@st.composite
def generate_network_fault_only(draw):
fault = {
"impaired": st.just(True), # need to detect if all faults are None to switch this back.
"limit": draw(NetworkFaultGen.generate_limit()),
"delay": draw(NetworkFaultGen.generate_delay()),
"loss_random": draw(NetworkFaultGen.generate_loss_random()),
"corrupt": draw(NetworkFaultGen.generate_corrupt()),
"duplicate": draw(NetworkFaultGen.generate_duplicate()),
"reorder": draw(NetworkFaultGen.generate_reorder()),
"rate": draw(NetworkFaultGen.generate_rate())
}
return draw(st.fixed_dictionaries(fault))
Run Code Online (Sandbox Code Playgroud)
上面的每个函数都会返回类似以下内容的内容:
@st.composite
def generate_reorder(draw):
"""
This must only happen if delay is applied
"""
return st.one_of(st.fixed_dictionaries(
{"percent": st.integers(min_value=0, max_value=100),
"correlation": st.integers(min_value=0, max_value=100),
"distance": st.integers(min_value=0)}),st.none())
Run Code Online (Sandbox Code Playgroud)
reorder我的值和重新排序中的值之间存在依赖关系,delay只有在延迟不是时才能指定None。
我不知道如何实现这一点。过滤似乎遇到了性能问题。此时代码中的delay值还不是具体值。
我正在对节点丢失时发生的 YARN 应用程序故障进行故障排除,因此我正在尝试重新创建此场景。但我只能强制节点关闭而不是丢失。我正在使用 AWS EMR,并且尝试过:
shutdown -h nowsudo stop hadoop-yarn-nodemanager和sudo stop hadoop-hdfs-datanodekill -9 <pid>那些导致 SHUTDOWN 节点但不是 LOST 节点。
如何在 AWS EMR 中创建 LOST 节点?
我刚刚读到 HDF5 允许您访问数据查找,而无需将整个文件读入内存。
这种寻找行为在没有 Java 的 Parquet 文件中是否可能(非 pyspark 解决方案)?我使用 Parquet 是因为它有强大的 dtype 支持。
import h5py
f = h5py.File('my_file.hdf5', 'w')
dset = f.create_dataset('coords', data=my_ndarray)
f.close()
f = h5py.File('my_file.hdf5', 'r')
dset = f['coords']
my_array = dset[-2:]
Run Code Online (Sandbox Code Playgroud)
https://arrow.apache.org/docs/python/parquet.html#inspecting-the-parquet-file-metadata
我在这里看到 Parquet 元数据具有num_row_groups: 1 (or more). 但我不确定这如何帮助我获取行 [23, 42, 117, 99293184]。
我有来自许多物联网传感器的数据。对于每个特定的传感器,数据帧中只有大约 100 行:数据没有倾斜。我正在为每个传感器训练一个单独的机器学习模型。
我正在pandas udf成功地使用并行训练和记录不同模型的 mlflow 指标(据说),如这里所教。
在 Azure 上使用 Databricks 和单节点集群(Standard_DS3_v2 - 14GB 内存 - 4 核)我能够在大约 23分钟内完成所有训练。
因为pandas udf,据说,为每个组并行计算,我认为我可以通过使用具有更多内核的单节点集群或使用具有更多工作人员的集群更快地完成训练。所以我尝试运行相同的笔记本:
对于我惊讶的是,训练时间没有减少:23分钟的选项1,然后26.5min选项2
我尝试使用较新的 applyInPandas,但结果大致相同。
注意:@Chris 回答后,查看 Web UI 上的 Stage Detail 页面(对于具有 1 个 master + 3 个 worker 的集群),我看到我只有一个 stage 负责 udf pandas 培训。花了 20 …
我一直对 Spark 中的函数有点困惑col,无论是 Python 还是 Scala 中的函数。看起来:
df.col("zipcode")在 Scala 中相当于
df["zipcode"]在spark.sql.functions.col("zipcode")Python中。
使用https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.col的最后一个用法让我感到困惑。如何spark.sql.functions.col知道我们指的是哪个Python DataFrame?我们只是传递列的名称。
apache-spark ×3
python ×3
pyspark ×2
amazon-emr ×1
databricks ×1
fastparquet ×1
hadoop ×1
hadoop-yarn ×1
parquet ×1
pyarrow ×1
scala ×1