Dask:持续提交,处理所有提交的数据

gie*_*s0r 5 python-3.x dask dask-distributed streamz

有 500 个,不断增长DataFrames,我想将(对于每个 DataFrame 独立的)数据的操作提交到dask. 我的主要问题是:可以dask保存不断提交的数据,所以我可以submit对所有提交的数据进行函数处理——而不仅仅是新提交的数据?

但是让我们用一个例子来解释它:

创建一个dask_server.py

from dask.distributed import Client, LocalCluster
HOST = '127.0.0.1'
SCHEDULER_PORT = 8711
DASHBOARD_PORT = ':8710'

def run_cluster():
    cluster = LocalCluster(dashboard_address=DASHBOARD_PORT, scheduler_port=SCHEDULER_PORT, n_workers=8)
    print("DASK Cluster Dashboard = http://%s%s/status" % (HOST, DASHBOARD_PORT))
    client = Client(cluster)
    print(client)
    print("Press Enter to quit ...")
    input()

if __name__ == '__main__':
    run_cluster()
Run Code Online (Sandbox Code Playgroud)

现在,我可以从我的连接my_stream.py,并开始submitgather数据:

DASK_CLIENT_IP = '127.0.0.1'
dask_con_string = 'tcp://%s:%s' % (DASK_CLIENT_IP, DASK_CLIENT_PORT)
dask_client = Client(self.dask_con_string)

def my_dask_function(lines):
    return lines['a'].mean() + lines['b'].mean

def async_stream_redis_to_d(max_chunk_size = 1000):
    while 1:

        # This is a redis queue, but can be any queueing/file-stream/syslog or whatever
        lines = self.queue_IN.get(block=True, max_chunk_size=max_chunk_size)

        futures = []
        df = pd.DataFrame(data=lines, columns=['a','b','c'])
        futures.append(dask_client.submit(my_dask_function, df))

        result = self.dask_client.gather(futures)
        print(result)

        time sleep(0.1)

if __name__ == '__main__':
    max_chunk_size = 1000
    thread_stream_data_from_redis = threading.Thread(target=streamer.async_stream_redis_to_d, args=[max_chunk_size])
    #thread_stream_data_from_redis.setDaemon(True)
    thread_stream_data_from_redis.start()
    # Lets go
Run Code Online (Sandbox Code Playgroud)

这按预期工作,而且速度非常快!!!

但是接下来,我想真正appendlines第一之前计算发生-而且不知道这是可能的吗?因此,在这里我们的例子中,我想计算出mean所有已提交,不仅行最后提交的。

问题/方法:

  1. 这种累积计算可能吗?
  2. 糟糕的替代方案 1:每次新行到达时,我都将所有行缓存在本地并将submit 所有数据缓存到集群中 。这就像一个指数级的开销。试过了,可以用,就是很慢!
  3. 黄金选项:Python 程序 1 推送数据。可以将另一个客户端(来自另一个 python 程序)连接到该累积数据并将分析逻辑从插入逻辑中移开。我认为发布的数据集是要走的路,但是否适用于这种高速附加?

可能相关:分布式变量Actors Worker

MRo*_*lin 1

对我来说,将期货列表分配给已发布的数据集似乎是理想的选择。这是相对便宜的(一切都是元数据),并且您将在几毫秒内了解最新情况

client.datasets["x"] = list_of_futures

def worker_function(...):
    futures = get_client().datasets["x"]
    data = get_client.gather(futures)
    ... work with data
Run Code Online (Sandbox Code Playgroud)

正如您提到的,还有其他系统,例如 PubSub 或 Actors。从你的说法来看,我怀疑期货+已发布的数据集更简单,也是更务实的选择。