我构建了一个通过多线程(通过 Jupyter Notebook、python 2.7、anaconda)启动了 XX 次的刮板(worker)。脚本采用以下格式,如 python.org 所述:
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
q = Queue()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # block until all tasks are done
Run Code Online (Sandbox Code Playgroud)
当我按原样运行脚本时,没有问题。脚本完成后释放内存。
但是,我想运行上述脚本 20 次(排序的批处理),所以我将提到的脚本转换为一个函数,并使用以下代码运行该函数:
def multithreaded_script():
my script #code from above
x = 0
while x<20:
x +=1
multithredaded_script()
Run Code Online (Sandbox Code Playgroud)
每次迭代都会增加内存,最终系统开始将其写入磁盘。
有没有办法在每次运行后清除内存?
我试过:
sleep(30)在每次迭代结束时设置(以防 ram 需要时间释放)似乎没有任何帮助。关于在 While 语句中每次运行后我还可以尝试清除内存的其他想法?如果没有,有没有更好的方法来执行我的脚本 XX 次,这不会吃掉 ram?
先感谢您。
我正在编写一个脚本,它从表中获取 N 条记录,并通过多线程处理所述记录。
以前,我只是在每个工作定义中的 SQL 语句中使用 Order by RAND(),并希望不会有重复。
这种工作(重复数据删除稍后完成),但是,我想通过以下方式使我的脚本更加高效:
1)查询一次表,提取N条记录,并将它们分配给一个列表
2)将大列表拆分为Y列表的相同大小的列表,这可以通过以下方式完成:
number_of_workers = 2
first_names = ['Steve', 'Jane', 'Sara', 'Mary','Jack']
def chunkify(lst,n):
return [lst[i::n] for i in xrange(n)]
list1 = chunkify(first_names, number_of_workers)
print list1
Run Code Online (Sandbox Code Playgroud)
3)在多线程中定义worker函数时,向每个worker传递不同的子列表。注意 - 工作人员的数量(以及我想要将查询结果拆分成的部分)在函数的开头定义。然而,由于我对 Python 相当陌生,我不知道如何将每个子列表传递给单独的工作人员(或者它甚至可行吗?)
任何帮助、其他建议等将不胜感激!
多线程代码示例如下。我将如何使用
import threading
import random
def worker():
assign sublistN to worker N
print sublistN
threads = []
for i in range(number_of_workers):
print i
print ""
t = threading.Thread(target=worker)
threads.append(t)
t.start()
Run Code Online (Sandbox Code Playgroud)
先感谢您!
如何将使用 boto3 上传到 AWS S3 的图像的 ContentType 设置为 content-type:image/jpeg?
目前,我使用以下命令使用 buto3/python 2.7 将图像上传到 S3:
s3.upload_fileobj(bytes_io_file, bucket_name, filename)
Run Code Online (Sandbox Code Playgroud)
但是,要将上传对象的类型设置为 ContentType='image/jpeg',我必须通过 Web 界面手动选择 S3 上的所有“文件夹”,并将元数据设置为 Content-type : Image/jpeg
有没有办法在我上面的上传请求中设置这个标志?
先感谢您!
我无法理解如何将队列实现到下面的多处理示例中。基本上,我希望代码:
1)产生2个进程(完成)
2)将我的 id_list 分成两部分(完成)
3)让每个进程迭代列表打印出每个项目,并且只有在完成列表后才关闭。我知道我必须实现某种类型的排队系统,并将其传递给每个工作人员,但我不确定如何做到这一点。任何帮助将非常感激。
from multiprocessing import Pool,Queue
id_list = [1,2,3,4,5,6,7,8,9,10]
def mp_worker(record):
try:
print record
sleep(1)
except: pass
print "worker closed"
def mp_handler():
p = Pool(processes = 2) #number of processes
p.map(mp_worker, id_list) #devides id_list between 2 processes, defined above
p.close()
p.join()
mp_handler()
Run Code Online (Sandbox Code Playgroud)
注意 - 代码打印出“工人关闭”10 次。我希望这条语句只打印两次(每个工人一次,在每个工人从 id_list 打印出 5 个数字之后)
我有一个字典列表,其键包含空格。
input = [{'books author': 'bob', 'book title': 'three wolves'},{'books author': 'tim', 'book title': 'three apples'}]
Run Code Online (Sandbox Code Playgroud)
我将如何遍历上述字典列表,并用下划线替换包含空格的键,输出为
output = [{'books_author': 'bob', 'book_title': 'three wolves'},{'books_author': 'tim', 'book_title': 'three apples'}]
Run Code Online (Sandbox Code Playgroud)
请注意,实际字典可能包含数百个键,而一个列表将包含数千个dicts。