用例如下:
我有一个大型数据框,其中包含一个"user_id"列(每个user_id可以出现在很多行中).我有一个用户my_users列表,我需要分析.
Groupby,filter和aggregate可能是一个好主意,但pyspark中包含的可用聚合函数不符合我的需要.在pyspark ver中,用户定义的聚合函数仍然不完全支持,我决定暂时保留它.
相反,我只是迭代my_users列表,过滤数据框中的每个用户,然后进行分析.为了优化这个过程,我决定为my_users中的每个用户使用python多处理池
执行分析(并传递给池)的函数有两个参数:user_id和主数据帧的路径,我在其上执行所有计算(PARQUET格式).在方法中,我加载数据帧,并对其进行处理(DataFrame不能作为参数本身传递)
我得到各种奇怪的错误,在一些进程(每次运行中不同),看起来像:
当我在没有任何多处理的情况下运行它时,一切都运行顺畅,但速度很慢..
这些错误来自哪里?
我会提供一些代码示例,以使事情更清晰:
PYSPRAK_SUBMIT_ARGS = '--driver-memory 4g --conf spark.driver.maxResultSize=3g --master local[*] pyspark-shell' #if it's relevant
# ....
def users_worker(df_path, user_id):
df = spark.read.parquet(df_path) # The problem is here!
## the analysis of user_id in df is here
def user_worker_wrapper(args):
users_worker(*args)
def analyse():
# ...
users_worker_args …
Run Code Online (Sandbox Code Playgroud) python apache-spark python-multiprocessing pyspark spark-dataframe
我有这个奇怪的错误.我有一个例程,如果它存在(或者创建一个数据帧),则读取数据帧,修改它,然后使用'overwrite'模式将它再次保存在镶木地板格式的同一目标路径中.
在第一次运行中,当没有数据帧时,我创建一个,然后保存它.它在输出文件夹中生成4个文件:
然后,在第二次运行中,我读取数据帧,修改它,当我尝试覆盖它时,它会抛出一个异常*part-r-<.....>.snappy.parquet file does not exist*
.
发生异常时输出文件夹为空,但在执行df.write.parquet(path, 'overwrite')
文件夹包含此文件之前.
我试图将spark.sql.cacheMetadata设置为'false',但它没有帮助.spark.catalog.listTables()返回一个空列表,因此无需刷新任何内容.
现在,我只是删除输出文件夹的项目,并写入数据帧.有用.但为什么带有'覆盖'模式的原始方法失败?
谢谢.
关于 Spark DataFrames 方法可视化的问题。
就目前而言(我使用 v.2.0.0),Spark DataFrames 还没有任何可视化功能。通常的解决方案是将DataFrame 的一些样本收集到驱动程序中,将其加载到 Pandas DataFrame 等中,并使用其可视化功能。
我的问题是:我如何知道最大限度地利用驱动程序内存以可视化数据的最佳采样大小?或者,解决此问题的最佳实践是什么?
谢谢!