Ben*_*Ben 6 apache-spark pyspark
在 PySpark 中,我知道 python 工作线程用于在工作节点上执行(至少一些)计算(如https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals中所述)。
在我的测试设置中,我试图让 Spark 使用 4 个工作线程(在独立机器上),但似乎只创建了 1 个 python 工作线程:
import socket
import threading
spark = SparkSession\
.builder\
.master('local[4]')\
.appName("PythonPi")\
.getOrCreate()
partitions = 4
# Print the ident of the local thread:
print(str(threading.get_ident()))
# Print the idents of the threads inside the python workers:
thread_ids = spark.sparkContext.parallelize(range(1, partitions + 1), partitions)\
.map(lambda x: ' threadid: ' + str(threading.get_ident())).collect()
print(thread_ids)
spark.stop()
Run Code Online (Sandbox Code Playgroud)
输出:
140226126948096
[' threadid: 139948131018496', ' threadid: 139948131018496', ' threadid: 139948131018496', ' threadid: 139948131018496']
Run Code Online (Sandbox Code Playgroud)
看看这些线程ID,似乎是使用同一个Python线程(在同一个worker中)来处理所有分区?或者该代码是在 python 工作人员之外评估的吗?
有没有其他方法可以访问 python 工作人员的 ID - 这样我就可以了解代码在哪里运行?
您的错误是相信 PySpark 使用线程。它不是。它一般使用进程和线程 ID,仅在进程内是唯一的(并且可以重用)。
所以你的代码应该是:
import os
(spark.sparkContext.range(partitions)
.map(lambda x: 'pid: {}'.format(os.getpid()))
.collect())
# ['pid: 749', 'pid: 755', 'pid: 753', 'pid: 758']
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2974 次 |
| 最近记录: |