我试图了解 Dask 在本地计算机上的使用模式。
具体来说,
Pandas 通过单个核心执行这些操作,这些操作对我来说需要几个小时。我的机器上有 8 个核心,因此,我想使用 Dask 尽可能地并行化这些操作。
我的问题如下: Dask 中执行此操作的两种方式有什么区别:
import pandas as pd
from sklearn.datasets import load_iris
iris = load_iris()
Run Code Online (Sandbox Code Playgroud)
(1)
import dask.dataframe as dd
df = dd.from_pandas(
pd.DataFrame(iris.data, columns=iris.feature_names),
npartitions=2
)
df.mean().compute()
Run Code Online (Sandbox Code Playgroud)
(2)
import dask.dataframe as dd
from distributed import Client
client = Client()
df = client.persist(
dd.from_pandas(
pd.DataFrame(iris.data, columns=iris.feature_names),
npartitions=2
)
)
df.mean().compute()
Run Code Online (Sandbox Code Playgroud)
一种使用模式相对于另一种使用模式有什么好处?为什么我应该使用其中一种而不是另一种?
有没有办法直接将 Spark 数据帧转换为 Dask 数据帧?
我目前正在使用 Spark 的 .toPandas()函数将其转换为 pandas 数据帧,然后转换为 dask 数据帧。我相信这是低效的操作,并且没有利用dask的分布式处理能力,因为pandas永远是瓶颈。
我发现 blaze 生态系统*很棒,因为它涵盖了大多数数据工程用例。在 2015-2016 年期间,这些项目肯定引起了很多兴趣,但最近却被忽视了。我说这是查看 github 存储库上的提交。
所以我对社区的问题是
火焰生态系统:
参考资料:http : //blaze.pydata.org/
pivot_table我在大型数据集(1000 万行,6 列)上使用 Pandas函数。由于执行时间至关重要,因此我尝试加快流程。目前,处理整个数据集大约需要 8 秒,这太慢了,我希望找到替代方案来提高速度/性能。
我当前的 Pandas 数据透视表:
df_pivot = df_original.pivot_table(index="industry", columns = "months",
values = ["orders", "client_name"],
aggfunc ={"orders": np.sum, "client_name": pd.Series.nunique})
Run Code Online (Sandbox Code Playgroud)
df_original包括所有数据(10m 行,从 csv 导入)。行业为客户所在行业,月份为订单月份(1月至12月),订单为订单数量。categorical除订单数(数据类型)外,所有数据均转换为数据int。最初,行业、月份和客户名称都是字符串。
我尝试使用pandas.DataFrame.unstack- 速度更慢。我也尝试过Dask。产生dask pivot_table了一些改进(6 秒执行时间 - 因此少了 2 秒)。然而,它仍然很慢。是否有更快的替代方案(对于大型数据集)?groupy也许用, ,重新创建数据透视表crosstab...不幸的是,我根本没有找到可以工作的替代方案,而且我对 Python 和 Pandas 仍然很陌生...期待您的建议。提前致谢!
更新:
我通过以下方式找出了 groupby 方式:
df_new = df_original.groupby(["months", "industry"]).agg({"orders": np.sum, "client_name": pd.Series.nunique}).unstack(level="months").fillna(0)
Run Code Online (Sandbox Code Playgroud)
现在速度快得多,大约需要 2-3 秒。还有一些选项可以进一步提高速度吗?
从文档来看,Number of allowed automatic retries if computing a result fails.
“结果”是指每个单独的任务还是整个compute()调用?
如果是指整个调用,那么dask.delayed中如何实现每个任务的重试呢?
另外,我不确定重试是否有效,如下面的代码所示。
import dask
import random
@dask.delayed
def add(x, y):
return x + y
@dask.delayed
def divide(sum_i):
n = random.randint(0, 1)
result = sum_i / n
return result
tasks = []
for i in range(3):
sum_i = add(i, i+1)
divide_n = divide(sum_i)
tasks.append(divide_n)
dask.compute(*tasks, retries=1000)
Run Code Online (Sandbox Code Playgroud)
预期输出为 (1, 3, 5),实际输出为 ZeroDivisionError。
我已经使用 dask read_sql_table 从 oracle 数据库成功引入了一张表。但是,当我尝试引入另一个表时,我收到此错误KeyError: 'Only a column name can be used for the key in a dtype mappings argument.'
我检查了我的连接字符串和架构,所有这些都很好。我知道表名存在,我试图用作索引的列是 oracle 数据库中表的主键。
有人可以解释为什么当列名明确存在时会发生此错误吗?
我知道我可以使用 Pandas 块,但在这种情况下宁愿使用 dask。
下面是我如何连接到 oracle 数据库和错误消息的最后一点
host='*******'
port='*****'
sid='****'
user='******'
password='*****'
con_string = 'oracle://' + user + ':' + password + '@' + host + ':' + port + '/' + sid
engine = create_engine(con_string)
df =ddf.read_sql_table('table_name', uri=con_string, index_col='id', npartitions=None, schema='*****')
Run Code Online (Sandbox Code Playgroud)
~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\core\generic.py in astype(self, dtype, copy, errors, **kwargs) 5855
如果 col_name 不在 self: …
使用时会创建dask.to_parquet(df, filename)一个子文件夹filename,并将多个文件写入该文件夹,而pandas.to_parquet(df, filename)只写入一个文件。我可以使用 dask to_parquet(不使用compute()创建 pandas df )只写入单个文件吗?
我的目标:
我有一个构建的 docker 映像,并希望在该映像上运行我的所有流程。
现在:
我有以下任务正在本地 Dask 执行器上运行。运行代理的服务器与执行所需的环境是不同的 python 环境my_task- 因此需要在预构建映像中运行。
我的问题是: 如何在 Dask Executor 上运行此流程,以便它在我提供的 docker 映像(作为环境)上运行?
import prefect
from prefect import task, Flow
from prefect.engine.executors import LocalDaskExecutor
from prefect.environments import LocalEnvironment
@task
def hello_task():
logger = prefect.context.get("logger")
logger.info("Hello, Docker!")
with Flow("My Flow") as flow:
results = hello_task()
flow.environment = LocalEnvironment(
labels=[], executor=LocalDaskExecutor(scheduler="threads", num_workers=2),
)
Run Code Online (Sandbox Code Playgroud)
我认为我需要首先在该 docker 映像上启动服务器和代理(如此处所述),但我想可以有一种方法可以简单地在提供的映像上运行 Flow。
更新1
按照本教程,我尝试了以下操作:
import prefect
from prefect import task, Flow
from prefect.engine.executors import LocalDaskExecutor
from …Run Code Online (Sandbox Code Playgroud) 假设我有以下数据框:
df1 = pd.DataFrame(data = [1,np.nan,np.nan,1,1,np.nan,1,1,1],
columns = ['X'],
index = ['a', 'a', 'a',
'b', 'b', 'b',
'c', 'c', 'c'])
print(df1)
X
a 1.0
a NaN
a NaN
b 1.0
b 1.0
b NaN
c 1.0
c 1.0
c 1.0
Run Code Online (Sandbox Code Playgroud)
我只想保留具有 2 个或更多非 NaN 条目的索引。在这种情况下,“a”条目只有一个非 NaN 值,所以我想删除它并让我的结果是:
X
b 1.0
b 1.0
b NaN
c 1.0
c 1.0
c 1.0
Run Code Online (Sandbox Code Playgroud)
做这个的最好方式是什么?理想情况下,我也想要一些适用于 Dask 的东西,尽管通常如果它适用于 Pandas,它也适用于 Dask。
我有三个一维数组:
idxs: 索引数据weights: 中每个指标的权重 idxsbins:用于计算其中最小重量的 bin。这是我当前使用的方法idxs来检查weights在哪个 bin 中调用的数据,然后计算 bin 权重的最小值/最大值:
slices显示每个垃圾箱idxs元素所属的。slices和weights同时。weights每个 bin(切片)中的最小值。import random
import numpy as np
# create example data
out_size = int(10)
bins = np.arange(3, out_size-3)
idxs = np.arange(0, out_size)
#random.shuffle(idxs)
# set duplicated slice manually for test
idxs[4] = idxs[3]
idxs[6] = idxs[7]
weights = idxs
# get which bin idxs belong …Run Code Online (Sandbox Code Playgroud) dask ×10
pandas ×6
python ×6
numpy ×2
blaze ×1
dask-delayed ×1
data-science ×1
datashape ×1
docker ×1
docker-image ×1
etl ×1
odo ×1
parquet ×1
performance ×1
prefect ×1
pyspark ×1
scipy ×1