小编the*_*ler的帖子

如何在spark中使用repartition()指定文件大小

我正在使用 pyspark,并且我有一个大型数据源,我想对其进行重新分区,并明确指定每个分区的文件大小。

我知道使用该repartition(500)函数会将我的镶木地板分成 500 个大小几乎相等的文件。问题是每天都会有新数据添加到该数据源中。有些日子可能会有较大的输入,有些日子可能会有较小的输入。因此,当查看一段时间内的分区文件大小分布时,它会在每个文件200KB之间变化。700KB

我正在考虑指定每个分区的最大大小,以便无论文件数量多少,每天每个文件的文件大小或多或少相同。这将有助于我稍后在这个大型数据集上运行我的工作,以避免执行程序时间和洗牌时间等偏差。

有没有办法使用repartition()函数或在将数据帧写入镶木地板时指定它?

partitioning apache-spark parquet pyspark

7
推荐指数
1
解决办法
8821
查看次数

无法在 pyspark 中导入 s3fs

当我尝试使用以下代码在 pyspark 中导入 s3fs 库时:

import s3fs
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

遇到错误:无法从 'fsspec.asyn' 导入名称 'maybe_sync' (/usr/local/lib/python3.7/site-packages/fsspec/asyn.py) 回溯(最近一次调用最后一次):文件“/ usr/local/lib/python3.7/site-packages/s3fs/ init .py", line 1, in from .core import S3FileSystem, S3File File "/usr/local/lib/python3.7/site-packages/s3fs /core.py", line 12, in from fsspec.asyn import AsyncFileSystem,sync,sync_wrapper,maybe_syncImportError: cannot import name 'maybe_sync' from 'fsspec.asyn' (/usr/local/lib/python3.7/site-包/fsspec/asyn.py)

fsspec软件包已安装在我的笔记本中。我实际上已经使用它很长时间了,突然发生了这种情况。我试过谷歌搜索,但找不到这个特定的错误。有没有人遇到过这个?如果是这样,您知道如何解决吗?

filesystems amazon-s3 apache-spark pyspark

4
推荐指数
1
解决办法
1212
查看次数

在pyspark中查找列表的最大值/最小值

我知道这是一个非常微不足道的问题,我很惊讶我在互联网上找不到答案,但是可以在 pyspark 中找到最大值或最小值 oa 列表吗?在 Python 中,它很容易通过

max(list)
Run Code Online (Sandbox Code Playgroud)

但是,当我在 pyspark 中尝试相同的操作时,出现以下错误:

An error was encountered:
An error occurred while calling z:org.apache.spark.sql.functions.max. Trace:
py4j.Py4JException: Method max([class java.util.ArrayList]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
    at py4j.Gateway.invoke(Gateway.java:276)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)

关于我做错了什么的任何想法?

更新:添加我所做的事情: 这是我的清单:

cur_datelist

输出:

['2020-06-10', '2020-06-11', '2020-06-12', '2020-06-13', '2020-06-14', '2020-06-15', '2020-06-16', '2020-06-17', '2020-06-18', '2020-06-19', '2020-06-20', '2020-06-21', '2020-06-22', '2020-06-23', '2020-06-24', '2020-06-25', '2020-06-26', '2020-06-27', '2020-06-28', '2020-06-29', '2020-06-30', '2020-07-01', '2020-07-02', '2020-07-03', '2020-07-04', '2020-07-05', '2020-07-06', '2020-07-07', '2020-07-08', '2020-07-09', '2020-07-10', '2020-07-11', …
Run Code Online (Sandbox Code Playgroud)

amazon-web-services apache-spark-sql pyspark

1
推荐指数
1
解决办法
1184
查看次数