我有这个使用该apscheduler库提交进程的 python 代码,它工作正常:
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
array = [ 1, 3, 5, 7]
for elem in array:
scheduler.add_job(function_to_submit, kwargs={ 'elem': elem })
scheduler.start()
def function_to_submit(elem):
print(str(elem))
Run Code Online (Sandbox Code Playgroud)
请注意,进程是并行提交的,并且代码不会等待进程结束。
我需要的是将此代码迁移到dask distributed使用工作人员。我遇到的问题是,如果我使用dask提交方法,代码将等待所有函数结束,我需要代码继续。如何实现这一目标?
client = Client('127.0.0.1:8786')
future1 = client.submit(function_to_submit, 1)
future3 = client.submit(function_to_submit, 3)
L = [future1, future3]
client.gather(L) # <-- this waits until all the futures end
Run Code Online (Sandbox Code Playgroud) 人们经常谈论使用镶木地板和熊猫。我正在努力了解与 pandas 一起使用时我们是否可以利用 parquet 文件的全部功能。例如,假设我有一个大 parquet 文件(按年份分区),有 30 列(包括年份、州、性别、姓氏)和许多行。我想加载镶木地板文件并执行随后的类似计算
import pandas as pd
df = pd.read_parquet("file.parquet")
df_2002 = df[df.year == 2002]
df_2002.groupby(["state", "gender"])["last_name"].count()
Run Code Online (Sandbox Code Playgroud)
在此查询中仅使用 4 列(共 30 列)并且仅2002使用年份分区。这意味着我们只想引入此计算所需的列和行,并且在具有谓词和投影下推的 parquet 中可以实现类似的操作(以及我们使用 parquet 的原因)。
但我试图了解这个查询在 pandas 中的行为方式。当我们打电话的那一刻,它会把所有的事情都记起来吗df = pd.read_parquet("file.parquet)?或者这里应用了任何惰性因素来引入投影和谓词下推?如果情况并非如此,那么将 pandas 与 parquet 一起使用还有什么意义呢?任何这一切都可以通过arrow package?
虽然我没用过dask只是想知道这种情况是否是在 dask 中处理的,因为他们是懒惰地执行的。
我确信这种情况在 Spark 世界中处理得很好,但只是想知道在本地场景中如何使用 pandas、arrow、dask、ibis 等包处理这些情况。
假设我们有 pandas dataframepd和 dask dataframe dd。当我想用 matplotlib 绘制 pandas 时,我可以轻松做到:
fig, ax = plt.subplots()
ax.bar(pd["series1"], pd["series2"])
fig.savefig(path)
Run Code Online (Sandbox Code Playgroud)
然而,当我尝试对 dask dataframe 执行相同操作时,我得到的Type Errors是:
TypeError: Cannot interpret 'string[python]' as a data type
Run Code Online (Sandbox Code Playgroud)
string[python]这只是一个示例,无论您的dd["series1"]数据类型是什么,都将在此处输入。
所以我的问题是:使用matplotlibwith 的正确方法是什么dask?将这两个库结合起来是否是一个好主意?
我正在尝试DataFrame通过读取由'#####'分隔的5个csv文件来创建一个哈希
代码是:
import dask.dataframe as dd
df = dd.read_csv('D:\temp.csv',sep='#####',engine='python')
res = df.compute()
Run Code Online (Sandbox Code Playgroud)
错误是:
dask.async.ValueError:
Dask dataframe inspected the first 1,000 rows of your csv file to guess the
data types of your columns. These first 1,000 rows led us to an incorrect
guess.
For example a column may have had integers in the first 1000
rows followed by a float or missing value in the 1,001-st row.
You will need to specify some dtype information explicitly using the …Run Code Online (Sandbox Code Playgroud) Dask数据帧看起来和感觉像是pandas数据帧,但是使用多个线程对大于内存的数据集进行操作.
但后来在同一页面中:
一个dask DataFrame由沿索引分隔的几个内存中的pandas DataFrame组成.
Dask是否从磁盘中顺序读取不同的DataFrame分区并执行计算以适应内存?是否在需要时将某些分区溢出到磁盘?一般来说,Dask如何管理数据的内存< - >磁盘IO以允许大于内存的数据分析?
我尝试在10M MovieLens数据集上执行一些基本计算(例如平均评级),并且我的笔记本电脑(8GB RAM)开始交换.
所以我有一个dask DataFrame.append的问题.我从主数据中生成了许多衍生特性,并将它们附加到主数据框中.之后,任何一组列的dask图表都会被炸毁.这是一个小例子:
%pylab inline
import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.dot import dot_graph
df=pd.DataFrame({'x%s'%i:np.random.rand(20) for i in range(5)})
ddf = dd.from_pandas(df, npartitions=2)
dot_graph(ddf['x0'].dask)
Run Code Online (Sandbox Code Playgroud)
g=ddf.assign(y=ddf['x0']+ddf['x1'])
dot_graph(g['x0'].dask)
Run Code Online (Sandbox Code Playgroud)
想象一下,我有很多产生的列.因此,任何特定列的计算图包括所有其他列的无关计算.即在我的情况下,我有len(ddf ['someColumn'].dask)> 100000.因此很快就会无法使用.
所以我的问题是这个问题可以解决吗?有没有现成的方法呢?如果不是 - 我应该朝哪个方向实施这个目标?
谢谢!
我正在处理内存不足的大型数据集,因此被引入了Dask数据框。我从文档中了解到,Dask不会将整个数据集加载到内存中。相反,它创建了多个线程,这些线程将根据需要从磁盘中获取记录。因此,我假设批处理大小为500的keras模型,在训练时它在内存中应该只有500条记录。但是当我开始训练时。这需要永远。可能是我做错了。请提出建议。
训练数据的形状:1000000 * 1290
import glob
import dask.dataframe
paths_train = glob.glob(r'x_train_d_final*.csv')
X_train_d = dd.read_csv('.../x_train_d_final0.csv')
Y_train1 = keras.utils.to_categorical(Y_train.iloc[,1], num_classes)
batch_size = 500
num_classes = 2
epochs = 5
model = Sequential()
model.add(Dense(645, activation='sigmoid', input_shape=(1290,),kernel_initializer='glorot_normal'))
#model.add(Dense(20, activation='sigmoid',kernel_initializer='glorot_normal'))
model.add(Dense(num_classes, activation='sigmoid'))
model.compile(loss='binary_crossentropy',
optimizer=Adam(decay=0),
metrics=['accuracy'])
history = model.fit(X_train_d.to_records(), Y_train,
batch_size=batch_size,
epochs=epochs,
verbose=1,
class_weight = {0:1,1:6.5},
shuffle=False)
Run Code Online (Sandbox Code Playgroud) 一个简单的代码片段如下:注释后跟###很重要..
from dask.distributed import Client
### this code-piece will get executed on a dask worker.
def task_to_perform():
print("task in progress.")
## do something here..
print("task is over.!")
### whereas the below code will run on client side,
### assume on a different node than worker
client= Client("127.0.01:8786")
future = client.submit(task_to_perform)
print("task results::", future.result())
Run Code Online (Sandbox Code Playgroud)
因此执行的控制流程将如下所示:dask-client将任务提交给dask-scheduler,调度程序将根据可用的工作程序调用必须提交给任务的工作者.
但是我们在dask中是否有任何机制可以通过它来绕过dask-scheduler在专用/特定工作者上提交我的任务?
我有一个dask数据框,其中一列具有索引。问题是,如果我执行df.head(),则它总是调出一个空的df,而df.tail总是返回正确的df。我检查了df.head总是检查第一个分区中的前n个条目。所以,如果我做df.reset_index(),它应该可以工作,但事实并非如此
下面是重现此代码:
import dask.dataframe as dd
import pandas as pd
data = pd.DataFrame({
'i64': np.arange(1000, dtype=np.int64),
'Ii32': np.arange(1000, dtype=np.int32),
'bhello': np.random.choice(['hello', 'Yo', 'people'], size=1000).astype("O")
})
Run Code Online (Sandbox Code Playgroud)
daskDf = dd.from_pandas(data, chunksize=3)
daskDf = daskDf.set_index('bhello')
print(daskDf.head())
Run Code Online (Sandbox Code Playgroud) 我刚刚开始冒险,DASK就在json格式的示例数据集上学习。我知道对于初学者来说,这不是世界上最简单的数据格式:)
我有一个json格式的数据集。我通过dd.read_json将数据加载到数据帧,一切顺利。例如,compute()或len()函数出现了问题。
我收到此错误:
ValueError: Metadata mismatch found in `from_delayed`.
Partition type: `DataFrame`
+----------+-------+----------+
| Column | Found | Expected |
+----------+-------+----------+
| column1 | - | object |
| column2 | - | object |
+----------+-------+----------+
Run Code Online (Sandbox Code Playgroud)
我尝试了不同的方法,但没有任何帮助。我不知道该如何处理该错误。
请帮助,我将非常感谢!
dask ×10
python ×8
bigdata ×2
dataframe ×2
pandas ×2
apscheduler ×1
assign ×1
csv ×1
dataset ×1
ibis ×1
keras ×1
large-data ×1
matplotlib ×1
optimization ×1
parquet ×1
pyarrow ×1
separator ×1