小编Men*_*kes的帖子

Python - 并行解压缩 .gz 文件

我有多个 .gz 文件,总计达 1TB。如何利用 Python 2.7 并行解压这些文件?循环处理文件需要太多时间。

我也尝试过这段代码:

filenames = [gz for gz in glob.glob(filesFolder + '*.gz')]

def uncompress(path):
    with gzip.open(path, 'rb') as src, open(path.rstrip('.gz'), 'wb') as dest:
        shutil.copyfileobj(src, dest)

with multiprocessing.Pool() as pool:
    for _ in pool.imap_unordered(uncompress, filenames, chunksize=1):
        pass
Run Code Online (Sandbox Code Playgroud)

但是我收到以下错误:

  with multiprocessing.Pool() as pool:

AttributeError: __exit__
Run Code Online (Sandbox Code Playgroud)

谢谢!

python gzip python-2.7

5
推荐指数
1
解决办法
1754
查看次数

为 Pandas 使用多个核心

我有以下使用 Pandas 的 Python (2.7) 函数,我需要在 400 GB 上运行。\n在 150 GB 上的测试运行需要 4 小时才能成功完成(机器上的内存为 128 GB 和 16 个内核,4TB 磁盘)。\n是有一种聪明的方法可以利用 CPU 上的所有核心来执行此操作\n并并行执行此操作以减少处理时间:

\n\n
def create_data(headers, filters, filesFolder, remove_chars, outputFolder):\nfor ds in headers:\n    for data_file in glob.glob(filesFolder + '*' + ds + '*.csv'):\n        x=0\n        for data in pd.read_csv(data_file, sep = '\xe2\x82\xac', names = headers[ds], engine = 'python', chunksize = 10000000):\n            logger.info('Pandas Read %s' %(str(x) + '_' + os.path.basename(data_file)) )\n            for fil in filters[ds]:\n                try:\n                    data[fil] = data[fil].astype('O')\n                    data = data[data[fil] == …
Run Code Online (Sandbox Code Playgroud)

python python-2.7 pandas

5
推荐指数
2
解决办法
8639
查看次数

在 pyspark 中的 5 分钟窗口上聚合

我有以下数据框df

User | Datetime         | amount | length
A    | 2016-01-01 12:01 | 10     | 20
A    | 2016-01-01 12:03 | 6      | 10
A    | 2016-01-01 12:05 | 1      | 3
A    | 2016-01-01 12:06 | 3      | 5
B    | 2016-01-01 12:01 | 10     | 20
B    | 2016-01-01 12:02 | 8      | 20
Run Code Online (Sandbox Code Playgroud)

我想有效地使用 pyspark 来聚合超过 5 分钟的时间窗口并进行一些计算 - 例如计算每 5 分钟时间窗口每次使用的平均数量和长度 - df 将如下所示:

User | Datetime         | amount | length
A    | 2016-01-01 …
Run Code Online (Sandbox Code Playgroud)

python pandas pyspark pyspark-sql

2
推荐指数
1
解决办法
2491
查看次数

将Spark CSV依赖项添加到Zeppelin

我在AWS上运行带有火花簇的EMR.Spark版本是1.6

运行以下命令时:

proxy = sqlContext.read.load("/user/zeppelin/ProxyRaw.csv", 
                          format="com.databricks.spark.csv", 
                          header="true", 
                          inferSchema="true")
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

Py4JJavaError:调用o162.load时发生错误.:java.lang.ClassNotFoundException:无法找到数据源:com.databricks.spark.csv.请在http://spark-packages.org 上找到包 org.apache.spark.sql.execution.datasources.ResolvedDataSource $ .lookupDataSource(ResolvedDataSource.scala:77)

我怎么解决这个问题?我假设我应该添加一个包但是如何安装它在哪里?

csv apache-spark pyspark apache-zeppelin

2
推荐指数
1
解决办法
2228
查看次数

PySpark 当列表中的项目

以下是我试图实现的操作:

types = ["200","300"]
def Count(ID):
    cnd = F.when((**F.col("type") in types**), 1).otherwise(F.lit(0))
    return F.sum(cnd).alias("CountTypes")
Run Code Online (Sandbox Code Playgroud)

粗体的语法不正确,有什么建议可以在这里为 PySpark 获得正确的语法吗?

apache-spark pyspark pyspark-sql

2
推荐指数
1
解决办法
6191
查看次数