标签: python-multiprocessing

Python多处理重新启动脚本?

multi processing在Python中使用。以下是我的代码的演示:

在功能上main

from multiprocessing import Process

def __name__ == "__main__":
    print "Main program starts here."

    SOME CODE....

    process_1 = Process(target=proc1,  args = (arg1, arg2))
    process_2 = Process(target=proc2,  args = (arg3, arg4))
    
    process_1.start()
    process_2.start()
    
    process_1.join()
    process_2.join()
Run Code Online (Sandbox Code Playgroud)

在函数proc1和中proc2

def proc1(arg1, arg2):
    print "Proc1 starts from here."

    SOME CODE....
Run Code Online (Sandbox Code Playgroud)

所以我期望看到的输出是:

主程序从这里开始。

Proc1从这里开始。

Proc2从这里开始。

然而,我得到的是:

主程序从这里开始。

主程序从这里开始。

主程序从这里开始。

看来 和 都proc1启动proc2main而不是进程。

我可以知道我的代码有什么问题吗?

非常感谢。

python python-multiprocessing

1
推荐指数
1
解决办法
701
查看次数

Python requests 模块多线程

有没有可能使用多处理接口来加速我的代码?问题是这个接口使用了map函数,它只适用于1个函数。但我的代码有3个功能。我尝试将我的功能合并为一个,但没有成功。我的脚本从文件中读取站点的 URL 并对其执行 3 个功能。For 循环使它非常慢,因为我有很多 URL

import requests

def Login(url): #Log in     
    payload = {
        'UserName_Text'     : 'user',
        'UserPW_Password'   : 'pass',
        'submit_ButtonOK'   : 'return buttonClick;'  
      }

    try:
        p = session.post(url+'/login.jsp', data = payload, timeout=10)
    except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
        print "site is DOWN! :", url[8:]
        session.cookies.clear()
        session.close() 
    else:
        print 'OK: ', p.url

def Timer(url): #Measure request time
    try:
        timer = requests.get(url+'/login.jsp').elapsed.total_seconds()
    except (requests.exceptions.ConnectionError):
        print 'Request time: None'
        print '-----------------------------------------------------------------'
    else: 
        print 'Request time:', round(timer, 2), 'sec'

def Logout(url): # Log out …
Run Code Online (Sandbox Code Playgroud)

python multithreading python-2.7 python-requests python-multiprocessing

1
推荐指数
1
解决办法
7558
查看次数

Pathos:在 Linux 上强制生成

我有可以在 Windows 上运行的 Python 代码,但是在 Linux 上运行时它就会挂起。我正在使用 JPype,因此我怀疑多个共享进程尝试使用同一管道访问 Java 可能存在一些问题(创建了不同的进程,但挂在 JPype 行)。有没有办法强制在 Pathos 中生成以复制 Windows 实现?(例如常规多处理库中的 set_start_method 或 get_context ?)

谢谢。

linux spawn python-multiprocessing pathos python-3.5

1
推荐指数
1
解决办法
1086
查看次数

围绕现有数组创建共享内存(python)

我在任何地方看到 python 的共享内存实现(例如在 参考资料中multiprocessing),创建共享内存总是分配新内存。有没有办法创建共享内存对象并让它引用现有内存?目的是预先初始化数据值,或者更确切地说,如果我们已经有一个数组,则可以避免复制到新的共享内存中。根据我的经验,分配大型共享数组比将值复制到其中要快得多。

python shared-memory python-multiprocessing

1
推荐指数
1
解决办法
1258
查看次数

Python 进程未清理以供重用

流程未清理以供重复使用

你好呀,

我偶然发现了一个问题ProcessPoolExecutor,进程访问数据时,它们不应该能够。让我解释:

我遇到的情况类似于下面的示例:我进行了多次运行,每次都以不同的参数开始。他们并行计算自己的东西,没有理由互相交互。现在,据我了解,当一个进程分叉时,它会复制自身。子进程与其父进程具有相同的(内存)数据,但如果它更改任何内容,它会在自己的副本上进行更改。如果我希望更改在子进程的生命周期内持续存在,我会调用队列、管道和其他 IPC 内容。

但其实我不知道!每个进程都为自己操作数据,这些数据不应传递到任何其他运行。不过,下面的示例显示了不同的情况。下一次运行(不是并行运行的运行)可以访问上一次运行的数据,这意味着数据尚未从进程中清除。

代码/示例

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import current_process, set_start_method

class Static:
    integer: int = 0

def inprocess(run: int) -> None:
    cp = current_process()
    # Print current state
    print(f"[{run:2d} {cp.pid} {cp.name}] int: {Static.integer}", flush=True)

    # Check value
    if Static.integer != 0:
        raise Exception(f"[{run:2d} {cp.pid} {cp.name}] Variable already set!")

    # Update value
    Static.integer = run + 1

def pooling():
    cp = current_process()
    # Get master's pid
    print(f"[{cp.pid} {cp.name}] Start")
    with ProcessPoolExecutor(max_workers=2) as …
Run Code Online (Sandbox Code Playgroud)

python python-3.x concurrent.futures python-multiprocessing

1
推荐指数
1
解决办法
1491
查看次数

如何确保Python多处理中所有处理器都得到利用?

我是多处理概念的新手。

我的代码

from multiprocessing import Process
def square(x):
 
    for x in numbers:
        print('%s squared  is  %s' % (x, x**2))
 
if __name__ == '__main__':
    numbers = [43, 50, 5, 98, 34, 35]
 
    p = Process(target=square, args=('x',))
    p.start()
    p.join
    print "Done"
     
Run Code Online (Sandbox Code Playgroud)

结果

Done
43 squared  is  1849
50 squared  is  2500
5 squared  is  25
98 squared  is  9604
34 squared  is  1156
35 squared  is  1225
Run Code Online (Sandbox Code Playgroud)

我明白了,我们可以用来multiprocessing.cpu_count()获取系统中的CPU数量

然而,我未能实现两件感兴趣的事情。-

  1. 将所有任务平均分配给所有CPU
  2. 检查哪个CPU被哪个进程使用

python parallel-processing multiprocessing python-multiprocessing

1
推荐指数
1
解决办法
79
查看次数

Python 中的多重处理:处理多个工作线程

在我的代码中,我需要在 python 程序中运行多个工作线程实例。我最初创建了几个工作线程实例(比如 10 个),然后将它们添加到池中。每当客户端请求服务时,就应该调用并为客户端保留一个线程。完成任务后,线程应添加回池中。

到目前为止我已经编写了以下代码。但我不确定如何在池中永远运行线程(它们应该在池内休眠),在需要时调用并获取服务,并在处理后将它们添加回池中(应该再次休眠)。任何帮助,将不胜感激。

PRED = Queue(10)

class Worker(threading.Thread):
    def __init__(self, threadID, name):
        threading.Thread.__init__(self)
        self.threadID  =threadID
        self.name = name

    def run(self):
        print("starting " + self.name + " thread")
        while True:
            ??
        print("Exiting " + self.name + " thread")


def work():
    print("working")
    time.sleep(3)
Run Code Online (Sandbox Code Playgroud)
  • 假设工作线程位于 PRED 队列中。
  • work() 是我应该调用来为客户提供服务的方法。

python multithreading python-3.x python-multiprocessing

1
推荐指数
1
解决办法
1万
查看次数

Python 3 asyncio 和 GIL(如何使用所有 cpu 内核 - 除 ProcessPoolExecutor 之外的任何其他选项)?

如何使用所有 cpu 内核进行 asyncio - 除了 ProcessPoolExecutor 之外的任何其他选项?

我认为 asyncio 无法打破 GIL 限制(也许我错了),因此程序的执行速度将比踩踏版本快,但会在一个核心上执行。

我研究了一些例子,我发现一种方法是多处理和 ProcessPoolExecutor。

https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor

# 3. Run in a custom process pool:
with concurrent.futures.ProcessPoolExecutor() as pool:
    result = await loop.run_in_executor(
        pool, cpu_bound)
    print('custom process pool', result)
Run Code Online (Sandbox Code Playgroud)

这很好,但需要在进程之间进行“pickle”,因此需要一些开销并对传递的参数进行一些优化以减少“pickle”序列化。

使用上面这个简单的模式,我写了这样的测试代码(如果你不喜欢它,你可以跳过这段代码阅读,因为它和以前一样)。顺便说一句,这是我解析文件问题的最快解决方案。这部分代码不是整个程序。

def _match_general_and_specific_file_chunk(file_name):
    with codecs.open(file_name, encoding='utf8') as f:
        while True:
            lines = f.readlines(sizehint=10000)
            if not lines:
                break
            for line in lines:
                general_match = RE_RULES.match(line)
                if general_match:
                    specific_match = RULES[general_match.lastindex].match(line)
                    groups = list(specific_match.groups())
                    continue


async def _async_process_executor_match_general_and_specific_read_lines_with_limit_file_chunk():
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor() …
Run Code Online (Sandbox Code Playgroud)

python python-3.x python-asyncio python-multiprocessing

1
推荐指数
1
解决办法
2585
查看次数

Python 多处理并行写入单个 gzip

我正在尝试使用 python 将一个大的压缩文件(.gz)复制到另一个压缩文件(.gz)。我将对代码示例中不存在的数据执行中间处理。我希望能够使用带有锁的多处理来从多个进程并行写入新的 gzip,但我在输出 gz 文件上收到无效格式错误。

我认为这是因为锁不足以支持并行写入 gzip。由于压缩数据需要“了解”之前的数据,以便将正确的条目写入存档中,因此我认为 python 默认情况下无法处理此问题。我猜想每个进程都会保持自己对 gzip 输出的感知,并且这种状态在第一次写入后会有所不同。

如果我在不使用 gzip 的情况下打开脚本中的目标文件,那么这一切都有效。我还可以写入多个 gzip 并将它们合并,但如果可能的话更愿意避免这种情况。

这是我的源代码:

#python3.8
import gzip
from itertools import islice
from multiprocessing import Process, Queue, Lock

def reader(infile, data_queue, coordinator_queue, chunk_size):
    print("Reader Started.")
    while True:
        data_chunk = list(islice(infile, chunk_size))
        data_queue.put(data_chunk)
        coordinator_queue.put('CHUNK_READ')
        if not data_chunk:
            coordinator_queue.put('READ_DONE')
            #Process exit
            break
        
def writer(outfile, data_queue, coordinator_queue, write_lock, ID):
    print("Writer Started.")
    while True:
        queue_message = data_queue.get()
        if (queue_message == 'DONE'):
            outfile.flush() 
            coordinator_queue.put('WRITE_DONE')
            #Process exit
            break
        else:
            print("Writer",ID,"-","Write Lock:",write_lock) …
Run Code Online (Sandbox Code Playgroud)

compression gzip python-multiprocessing

1
推荐指数
1
解决办法
1669
查看次数

两个进程之间如何共享数据?

如何与另一个流程共享一个流程的价值?显然我可以通过多线程而不是多处理来做到这一点。多线程对于我的程序来说很慢。

我无法显示我的确切代码,所以我做了这个简单的例子。

from multiprocessing import Process
from threading import Thread
import time

class exp:
    def __init__(self):
        self.var1 = 0
            
    def func1(self):

        self.var1 = 5
        print(self.var1)

    def func2(self):

        print(self.var1) 


if __name__ == "__main__":

    #multithreading
    obj1 = exp()
    t1 = Thread(target = obj1.func1)
    t2 = Thread(target = obj1.func2)
    print("multithreading")
    t1.start()
    time.sleep(1)
    t2.start()

    time.sleep(3)


    #multiprocessing
    obj = exp()
    p1 = Process(target = obj.func1)
    p2 = Process(target = obj.func2)

    print("multiprocessing")
    p1.start()
    time.sleep(2)
    p2.start()
Run Code Online (Sandbox Code Playgroud)

预期输出

from multiprocessing import Process
from threading import Thread
import …
Run Code Online (Sandbox Code Playgroud)

python multiprocessing python-multithreading python-3.x python-multiprocessing

1
推荐指数
1
解决办法
2464
查看次数