Python:将sqlite3与多处理配合使用

use*_*733 5 python sqlite

我有一个SQLite3数据库。我需要解析10000个文件。我从每个文件中读取一些数据,然后使用此数据查询数据库以获取结果。我的代码在单个流程环境中运行良好。但是尝试使用多处理池时出现错误。

My approach without multiprocessing (works OK):
1. Open DB connection object
2. for f in files: 
     foo(f, x1=x1, x2=x2, ..., db=DB)
3. Close DB

My approach with multiprocessing (does NOT work):
1. Open DB
2. pool = multiprocessing.Pool(processes=4) 
3. pool.map(functools.partial(foo, x1=x1, x2=x2, ..., db=DB), [files])
4. pool.close()
5. Close DB 
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:sqlite3.ProgrammingError:未调用基本Cursor .__ init__。

我的数据库类的实现如下:

def open_db(sqlite_file):
    """Open SQLite database connection.

    Args:
    sqlite_file -- File path

    Return:
    Connection
    """

    log.info('Open SQLite database %s', sqlite_file)
    try:
        conn = sqlite3.connect(sqlite_file)
    except sqlite3.Error, e:
        log.error('Unable to open SQLite database %s', e.args[0])
        sys.exit(1)

    return conn

def close_db(conn, sqlite_file):
    """Close SQLite database connection.

    Args:
    conn -- Connection
    """

    if conn:
        log.info('Close SQLite database %s', sqlite_file)
        conn.close()

class MapDB:

    def __init__(self, sqlite_file):
        """Initialize.

        Args:
        sqlite_file -- File path
        """

        # 1. Open database.
        # 2. Setup to receive data as dict().
        # 3. Get cursor to execute queries.
        self._sqlite_file      = sqlite_file
        self._conn             = open_db(sqlite_file)
        self._conn.row_factory = sqlite3.Row
        self._cursor           = self._conn.cursor()

    def close(self):
        """Close DB connection."""

        if self._cursor:
            self._cursor.close()
        close_db(self._conn, self._sqlite_file)

    def check(self):
        ...

    def get_driver_net(self, net):
        ...

    def get_cell_id(self, net):
       ...
Run Code Online (Sandbox Code Playgroud)

函数foo()看起来像这样:

def foo(f, x1, x2, db):

  extract some data from file f
  r1 = db.get_driver_net(...)
  r2 = db.get_cell_id(...)
Run Code Online (Sandbox Code Playgroud)

总体不起作用的实现如下:

mapdb = MapDB(sqlite_file)

log.info('Create NetInfo objects')
pool = multiprocessing.Pool(processes=4)
files = [get list of files to process]                 
pool.map(functools.partial(foo, x1=x1, x2=x2, db=mapdb), files)    
pool.close()
mapdb.close()
Run Code Online (Sandbox Code Playgroud)

要解决此问题,我想我需要在每个池工作程序内创建MapDB()对象(因此具有4个并行/独立连接)。但是我不确定该怎么做。有人可以向我展示如何使用Pool完成此操作的示例吗?

Eri*_*ikR 5

foo像这样定义怎么样:

def foo(f, x1, x2, db_path):
    mapdb = MapDB(db_path)
    ... open mapdb
    ... process data ...
    ... close mapdb
Run Code Online (Sandbox Code Playgroud)

然后将您的 pool.map 调用更改为:

pool.map(functools.partial(foo, x1=x1, x2=x2, db_path="path-to-sqlite3-db"), files)    
Run Code Online (Sandbox Code Playgroud)

更新

另一种选择是自己处理工作线程并通过Queue.

from Queue import Queue
from threading import Thread

q = Queue()

def worker():
  mapdb = ...open the sqlite database
  while True:
    item = q.get()
    if item[0] == "file":
      file = item[1]
      ... process file ...
      q.task_done()
    else:
      q.task_done()
      break
  ...close sqlite connection...

# Start up the workers

nworkers = 4

for i in range(nworkers):
  worker = Thread(target=worker)
  worker.daemon = True
  worker.start()

# Place work on the Queue

for x in ...list of files...:
  q.put(("file",x))

# Place termination tokens onto the Queue

for i in range(nworkers):
  q.put(("end",))

# Wait for all work to be done.

q.join()
Run Code Online (Sandbox Code Playgroud)

终止令牌用于确保关闭 sqlite 连接 - 以防万一。