如何在Python 3和PyQt5中实现多核处理?

Heo*_*eoN 6 multicore image-processing python-3.x pyqt5 python-multiprocessing

背景:我正在尝试在 python 3.4 应用程序中实现多核处理PyQT5

在我的框架应用程序中numpy.ndarrays,将其想象为一个[n,m,t]数组。我需要处理每个[n,m,:]数组,使用多核会线性加速我的过程。

我尝试过多重处理,并使用部分示例脚本编写了一个简单的脚本,并给了我以下想法:

简单无 GUI 代码

import multiprocessing as mp
import numpy

aa = numpy.random.rand(4,2,3)

def random_function(x):
    return x,x**3

if __name__ == '__main__':
    pool = mp.Pool(processes=4)

    #with apply_asynch

    #results = [pool.apply_async(cube, args=(aa[:,:,x],)) for x in range(0,aa.shape[2])]
    #output = [p.get() for p in results]
    #test_va = numpy.asarray( output)


    #with apply

    results = [pool.apply(random_function, args=(aa[:,:,x],)) for x in range(0,aa.shape[2])]
    test_va = numpy.asarray( results)
Run Code Online (Sandbox Code Playgroud)

这有效并且可以满足我需要它做的事情。

问题:现在,当我实现这个时,PyQT5我遇到了“pickling”问题。因此,按照PyQT4 此处的建议,我制作了一个简单的 GUI,生成一个线程并使用多处理。结果,我将相同的 GUI 复制了 4 次,但它似乎不起作用。

PyQT5 GUI 非工作代码:

import sys, time
from PyQt5.QtCore import * 
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *

import multiprocessing as mp
import numpy

class WorkThread(QThread):
    finished = pyqtSignal(int,object)


    def __del__(self):
      self.wait()

    def cube(x):
        return x,x**3

    def run(self):
        aa = numpy.random.rand(4,2,3)

        pool = mp.Pool(processes=4)
        results = [pool.apply_async(self.cube, args=(aa[:,:,x],)) for x in range(0,aa.shape[2])]
        output = [p.get() for p in results]
        test_va = numpy.asarray( output)

        for i in range(5):

            QThread.sleep(0.3) # artificial time delay

            self.finished.emit(i,test_va)




class test_multicore(QWidget):
    def __init__(self, parent=None):
        QWidget.__init__(self, parent)

        self.setGeometry(300, 300, 280, 600)
        self.setWindowTitle('Qthreads and multicore')

        self.layout = QVBoxLayout(self)

        self.testButton = QPushButton("test")
        self.testButton.clicked.connect(self.test)

        self.listwidget = QListWidget(self)

        self.layout.addWidget(self.testButton)
        self.layout.addWidget(self.listwidget)
        self.threadPool = []

    def add(self, text,random_matrix): 
        """ Add item to list widget """
        print ("Add: " + str(text) +str(random_matrix))
        self.listwidget.addItem(str(text))
        self.listwidget.sortItems()

    def addBatch(self,text="text",iters=6,delay=0.3): 
        """ Add several items to list widget """
        for i in range(iters):
            time.sleep(delay) # artificial time delay
            self.add(text+" "+str(i), 0)


    def test(self):
        self.listwidget.clear()

        self.addBatch("_non_thread_entries",iters=6,delay=0.3)




        self.workThread = WorkThread()
        self.workThread.finished[int,object].connect(self.add)


        self.workThread.start()






# run
app = QApplication(sys.argv)
test = test_multicore()
test.show()
app.exec_()
Run Code Online (Sandbox Code Playgroud)

我也尝试过使用Qobject并将其传递给线程,moveToThread但再次遇到了同样的问题。

问题: 如何在 Python 3.4PyQT5应用程序中实现多核处理?考虑到我将在 Windows 和 Mac 上进行部署cx_freeze

Heo*_*eoN 2

添加

if __name__ == '__main__':
Run Code Online (Sandbox Code Playgroud)

在环境创建之前确保应用程序创建一次。

这是多处理 pyqt5 python 3.4 的工作代码/示例。

import sys, time
from PyQt5.QtCore import * 
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *

import multiprocessing as mp
import numpy

class WorkThread(QThread):
    finished = pyqtSignal(int,object)


    def __del__(self):
      self.wait()

    def cube(self,x):
        return x,x**3

    def run(self):
        aa = numpy.random.rand(4,2,3)

        pool = mp.Pool(processes=4)
        results = [pool.apply_async(self.cube, args=(aa[:,:,x],)) for x in range(0,aa.shape[2])]
        output = [p.get() for p in results]
        test_va = numpy.asarray( output)

        for i in range(5):

            QThread.sleep(0.3) # artificial time delay

            self.finished.emit(i,test_va)




class test_multicore(QWidget):
    def __init__(self, parent=None):
        QWidget.__init__(self, parent)

        self.setGeometry(300, 300, 280, 600)
        self.setWindowTitle('Qthreads and multicore')

        self.layout = QVBoxLayout(self)

        self.testButton = QPushButton("test")
        self.testButton.clicked.connect(self.test)

        self.listwidget = QListWidget(self)

        self.layout.addWidget(self.testButton)
        self.layout.addWidget(self.listwidget)
        self.threadPool = []

    def add(self, text,random_matrix): 
        """ Add item to list widget """
        print ("Add: " + str(text) +str(random_matrix))
        self.listwidget.addItem(str(text))
        self.listwidget.sortItems()

    def addBatch(self,text="text",iters=6,delay=0.3): 
        """ Add several items to list widget """
        for i in range(iters):
            time.sleep(delay) # artificial time delay
            self.add(text+" "+str(i), 0)


    def test(self):
        self.listwidget.clear()

        self.addBatch("_non_thread_entries",iters=6,delay=0.3)




        self.workThread = WorkThread()
        self.workThread.finished[int,object].connect(self.add)


        self.workThread.start()






# run
if __name__ == '__main__':
    app = QApplication(sys.argv)
    test = test_multicore()
    test.show()
    app.exec_()
Run Code Online (Sandbox Code Playgroud)

要使用 apply_asynch 替代:

results = [pool.apply_async(cube, args=(aa[:,:,x],)) for x in range(0,aa.shape[2])]
output = [p.get() for p in results]
test_va = numpy.asarray( output)
Run Code Online (Sandbox Code Playgroud)