标签: parallel-python

并行Python:什么是回调?

并行Python中,它在提交函数中有一些叫做回调(文档)的东西,但它似乎没有解释得太好.我几天前在他们的论坛上发帖,我没有收到回复.有人会解释回调是什么以及它用于什么?

谢谢.

python callback parallel-python

45
推荐指数
2
解决办法
6万
查看次数

并行Python:如何为'submit'提供参数?

这只是parallel-python标签的第二个问题.通过浏览文档和谷歌搜索主题后,我来到这里,因为它是我得到答案和建议最好的运气.

以下是将所有相关信息提交给pp的API(我认为它被称为).

    def submit(self, func, args=(), depfuncs=(), modules=(),
        callback=None, callbackargs=(), group='default', globals=None):
    """Submits function to the execution queue

        func - function to be executed
        args - tuple with arguments of the 'func'
        depfuncs - tuple with functions which might be called from 'func'
        modules - tuple with module names to import
        callback - callback function which will be called with argument
                list equal to callbackargs+(result,)
                as soon as calculation is done
        callbackargs - additional arguments for callback function
        group - …
Run Code Online (Sandbox Code Playgroud)

python parallel-python

6
推荐指数
1
解决办法
3455
查看次数

并行Python-RuntimeError:通信管道读取错误

我正在使用并行python使用称为OrcFxAPI的模块运行多个动态仿真。如果该程序作为python程序在我的计算机上运行,​​则该程序运行完美,但是如果我使用py2exe将其转换为exe文件,然后运行,则会收到以下错误消息:

Traceback (most recent call last):
    File "Analysis.pyc", Line 500, in multiprocessor
    File "pp.pyc", Line 342, in __init__
    File "pp.pyc", Line 506, in set_ncpus
    File "pp.pyc", Line 140, in __init__
    File "pp.pyc", Line 152, in start
    File "pptransport.pyc", Line 140, in receive
RuntimeError: Communication pipe read error
Run Code Online (Sandbox Code Playgroud)

我的程序在这一行失败了:

job_server = pp.Server(ppservers=ppservers)
Run Code Online (Sandbox Code Playgroud)

但我认为这可能与提交作业时用于导入OrcFxAPI模块的路径有关:

job = job_server.submit(max_seastate, (gui_vars, case_list, case, line_info, output_vars), (), ("OrcFxAPI",), callback=finished, callbackargs=(case_no, no_of_cases,))
Run Code Online (Sandbox Code Playgroud)

python py2exe parallel-python

5
推荐指数
0
解决办法
642
查看次数

使用并行python时记录

我正在使用并行python executePipeline多次执行一个大函数().此功能也使用多处理(使用multiprocessing模块).
我使用并行python模块在我的控制台上正确显示日志消息时遇到了一些麻烦.当我不使用它时,日志消息很好地显示.

下面是它的工作原理.我有一个服务器每次从客户端收到请求时都会调用一个worker:

job = self.server.job_server.submit(func = executeWorker, args = (config, ) )
Run Code Online (Sandbox Code Playgroud)

每次有来自客户端的新请求时,都会从新线程执行此函数.然后,worker正在executePipeline使用多处理调用正在执行不同processus 的函数.

SocketServer.TCPServer我使用线程的服务器.我使用根记录器在服务器中设置了一个记录器:

self.logger = logging.getLogger()
self.logger.setLevel(logging.INFO)
self.logger.addHandler(logging.StreamHandler() 
self.job_server = pp.Server(ncpus = 8) # for test
self.jobs = []
Run Code Online (Sandbox Code Playgroud)

当我运行我的服务器时,我只能从而executePipeline不是从子进程获取日志记录.此外,我只在作业结束时才记录执行管道,而不是在运行时.

这里还有工人代码.在"Executing pipeline with worker number "很好地显示在我的终端

'''
Setup logging
'''

logger = logging.getLogger()
logger.setLevel(logging.INFO)  

# worker name
publicIP = socket.gethostbyname(socket.gethostname()) 
pid = os.getpid()
workerID = unicode(str(publicIP) + ":" + str(pid))

logger.info( "Executing pipeline with worker {}".format(workerID)) …
Run Code Online (Sandbox Code Playgroud)

python logging parallel-python

5
推荐指数
1
解决办法
2487
查看次数

Python在线程之间共享类实例

我有一个类,该类将应用程序所需的所有资源(主要是图像)加载到内存中。

然后,几个线程需要通过此类访问这些资源。我不希望每个实例都重新加载所有资源,所以我以为我使用了Singleton模式。我这样做是这样的:

class DataContainer(object):
    _instance = None
    _lock = threading.Lock()
    _initialised = True

    def __new__(cls, *args, **kwargs):
        with cls._lock:
            if not cls._instance:
                cls._initialised = False
                cls._instance = object.__new__(cls, *args, **kwargs)
        return cls._instance

    def __init__(self, map_name = None):

        # instance has already been created
        if self._initialised:
            return

        self._initialised = True

        # load images
Run Code Online (Sandbox Code Playgroud)

只要我不使用多个线程,它就可以正常工作。但是对于多个线程,每个线程都有一个不同的实例。因此,使用4个线程,它们每个都创建一个新实例。我希望所有线程都使用此类的相同实例,因此资源仅一次加载到内存中。

我也尝试在定义类的同一模块中执行此操作,但在类定义之外:

def getDataContainer():
    global dataContainer
    return dataContainer

dataContainer = DataContainer()
Run Code Online (Sandbox Code Playgroud)

但是每个线程仍然有自己的实例。

我是python的新手,如果这是错误的方法,请让我知道,谢谢您的帮助

python singleton multithreading multiprocessing parallel-python

5
推荐指数
1
解决办法
6998
查看次数

将并行Python代码移动到云端

听说科学计算项目(恰好是这里描述的随机纤维束成像方法)我目前正在为一名研究人员运行,我们的50个节点集群需要4个月,研究人员要求我检查其他选项.该项目目前正在使用并行python将4d数组的块分配到不同的集群节点,并将处理后的块重新组合在一起.

我目前正在处理的工作可能过于粗糙,(5秒到10分钟,我不得不在并行python中增加超时默认值)并且我估计通过重写它可以将进程加速2-4倍为了更好地利用资源(将数据拆分和重新组合在一起花费的时间太长,也应该并行化).大部分工作都是由numpy数组完成的.

我们假设2-4次是不够的,我决定从我们的本地硬件中获取代码.对于像这样的高吞吐量计算,我的商业选择是什么以及如何修改代码?

python parallel-processing mapreduce amazon-ec2 parallel-python

4
推荐指数
1
解决办法
1454
查看次数

使用Parallel Python记录工作进程

我继承了在集群上使用Parallel Python维护一些科学计算.使用Parallel Python,作业将被提交给ppserver,在这种情况下,它会与其他计算机上已经运行的ppserver进程进行通信,将任务输出到ppworkers进程.

我想使用标准库日志记录模块来记录提交给ppserver的函数中的错误和调试信息.由于这些ppworkers作为单独的进程运行(在不同的计算机上),我不确定如何正确构建日志记录.我必须为每个进程登录一个单独的文件吗?也许有一个日志处理程序可以让它变得更好?

另外,我想要报告什么计算机遇到错误的过程,但我正在编写登录的代码可能不知道这些事情; 也许这应该发生在ppserver级别?

(在并行Python论坛上交叉发布的问题的版本,如果我从非SO用户那里得到关于此的内容,我会在这里发布答案)

python logging parallel-python

2
推荐指数
1
解决办法
1818
查看次数