标签: python-multiprocessing

使用python在多处理中共享字典

在我的程序中,我需要在使用Python进行多处理的进程之间共享一个字典。我简化了代码,在此举一个例子:

import multiprocessing
def folding (return_dict, seq):
    dis = 1
    d = 0
    ddg = 1 
    '''This is irrelevant, actually my program sends seq parameter to other extern program that returns dis, d and ddg parameters'''
    return_dict [seq] = [dis, d, ddg]

seqs = ['atcgtg', 'agcgatcg', 'atcgatcgatc', atcggatcg', agctgctagct']
manager = Manager()
return_dict = manager.dict()
n_cores = 3

for i in range (0, len(seqs), n_cores) #n_cores is the number of cores availables in the computer, defined by the user
    subseqs …
Run Code Online (Sandbox Code Playgroud)

python dictionary python-multiprocessing

1
推荐指数
1
解决办法
2322
查看次数

如何使用多重处理实现发布/订阅模式?

有什么方法可以使用multiprocessing数据结构创建发布/订阅模式?换句话说,我希望有一个类似队列的东西,除了发布者可以同时向多个工作程序发送单个命令。

python queue multiprocessing python-multiprocessing

1
推荐指数
1
解决办法
1793
查看次数

多处理忽略“ __setstate__”

我假设多处理程序包使用pickle在进程之间发送消息。但是,泡菜要注意对象的__getstate____setstate__方法。多处理似乎忽略了它们。它是否正确?我感到困惑吗?

要复制,请安装docker,然后在命令行中键入

$ docker run python:3.4 python -c "import pickle
import multiprocessing
import os

class Tricky:
    def __init__(self,x):
        self.data=x

    def __setstate__(self,d):
        self.data=10

    def __getstate__(self):
        return {}

def report(ar,q):
    print('running report in pid %d, hailing from %d'%(os.getpid(),os.getppid()))
    q.put(ar.data)

print('module loaded in pid %d, hailing from pid %d'%(os.getpid(),os.getppid()))
if __name__ == '__main__':
    print('hello from pid %d'%os.getpid())
    ar = Tricky(5)
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=report, args=(ar, q))
    p.start()
    p.join()
    print(q.get())
    print(pickle.loads(pickle.dumps(ar)).data)"
Run Code Online (Sandbox Code Playgroud)

你应该得到类似

module loaded in pid 1, hailing …
Run Code Online (Sandbox Code Playgroud)

python pickle multiprocessing python-multiprocessing

1
推荐指数
1
解决办法
233
查看次数

Python - 将TCP套接字对象传递给多处理队列

我有一个TCP服务器和客户端.在服务器脚本的某个时刻,我启动一个进程,该进程需要能够获取每个新连接并向其发送数据.为了做到这一点,我有一个multiprocessing.Queue(),我想从主进程中放入每个新连接,以便我打开的进程可以从中获取连接并将数据发送给它们.但是,您似乎无法将任何想要的内容传递给队列.当我尝试传递连接(套接字对象)时,我得到:

Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed
    send(obj)
TypeError: expected string or Unicode object, NoneType found
Run Code Online (Sandbox Code Playgroud)

我可以使用任何替代品吗?

sockets python-2.7 python-multiprocessing

1
推荐指数
1
解决办法
2354
查看次数

Python多处理池超时

我想使用multiprocessing.Pool,但是multiprocessing.Pool不能在超时后中止任务。我找到了解决方案,并对其进行了一些修改。

from multiprocessing import util, Pool, TimeoutError
from multiprocessing.dummy import Pool as ThreadPool
import threading
import sys
from functools import partial
import time


def worker(y):
    print("worker sleep {} sec, thread: {}".format(y, threading.current_thread()))
    start = time.time()
    while True:
       if time.time() - start >= y:
           break
       time.sleep(0.5)
       # show work progress
       print(y)
    return y


def collect_my_result(result):
    print("Got result {}".format(result))


def abortable_worker(func, *args, **kwargs):
    timeout = kwargs.get('timeout', None)
    p = ThreadPool(1)
    res = p.apply_async(func, args=args)
    try:
        # Wait timeout …
Run Code Online (Sandbox Code Playgroud)

python multithreading multiprocessing python-multithreading python-multiprocessing

1
推荐指数
1
解决办法
3451
查看次数

在Python中生成一个没有分叉的进程

我正在使用Python(2.7)和pymongo(3.3),我需要生成一个子进程来异步运行一个作业.不幸的是pymongo不像这里描述的那样是fork-safe (我需要在生成子进程之前与db进行交互).

我运行了一个实验subprocess.Popen(使用shellset to True然后False)和multiprocessing.Process.据我所知,他们都分叉父进程来创建子进程,但只multiprocessing.Process导致pymongo打印它已检测到分叉进程的警告.

我想知道这样做的pythonic方式是什么.似乎也许os.system会为我做这件事,但subprocess被描述为一个预定的替代品,os.system所以我想知道我是否遗漏了一些东西.

python subprocess python-2.x pymongo python-multiprocessing

1
推荐指数
1
解决办法
2144
查看次数

python asyncio从3.4迁移到3.5+

大家晚上好,我正在尝试创建互联网机器人,我在将我的脚本从python 3.4迁移到3.5或3.6+时遇到了问题.它使用asyncio并且在3.4 python上运行良好但是当我用python3.5 +启动时我得到了错误:RuntimeError: Cannot run the event loop while another loop is running

这是代码方案:

import multiprocessing as mp
import asyncio
import concurrent.futures
import aiohttp

def create_proccesses(separate_loop_creator, coro):
    proccesses = []
    for n in range(2):
        proc = mp.Process(target=separate_loop_creator, args=(coro,))
        proc.start()
        proccesses.append(proc)
    for p in proccesses:
        p.join()

def separate_loop_creator(coro):
    sep_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(sep_loop)
    tasks = [asyncio.async(coro(sep_loop)) for _ in range(100)]
    try:
        sep_loop.run_until_complete(asyncio.wait(tasks))
        sep_loop.close()
    except Exception as err:
        print(err)
        for task in tasks:
            task.cancel()
        sep_loop.close()


@asyncio.coroutine
def manager(exe, loop):
    # some …
Run Code Online (Sandbox Code Playgroud)

python python-3.4 python-asyncio python-multiprocessing python-3.5

1
推荐指数
1
解决办法
1283
查看次数

获取“队列对象仅应通过继承在进程之间共享”,但我没有使用队列

我正在尝试使用ProcessPoolExecutor,但是出现错误“队列对象仅应通过继承在进程之间共享”,但我没有使用队列(至少没有明确地使用)。我找不到任何可以解释我做错事情的信息。

这是一些演示问题的代码(不是我的实际代码):

from concurrent.futures import ProcessPoolExecutor, as_completed

class WhyDoesntThisWork:

    def __init__(self):
        self.executor = ProcessPoolExecutor(4)

    def execute_something(self, starting_letter):
        futures = [self.executor.submit(self.something, starting_letter, d) for d in range(4)]
        letter = None
        for future in as_completed(futures):
            letter = future.result()
        print(letter)

    def something(self, letter, d):
        # do something pointless for the example
        for x in range(d):
            letter = chr(ord(letter) + 1)

if __name__ == '__main__':
    WhyDoesntThisWork(). execute_something('A')
Run Code Online (Sandbox Code Playgroud)

El Ruso指出,使something()成为静态方法或类方法会使错误消失。不幸的是,我的实际代码需要使用self调用其他方法。

python concurrent.futures python-multiprocessing

1
推荐指数
1
解决办法
1112
查看次数

多处理时在哪里调用join()

在Python中使用多处理时,我通常会看到一些示例,其中join()函数在一个单独的循环中调用,以实际创建每个进程.

例如,这个:

processes = []

for i in range(10):
    p = Process(target=my_func)
    processes.append(p)
    p.start()

for p in processes:
    p.join()
Run Code Online (Sandbox Code Playgroud)

比这更常见:

processes = []

for i in range(10):
    p = Process(target=my_func)
    processes.append(p)
    p.start()
    p.join()
Run Code Online (Sandbox Code Playgroud)

但是根据我的理解join(),它只是告诉脚本在该过程完成之前不要退出.因此,join()调用何时无关紧要.那么为什么通常在一个单独的循环中调用呢?

python python-multiprocessing

1
推荐指数
1
解决办法
47
查看次数

python为什么不能向量化map()或列表推导

我对向量化了解不多,但是我想了解为什么像python这样的语言无法通过库接口在可迭代对象上提供向量化,就像它提供线程支持一样。我知道许多numpy方法都是矢量化的,但是对于通用计算必须使用numpy可能会受到限制。

我目前的理解是,即使python与“ SIMD”模式匹配,它们也无法向量化它们。例如,理论上不应该对列表的理解或对该map()函数的使用进行矢量化处理,因为它们会输出一个列表,这是对来自输入列表的独立输入运行相同功能的结果吗?

凭着我的敏锐理解,似乎map()从理论上讲,无论何时我使用,我都应该能够创建一个代表该功能的指令集。那么输入中的每个元素只需通过已编译的同一函数运行即可。设计一种工具的技术挑战是什么,该工具会simd_map(func, iterable)尝试func“及时” 编译,然后从iterable处理器中提取批输入并利用处理器的simd功能来运行这些批处理func()

谢谢!

python parallel-processing simd vectorization python-multiprocessing

1
推荐指数
1
解决办法
78
查看次数