标签: python-multiprocessing

python中的多线程系统调用

我有一个类似的python脚本:

def test_run():
     global files_dir
     for f1 in os.listdir(files_dir):
          for f2 os.listdir(files_dir):
               os.system("run program x on f1 and f2")
Run Code Online (Sandbox Code Playgroud)

os.system在不同处理器上调用每个呼叫的最佳方法是什么?使用子进程或多处理池?

注意:程序的每次运行都将生成一个输出文件.

python multiprocessing python-multiprocessing

0
推荐指数
1
解决办法
2115
查看次数

class ForkAwareLocal(threading.local):AttributeError:'module'对象没有属性'local

我是一个蟒蛇新手.我正在尝试手册中的这段代码片段,但是我收到了这个错误.无法弄清楚为什么.任何帮助将不胜感激.谢谢

ABHI

代码段

#/usr/bin/python
# -*- coding: utf-8 -*-

from multiprocessing import Pool
def f(x):
return x*x

p = Pool(1)
p.map(f, [1, 2, 3])
Run Code Online (Sandbox Code Playgroud)

错误

[root@localhost mpls-perf]# python thr_1.py 
Traceback (most recent call last):
  File "thr_1.py", line 4, in <module>
    from multiprocessing import Pool
  File "/usr/lib64/python2.7/multiprocessing/__init__.py", line 65, in <module>
    from multiprocessing.util import SUBDEBUG, SUBWARNING
  File "/usr/lib64/python2.7/multiprocessing/util.py", line 340, in <module>
    class ForkAwareLocal(threading.local):
AttributeError: 'module' object has no attribute 'local'
Exception AttributeError: '_shutdown' in <module 'threading' 
from '/root/nfs/zebos/tests/mpls-             perf/threading.pyc'> ignored …
Run Code Online (Sandbox Code Playgroud)

python multithreading python-multiprocessing

0
推荐指数
1
解决办法
1715
查看次数

python - 导入错误:无法导入名称池

代码在这里:

from multiprocessing import pool
def worker(num):
    print 'Worker:', num
    return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()
Run Code Online (Sandbox Code Playgroud)

对不起,我是python的新手。每当我尝试导入池时,我都会收到以下错误。它说 os.chdir(wdir) 有问题,但我不知道是什么。有什么帮助吗?

Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "C:\Users\z080302\Desktop\WinPython-32bit-2.7.6.3\python-2.7.6\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py", line 540, in runfile
execfile(filename, namespace)
File "C:/Users/z080302/Desktop/Python_Projects/mp_test.py", line 18, in <module>
p = multiprocessing.Process(target=worker, args=(i,))
NameError: name 'multiprocessing' is not defined
Run Code Online (Sandbox Code Playgroud)

python python-multiprocessing

0
推荐指数
1
解决办法
1万
查看次数

在 Python 3 中使用 Pool.map() 和 range() 运行代码时的奇怪行为

我在使用multiprocessingrange()生成器时遇到了一些奇怪的行为,我无法弄清楚发生了什么。

这是代码:

from multiprocessing import Pool
import time

def worker_thread(param):
    time.sleep(1)
    print(param, end=' ', flush=True)

p = Pool(1)
inp = list(range(0, 100))

p.map(worker_thread, inp)
Run Code Online (Sandbox Code Playgroud)

执行此代码时(只有 1 个线程),输出如预期:

0 1 2 3 4 5 6 7 ...
Run Code Online (Sandbox Code Playgroud)

但是,当我将线程数提高到 2 时,输出变得无法解释:

0 13 1 14 2 15 3 16 4 17 ...
Run Code Online (Sandbox Code Playgroud)

依此类推,这种行为出现在线程数较高的情况下。既然list(range(0,100))按升序生成了从 0 到 99 的数字列表,为什么不map()按它所在的顺序扫描列表?

python python-3.x python-multiprocessing

0
推荐指数
1
解决办法
144
查看次数

当我增加将数据推送到它的线程数时,我的tensorflow队列的填充速度变慢

我已经写了一些代码将数据推送到tensorflow中的队列中,我的队列处理程序的初始化以及所有线程运行的主要功能如下:

def __init__(self):
    self.X = tf.placeholder(tf.int64)
    self.Y = tf.placeholder(tf.int64)
    self.queue = tf.RandomShuffleQueue(dtypes=[tf.int64, tf.int64],
                                       capacity=100,
                                       min_after_dequeue=20)

    self.enqueue_op = self.queue.enqueue([self.X, self.Y])


def thread_main(self, sess, coord):
    """Cycle through the dataset until the main process says stop."""
    train_fs = open(data_train, 'r')
    while not coord.should_stop():
        X_, Y_ = get_batch(train_fs)
        if not Y: #We're at the end of the file
            train_fs = open(data_train, 'r')
            X, Y = get_batch(train_fs)
        sess.run(self.enqueue_op, feed_dict={self.X:X_, self.Y:Y_}) 
Run Code Online (Sandbox Code Playgroud)

在培训期间,我正在监视队列的大小。由于某些原因,当我增加向其中推送数据的线程数量时,队列的填充速度会变慢。知道为什么吗?是因为我正在同时读取python文件吗?

编辑:

这是我正在使用的代码,在数据和图形旁边它是完全相同的。该代码在此虚拟数据上的行为符合预期。我有两个观察结果:

  • 我认为我没有适当地关闭线程,似乎它们在执行后会卡在队列中,而我运行的代码越多,它就会变得越慢。
  • 由于多线程在这里发挥作用,我猜失败的仅有两点是图形和读取数据的方式。

首先,生成一个虚拟数据集:

data_train = "./test.txt"

with open(data_train, 'w') as out_stream:
    out_stream.write("""[1,2,3,4,5,6]|1\n[1,2,3,4]|2\n[1,2,3,4,5,6]|0\n[1,2,3,4,5,6]|1\n[1,2,5,6]|1\n[1,2,5,6]|0""") …
Run Code Online (Sandbox Code Playgroud)

python-multiprocessing tensorflow

0
推荐指数
1
解决办法
1000
查看次数

Python 多处理:增加池大小后出现损坏的管道异常

我得到的例外。我所做的只是增加了池数

代码

 def parse(url):
  r = request.get(url)
POOL_COUNT = 75
with Pool(POOL_COUNT) as p:
    result = p.map(parse, links)



File "/usr/lib64/python3.5/multiprocessing/pool.py", line 130, in worker
    put((job, i, (False, wrapped)))
  File "/usr/lib64/python3.5/multiprocessing/queues.py", line 355, in put
    self._writer.send_bytes(obj)
  File "/usr/lib64/python3.5/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/usr/lib64/python3.5/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/usr/lib64/python3.5/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
Process ForkPoolWorker-26:
Traceback (most recent call last):
  File "/usr/lib64/python3.5/multiprocessing/pool.py", line 125, in …
Run Code Online (Sandbox Code Playgroud)

python python-multiprocessing

0
推荐指数
1
解决办法
1万
查看次数

Python 多处理中的变量不会改变

我正在尝试在我的多处理代码中使用字典。问题是当我在流程中更改一个键的值时,更改只会影响该流程。显然,进程的变量参数是该变量的副本。我尝试使用 dict 方法,但它也不起作用。我的错误是什么?

import time 
from multiprocessing import Process, Manager     

def f1(list1, set1):
    list1.append(3)
    set1['func']='f1' 
    print('f1 dic',set1) 
    print('f1 list',list1) 
    while True:
        time.sleep(3)
        print('f1 dic',set1) 
        print('f1 list',list1) 

def f2(list1): 
    list1.append(4)
    settings_dic['func']='f2'
    print('f2 dic',settings_dic) 
    print('f2 list',list1) 
    while True:
        time.sleep(3)
        print('f2 dic',settings_dic) 
        print('f2 list',list1) 

if __name__ == '__main__':     
    # list1 = Manager().dict()    
    settings_dic = Manager().dict()  
    Global = Manager().Namespace()
    list1 = [1,2]
    settings_dic = { 
        'mode' : 'master', 
        'logger' : 'True',
        'func' : ''
    }       
    p_video_get = Process(target=f1, args=(list1,settings_dic,))
    p_video_get.daemon = True
    p_video_get.start() 
    p_packetTrasnmission …
Run Code Online (Sandbox Code Playgroud)

python python-3.x python-multiprocessing

0
推荐指数
1
解决办法
123
查看次数

具有 python 多处理依赖性的作业队列

我有一个函数和一个工作列表:

jobs = [[(2, 'dog'), None],
        [(-1, 'cat'), (0,)], 
        [(-1, 'Bob'), (1,)],
        [(7, 'Alice'), None],
        [(0, 'spam'), (2,3)]]
Run Code Online (Sandbox Code Playgroud)

我想并行地将函数应用于参数(第一个元组),同时满足对先前作业(第二个元组)的依赖性。例如,在狗的工作完成之前,猫的工作不能开始。但是,我不想占用一个核心并等待作业的依赖项完成。相反,我想继续执行可以立即执行的不同作业,以便如果可能的话,所有核心始终保持忙碌。有小费吗?非常感谢!

python multiprocessing python-multiprocessing

0
推荐指数
1
解决办法
461
查看次数

Python 多处理中的 Process.terminate() 和 Process.kill() 有什么区别?

Python 多处理Process.terminate()Process.kill()在 Python 多处理中的区别是什么?

python python-3.x python-multiprocessing

-1
推荐指数
1
解决办法
289
查看次数

使用队列和多处理时出现死锁

我不明白多处理文档(python.org)的这一部分,我引述:

"将陷入僵局的一个例子如下:

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()
Run Code Online (Sandbox Code Playgroud)

"首先,它为什么会阻塞?更令人惊讶的是,当我在f的定义中使用一些小于1000000的值时,它可以正常工作(它适用于10,100,1000,10000,但不能使用100000).

非常感谢你的帮助 !

python multithreading multiprocessing python-multithreading python-multiprocessing

-2
推荐指数
1
解决办法
448
查看次数