标签: dask

dask 的本地使用:到 Client() 还是不到 Client()?

我试图了解 Dask 在本地计算机上的使用模式。

具体来说,

  • 我有一个适合内存的数据集
  • 我想做一些熊猫操作
    • 通过...分组...
    • 日期解析
    • ETC。

Pandas 通过单个核心执行这些操作,这些操作对我来说需要几个小时。我的机器上有 8 个核心,因此,我想使用 Dask 尽可能地并行化这些操作。

我的问题如下: Dask 中执行此操作的两种方式有什么区别:

import pandas as pd
from sklearn.datasets import load_iris

iris = load_iris()
Run Code Online (Sandbox Code Playgroud)

(1)

import dask.dataframe as dd

df = dd.from_pandas(
    pd.DataFrame(iris.data, columns=iris.feature_names),
    npartitions=2
)

df.mean().compute()
Run Code Online (Sandbox Code Playgroud)

(2)

import dask.dataframe as dd
from distributed import Client

client = Client()

df = client.persist(
    dd.from_pandas(
        pd.DataFrame(iris.data, columns=iris.feature_names),
        npartitions=2
    )
)

df.mean().compute()
Run Code Online (Sandbox Code Playgroud)

一种使用模式相对于另一种使用模式有什么好处?为什么我应该使用其中一种而不是另一种?

python dask data-science dask-distributed

7
推荐指数
1
解决办法
2950
查看次数

将 Spark 数据帧转换为 dask 数据帧

有没有办法直接将 Spark 数据帧转换为 Dask 数据帧?

我目前正在使用 Spark 的 .toPandas()函数将其转换为 pandas 数据帧,然后转换为 dask 数据帧。我相信这是低效的操作,并且没有利用dask的分布式处理能力,因为pandas永远是瓶颈。

pandas pyspark dask dask-distributed

7
推荐指数
1
解决办法
3701
查看次数

pydata BLAZE 项目走向何方?

我发现 blaze 生态系统*很棒,因为它涵盖了大多数数据工程用例。在 2015-2016 年期间,这些项目肯定引起了很多兴趣,但最近却被忽视了。我说这是查看 github 存储库上的提交。

所以我对社区的问题是

- 2016 年发生了什么导致失去兴趣的事情?

- 是否有其他基于 Python 的库取代了 blaze?

火焰生态系统:

  • Blaze:查询不同存储系统数据的接口
  • Dask:通过任务调度和阻塞算法进行并行计算
  • Datashape:一种数据描述语言
  • DyND:用于动态多维数组的 C++ 库
  • Odo:不同存储系统间的数据迁移

参考资料:http : //blaze.pydata.org/

blaze datashape dask odo

7
推荐指数
1
解决办法
553
查看次数

Pandas hub_table 更快的替代品

pivot_table我在大型数据集(1000 万行,6 列)上使用 Pandas函数。由于执行时间至关重要,因此我尝试加快流程。目前,处理整个数据集大约需要 8 秒,这太慢了,我希望找到替代方案来提高速度/性能。

我当前的 Pandas 数据透视表:

df_pivot = df_original.pivot_table(index="industry", columns = "months",
                    values = ["orders", "client_name"],
                    aggfunc ={"orders": np.sum, "client_name": pd.Series.nunique})
Run Code Online (Sandbox Code Playgroud)

df_original包括所有数据(10m 行,从 csv 导入)。行业为客户所在行业,月份为订单月份(1月至12月),订单为订单数量。categorical除订单数(数据类型)外,所有数据均转换为数据int。最初,行业、月份和客户名称都是字符串。

我尝试使用pandas.DataFrame.unstack- 速度更慢。我也尝试过Dask。产生dask pivot_table了一些改进(6 秒执行时间 - 因此少了 2 秒)。然而,它仍然很慢。是否有更快的替代方案(对于大型数据集)?groupy也许用, ,重新创建数据透视表crosstab...不幸的是,我根本没有找到可以工作的替代方案,而且我对 Python 和 Pandas 仍然很陌生...期待您的建议。提前致谢!

更新:

我通过以下方式找出了 groupby 方式:

df_new = df_original.groupby(["months", "industry"]).agg({"orders": np.sum, "client_name": pd.Series.nunique}).unstack(level="months").fillna(0)
Run Code Online (Sandbox Code Playgroud)

现在速度快得多,大约需要 2-3 秒。还有一些选项可以进一步提高速度吗?

python performance numpy pandas dask

7
推荐指数
1
解决办法
2万
查看次数

dask.compute() 中的重试不清楚

从文档来看,Number of allowed automatic retries if computing a result fails.

“结果”是指每个单独的任务还是整个compute()调用?

如果是指整个调用,那么dask.delayed中如何实现每个任务的重试呢?

另外,我不确定重试是否有效,如下面的代码所示。

import dask
import random

@dask.delayed
def add(x, y):
    return x + y

@dask.delayed
def divide(sum_i):
    n = random.randint(0, 1)
    result = sum_i / n
    return result

tasks = []
for i in range(3):
    sum_i = add(i, i+1)
    divide_n = divide(sum_i)
    tasks.append(divide_n)

dask.compute(*tasks, retries=1000)
Run Code Online (Sandbox Code Playgroud)

预期输出为 (1, 3, 5),实际输出为 ZeroDivisionError。

dask dask-delayed

7
推荐指数
1
解决办法
862
查看次数

dtype 映射参数中的键只能使用列名

我已经使用 dask read_sql_table 从 oracle 数据库成功引入了一张表。但是,当我尝试引入另一个表时,我收到此错误KeyError: 'Only a column name can be used for the key in a dtype mappings argument.'

我检查了我的连接字符串和架构,所有这些都很好。我知道表名存在,我试图用作索引的列是 oracle 数据库中表的主键。

有人可以解释为什么当列名明确存在时会发生此错误吗?

我知道我可以使用 Pandas 块,但在这种情况下宁愿使用 dask。

下面是我如何连接到 oracle 数据库和错误消息的最后一点

host='*******'
port='*****'
sid='****'
user='******'
password='*****'

con_string = 'oracle://' + user + ':' + password + '@' + host + ':' + port + '/' + sid 
engine = create_engine(con_string)

df =ddf.read_sql_table('table_name', uri=con_string, index_col='id', npartitions=None, schema='*****')
Run Code Online (Sandbox Code Playgroud)

~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\generic.py in astype(self, dtype, copy, errors, **kwargs) 5855
如果 col_name 不在 self: …

python pandas dask

7
推荐指数
1
解决办法
1万
查看次数

强制 dask to_parquet 写入单个文件

使用时会创建dask.to_parquet(df, filename)一个子文件夹filename,并将多个文件写入该文件夹,而pandas.to_parquet(df, filename)只写入一个文件。我可以使用 dask to_parquet(不使用compute()创建 pandas df )只写入单个文件吗?

python pandas parquet dask

7
推荐指数
1
解决办法
5785
查看次数

如何在 docker 镜像上执行完美的 Flow?

我的目标:

我有一个构建的 docker 映像,并希望在该映像上运行我的所有流程。

现在:

我有以下任务正在本地 Dask 执行器上运行。运行代理的服务器与执行所需的环境是不同的 python 环境my_task- 因此需要在预构建映像中运行。

我的问题是: 如何在 Dask Executor 上运行此流程,以便它在我提供的 docker 映像(作为环境)上运行?

import prefect
from prefect import task, Flow
from prefect.engine.executors import LocalDaskExecutor
from prefect.environments import LocalEnvironment


@task
def hello_task():
    logger = prefect.context.get("logger")
    logger.info("Hello, Docker!")


with Flow("My Flow") as flow:
    results = hello_task()

flow.environment = LocalEnvironment(
    labels=[], executor=LocalDaskExecutor(scheduler="threads", num_workers=2),
)
Run Code Online (Sandbox Code Playgroud)

我认为我需要首先在该 docker 映像上启动服务器和代理(如此处所述,但我想可以有一种方法可以简单地在提供的映像上运行 Flow。

更新1

按照教程,我尝试了以下操作:

import prefect
from prefect import task, Flow
from prefect.engine.executors import LocalDaskExecutor
from …
Run Code Online (Sandbox Code Playgroud)

etl docker dask docker-image prefect

7
推荐指数
1
解决办法
7165
查看次数

使用一定数量的非 NaN 整数在 Pandas DataFrame 中保留索引

假设我有以下数据框:

df1 = pd.DataFrame(data    = [1,np.nan,np.nan,1,1,np.nan,1,1,1], 
                   columns = ['X'], 
                   index   = ['a', 'a', 'a', 
                              'b', 'b', 'b',
                              'c', 'c', 'c'])
print(df1)
     X
a  1.0
a  NaN
a  NaN
b  1.0
b  1.0
b  NaN
c  1.0
c  1.0
c  1.0
Run Code Online (Sandbox Code Playgroud)

我只想保留具有 2 个或更多非 NaN 条目的索引。在这种情况下,“a”条目只有一个非 NaN 值,所以我想删除它并让我的结果是:

     X
b  1.0
b  1.0
b  NaN
c  1.0
c  1.0
c  1.0
Run Code Online (Sandbox Code Playgroud)

做这个的最好方式是什么?理想情况下,我也想要一些适用于 Dask 的东西,尽管通常如果它适用于 Pandas,它也适用于 Dask。

python pandas dask

7
推荐指数
3
解决办法
214
查看次数

在另一个成对的bin数组中获取数据数组最小值的最快方法

我有三个一维数组:

  • idxs: 索引数据
  • weights: 中每个指标的权重 idxs
  • bins:用于计算其中最小重量的 bin。

这是我当前使用的方法idxs来检查weights在哪个 bin 中调用的数据,然后计算 bin 权重的最小值/最大值:

插图

  1. 获取slices显示每个垃圾箱idxs元素所属的。
  2. 排序slicesweights同时。
  3. 计算weights每个 bin(切片)中的最小值。

numpy 方法

import random
import numpy as np

# create example data
out_size = int(10)
bins = np.arange(3, out_size-3)
idxs = np.arange(0, out_size)
#random.shuffle(idxs)

# set duplicated slice manually for test
idxs[4] = idxs[3]
idxs[6] = idxs[7]

weights = idxs

# get which bin idxs belong …
Run Code Online (Sandbox Code Playgroud)

python numpy scipy pandas dask

7
推荐指数
1
解决办法
246
查看次数