创建数据库连接并维护多个进程(多处理)

EnE*_*nE_ 21 python database-connection multiprocessing

与我发布的另一篇文章类似,这回复了帖子并创建了一个新问题.

回顾:我需要更新空间数据库中的每条记录,其中我有一个覆盖多边形数据集的点数据集.对于每个点要素,我想指定一个键,使其与其所在的面要素相关联.因此,如果我的观点'纽约市'位于多边形美国,而美国多边形'GID = 1',我将为我的点纽约市分配'gid_fkey = 1'.

好的,所以这已经通过多处理实现了.我注意到使用它的速度提高了150%所以它确实有效.但我认为有一堆不必要的开销,因为每条记录需要一个数据库连接.

所以这是代码:

import multiprocessing, time, psycopg2

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                print 'Tasks Complete'
                self.task_queue.task_done()
                break            
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return


class Task(object):
    def __init__(self, a):
        self.a = a

    def __call__(self):        
        pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
        pyConn.set_isolation_level(0)
        pyCursor1 = pyConn.cursor()

        procQuery = 'UPDATE city SET gid_fkey = gid FROM country  WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)

        pyCursor1.execute(procQuery)
        print 'What is self?'
        print self.a

        return self.a

    def __str__(self):
        return 'ARC'
    def run(self):
        print 'IN'

if __name__ == '__main__':
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    num_consumers = multiprocessing.cpu_count() * 2
    consumers = [Consumer(tasks, results) for i in xrange(num_consumers)]
    for w in consumers:
        w.start()

    pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
    pyConnX.set_isolation_level(0)
    pyCursorX = pyConnX.cursor()

    pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')    
    temp = pyCursorX.fetchall()    
    num_job = temp[0]
    num_jobs = num_job[0]

    pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')    
    cityIdListTuple = pyCursorX.fetchall()    

    cityIdListList = []

    for x in cityIdListTuple:
        cityIdList.append(x[0])


    for i in xrange(num_jobs):
        tasks.put(Task(cityIdList[i - 1]))

    for i in xrange(num_consumers):
        tasks.put(None)

    while num_jobs:
        result = results.get()
        print result
        num_jobs -= 1
Run Code Online (Sandbox Code Playgroud)

它看起来在每个连接0.3到1.5秒之间,因为我用'time'模块测量它.

有没有办法为每个进程建立一个数据库连接,然后只使用city_id信息作为变量,我可以将其提供给这个打开的游标的查询?这样我就说四个进程,每个进程都有一个数据库连接,然后以某种方式将city_id放到我的进程中.

Céd*_*ien 36

尝试在Consumer构造函数中隔离连接的创建,然后将其提供给执行的Task:

import multiprocessing, time, psycopg2

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
        self.pyConn.set_isolation_level(0)


    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                print 'Tasks Complete'
                self.task_queue.task_done()
                break            
            answer = next_task(connection=self.pyConn)
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return


class Task(object):
    def __init__(self, a):
        self.a = a

    def __call__(self, connection=None):        
        pyConn = connection
        pyCursor1 = pyConn.cursor()

        procQuery = 'UPDATE city SET gid_fkey = gid FROM country  WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)

        pyCursor1.execute(procQuery)
        print 'What is self?'
        print self.a

        return self.a

    def __str__(self):
        return 'ARC'
    def run(self):
        print 'IN'
Run Code Online (Sandbox Code Playgroud)

  • 做了一次治疗的伴侣.没有荣誉给你批准,但该代码绝对是神奇的.摆脱恒定的DB连接很容易将速度提高50%.在某些情况下可能接近100%.再次感谢. (2认同)
  • 我的印象是你应该建立一个数据库连接_after_你分叉进程,但是在你的例子中,Consumer在fork之前被初始化.不应该在Consumer.run()中建立数据库连接,还是在这种情况下无关紧要?消费者实例(包括数据库连接)_copied_不是新进程吗? (2认同)