请告诉我之间的差异ThreadPool,并Pool在multiprocessing模块.当我尝试我的代码时,这是我看到的主要区别:
from multiprocessing import Pool
import os, time
print("hi outside of main()")
def hello(x):
print("inside hello()")
print("Proccess id: ", os.getpid())
time.sleep(3)
return x*x
if __name__ == "__main__":
p = Pool(5)
pool_output = p.map(hello, range(3))
print(pool_output)
Run Code Online (Sandbox Code Playgroud)
我看到以下输出:
hi outside of main()
hi outside of main()
hi outside of main()
hi outside of main()
hi outside of main()
hi outside of main()
inside hello()
Proccess id: 13268
inside hello()
Proccess id: 11104
inside hello()
Proccess id: …Run Code Online (Sandbox Code Playgroud) python multiprocessing threadpool python-3.x python-multiprocessing
这是我的主要分解程序,我添加了一个回调函数pool.apply_async(findK, args=(N,begin,end)),prime factorization is over当分解结束时消息提示,它工作正常.
import math
import multiprocessing
def findK(N,begin,end):
for k in range(begin,end):
if N% k == 0:
print(N,"=" ,k ,"*", N/k)
return True
return False
def prompt(result):
if result:
print("prime factorization is over")
def mainFun(N,process_num):
pool = multiprocessing.Pool(process_num)
for i in range(process_num):
if i ==0 :
begin =2
else:
begin = int(math.sqrt(N)/process_num*i)+1
end = int(math.sqrt(N)/process_num*(i+1))
pool.apply_async(findK, args=(N,begin,end) , callback = prompt)
pool.close()
pool.join()
if __name__ == "__main__":
N = 684568031001583853
process_num = 16 …Run Code Online (Sandbox Code Playgroud) 我有一些代码需要针对可能挂起或有不受我控制的问题的其他几个系统运行.我想使用python的多处理来生成子进程独立于主程序运行,然后当它们挂起或有问题终止它们时,但我不确定最好的方法来解决这个问题.
当终止被调用时,它确实会终止子进程,但它会变成一个已经失效的僵尸,直到进程对象消失才会被释放.循环永远不会结束的下面的示例代码可以杀死它并在再次调用时允许重新生成,但似乎不是一个很好的解决方法(即multiprocessing.Process()在__init __()中会更好.
有人有建议吗?
class Process(object):
def __init__(self):
self.thing = Thing()
self.running_flag = multiprocessing.Value("i", 1)
def run(self):
self.process = multiprocessing.Process(target=self.thing.worker, args=(self.running_flag,))
self.process.start()
print self.process.pid
def pause_resume(self):
self.running_flag.value = not self.running_flag.value
def terminate(self):
self.process.terminate()
class Thing(object):
def __init__(self):
self.count = 1
def worker(self,running_flag):
while True:
if running_flag.value:
self.do_work()
def do_work(self):
print "working {0} ...".format(self.count)
self.count += 1
time.sleep(1)
Run Code Online (Sandbox Code Playgroud) RabbieMQ Cluster不接受新连接!连接的套接字数量很少,Rabbitmq日志中唯一的消息是:
**警告**Mnesia超载
那是什么意思?我该如何解决?
我对python很新.我正在使用多处理模块读取stdin上的文本行,以某种方式转换它们并将它们写入数据库.这是我的代码片段:
batch = []
pool = multiprocessing.Pool(20)
i = 0
for i, content in enumerate(sys.stdin):
batch.append(content)
if len(batch) >= 10000:
pool.apply_async(insert, args=(batch,i+1))
batch = []
pool.apply_async(insert, args=(batch,i))
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
现在一切正常,直到我处理我输入我的python程序的巨大输入文件(数亿行).在某些时候,当我的数据库变慢时,我看到内存已满.
经过一番播放后,事实证明pool.apply_async以及pool.map_async永远不会阻塞,因此要处理的调用队列越来越大.
我的问题的正确方法是什么?我希望我能设置一个参数,一旦达到某个队列长度,就会阻塞pool.apply_async调用.Java中的AFAIR可以为ThreadPoolExecutor提供一个具有固定长度的BlockingQueue用于此目的.
谢谢!
python queue design-patterns multiprocessing python-multiprocessing
我有一个我想要使用多处理修改的元素列表.问题是对于某些特定的输入(在尝试之前不可观察),我的部分功能会停止.我已经在概念上用下面的代码展示了这个函数,其中函数sometimes_stalling_processing()偶尔会无限期地停止.
为了将其置于上下文中,我正在使用Web scraper处理一堆链接,并且即使在请求模块中使用超时,其中一些链接也会停止.我尝试了不同的方法(例如使用eventlet),但得出的结论是,在多处理级别处理它可能更容易.
def stable_processing(obs):
...
return processed_obs
def sometimes_stalling_processing(obs):
...
return processed_obs
def extract_info(obs):
new_obs = stable_processing(obs)
try:
new_obs = sometimes_stalling_processing(obs)
except MyTimedOutError: # error doesn't exist, just here for conceptual purposes
pass
return new_obs
pool = Pool(processes=n_threads)
processed_dataset = pool.map(extract_info, dataset)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
这个问题(如何在超时后中断多处理.Pool中的任务?)似乎非常相似,但我一直无法将其转换为使用map而不是apply.我也尝试过使用该eventlet软件包,但这不起作用.请注意,我使用的是Python 2.7.
如何pool.map()对个别观察结果进行超时并杀死sometimes_stalling_processing?
我正在尝试使用Python的multiprocessing.Pool对象来理解最佳实践.
在我的程序中,我经常使用Pool.imap.通常每次我并行启动任务时,我都会创建一个新的池对象,然后在完成后关闭它.
我最近遇到了挂起,其中提交到池的任务数量少于进程数.奇怪的是它只发生在我的测试管道中,它之前运行了很多东西.作为独立运行测试不会导致手.我认为它与制作多个池有关.
我真的很想找到一些资源来帮助我理解使用Python多处理的最佳实践.具体来说,我目前正在尝试了解制作多个池对象与仅使用一个池对象的含义.
我试图在循环中运行一些计算,每个计算都会创建、使用和关闭一个池。但计算只运行一次,然后抛出错误:“池未运行”。当然旧的没有运行,但新的不应该创建吗?
下面是一个简化的例子,类似于我的代码。更奇怪的是,在我的实际代码计算中,在崩溃之前运行了 7 次,所以我真的很困惑是什么问题。任何建议表示赞赏!
from pathos.multiprocessing import ProcessingPool as Pool
def add_two(number):
return (number + 2)
def parallel_function(numbers):
pool = Pool(10)
result = pool.imap(add_two, numbers)
pool.close()
pool.join()
return(result)
sets=[
[1, 2, 3],
[2, 3, 4],
[3, 4, 5]
]
for one_set in sets:
x = parallel_function(one_set)
for i in x:
print(i)
Run Code Online (Sandbox Code Playgroud) 我想使用多个进程(not threads)进行一些预处理并将结果排入tf.RandomShuffleQueue,我的主图可以使用它来进行训练.
有没有办法做到这一点 ?
我已将我的数据集转换为分割为256个分片的TFRecords.我想开始使用20个进程multiprocessing,让每个进程处理一系列分片.每个过程都应该读取图像,然后对它们进行扩充并将它们推入一个tf.RandomShuffleQueue可以将输入提供给图形进行训练的图像.
有人建议我通过这个inception例子tensorflow.但是,这是一种非常不同的情况,因为只有数据分片的读取是由多个线程(not processes)完成的,而预处理(例如 - 扩充)是在主线程中进行的.
我正在寻求设计一个基本上需要根据输入做出决策的系统。输入将是一个人。
class Person:
def __init__(self, name, age, sex, weight, height, nationality):
self.name = name
self.age = age
self.sex = sex
self.weight = weight
self.height = height
self.nationality = nationality
Run Code Online (Sandbox Code Playgroud)
我们希望根据某些规则将每个人分配到一个学校班级。
例如:
22至25岁之间的英国女性应进入B级。75岁以上的男性应进入A级。6英尺以上的女性应进入C级。
我们将有大约400个不同的规则,并且应该应用第一个满足的规则-我们需要保持规则的顺序。
我正在考虑如何在此处存储/表示规则。显然,您可能只想发表一段很长的if, elif, elif声明,但这并不有效。另一种选择是将规则存储在数据库中,并可能在内存表中。
我希望能够在不发布规则的情况下编辑规则-可能具有前端,以允许非技术人员添加,删除和重新排序规则。
一切都在这里-唯一的要求是实际的编程语言必须是Python。
添加了更多内容
我想我的问题是如何存储规则。目前,这是一条很长的if elif elif语句,因此只要业务逻辑发生变化,PM都会制定新规则,然后将它们转换为if语句。
系统的所有输入将通过相同的规则列表发送,并且将应用匹配的第一个规则。多个规则可以应用于每个输入,但始终是第一个应用的规则。
例如
25岁以上的
女性去B级女性去A级。
即使第二条规则也适用,所有25岁以上的女性都将被送入B级。
输入将始终包含相同格式的输入-尚未确定它是对象还是字典的位置,但其中一些值可能是None。有些人可能没有与之相关的体重。
我想共享BlockingChannel多个python进程。为了basic_ack从其他python进程发送
。
如何BlockingChannel在多个python进程之间共享。
以下是代码:
self.__connection__ = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.__channel__ = self.__connection__.channel()
Run Code Online (Sandbox Code Playgroud)
我尝试转储使用,pickle但它确实允许转储频道并can't pickle select.epoll objects
使用以下代码给出错误
filepath = "temp/" + "merger_channel.sav"
pickle.dump(self.__channel__, open(filepath, 'wb'))
Run Code Online (Sandbox Code Playgroud)
目标:
目标是basic_ack从其他python进程的通道发送。
python ×10
rabbitmq ×2
callback ×1
class ×1
function ×1
pathos ×1
pika ×1
python-3.x ×1
queue ×1
rules ×1
tensorflow ×1
threadpool ×1