我有一个类似的python脚本:
def test_run():
global files_dir
for f1 in os.listdir(files_dir):
for f2 os.listdir(files_dir):
os.system("run program x on f1 and f2")
Run Code Online (Sandbox Code Playgroud)
os.system在不同处理器上调用每个呼叫的最佳方法是什么?使用子进程或多处理池?
注意:程序的每次运行都将生成一个输出文件.
我是一个蟒蛇新手.我正在尝试手册中的这段代码片段,但是我收到了这个错误.无法弄清楚为什么.任何帮助将不胜感激.谢谢
#/usr/bin/python
# -*- coding: utf-8 -*-
from multiprocessing import Pool
def f(x):
return x*x
p = Pool(1)
p.map(f, [1, 2, 3])
Run Code Online (Sandbox Code Playgroud)
[root@localhost mpls-perf]# python thr_1.py
Traceback (most recent call last):
File "thr_1.py", line 4, in <module>
from multiprocessing import Pool
File "/usr/lib64/python2.7/multiprocessing/__init__.py", line 65, in <module>
from multiprocessing.util import SUBDEBUG, SUBWARNING
File "/usr/lib64/python2.7/multiprocessing/util.py", line 340, in <module>
class ForkAwareLocal(threading.local):
AttributeError: 'module' object has no attribute 'local'
Exception AttributeError: '_shutdown' in <module 'threading'
from '/root/nfs/zebos/tests/mpls- perf/threading.pyc'> ignored …Run Code Online (Sandbox Code Playgroud) 代码在这里:
from multiprocessing import pool
def worker(num):
print 'Worker:', num
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
Run Code Online (Sandbox Code Playgroud)
对不起,我是python的新手。每当我尝试导入池时,我都会收到以下错误。它说 os.chdir(wdir) 有问题,但我不知道是什么。有什么帮助吗?
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "C:\Users\z080302\Desktop\WinPython-32bit-2.7.6.3\python-2.7.6\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py", line 540, in runfile
execfile(filename, namespace)
File "C:/Users/z080302/Desktop/Python_Projects/mp_test.py", line 18, in <module>
p = multiprocessing.Process(target=worker, args=(i,))
NameError: name 'multiprocessing' is not defined
Run Code Online (Sandbox Code Playgroud) 我在使用multiprocessing和range()生成器时遇到了一些奇怪的行为,我无法弄清楚发生了什么。
这是代码:
from multiprocessing import Pool
import time
def worker_thread(param):
time.sleep(1)
print(param, end=' ', flush=True)
p = Pool(1)
inp = list(range(0, 100))
p.map(worker_thread, inp)
Run Code Online (Sandbox Code Playgroud)
执行此代码时(只有 1 个线程),输出如预期:
0 1 2 3 4 5 6 7 ...
Run Code Online (Sandbox Code Playgroud)
但是,当我将线程数提高到 2 时,输出变得无法解释:
0 13 1 14 2 15 3 16 4 17 ...
Run Code Online (Sandbox Code Playgroud)
依此类推,这种行为出现在线程数较高的情况下。既然list(range(0,100))按升序生成了从 0 到 99 的数字列表,为什么不map()按它所在的顺序扫描列表?
我已经写了一些代码将数据推送到tensorflow中的队列中,我的队列处理程序的初始化以及所有线程运行的主要功能如下:
def __init__(self):
self.X = tf.placeholder(tf.int64)
self.Y = tf.placeholder(tf.int64)
self.queue = tf.RandomShuffleQueue(dtypes=[tf.int64, tf.int64],
capacity=100,
min_after_dequeue=20)
self.enqueue_op = self.queue.enqueue([self.X, self.Y])
def thread_main(self, sess, coord):
"""Cycle through the dataset until the main process says stop."""
train_fs = open(data_train, 'r')
while not coord.should_stop():
X_, Y_ = get_batch(train_fs)
if not Y: #We're at the end of the file
train_fs = open(data_train, 'r')
X, Y = get_batch(train_fs)
sess.run(self.enqueue_op, feed_dict={self.X:X_, self.Y:Y_})
Run Code Online (Sandbox Code Playgroud)
在培训期间,我正在监视队列的大小。由于某些原因,当我增加向其中推送数据的线程数量时,队列的填充速度会变慢。知道为什么吗?是因为我正在同时读取python文件吗?
编辑:
这是我正在使用的代码,在数据和图形旁边它是完全相同的。该代码在此虚拟数据上的行为符合预期。我有两个观察结果:
首先,生成一个虚拟数据集:
data_train = "./test.txt"
with open(data_train, 'w') as out_stream:
out_stream.write("""[1,2,3,4,5,6]|1\n[1,2,3,4]|2\n[1,2,3,4,5,6]|0\n[1,2,3,4,5,6]|1\n[1,2,5,6]|1\n[1,2,5,6]|0""") …Run Code Online (Sandbox Code Playgroud) 我得到的例外。我所做的只是增加了池数
代码
def parse(url):
r = request.get(url)
POOL_COUNT = 75
with Pool(POOL_COUNT) as p:
result = p.map(parse, links)
File "/usr/lib64/python3.5/multiprocessing/pool.py", line 130, in worker
put((job, i, (False, wrapped)))
File "/usr/lib64/python3.5/multiprocessing/queues.py", line 355, in put
self._writer.send_bytes(obj)
File "/usr/lib64/python3.5/multiprocessing/connection.py", line 200, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/usr/lib64/python3.5/multiprocessing/connection.py", line 404, in _send_bytes
self._send(header + buf)
File "/usr/lib64/python3.5/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
Process ForkPoolWorker-26:
Traceback (most recent call last):
File "/usr/lib64/python3.5/multiprocessing/pool.py", line 125, in …Run Code Online (Sandbox Code Playgroud) 我正在尝试在我的多处理代码中使用字典。问题是当我在流程中更改一个键的值时,更改只会影响该流程。显然,进程的变量参数是该变量的副本。我尝试使用 dict 方法,但它也不起作用。我的错误是什么?
import time
from multiprocessing import Process, Manager
def f1(list1, set1):
list1.append(3)
set1['func']='f1'
print('f1 dic',set1)
print('f1 list',list1)
while True:
time.sleep(3)
print('f1 dic',set1)
print('f1 list',list1)
def f2(list1):
list1.append(4)
settings_dic['func']='f2'
print('f2 dic',settings_dic)
print('f2 list',list1)
while True:
time.sleep(3)
print('f2 dic',settings_dic)
print('f2 list',list1)
if __name__ == '__main__':
# list1 = Manager().dict()
settings_dic = Manager().dict()
Global = Manager().Namespace()
list1 = [1,2]
settings_dic = {
'mode' : 'master',
'logger' : 'True',
'func' : ''
}
p_video_get = Process(target=f1, args=(list1,settings_dic,))
p_video_get.daemon = True
p_video_get.start()
p_packetTrasnmission …Run Code Online (Sandbox Code Playgroud) 我有一个函数和一个工作列表:
jobs = [[(2, 'dog'), None],
[(-1, 'cat'), (0,)],
[(-1, 'Bob'), (1,)],
[(7, 'Alice'), None],
[(0, 'spam'), (2,3)]]
Run Code Online (Sandbox Code Playgroud)
我想并行地将函数应用于参数(第一个元组),同时满足对先前作业(第二个元组)的依赖性。例如,在狗的工作完成之前,猫的工作不能开始。但是,我不想占用一个核心并等待作业的依赖项完成。相反,我想继续执行可以立即执行的不同作业,以便如果可能的话,所有核心始终保持忙碌。有小费吗?非常感谢!
Python 多处理Process.terminate()和Process.kill()在 Python 多处理中的区别是什么?
我不明白多处理文档(python.org)的这一部分,我引述:
"将陷入僵局的一个例子如下:
from multiprocessing import Process, Queue
def f(q):
q.put('X' * 1000000)
if __name__ == '__main__':
queue = Queue()
p = Process(target=f, args=(queue,))
p.start()
p.join() # this deadlocks
obj = queue.get()
Run Code Online (Sandbox Code Playgroud)
"首先,它为什么会阻塞?更令人惊讶的是,当我在f的定义中使用一些小于1000000的值时,它可以正常工作(它适用于10,100,1000,10000,但不能使用100000).
非常感谢你的帮助 !
python multithreading multiprocessing python-multithreading python-multiprocessing