我上传了 130k json 文件。
我用以下方法做到这一点Python:
import os
import json
import pandas as pd
path = "/my_path/"
filename_ending = '.json'
json_list = []
json_files = [file for file in os.listdir(f"{path}") if file.endswith(filename_ending)]
import time
start = time.time()
for jf in json_files:
with open(f"{path}/{jf}", 'r') as f:
json_data = json.load(f)
json_list.append(json_data)
end = time.time()
Run Code Online (Sandbox Code Playgroud)
需要 60 秒。
我用以下方法做到这一点multiprocessing:
import os
import json
import pandas as pd
from multiprocessing import Pool
import time
path = "/my_path/"
filename_ending = '.json'
json_files = [file for file in os.listdir(f"{path}") if file.endswith(filename_ending)]
def read_data(name):
with open(f"/my_path/{name}", 'r') as f:
json_data = json.load(f)
return json_data
if __name__ == '__main__':
start = time.time()
pool = Pool(processes=os.cpu_count())
x = pool.map(read_data, json_files)
end = time.time()
Run Code Online (Sandbox Code Playgroud)
需要 53 秒。
我用以下方法做到这一点ray:
import os
import json
import pandas as pd
from multiprocessing import Pool
import time
import ray
path = "/my_path/"
filename_ending = '.json'
json_files = [file for file in os.listdir(f"{path}") if file.endswith(filename_ending)]
start = time.time()
ray.shutdown()
ray.init(num_cpus=os.cpu_count()-1)
@ray.remote
def read_data(name):
with open(f"/my_path/{name}", 'r') as f:
json_data = json.load(f)
return json_data
all_data = []
for jf in json_files:
all_data.append(read_data.remote(jf))
final = ray.get(all_data)
end = time.time()
Run Code Online (Sandbox Code Playgroud)
需要 146 秒。
我的问题是为什么ray需要这么多时间?
是不是因为:
1)ray 对于相对较小的数据量来说相对较慢?
2)我的代码做错了什么?
3)ray是不是很有用?
我从未使用过射线,但我很有信心,我的解释应该是正确的。
原始代码进行了简单的 json 反序列化。该代码主要需要文件 IO 和少量 CPU。(json 反序列化相当快,这就是 json 成为流行交换格式的原因之一)
Ray 必须将数据从一个进程推送到另一个进程(如果通过网络分布在多台机器上)。为了做到这一点,它自己执行一些序列化/反序列化(也许它使用 pickle 和强大的 TCP 协议来推送参数并收集结果)。而且这个开销可能比实际任务所需的工作量还要大。
如果您对 json 数据进行更多计算(任何 CPU 密集型数据),那么您将能够看到差异。
我的猜测是,您的示例问题太简单,因此 ray 的开销超出了使用多个工作线程的好处。
换句话说。分配任务和收集结果所花费的时间/精力比执行计算结果实际花费的时间/精力要多。