标签: python-multiprocessing

Python - 使用队列时不会关闭多处理线程

这适用于 Python 3.x

我正在从 CSV 文件中以 300 个块为单位加载记录,然后生成工作线程以将它们提交给 REST API。我将 HTTP 响应保存在队列中,以便在处理整个 CSV 文件后获得跳过记录数的计数。但是,在我向我的工作人员添加一个队列之后,线程似乎不再关闭。我想监视线程数有两个原因:(1) 一旦全部完成,我可以计算并显示跳过计数;(2) 我想增强我的脚本以生成不超过 20 个左右的线程,所以我不要耗尽内存。

我有两个问题:

  • 有人可以解释为什么线程在使用时保持活动状态q.put()吗?
  • 是否有不同的方法来管理线程数并监视所有线程是否已完成?

这是我的代码(有些简化,因为我无法分享我正在调用的 API 的确切细节):

import requests, json, csv, time, datetime, multiprocessing

TEST_FILE = 'file.csv'

def read_test_data(path, chunksize=300):
    leads = []
    with open(path, 'rU') as data:
        reader = csv.DictReader(data)
        for index, row in enumerate(reader):
            if (index % chunksize == 0 and index > 0):
                yield leads
                del leads[:]
            leads.append(row)
        yield leads

def worker(leads, q):
    payload = {"action":"createOrUpdate","input":leads}
    r = …
Run Code Online (Sandbox Code Playgroud)

python queue multithreading python-3.x python-multiprocessing

2
推荐指数
1
解决办法
4029
查看次数

键盘中断与python的多处理池和映射功能

我发现这篇文章解释了如何使用 ctr+c 终止正在运行的多处理代码。以下代码完全正常工作(可以使用 ctrl+c 终止它):

#!/usr/bin/env python

# Copyright (c) 2011 John Reese
# Licensed under the MIT License

import multiprocessing
import os
import signal
import time

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def run_worker():
    time.sleep(15)

def main():
    print "Initializng 5 workers"
    pool = multiprocessing.Pool(5, init_worker)

    print "Starting 3 jobs of 15 seconds each"
    for i in range(3):
        pool.apply_async(run_worker)

    try:
        print "Waiting 10 seconds"
        time.sleep(10)

    except KeyboardInterrupt:
        print "Caught KeyboardInterrupt, terminating workers"
        pool.terminate()
        pool.join()

    else:
        print "Quitting normally"
        pool.close()
        pool.join() …
Run Code Online (Sandbox Code Playgroud)

python python-multiprocessing

2
推荐指数
1
解决办法
5439
查看次数

joblib的中间结果

我正在尝试学习该joblib模块,以替代multiprocessingpython中的内置模块。我习惯于multiprocessing.imap在一个可迭代的函数上运行一个函数,并在返回结果时返回结果。在这个最小的工作示例中,我不知道如何使用joblib来做到这一点:

import joblib, time

def hello(n):
    time.sleep(1)
    print "Inside function", n
    return n

with joblib.Parallel(n_jobs=1) as MP:

    func = joblib.delayed(hello)
    for x in MP(func(x) for x in range(3)):
        print "Outside function", x
Run Code Online (Sandbox Code Playgroud)

哪些打印:

Inside function 0
Inside function 1
Inside function 2
Outside function 0
Outside function 1
Outside function 2
Run Code Online (Sandbox Code Playgroud)

我想看看输出:

Inside function 0
Outside function 0
Inside function 1
Outside function 1
Inside function 2
Outside function 2
Run Code Online (Sandbox Code Playgroud)

或类似的内容,表明迭代MP(...)器没有等待所有结果完成。对于更长的演示更改n_jobs=-1 …

python parallel-processing multiprocessing joblib python-multiprocessing

2
推荐指数
2
解决办法
1096
查看次数

在Python的Multiprocessing库中获取Manager.Queue的长度

我有一个队列,其中包含我希望一组进程执行的所有工作.我想获得此队列中剩余的元素数量,并想知道如何做到这一点?该LEN功能似乎并没有工作,虽然我可以重复的队列中获取的每个元素,并把它放回,直到我走了一圈,我宁愿避免这种情况的畏缩编码的原因.

python python-2.7 python-multiprocessing

2
推荐指数
1
解决办法
6272
查看次数

Python 触发文件更改事件

我有一组由应用程序创建的 100 个文件。文件不是按顺序动态更新的。使用 Python,我正在尝试读取文件。但是,我不知道哪个文件在什么时间更新。

我不想每次都遍历每个文件来检查实例中更新了哪些文件。我可以创建多个进程/线程来触发文件更新的主进程。有没有其他方式像文件更新可以通知主python进程,以便只读取那些文件??

谢谢。

python-2.7 python-multiprocessing

2
推荐指数
1
解决办法
5686
查看次数

多处理 - 使用 Managers 命名空间来节省内存

我有几个进程,每个进程都需要一个大的 numpy 数组来完成任务,这只是被读取(线程正在搜索它以寻找适当的值)。

如果每个进程加载数据,我会收到内存错误。

因此,我试图通过使用管理器在进程之间共享相同的数组来最小化内存使用量。

但是我仍然收到内存错误。我可以在主进程中加载一次数组,但是当我尝试将其作为管理器命名空间的属性时,我收到了内存错误。我假设管理器就像指针一样,并允许单独的进程(通常只能访问自己的内存)也可以访问这个共享内存。但是错误提到了酸洗:

Traceback (most recent call last):
  File <PATH>, line 63, in <module>
    ns.pp = something
  File "C:\Program Files (x86)\Python35-32\lib\multiprocessing\managers.py", line 1021, in __setattr__
    return callmethod('__setattr__', (key, value))
  File "C:\Program Files (x86)\Python35-32\lib\multiprocessing\managers.py", line 716, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "C:\Program Files (x86)\Python35-32\lib\multiprocessing\connection.py", line 206, in send
    self._send_bytes(ForkingPickler.dumps(obj))
  File "C:\Program Files (x86)\Python35-32\lib\multiprocessing\reduction.py", line 50, in dumps
    cls(buf, protocol).dump(obj)
MemoryError
Run Code Online (Sandbox Code Playgroud)

我假设 numpy 数组在分配给经理时实际上正在被复制,但我可能错了。

更令人恼火的是,我在一台具有 32GB 内存的机器上,看着内存使用情况,它只会在崩溃前稍微增加一点,最多可能增加 …

python memory numpy out-of-memory python-multiprocessing

2
推荐指数
1
解决办法
4374
查看次数

Python Kafka多进程与线程

我可以用来KafkaConsumer在单独的线程中使用消息。

但是,当我使用multiprocessing.Process而不是时threading.Thread,出现错误:

OSError: [Errno 9] Bad file descriptor

问题文档表明使用多处理并行使用消息是可能的。有人可以分享一个有效的例子吗?

编辑

这是一些示例代码。抱歉,原始代码太复杂了,因此我在这里创建了一个示例,希望可以传达正在发生的事情。如果我使用threading.Thread而不是,此代码可以正常工作multiprocessing.Process

from multiprocessing import Process

class KafkaWrapper():
    def __init__(self):
        self.consumer = KafkaConsumer(bootstrap_servers='my.server.com')

    def consume(self, topic):
        self.consumer.subscribe(topic)
        for message in self.consumer:
            print(message.value)

class ServiceInterface():
    def __init__(self):
        self.kafka_wrapper = KafkaWrapper()

    def start(self, topic):
        self.kafka_wrapper.consume(topic)

class ServiceA(ServiceInterface):
    pass

class ServiceB(ServiceInterface):
    pass


def main():

    serviceA = ServiceA()
    serviceB = ServiceB()

    jobs=[]
    # The code works fine if I used threading.Thread …
Run Code Online (Sandbox Code Playgroud)

python python-multithreading apache-kafka python-multiprocessing

2
推荐指数
1
解决办法
2692
查看次数

在多处理中与Pool结合使用时,map如何划分数据?

我有一个函数f,我想要并行计算某些大数据.数据可以分为多种方式,我试图决定如何划分它.我试图理解多处理中的"映射".Pool准确地分配/分配数据,以便我做出正确的决定,即分割我的数据以及选择处理器的数量.我的输入数据不仅仅是一个列表,如下例所示,而是字典列表和列表列表,因此理解Pool.map如何划分数据似乎很关键.

话虽如此,我认为理解这个简单的例子可以说明更复杂的例子.

以下scipt表明我们正在选择一个包含5个进程的池和[1,2,3]中的数据.这里为分割数据做出的隐含选择是什么?

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [1, 2, 3]))
Run Code Online (Sandbox Code Playgroud)

python multiprocessing python-multiprocessing

2
推荐指数
1
解决办法
475
查看次数

如何在python多处理中使用全局/公共变量

我最近开始在python中使用多处理,我有以下代码来更新多个进程的列表项.但它正在给出空列表.

from multiprocessing import Pool
import time

global_list = list()


def testfun(n):
    print('started ', n)
    time.sleep(1)
    global_list.append(n)
    print('completed ', n)


def call_multiprocessing_function():
    mytasks = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n']
    with Pool() as pool:
        pool.map(testfun, mytasks)


if __name__ == "__main__":
    print('starting the script')

    print(global_list)
    call_multiprocessing_function()
    print(global_list)

    print('completed the script')
Run Code Online (Sandbox Code Playgroud)

我得到以下输出

starting the script
[]
started  a
started  b
started  c
started  d
completed  a
started  e
completed  b
started  f
completed  c …
Run Code Online (Sandbox Code Playgroud)

python multiprocessing python-3.x python-multiprocessing

2
推荐指数
1
解决办法
80
查看次数

与Python中的multiprocessing / pool.map无关的块大小?

我尝试利用python的池多处理功能。

与我如何设置块大小无关(在Windows 7和Ubuntu下-后者在下面具有4个内核),并行线程的数量似乎保持不变。

from multiprocessing import Pool
from multiprocessing import cpu_count
import multiprocessing
import time


def f(x):
    print("ready to sleep", x, multiprocessing.current_process())
    time.sleep(20)
    print("slept with:", x, multiprocessing.current_process())


if __name__ == '__main__':
    processes = cpu_count()
    print('-' * 20)
    print('Utilizing %d cores' % processes)
    print('-' * 20)
    pool = Pool(processes)
    myList = []
    runner = 0
    while runner < 40:
        myList.append(runner)
        runner += 1
    print("len(myList):", len(myList))

    # chunksize = int(len(myList) / processes)
    # chunksize = processes
    chunksize = 1
    print("chunksize:", chunksize)
    pool.map(f, …
Run Code Online (Sandbox Code Playgroud)

python multithreading multiprocessing python-multithreading python-multiprocessing

2
推荐指数
1
解决办法
1199
查看次数