mat*_*ick 5 dask dask-distributed
import dask.distributed
def f(x, y):
return x, y
client = dask.distributed.Client()
client.map(f, [(1, 2), (2, 3)])
Run Code Online (Sandbox Code Playgroud)
不起作用。
[<Future: status: pending, key: f-137239e2f6eafbe900c0087f550bc0ca>,
<Future: status: pending, key: f-64f918a0c730c63955da91694fcf7acc>]
distributed.worker - WARNING - Compute Failed
Function: f
args: ((1, 2))
kwargs: {}
Exception: TypeError("f() missing 1 required positional argument: 'y'",)
distributed.worker - WARNING - Compute Failed
Function: f
args: ((2, 3))
kwargs: {}
Exception: TypeError("f() missing 1 required positional argument: 'y'",)
Run Code Online (Sandbox Code Playgroud)
您没有完全正确的签名 - 也许文档不清楚(欢迎提出建议)。Client.map()为提交的每个任务采用(可变数量)组参数,而不是单个可迭代的东西。你应该把它表述为
client.map(f, (1, 2), (2, 3))
Run Code Online (Sandbox Code Playgroud)
或者,如果你想更接近你的原始模式
client.map(f, *[(1, 2), (2, 3)])
Run Code Online (Sandbox Code Playgroud)
好吧,这个文档确实有点令人困惑。而且我找不到清楚地证明这个问题的例子。那么让我把它分解如下:
def test_fn(a, b, c, d, **kwargs):
return a + b + c + d + kwargs["special"]
futures = client.map(test_fn, *[[1, 2, 3, 4], (1, 2, 3, 4), (1, 2, 3, 4), (1, 2, 3, 4)], special=100)
output = [f.result() for f in futures]
# output = [104, 108, 112, 116]
futures = client.map(test_fn, [1, 2, 3, 4], (1, 2, 3, 4), (1, 2, 3, 4), (1, 2, 3, 4), special=100)
output = [f.result() for f in futures]
# output = [104, 108, 112, 116]
Run Code Online (Sandbox Code Playgroud)
注意事项:
test_fn获取 a=b=c=d=1。)**kwargs(如special)被传递给函数。但对于所有函数调用来说,它的值都是相同的。现在想来,这并不奇怪。我认为它只是遵循Python的concurrent.futures.ProcessPoolExecutor.map()签名。
附言。请注意,即使文档说“返回:
列表、迭代器或未来队列,具体取决于输入的类型。”,您实际上可能会收到此错误:Dask no longer supports mapping over Iterators or Queues. Consider using a normal for loop and Client.submit
| 归档时间: |
|
| 查看次数: |
2387 次 |
| 最近记录: |