gie*_*s0r 5 python-3.x dask dask-distributed streamz
有 500 个,不断增长DataFrames,我想将(对于每个 DataFrame 独立的)数据的操作提交到dask. 我的主要问题是:可以dask保存不断提交的数据,所以我可以submit对所有提交的数据进行函数处理——而不仅仅是新提交的数据?
但是让我们用一个例子来解释它:
创建一个dask_server.py:
from dask.distributed import Client, LocalCluster
HOST = '127.0.0.1'
SCHEDULER_PORT = 8711
DASHBOARD_PORT = ':8710'
def run_cluster():
cluster = LocalCluster(dashboard_address=DASHBOARD_PORT, scheduler_port=SCHEDULER_PORT, n_workers=8)
print("DASK Cluster Dashboard = http://%s%s/status" % (HOST, DASHBOARD_PORT))
client = Client(cluster)
print(client)
print("Press Enter to quit ...")
input()
if __name__ == '__main__':
run_cluster()
Run Code Online (Sandbox Code Playgroud)
现在,我可以从我的连接my_stream.py,并开始submit和gather数据:
DASK_CLIENT_IP = '127.0.0.1'
dask_con_string = 'tcp://%s:%s' % (DASK_CLIENT_IP, DASK_CLIENT_PORT)
dask_client = Client(self.dask_con_string)
def my_dask_function(lines):
return lines['a'].mean() + lines['b'].mean
def async_stream_redis_to_d(max_chunk_size = 1000):
while 1:
# This is a redis queue, but can be any queueing/file-stream/syslog or whatever
lines = self.queue_IN.get(block=True, max_chunk_size=max_chunk_size)
futures = []
df = pd.DataFrame(data=lines, columns=['a','b','c'])
futures.append(dask_client.submit(my_dask_function, df))
result = self.dask_client.gather(futures)
print(result)
time sleep(0.1)
if __name__ == '__main__':
max_chunk_size = 1000
thread_stream_data_from_redis = threading.Thread(target=streamer.async_stream_redis_to_d, args=[max_chunk_size])
#thread_stream_data_from_redis.setDaemon(True)
thread_stream_data_from_redis.start()
# Lets go
Run Code Online (Sandbox Code Playgroud)
这按预期工作,而且速度非常快!!!
但是接下来,我想真正append的lines第一之前计算发生-而且不知道这是可能的吗?因此,在这里我们的例子中,我想计算出mean在所有已提交,不仅行最后提交的。
问题/方法:
submit 所有数据缓存到集群中
。这就像一个指数级的开销。试过了,可以用,就是很慢!对我来说,将期货列表分配给已发布的数据集似乎是理想的选择。这是相对便宜的(一切都是元数据),并且您将在几毫秒内了解最新情况
client.datasets["x"] = list_of_futures
def worker_function(...):
futures = get_client().datasets["x"]
data = get_client.gather(futures)
... work with data
Run Code Online (Sandbox Code Playgroud)
正如您提到的,还有其他系统,例如 PubSub 或 Actors。从你的说法来看,我怀疑期货+已发布的数据集更简单,也是更务实的选择。
| 归档时间: |
|
| 查看次数: |
216 次 |
| 最近记录: |