我有一个应用程序,它读取一系列包含道路车辆通道日志的XML文件.然后,应用程序处理每个记录,转换一些信息以匹配数据库列并将其插入到cassandra数据库中(在远程服务器中运行单个节点[它在内部网络中,因此连接不是真正的问题]) .在数据库中插入数据之后,每个文件的进程继续读取此数据并生成摘要表的信息,这使得信息可以在应用程序的不相关部分中进行深入分析.
我正在使用多处理来并行处理许多XML文件,而我遇到的麻烦就是与cassandra服务器进行通信.示意性地,该过程如下:
.execute_async(query))现在,这在多个并行进程中运行顺畅,直到一个进程进入步骤6,其请求(使用.execute(query),即我将等待响应)始终面临超时.我收到的错误是:
Process ProcessoImportacaoPNCT-1:
Traceback (most recent call last):
File "C:\Users\Lucas\Miniconda\lib\multiprocessing\process.py", line 258, in _bootstrap
self.run()
File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\ImportacaoArquivosPNCT.py", line 231, in run
core.CalculoIndicadoresPNCT.processa_equipamento(sessao_cassandra, equipamento, data, sentido, faixa)
File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\CalculoIndicadoresPNCT.py", line 336, in processa_equipamento
desvio_medias(sessao_cassandra, equipamento, data_referencia, sentido, faixa)
File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\CalculoIndicadoresPNCT.py", line 206, in desvio_medias
veiculos = sessao_cassandra.execute(sql_pronto)
File "C:\Users\Lucas\Miniconda\lib\site-packages\cassandra\cluster.py", line 1594, in execute
result = future.result(timeout)
File "C:\Users\Lucas\Miniconda\lib\site-packages\cassandra\cluster.py", line 3296, in result
raise self._final_exception
ReadTimeout: code=1200 …Run Code Online (Sandbox Code Playgroud) 我试图使用多处理同时运行2个循环,但它们似乎只是按顺序运行.当第一个循环启动tkinter的mainloop()进程时,另一个循环在GUI窗口关闭之前不会启动,然后计数循环开始.我尝试过多线程和多处理,结果相同.我需要它们同时运行.下面是一个演示该问题的简单示例.我正在使用python 2.7.10.
from multiprocessing import Process
from Tkinter import *
import time
count = 0
def counting():
while True:
global count
count = count + 1
print count
time.sleep(1)
class App():
def __init__(self):
self.myGUI = Tk()
self.myGUI.geometry('800x600')
self.labelVar = StringVar()
self.labelVar.set("test")
self.label1 = Label(self.myGUI, textvariable=self.labelVar)
self.label1.grid(row=0, column=0)
app = App()
t1 = Process(target = app.myGUI.mainloop())
t2 = Process(target = counting())
t1.start()
t2.start()
Run Code Online (Sandbox Code Playgroud) 我有以下Python代码
jobs = []
p = Process(target=self.verify_process, args=(vm_ha1, creds, run_cmd_ha1, ip_ha1))
jobs.append(p)
p.start()
p = Process(target=self.verify_process, args=(vm_ha2, creds, run_cmd_ha2, ip_ha2))
jobs.append(p)
p.start
p = Process(target=self.verify_process, args=(vm_client, creds, run_cmd_client, ip_client))
jobs.append(p)
p.start
for p in jobs:
p.join()
Run Code Online (Sandbox Code Playgroud)
如果VM上的进程已完成,则目标是一个小的def检查,并将打印退出代码.
当它运行时,我在第一个进程完成后得到一个错误,并写出它的输出
File "/usr/lib/python2.7/multiprocessing/process.py", line 144, in join
assert self._popen is not None, 'can only join a started process'
AssertionError: can only join a started process
Run Code Online (Sandbox Code Playgroud)
当我四处寻找时,我发现了一些这样的提及.其中大多数是因为他们都在使用p.run()并且应该使用p.start().
但我已经在使用p.start()了.
这是我第一次捆绑Multiprocessing,所以我可能会错误地使用它.
从我所看到的,应该开始所有3个过程.
我假设多处理程序包使用pickle在进程之间发送消息。但是,泡菜要注意对象的__getstate__和__setstate__方法。多处理似乎忽略了它们。它是否正确?我感到困惑吗?
要复制,请安装docker,然后在命令行中键入
$ docker run python:3.4 python -c "import pickle
import multiprocessing
import os
class Tricky:
def __init__(self,x):
self.data=x
def __setstate__(self,d):
self.data=10
def __getstate__(self):
return {}
def report(ar,q):
print('running report in pid %d, hailing from %d'%(os.getpid(),os.getppid()))
q.put(ar.data)
print('module loaded in pid %d, hailing from pid %d'%(os.getpid(),os.getppid()))
if __name__ == '__main__':
print('hello from pid %d'%os.getpid())
ar = Tricky(5)
q = multiprocessing.Queue()
p = multiprocessing.Process(target=report, args=(ar, q))
p.start()
p.join()
print(q.get())
print(pickle.loads(pickle.dumps(ar)).data)"
Run Code Online (Sandbox Code Playgroud)
你应该得到类似
module loaded in pid 1, hailing …Run Code Online (Sandbox Code Playgroud) 我在池中使用多处理。我需要将结构作为参数传递给必须在单独的进程中使用的函数。我无法使用的映射功能multiprocessing.Pool,因为我无法复制Pool.Queue,也不能复制Pool.Array。该结构将在运行中用于记录每个终止过程的结果。这是我的代码:
import multiprocessing
from multiprocessing import Process, Manager, Queue, Array
import itertools
import time
def do_work(number, out_queue=None):
if out_queue is not None:
print "Treated nb ", number
out_queue.append("Treated nb " + str(number))
return 0
def multi_run_wrapper(iter_values):
return do_work(*iter_values)
def test_pool():
# Get the max cpu
nb_proc = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=nb_proc)
total_tasks = 16
tasks = range(total_tasks)
out_queue= Queue() # Use it instead of out_array and change out_queue.append() into out_queue.put() in the do_work() function. …Run Code Online (Sandbox Code Playgroud) 我有一个多处理脚本,我在linux和windows中都尝试过
在Linux中它工作正常但在Windows中脚本运行一些随机的未知结果,脚本甚至没有结束
脚本
from multiprocessing.pool import Pool
def get_urls1():
res = [1,2,3,4,5]
nprocs = 20 # nprocs is the number of processes to run
ParsePool = Pool(nprocs)
#ParsePool.map(btl_test,url)
ParsedURLS = ParsePool.map(extractData,res)
def extractData(r):
print r
get_urls1()
Run Code Online (Sandbox Code Playgroud)
Linux输出
1
3
2
5
4
Run Code Online (Sandbox Code Playgroud)
但是当我在Windows中运行相同的脚本时,它并没有给出确切的结果,因为linux和脚本甚至没有结束(但是如果我删除多处理脚本工作)
我应该怎样修复多处理工作?
我想使用multiprocessing.Pool,但是multiprocessing.Pool不能在超时后中止任务。我找到了解决方案,并对其进行了一些修改。
from multiprocessing import util, Pool, TimeoutError
from multiprocessing.dummy import Pool as ThreadPool
import threading
import sys
from functools import partial
import time
def worker(y):
print("worker sleep {} sec, thread: {}".format(y, threading.current_thread()))
start = time.time()
while True:
if time.time() - start >= y:
break
time.sleep(0.5)
# show work progress
print(y)
return y
def collect_my_result(result):
print("Got result {}".format(result))
def abortable_worker(func, *args, **kwargs):
timeout = kwargs.get('timeout', None)
p = ThreadPool(1)
res = p.apply_async(func, args=args)
try:
# Wait timeout …Run Code Online (Sandbox Code Playgroud) python multithreading multiprocessing python-multithreading python-multiprocessing
如何在运行时检查openmp时间表?
我使用并行循环和运行时计划来编译代码
#pragma omp parallel for schedule(runtime) collapse(2)
for(j=1;j>-2;j-=2){
for(i=0;i<n;i++){
//nested loop code here
}
}
Run Code Online (Sandbox Code Playgroud)
然后指定环境变量OMP_SCHEDULE=dynamic,50。
如何在运行时检查我的程序实际使用了OMP_SCHEDULE变量?
我在gcc 4.7.3中使用openmp 3.1
我正在尝试并行运行一些Python函数,该函数在整个函数中都有打印命令。我想要的是让每个子进程运行相同的功能,以分组的方式输出到主标准输出。我的意思是,我希望每个子流程的输出仅在完成其任务后才打印。但是,如果在此过程中发生某种错误,我仍要输出子过程中所做的任何事情。
一个小例子:
from time import sleep
import multiprocessing as mp
def foo(x):
print('foo')
for i in range(5):
print('Process {}: in foo {}'.format(x, i))
sleep(0.5)
if __name__ == '__main__':
pool = mp.Pool()
jobs = []
for i in range(4):
job = pool.apply_async(foo, args=[i])
jobs.append(job)
for job in jobs:
job.wait()
Run Code Online (Sandbox Code Playgroud)
这是并行运行的,但是输出的是:
foo
Process 0: in foo 0
foo
Process 1: in foo 0
foo
Process 2: in foo 0
foo
Process 3: in foo 0
Process 1: in foo 1
Process 0: …Run Code Online (Sandbox Code Playgroud) 我始终确信没有必要拥有比CPU内核更多的线程/进程(从性能角度来看).但是,我的python示例显示了不同的结果.
import concurrent.futures
import random
import time
def doSomething(task_num):
print("executing...", task_num)
time.sleep(1) # simulate heavy operation that takes ~ 1 second
return random.randint(1, 10) * random.randint(1, 500) # real operation, used random to avoid caches and so on...
def main():
# This part is not taken in consideration because I don't want to
# measure the worker creation time
executor = concurrent.futures.ProcessPoolExecutor(max_workers=60)
start_time = time.time()
for i in range(1, 100): # execute 100 tasks
executor.map(doSomething, [i, ])
executor.shutdown(wait=True)
print("--- %s …Run Code Online (Sandbox Code Playgroud)