我有一个json文件目录,我试图转换为dask DataFrame并将其保存到castra.有200个文件,它们之间包含O(10**7)个json记录.代码非常简单,主要遵循教程示例.
import dask.dataframe as dd
import dask.bag as db
import json
txt = db.from_filenames('part-*.json')
js = txt.map(json.loads)
df = js.to_dataframe()
cs=df.to_castra("data.castra")
Run Code Online (Sandbox Code Playgroud)
我在32核机器上运行它,但代码只使用100%的一个核心.我对文档的理解是这段代码并行执行.为什么不呢?我误解了什么吗?
有没有办法轻松将数值的DataFrame转换为数组?与valuespandas DataFrame 类似.我似乎无法用提供的API找到任何方法,但我认为这是一个常见的操作.
我需要创建一个基于dask数据帧的某些条件的列.在熊猫中它是相当简单的:
ddf['TEST_VAR'] = ['THIS' if x == 200607 else
'NOT THIS' if x == 200608 else
'THAT' if x == 200609 else 'NONE'
for x in ddf['shop_week'] ]
Run Code Online (Sandbox Code Playgroud)
在dask中,我必须做同样的事情,如下所示:
def f(x):
if x == 200607:
y= 'THIS'
elif x == 200608 :
y= 'THAT'
else :
y= 1
return y
ddf1 = ddf.assign(col1 = list(ddf.shop_week.apply(f).compute()))
ddf1.compute()
Run Code Online (Sandbox Code Playgroud)
问题:
我在使用Dask和Distributed开发数据分析管道方面取得了很大成功.然而,我仍然期待改进的一件事是我处理异常的方式.
现在,如果,我写下面的内容
def my_function (value):
return 1 / value
results = (dask.bag
.from_sequence(range(-10, 10))
.map(my_function))
print(results.compute())
Run Code Online (Sandbox Code Playgroud)
...然后在运行程序时,我得到一个很长很长的追溯列表(每个工人一个,我猜).最相关的部分是
distributed.utils - ERROR - division by zero
Traceback (most recent call last):
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/utils.py", line 193, in f
result[0] = yield gen.maybe_future(func(*args, **kwargs))
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1021, in run
yielded = self.gen.throw(*exc_info)
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/client.py", line 1473, in …Run Code Online (Sandbox Code Playgroud) 我有一个具有高性能网络的集群(InfiniBand).但是,当我设置我的Dask调度程序和工作程序时,性能似乎并不像我预期的那么快.我怎么能告诉Dask使用这个网络?
免责声明:我只是问这个问题,以便我可以回答.它已成为一个经常被问到的问题
MongoDB中我的集合中的所有文档都具有相同的字段.我的目标是将它们加载到Python中pandas.DataFrame或dask.DataFrame.
我想通过并行化来加速加载过程.我的计划是生成几个进程或线程.每个进程都会加载一个集合的块,然后这些块将合并在一起.
如何使用MongoDB正确完成?
我尝试过类似PostgreSQL的方法.我最初的想法是在SQL查询中使用SKIP和LIMIT.它失败了,因为为每个特定查询打开的每个游标都从头开始读取数据表,只是跳过了指定的行数.所以我必须创建包含记录号的附加列,并在查询中指定这些数字的范围.
相反,MongoDB为每个文档分配唯一的ObjectID.但是,我发现不可能从另一个ObjectID中减去一个ObjectID,它们只能与排序操作进行比较:less,greater和equal.
另外,pymongo返回支持索引操作的游标对象,并且有一些看似对我的任务有用的方法,比如count,limit.
用于Spark的MongoDB连接器以某种方式完成此任务.不幸的是,我对Scala并不熟悉,因此,我很难找到他们是如何做到的.
那么,从Mongo并行加载数据到python的正确方法是什么?
到目前为止,我已经达到了以下解决方案:
import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed
# import other modules.
collection = get_mongo_collection()
cursor = collection.find({ })
def process_document(in_doc):
out_doc = # process doc keys and values
return pd.DataFrame(out_doc)
df = dd.from_delayed( (delayed(process_document)(d) for d in cursor) )
Run Code Online (Sandbox Code Playgroud)
但是,它看起来像是dask.dataframe.from_delayed从传递的生成器内部创建一个列表,有效地在一个线程中加载所有集合.
更新.我在docs中发现,这种skip方法pymongo.Cursor从集合的开头也开始,就像PostgreSQL一样.同一页面建议在应用程序中使用分页逻辑.到目前为止,我发现的解决方案使用_id了这个分类.但是,它们也存储在最后一次看到 …
默认情况下describe,Dask DataFrame的方法仅汇总数字列.根据文档,我应该能够通过提供include参数来获得分类列的描述.然而
df.describe(include=['category']).compute()
导致一个
TypeError: describe() got an unexpected keyword argument 'include'.
我也试过了一个不同的方法:
df.select_dtypes(include=['category']).describe().compute()
而这次我得到了
ValueError: DataFrame contains only non-numeric data.
您能否告诉我们在Dask DataFrame中总结分类列的最佳方法是什么?
我们如何在Dask分布式中为每个工作人员选择--nthreads和--nprocs?我有3个工作线程,每个工作线程有2个线程,每个内核有4个内核,每个内核有1个线程(根据每个工作线程上'lscpu'Linux命令的输出)
我正在尝试使用LocalCluster在我的笔记本电脑上使用dask-distributed,但我仍然没有找到一种方法让我的应用程序关闭而不会引发一些警告或触发matplotlib的一些奇怪的迭代(我正在使用tkAgg后端).
例如,如果我按此顺序关闭客户端和群集,则tk无法以适当的方式从内存中删除图像,我收到以下错误:
Traceback (most recent call last):
File "/opt/Python-3.6.0/lib/python3.6/tkinter/__init__.py", line 3501, in __del__
self.tk.call('image', 'delete', self.name)
RuntimeError: main thread is not in main loop
Run Code Online (Sandbox Code Playgroud)
例如,以下代码生成此错误:
from time import sleep
import numpy as np
import matplotlib.pyplot as plt
from dask.distributed import Client, LocalCluster
if __name__ == '__main__':
cluster = LocalCluster(
n_workers=2,
processes=True,
threads_per_worker=1
)
client = Client(cluster)
x = np.linspace(0, 1, 100)
y = x * x
plt.plot(x, y)
print('Computation complete! Stopping workers...')
client.close()
sleep(1)
cluster.close()
print('Execution complete!')
Run Code Online (Sandbox Code Playgroud)
该sleep(1)行使问题更容易出现,因为它不会在每次执行时出现. …
通过进行标准的Tornado演示并将IOLoop推入后台线程,可以在单个脚本中查询服务器。当Tornado服务器是交互式对象时,这很有用(请参阅Dask或类似内容)。
import asyncio
import requests
import tornado.ioloop
import tornado.web
from concurrent.futures import ThreadPoolExecutor
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world")
def make_app():
return tornado.web.Application([
(r"/", MainHandler),
])
pool = ThreadPoolExecutor(max_workers=2)
loop = tornado.ioloop.IOLoop()
app = make_app()
app.listen(8888)
fut = pool.submit(loop.start)
print(requests.get("https://localhost:8888"))
Run Code Online (Sandbox Code Playgroud)
上面的代码在标准python脚本中可以正常工作(尽管缺少安全关机)。Jupyter Notebook是这些交互式Tornado服务器环境的最佳环境。但是,当涉及到Jupyter时,由于已经存在一个活动的运行循环,所以这个想法失败了:
>>> import asyncio
>>> asyncio.get_event_loop()
<_UnixSelectorEventLoop running=True closed=False debug=False>
Run Code Online (Sandbox Code Playgroud)
在Jupyter笔记本中运行上述脚本时,可以看到这一点,服务器和请求客户端都试图在同一线程中打开连接,并且代码挂起。建立一个新的Asyncio循环和/或Tornado IOLoop似乎没有帮助,我怀疑我在Jupyter本身中缺少了一些东西。
问题:Jupyter笔记本中是否可以在后台运行实时Tornado服务器,以便标准python requests或类似服务器可以从主线程连接到它?我希望尽可能避免在呈现给用户的代码中使用Asyncio,因为它对新手用户而言相对复杂。
dask ×10
python ×8
pandas ×2
castra ×1
concurrency ×1
infiniband ×1
jupyter ×1
mongodb ×1
tornado ×1