标签: dask

DataFrame中的Dask数组

有没有办法轻松将数值的DataFrame转换为数组?与valuespandas DataFrame 类似.我似乎无法用提供的API找到任何方法,但我认为这是一个常见的操作.

dask

8
推荐指数
1
解决办法
2489
查看次数

在dask数据帧中创建if-else条件列

我需要创建一个基于dask数据帧的某些条件的列.在熊猫中它是相当简单的:

ddf['TEST_VAR'] = ['THIS' if x == 200607 else  
              'NOT THIS' if x == 200608 else 
              'THAT' if x == 200609 else 'NONE'  
              for x in ddf['shop_week'] ]
Run Code Online (Sandbox Code Playgroud)

在dask中,我必须做同样的事情,如下所示:

def f(x):
    if x == 200607:
         y= 'THIS'
    elif x == 200608 :
         y= 'THAT'
    else :
         y= 1 
    return y

ddf1 = ddf.assign(col1 = list(ddf.shop_week.apply(f).compute()))
ddf1.compute()
Run Code Online (Sandbox Code Playgroud)

问题:

  1. 是否有更好/更直接的方法来实现它?
  2. 我无法修改第一个数据帧ddf,我需要创建ddf1来改变是dask dataframe的Immutable对象吗?

python pandas dask

8
推荐指数
1
解决办法
1042
查看次数

处理Dask分布式异常的方法

我在使用DaskDistributed开发数据分析管道方面取得了很大成功.然而,我仍然期待改进的一件事是我处理异常的方式.

现在,如果,我写下面的内容

def my_function (value):
    return 1 / value

results = (dask.bag
    .from_sequence(range(-10, 10))
    .map(my_function))

print(results.compute())
Run Code Online (Sandbox Code Playgroud)

...然后在运行程序时,我得到一个很长很长的追溯列表(每个工人一个,我猜).最相关的部分是

distributed.utils - ERROR - division by zero
Traceback (most recent call last):
  File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/utils.py", line 193, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/client.py", line 1473, in …
Run Code Online (Sandbox Code Playgroud)

python dask

8
推荐指数
1
解决办法
1107
查看次数

如何在Dask中使用InfiniBand网络?

我有一个具有高性能网络的集群(InfiniBand).但是,当我设置我的Dask调度程序和工作程序时,性能似乎并不像我预期的那么快.我怎么能告诉Dask使用这个网络?

免责声明:我只是问这个问题,以便我可以回答.它已成为一个经常被问到的问题

python infiniband dask

8
推荐指数
1
解决办法
586
查看次数

并行将MongoDB中的数据加载到python中

MongoDB中我的集合中的所有文档都具有相同的字段.我的目标是将它们加载到Python中pandas.DataFramedask.DataFrame.

我想通过并行化来加速加载过程.我的计划是生成几个进程或线程.每个进程都会加载一个集合的块,然后这些块将合并在一起.

如何使用MongoDB正确完成?

我尝试过类似PostgreSQL的方法.我最初的想法是在SQL查询中使用SKIPLIMIT.它失败了,因为为每个特定查询打开的每个游标都从头开始读取数据表,只是跳过了指定的行数.所以我必须创建包含记录号的附加列,并在查询中指定这些数字的范围.

相反,MongoDB为每个文档分配唯一的ObjectID.但是,我发现不可能从另一个ObjectID中减去一个ObjectID,它们只能与排序操作进行比较:less,greater和equal.

另外,pymongo返回支持索引操作的游标对象,并且有一些看似对我的任务有用的方法,比如count,limit.

用于Spark的MongoDB连接器以某种方式完成此任务.不幸的是,我对Scala并不熟悉,因此,我很难找到他们是如何做到的.

那么,从Mongo并行加载数据到python的正确方法是什么?

到目前为止,我已经达到了以下解决方案:

import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed

# import other modules.

collection = get_mongo_collection()
cursor = collection.find({ })

def process_document(in_doc):
    out_doc = # process doc keys and values
    return pd.DataFrame(out_doc)

df = dd.from_delayed( (delayed(process_document)(d) for d in cursor) )
Run Code Online (Sandbox Code Playgroud)

但是,它看起来像是dask.dataframe.from_delayed从传递的生成器内部创建一个列表,有效地在一个线程中加载所有集合.

更新.我在docs中发现,这种skip方法pymongo.Cursor从集合的开头也开始,就像PostgreSQL一样.同一页面建议在应用程序中使用分页逻辑.到目前为止,我发现的解决方案使用_id了这个分类.但是,它们也存储在最后一次看到 …

python parallel-processing mongodb pandas dask

8
推荐指数
1
解决办法
4111
查看次数

解包延迟功能的结果

在使用延迟转换我的程序时,我偶然发现了一种常用的编程模式,该模式不适用于延迟.例:

from dask import delayed
@delayed
def myFunction():
    return 1,2

a, b = myFunction()
a.compute()
Run Code Online (Sandbox Code Playgroud)

提高:TypeError: Delayed objects of unspecified length are not iterable 虽然以下解决方法没有.但看起来更笨拙

from dask import delayed
@delayed
def myFunction():
    return 1,2

dummy = myFunction()
a, b = dummy[0], dummy[1]
a.compute()
Run Code Online (Sandbox Code Playgroud)

这是预期的行为吗?

python dask dask-delayed

8
推荐指数
1
解决办法
1200
查看次数

使用Python dask读取CSV时,我可以设置索引列吗?

使用Python Pandas读取CSV时,可以指定索引列.读取文件时是否可以使用Python Dask,而不是之后设置索引?

例如,使用pandas:

df = pandas.read_csv(filename, index_col=0)
Run Code Online (Sandbox Code Playgroud)

理想情况下使用dask可能是这样的:

df = dask.dataframe.read_csv(filename, index_col=0)
Run Code Online (Sandbox Code Playgroud)

我试过了

df = dask.dataframe.read_csv(filename).set_index(?)
Run Code Online (Sandbox Code Playgroud)

但索引列没有名称(这似乎很慢).

python csv dataframe dask

8
推荐指数
1
解决办法
2508
查看次数

如何使用Dask使用所有cpu核心?

我有一个超过35000行的熊猫系列。我想使用dask使其更有效率。但是,我的dask代码和pandas代码同时使用。最初,“ ser”是熊猫系列,而fun1fun2是在系列的各个行中执行模式匹配的基本功能。
大熊猫
ser = ser.apply(fun1).apply(fun2)

达斯克
ser = dd.from_pandas(ser, npartitions = 16) ser = ser.apply(fun1).apply(fun2)

在检查cpu核心的状态时,我发现并不是所有的核心都被使用了。只有一个内核已经习惯了100%。

有没有什么方法可以使用dask加快序列代码的速度,或者在串行执行dask操作时利用cpu的所有核心?

dask dask-delayed dask-distributed

8
推荐指数
1
解决办法
4600
查看次数

什么是关闭Dask LocalCluster的"正确"方法?

我正在尝试使用LocalCluster在我的笔记本电脑上使用dask-distributed,但我仍然没有找到一种方法让我的应用程序关闭而不会引发一些警告或触发matplotlib的一些奇怪的迭代(我正在使用tkAgg后端).

例如,如果我按此顺序关闭客户端和群集,则tk无法以适当的方式从内存中删除图像,我收到以下错误:

Traceback (most recent call last):
  File "/opt/Python-3.6.0/lib/python3.6/tkinter/__init__.py", line 3501, in __del__
    self.tk.call('image', 'delete', self.name)
RuntimeError: main thread is not in main loop
Run Code Online (Sandbox Code Playgroud)

例如,以下代码生成此错误:

from time import sleep
import numpy as np
import matplotlib.pyplot as plt
from dask.distributed import Client, LocalCluster

if __name__ == '__main__':
    cluster = LocalCluster(
        n_workers=2,
        processes=True,
        threads_per_worker=1
    )
    client = Client(cluster)

    x = np.linspace(0, 1, 100)
    y = x * x
    plt.plot(x, y)

    print('Computation complete! Stopping workers...')
    client.close()
    sleep(1)
    cluster.close()

    print('Execution complete!')
Run Code Online (Sandbox Code Playgroud)

sleep(1)行使问题更容易出现,因为它不会在每次执行时出现. …

python dask dask-distributed

8
推荐指数
2
解决办法
605
查看次数

在Jupyter Notebook中运行Tornado服务器

通过进行标准的Tornado演示并将IOLoop推入后台线程,可以在单个脚本中查询服务器。当Tornado服务器是交互式对象时,这很有用(请参阅Dask或类似内容)。

import asyncio
import requests
import tornado.ioloop
import tornado.web

from concurrent.futures import ThreadPoolExecutor

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world")

def make_app():
    return tornado.web.Application([
        (r"/", MainHandler),
    ])

pool = ThreadPoolExecutor(max_workers=2)
loop = tornado.ioloop.IOLoop()

app = make_app()
app.listen(8888)
fut = pool.submit(loop.start)

print(requests.get("https://localhost:8888"))
Run Code Online (Sandbox Code Playgroud)

上面的代码在标准python脚本中可以正常工作(尽管缺少安全关机)。Jupyter Notebook是这些交互式Tornado服务器环境的最佳环境。但是,当涉及到Jupyter时,由于已经存在一个活动的运行循环,所以这个想法失败了:

>>> import asyncio
>>> asyncio.get_event_loop()
<_UnixSelectorEventLoop running=True closed=False debug=False>
Run Code Online (Sandbox Code Playgroud)

在Jupyter笔记本中运行上述脚本时,可以看到这一点,服务器和请求客户端都试图在同一线程中打开连接,并且代码挂起。建立一个新的Asyncio循环和/或Tornado IOLoop似乎没有帮助,我怀疑我在Jupyter本身中缺少了一些东西。

问题:Jupyter笔记本中是否可以在后台运行实时Tornado服务器,以便标准python requests或类似服务器可以从主线程连接到它?我希望尽可能避免在呈现给用户的代码中使用Asyncio,因为它对新手用户而言相对复杂。

python tornado python-asyncio jupyter dask

8
推荐指数
1
解决办法
752
查看次数