标签: multiprocessing

Cassandra在阻止同步请求的多个进程中同步执行

我有一个应用程序,它读取一系列包含道路车辆通道日志的XML文件.然后,应用程序处理每个记录,转换一些信息以匹配数据库列并将其插入到cassandra数据库中(在远程服务器中运行单个节点[它在内部网络中,因此连接不是真正的问题]) .在数据库中插入数据之后,每个文件的进程继续读取此数据并生成摘要表的信息,这使得信息可以在应用程序的不相关部分中进行深入分析.

我正在使用多处理来并行处理许多XML文件,而我遇到的麻烦就是与cassandra服务器进行通信.示意性地,该过程如下:

  1. 从XML文件中读取记录
  2. 流程记录的数据
  3. 将处理后的数据插入数据库(使用.execute_async(query))
  4. 重复1到3,直到XMl文件结束
  5. 等待我所做的所有插入查询的响应
  6. 从数据库中读取数据
  7. 处理读取数据
  8. 将已处理的数据插入摘要表中

现在,这在多个并行进程中运行顺畅,直到一个进程进入步骤6,其请求(使用.execute(query),即我将等待响应)始终面临超时.我收到的错误是:

 Process ProcessoImportacaoPNCT-1:
Traceback (most recent call last):
  File "C:\Users\Lucas\Miniconda\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\ImportacaoArquivosPNCT.py", line 231, in run
    core.CalculoIndicadoresPNCT.processa_equipamento(sessao_cassandra, equipamento, data, sentido, faixa)
  File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\CalculoIndicadoresPNCT.py", line 336, in processa_equipamento
    desvio_medias(sessao_cassandra, equipamento, data_referencia, sentido, faixa)
  File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\CalculoIndicadoresPNCT.py", line 206, in desvio_medias
    veiculos = sessao_cassandra.execute(sql_pronto)
  File "C:\Users\Lucas\Miniconda\lib\site-packages\cassandra\cluster.py", line 1594, in execute
    result = future.result(timeout)
  File "C:\Users\Lucas\Miniconda\lib\site-packages\cassandra\cluster.py", line 3296, in result
    raise self._final_exception
ReadTimeout: code=1200 …
Run Code Online (Sandbox Code Playgroud)

python database cql multiprocessing cassandra

1
推荐指数
1
解决办法
1994
查看次数

同时运行多个循环

我试图使用多处理同时运行2个循环,但它们似乎只是按顺序运行.当第一个循环启动tkinter的mainloop()进程时,另一个循环在GUI窗口关闭之前不会启动,然后计数循环开始.我尝试过多线程和多处理,结果相同.我需要它们同时运行.下面是一个演示该问题的简单示例.我正在使用python 2.7.10.

from multiprocessing import Process
from Tkinter import *
import time



count = 0

def counting():
    while True:
        global count
        count = count + 1
        print count
        time.sleep(1)

class App():

    def __init__(self):
        self.myGUI = Tk()
        self.myGUI.geometry('800x600')

        self.labelVar = StringVar()
        self.labelVar.set("test")

        self.label1 = Label(self.myGUI, textvariable=self.labelVar)
        self.label1.grid(row=0, column=0)


app = App()

t1 = Process(target = app.myGUI.mainloop())
t2 = Process(target = counting())

t1.start()
t2.start()
Run Code Online (Sandbox Code Playgroud)

python multiprocessing

1
推荐指数
1
解决办法
349
查看次数

多处理只能加入已启动的进程

我有以下Python代码

jobs = []
p = Process(target=self.verify_process, args=(vm_ha1, creds, run_cmd_ha1, ip_ha1))
jobs.append(p)
p.start()
p = Process(target=self.verify_process, args=(vm_ha2, creds, run_cmd_ha2, ip_ha2))
jobs.append(p)
p.start
p = Process(target=self.verify_process, args=(vm_client, creds, run_cmd_client, ip_client))
jobs.append(p)
p.start
for p in jobs:
    p.join()
Run Code Online (Sandbox Code Playgroud)

如果VM上的进程已完成,则目标是一个小的def检查,并将打印退出代码.

当它运行时,我在第一个进程完成后得到一个错误,并写出它的输出

  File "/usr/lib/python2.7/multiprocessing/process.py", line 144, in join
    assert self._popen is not None, 'can only join a started process'
AssertionError: can only join a started process
Run Code Online (Sandbox Code Playgroud)

当我四处寻找时,我发现了一些这样的提及.其中大多数是因为他们都在使用p.run()并且应该使用p.start().

但我已经在使用p.start()了.

这是我第一次捆绑Multiprocessing,所以我可能会错误地使用它.

从我所看到的,应该开始所有3个过程.

python multiprocessing

1
推荐指数
1
解决办法
1964
查看次数

多处理忽略“ __setstate__”

我假设多处理程序包使用pickle在进程之间发送消息。但是,泡菜要注意对象的__getstate____setstate__方法。多处理似乎忽略了它们。它是否正确?我感到困惑吗?

要复制,请安装docker,然后在命令行中键入

$ docker run python:3.4 python -c "import pickle
import multiprocessing
import os

class Tricky:
    def __init__(self,x):
        self.data=x

    def __setstate__(self,d):
        self.data=10

    def __getstate__(self):
        return {}

def report(ar,q):
    print('running report in pid %d, hailing from %d'%(os.getpid(),os.getppid()))
    q.put(ar.data)

print('module loaded in pid %d, hailing from pid %d'%(os.getpid(),os.getppid()))
if __name__ == '__main__':
    print('hello from pid %d'%os.getpid())
    ar = Tricky(5)
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=report, args=(ar, q))
    p.start()
    p.join()
    print(q.get())
    print(pickle.loads(pickle.dumps(ar)).data)"
Run Code Online (Sandbox Code Playgroud)

你应该得到类似

module loaded in pid 1, hailing …
Run Code Online (Sandbox Code Playgroud)

python pickle multiprocessing python-multiprocessing

1
推荐指数
1
解决办法
233
查看次数

多处理池和队列

我在池中使用多处理。我需要将结构作为参数传递给必须在单独的进程中使用的函数。我无法使用的映射功能multiprocessing.Pool,因为我无法复制Pool.Queue,也不能复制Pool.Array。该结构将在运行中用于记录每个终止过程的结果。这是我的代码:

import multiprocessing
from multiprocessing import Process, Manager, Queue, Array
import itertools
import time

def do_work(number, out_queue=None):
    if out_queue is not None:
        print "Treated nb ", number
        out_queue.append("Treated nb " + str(number))
    return 0


def multi_run_wrapper(iter_values):
    return do_work(*iter_values)

def test_pool():
    # Get the max cpu
    nb_proc = multiprocessing.cpu_count()

    pool = multiprocessing.Pool(processes=nb_proc)
    total_tasks = 16
    tasks = range(total_tasks)

    out_queue= Queue()  # Use it instead of out_array and change out_queue.append() into out_queue.put() in the do_work() function. …
Run Code Online (Sandbox Code Playgroud)

python pool multiprocessing python-2.7

1
推荐指数
1
解决办法
5471
查看次数

Python多处理工作在Linux中,但不在Windows中

我有一个多处理脚本,我在linux和windows中都尝试过

在Linux中它工作正常但在Windows中脚本运行一些随机的未知结果,脚本甚至没有结束

脚本

from multiprocessing.pool import Pool
def get_urls1():
    res = [1,2,3,4,5]

    nprocs = 20 # nprocs is the number of processes to run
    ParsePool = Pool(nprocs)
    #ParsePool.map(btl_test,url)
    ParsedURLS = ParsePool.map(extractData,res)

def extractData(r):
    print r

get_urls1()
Run Code Online (Sandbox Code Playgroud)

Linux输出

1
3
2
5
4
Run Code Online (Sandbox Code Playgroud)

但是当我在Windows中运行相同的脚本时,它并没有给出确切的结果,因为linux和脚本甚至没有结束(但是如果我删除多处理脚本工作)

我应该怎样修复多处理工作?

python linux windows multiprocessing

1
推荐指数
1
解决办法
948
查看次数

Python多处理池超时

我想使用multiprocessing.Pool,但是multiprocessing.Pool不能在超时后中止任务。我找到了解决方案,并对其进行了一些修改。

from multiprocessing import util, Pool, TimeoutError
from multiprocessing.dummy import Pool as ThreadPool
import threading
import sys
from functools import partial
import time


def worker(y):
    print("worker sleep {} sec, thread: {}".format(y, threading.current_thread()))
    start = time.time()
    while True:
       if time.time() - start >= y:
           break
       time.sleep(0.5)
       # show work progress
       print(y)
    return y


def collect_my_result(result):
    print("Got result {}".format(result))


def abortable_worker(func, *args, **kwargs):
    timeout = kwargs.get('timeout', None)
    p = ThreadPool(1)
    res = p.apply_async(func, args=args)
    try:
        # Wait timeout …
Run Code Online (Sandbox Code Playgroud)

python multithreading multiprocessing python-multithreading python-multiprocessing

1
推荐指数
1
解决办法
3451
查看次数

我正在运行哪个openmp时间表?

如何在运行时检查openmp时间表?

我使用并行循环和运行时计划来编译代码

#pragma omp parallel for schedule(runtime) collapse(2)
for(j=1;j>-2;j-=2){
    for(i=0;i<n;i++){
    //nested loop code here
    }
}
Run Code Online (Sandbox Code Playgroud)

然后指定环境变量OMP_SCHEDULE=dynamic,50

如何在运行时检查我的程序实际使用了OMP_SCHEDULE变量?

我在gcc 4.7.3中使用openmp 3.1

multithreading openmp multiprocessing

1
推荐指数
1
解决办法
614
查看次数

有序打印的Python多处理子流程?

我正在尝试并行运行一些Python函数,该函数在整个函数中都有打印命令。我想要的是让每个子进程运行相同的功能,以分组的方式输出到主标准输出。我的意思是,我希望每个子流程的输出仅在完成其任务后才打印。但是,如果在此过程中发生某种错误,我仍要输出子过程中所做的任何事情。

一个小例子:

from time import sleep
import multiprocessing as mp


def foo(x):
    print('foo')
    for i in range(5):
        print('Process {}: in foo {}'.format(x, i))
        sleep(0.5)


if __name__ == '__main__':
    pool = mp.Pool()

    jobs = []
    for i in range(4):
        job = pool.apply_async(foo, args=[i])
        jobs.append(job)

    for job in jobs:
        job.wait()
Run Code Online (Sandbox Code Playgroud)

这是并行运行的,但是输出的是:

foo
Process 0: in foo 0
foo
Process 1: in foo 0
foo
Process 2: in foo 0
foo
Process 3: in foo 0
Process 1: in foo 1
Process 0: …
Run Code Online (Sandbox Code Playgroud)

python stdout multiprocessing

1
推荐指数
1
解决办法
2290
查看次数

为什么增加工人数量(超过核心数量)仍然会减少执行时间?

我始终确信没有必要拥有比CPU内核更多的线程/进程(从性能角度来看).但是,我的python示例显示了不同的结果.

import concurrent.futures
import random
import time


def doSomething(task_num):
    print("executing...", task_num)
    time.sleep(1)  # simulate heavy operation that takes ~ 1 second    
    return random.randint(1, 10) * random.randint(1, 500)  # real operation, used random to avoid caches and so on...


def main():
    # This part is not taken in consideration because I don't want to
    # measure the worker creation time
    executor = concurrent.futures.ProcessPoolExecutor(max_workers=60)

    start_time = time.time()

    for i in range(1, 100): # execute 100 tasks
        executor.map(doSomething, [i, ])
    executor.shutdown(wait=True)

    print("--- %s …
Run Code Online (Sandbox Code Playgroud)

python parallel-processing multithreading multiprocessing

1
推荐指数
1
解决办法
741
查看次数