PySpark 中的多个 Python 工作线程(或工作线程)?

Ben*_*Ben 6 apache-spark pyspark

在 PySpark 中,我知道 python 工作线程用于在工作节点上执行(至少一些)计算(如https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals中所述)。

在我的测试设置中,我试图让 Spark 使用 4 个工作线程(在独立机器上),但似乎只创建了 1 个 python 工作线程:

import socket
import threading

spark = SparkSession\
    .builder\
    .master('local[4]')\
    .appName("PythonPi")\
    .getOrCreate()

partitions = 4

# Print the ident of the local thread:
print(str(threading.get_ident()))

# Print the idents of the threads inside the python workers:
thread_ids = spark.sparkContext.parallelize(range(1, partitions + 1), partitions)\
.map(lambda x: ' threadid: ' + str(threading.get_ident())).collect()


print(thread_ids)

spark.stop()
Run Code Online (Sandbox Code Playgroud)

输出:

140226126948096
[' threadid: 139948131018496', ' threadid: 139948131018496', ' threadid: 139948131018496', ' threadid: 139948131018496']
Run Code Online (Sandbox Code Playgroud)

看看这些线程ID,似乎是使用同一个Python线程(在同一个worker中)来处理所有分区?或者该代码是在 python 工作人员之外评估的吗?

有没有其他方法可以访问 python 工作人员的 ID - 这样我就可以了解代码在哪里运行?

hi-*_*zir 2

您的错误是相信 PySpark 使用线程。它不是。它一般使用进程和线程 ID,仅在进程内是唯一的(并且可以重用)。

所以你的代码应该是:

import os

(spark.sparkContext.range(partitions)
    .map(lambda x: 'pid: {}'.format(os.getpid()))
    .collect())

# ['pid: 749', 'pid: 755', 'pid: 753', 'pid: 758']
Run Code Online (Sandbox Code Playgroud)