在我的程序中,我需要在使用Python进行多处理的进程之间共享一个字典。我简化了代码,在此举一个例子:
import multiprocessing
def folding (return_dict, seq):
dis = 1
d = 0
ddg = 1
'''This is irrelevant, actually my program sends seq parameter to other extern program that returns dis, d and ddg parameters'''
return_dict [seq] = [dis, d, ddg]
seqs = ['atcgtg', 'agcgatcg', 'atcgatcgatc', atcggatcg', agctgctagct']
manager = Manager()
return_dict = manager.dict()
n_cores = 3
for i in range (0, len(seqs), n_cores) #n_cores is the number of cores availables in the computer, defined by the user
subseqs …Run Code Online (Sandbox Code Playgroud) 有什么方法可以使用multiprocessing数据结构创建发布/订阅模式?换句话说,我希望有一个类似队列的东西,除了发布者可以同时向多个工作程序发送单个命令。
我假设多处理程序包使用pickle在进程之间发送消息。但是,泡菜要注意对象的__getstate__和__setstate__方法。多处理似乎忽略了它们。它是否正确?我感到困惑吗?
要复制,请安装docker,然后在命令行中键入
$ docker run python:3.4 python -c "import pickle
import multiprocessing
import os
class Tricky:
def __init__(self,x):
self.data=x
def __setstate__(self,d):
self.data=10
def __getstate__(self):
return {}
def report(ar,q):
print('running report in pid %d, hailing from %d'%(os.getpid(),os.getppid()))
q.put(ar.data)
print('module loaded in pid %d, hailing from pid %d'%(os.getpid(),os.getppid()))
if __name__ == '__main__':
print('hello from pid %d'%os.getpid())
ar = Tricky(5)
q = multiprocessing.Queue()
p = multiprocessing.Process(target=report, args=(ar, q))
p.start()
p.join()
print(q.get())
print(pickle.loads(pickle.dumps(ar)).data)"
Run Code Online (Sandbox Code Playgroud)
你应该得到类似
module loaded in pid 1, hailing …Run Code Online (Sandbox Code Playgroud) 我有一个TCP服务器和客户端.在服务器脚本的某个时刻,我启动一个进程,该进程需要能够获取每个新连接并向其发送数据.为了做到这一点,我有一个multiprocessing.Queue(),我想从主进程中放入每个新连接,以便我打开的进程可以从中获取连接并将数据发送给它们.但是,您似乎无法将任何想要的内容传递给队列.当我尝试传递连接(套接字对象)时,我得到:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed
send(obj)
TypeError: expected string or Unicode object, NoneType found
Run Code Online (Sandbox Code Playgroud)
我可以使用任何替代品吗?
我想使用multiprocessing.Pool,但是multiprocessing.Pool不能在超时后中止任务。我找到了解决方案,并对其进行了一些修改。
from multiprocessing import util, Pool, TimeoutError
from multiprocessing.dummy import Pool as ThreadPool
import threading
import sys
from functools import partial
import time
def worker(y):
print("worker sleep {} sec, thread: {}".format(y, threading.current_thread()))
start = time.time()
while True:
if time.time() - start >= y:
break
time.sleep(0.5)
# show work progress
print(y)
return y
def collect_my_result(result):
print("Got result {}".format(result))
def abortable_worker(func, *args, **kwargs):
timeout = kwargs.get('timeout', None)
p = ThreadPool(1)
res = p.apply_async(func, args=args)
try:
# Wait timeout …Run Code Online (Sandbox Code Playgroud) python multithreading multiprocessing python-multithreading python-multiprocessing
我正在使用Python(2.7)和pymongo(3.3),我需要生成一个子进程来异步运行一个作业.不幸的是pymongo不像这里描述的那样是fork-safe (我需要在生成子进程之前与db进行交互).
我运行了一个实验subprocess.Popen(使用shellset to True然后False)和multiprocessing.Process.据我所知,他们都分叉父进程来创建子进程,但只multiprocessing.Process导致pymongo打印它已检测到分叉进程的警告.
我想知道这样做的pythonic方式是什么.似乎也许os.system会为我做这件事,但subprocess被描述为一个预定的替代品,os.system所以我想知道我是否遗漏了一些东西.
大家晚上好,我正在尝试创建互联网机器人,我在将我的脚本从python 3.4迁移到3.5或3.6+时遇到了问题.它使用asyncio并且在3.4 python上运行良好但是当我用python3.5 +启动时我得到了错误:RuntimeError: Cannot run the event loop while another loop is running
这是代码方案:
import multiprocessing as mp
import asyncio
import concurrent.futures
import aiohttp
def create_proccesses(separate_loop_creator, coro):
proccesses = []
for n in range(2):
proc = mp.Process(target=separate_loop_creator, args=(coro,))
proc.start()
proccesses.append(proc)
for p in proccesses:
p.join()
def separate_loop_creator(coro):
sep_loop = asyncio.new_event_loop()
asyncio.set_event_loop(sep_loop)
tasks = [asyncio.async(coro(sep_loop)) for _ in range(100)]
try:
sep_loop.run_until_complete(asyncio.wait(tasks))
sep_loop.close()
except Exception as err:
print(err)
for task in tasks:
task.cancel()
sep_loop.close()
@asyncio.coroutine
def manager(exe, loop):
# some …Run Code Online (Sandbox Code Playgroud) python python-3.4 python-asyncio python-multiprocessing python-3.5
我正在尝试使用ProcessPoolExecutor,但是出现错误“队列对象仅应通过继承在进程之间共享”,但我没有使用队列(至少没有明确地使用)。我找不到任何可以解释我做错事情的信息。
这是一些演示问题的代码(不是我的实际代码):
from concurrent.futures import ProcessPoolExecutor, as_completed
class WhyDoesntThisWork:
def __init__(self):
self.executor = ProcessPoolExecutor(4)
def execute_something(self, starting_letter):
futures = [self.executor.submit(self.something, starting_letter, d) for d in range(4)]
letter = None
for future in as_completed(futures):
letter = future.result()
print(letter)
def something(self, letter, d):
# do something pointless for the example
for x in range(d):
letter = chr(ord(letter) + 1)
if __name__ == '__main__':
WhyDoesntThisWork(). execute_something('A')
Run Code Online (Sandbox Code Playgroud)
El Ruso指出,使something()成为静态方法或类方法会使错误消失。不幸的是,我的实际代码需要使用self调用其他方法。
在Python中使用多处理时,我通常会看到一些示例,其中join()函数在一个单独的循环中调用,以实际创建每个进程.
例如,这个:
processes = []
for i in range(10):
p = Process(target=my_func)
processes.append(p)
p.start()
for p in processes:
p.join()
Run Code Online (Sandbox Code Playgroud)
比这更常见:
processes = []
for i in range(10):
p = Process(target=my_func)
processes.append(p)
p.start()
p.join()
Run Code Online (Sandbox Code Playgroud)
但是根据我的理解join(),它只是告诉脚本在该过程完成之前不要退出.因此,join()调用何时无关紧要.那么为什么通常在一个单独的循环中调用呢?
我对向量化了解不多,但是我想了解为什么像python这样的语言无法通过库接口在可迭代对象上提供向量化,就像它提供线程支持一样。我知道许多numpy方法都是矢量化的,但是对于通用计算必须使用numpy可能会受到限制。
我目前的理解是,即使python与“ SIMD”模式匹配,它们也无法向量化它们。例如,理论上不应该对列表的理解或对该map()函数的使用进行矢量化处理,因为它们会输出一个列表,这是对来自输入列表的独立输入运行相同功能的结果吗?
凭着我的敏锐理解,似乎map()从理论上讲,无论何时我使用,我都应该能够创建一个代表该功能的指令集。那么输入中的每个元素只需通过已编译的同一函数运行即可。设计一种工具的技术挑战是什么,该工具会simd_map(func, iterable)尝试func“及时” 编译,然后从iterable处理器中提取批输入并利用处理器的simd功能来运行这些批处理func()?
谢谢!
python parallel-processing simd vectorization python-multiprocessing
python ×9
dictionary ×1
pickle ×1
pymongo ×1
python-2.7 ×1
python-2.x ×1
python-3.4 ×1
python-3.5 ×1
queue ×1
simd ×1
sockets ×1
subprocess ×1