Mik*_*ike 8 python apache-spark python-multiprocessing pyspark spark-dataframe
用例如下:
我有一个大型数据框,其中包含一个"user_id"列(每个user_id可以出现在很多行中).我有一个用户my_users列表,我需要分析.
Groupby,filter和aggregate可能是一个好主意,但pyspark中包含的可用聚合函数不符合我的需要.在pyspark ver中,用户定义的聚合函数仍然不完全支持,我决定暂时保留它.
相反,我只是迭代my_users列表,过滤数据框中的每个用户,然后进行分析.为了优化这个过程,我决定为my_users中的每个用户使用python多处理池
执行分析(并传递给池)的函数有两个参数:user_id和主数据帧的路径,我在其上执行所有计算(PARQUET格式).在方法中,我加载数据帧,并对其进行处理(DataFrame不能作为参数本身传递)
我得到各种奇怪的错误,在一些进程(每次运行中不同),看起来像:
当我在没有任何多处理的情况下运行它时,一切都运行顺畅,但速度很慢..
这些错误来自哪里?
我会提供一些代码示例,以使事情更清晰:
PYSPRAK_SUBMIT_ARGS = '--driver-memory 4g --conf spark.driver.maxResultSize=3g --master local[*] pyspark-shell' #if it's relevant
# ....
def users_worker(df_path, user_id):
df = spark.read.parquet(df_path) # The problem is here!
## the analysis of user_id in df is here
def user_worker_wrapper(args):
users_worker(*args)
def analyse():
# ...
users_worker_args = [(df_path, user_id) for user_id in my_users]
users_pool = Pool(processes=len(my_users))
users_pool.map(users_worker_wrapper, users_worker_args)
users_pool.close()
users_pool.join()
Run Code Online (Sandbox Code Playgroud)
实际上,正如@ user6910411评论的那样,当我将Pool更改为threadPool(multiprocessing.pool.ThreadPool包)时,一切都按预期工作,这些错误消失了.
错误本身的根本原因现在也很清楚,如果您希望我分享它们,请在下面发表评论.
| 归档时间: |
|
| 查看次数: |
4152 次 |
| 最近记录: |