小编Mik*_*ike的帖子

使用spark进行多处理(PySpark)

用例如下:

我有一个大型数据框,其中包含一个"user_id"列(每个user_id可以出现在很多行中).我有一个用户my_users列表,我需要分析.

Groupby,filteraggregate可能是一个好主意,但pyspark中包含的可用聚合函数不符合我的需要.在pyspark ver中,用户定义的聚合函数仍然不完全支持,我决定暂时保留它.

相反,我只是迭代my_users列表,过滤数据框中的每个用户,然后进行分析.为了优化这个过程,我决定为my_users中的每个用户使用python多处理池

执行分析(并传递给池)的函数有两个参数:user_id主数据帧路径,我在其上执行所有计算(PARQUET格式).在方法中,我加载数据帧,并对其进行处理(DataFrame不能作为参数本身传递)

我得到各种奇怪的错误,在一些进程(每次运行中不同),看起来像:

  • JVM中不存在PythonUtils(读取'镶木地板'数据时)

打印错误消息的屏幕

  • KeyError:找不到'c'(同样,当读取'镶木地板'数据框时.无论如何,'c'是什么?)

当我在没有任何多处理的情况下运行它时,一切都运行顺畅,但速度很慢..

这些错误来自哪里?

我会提供一些代码示例,以使事情更清晰:

PYSPRAK_SUBMIT_ARGS = '--driver-memory 4g --conf spark.driver.maxResultSize=3g --master local[*] pyspark-shell' #if it's relevant

# ....

def users_worker(df_path, user_id):
    df = spark.read.parquet(df_path) # The problem is here!
    ## the analysis of user_id in df is here

def user_worker_wrapper(args):
    users_worker(*args)

def analyse():
    # ...
    users_worker_args …
Run Code Online (Sandbox Code Playgroud)

python apache-spark python-multiprocessing pyspark spark-dataframe

8
推荐指数
1
解决办法
4152
查看次数

尝试将DataFrame保存为镶木地板格式时,使用'覆盖'模式时出现FileNotFoundException

我有这个奇怪的错误.我有一个例程,如果它存在(或者创建一个数据帧),则读取数据帧,修改它,然后使用'overwrite'模式将它再次保存在镶木地板格式的同一目标路径中.

在第一次运行中,当没有数据帧时,我创建一个,然后保存它.它在输出文件夹中生成4个文件:

  • _SUCCESS.crc
  • 部分-R - <.....> snappy.parquet.crc
  • _成功
  • 部分-R - <.....> snappy.parquet

然后,在第二次运行中,我读取数据帧,修改它,当我尝试覆盖它时,它会抛出一个异常*part-r-<.....>.snappy.parquet file does not exist*.

发生异常时输出文件夹为空,但在执行df.write.parquet(path, 'overwrite')文件夹包含此文件之前.

我试图将spark.sql.cacheMetadata设置为'false',但它没有帮助.spark.catalog.listTables()返回一个空列表,因此无需刷新任何内容.

现在,我只是删除输出文件夹的项目,并写入数据帧.有用.但为什么带有'覆盖'模式的原始方法失败?

谢谢.

apache-spark apache-spark-sql pyspark

7
推荐指数
1
解决办法
2418
查看次数

(Py)Spark 框架中数据帧的数据可视化

关于 Spark DataFrames 方法可视化的问题。

就目前而言(我使用 v.2.0.0),Spark DataFrames 还没有任何可视化功能。通常的解决方案是将DataFrame 的一些样本收集到驱动程序中,将其加载到 Pandas DataFrame 等中,并使用其可视化功能。

我的问题是:我如何知道最大限度地利用驱动程序内存以可视化数据的最佳采样大小?或者,解决此问题的最佳实践是什么?

谢谢!

data-visualization apache-spark apache-spark-sql pyspark

5
推荐指数
1
解决办法
8560
查看次数