为什么Twisted的adbapi无法从单元测试中恢复数据?

blz*_*blz 8 python sqlite unit-testing twisted python-db-api

概观

上下文

我正在为一些依赖于写入SQLite3数据库的高阶逻辑编写单元测试.为此,我正在使用twisted.trial.unittesttwisted.enterprise.adbapi.ConnectionPool.

问题陈述

我能够创建一个持久的sqlite3数据库并在其中存储数据.使用sqlitebrowser,我能够验证数据是否按预期保持不变.

问题是调用t.e.a.ConnectionPool.run*(例如:) runQuery返回一组空结果,但仅在从a内调用时才返回TestCase.

备注和重要细节

我遇到的问题只发生在Twisted的trial框架内.我第一次尝试调试是将数据库代码从单元测试中拉出来并将其放入一个独立的测试/调试脚本中.所述脚本按预期工作,而单元测试代码则没有(参见下面的示例).

案例1:行为不当单位测试

init.sql

这是用于初始化数据库的脚本.这个文件没有(明显的)错误.

CREATE TABLE ajxp_changes ( seq INTEGER PRIMARY KEY AUTOINCREMENT, node_id NUMERIC, type TEXT, source TEXT, target TEXT, deleted_md5 TEXT );
CREATE TABLE ajxp_index ( node_id INTEGER PRIMARY KEY AUTOINCREMENT, node_path TEXT, bytesize NUMERIC, md5 TEXT, mtime NUMERIC, stat_result BLOB);
CREATE TABLE ajxp_last_buffer ( id INTEGER PRIMARY KEY AUTOINCREMENT, type TEXT, location TEXT, source TEXT, target TEXT );
CREATE TABLE ajxp_node_status ("node_id" INTEGER PRIMARY KEY  NOT NULL , "status" TEXT NOT NULL  DEFAULT 'NEW', "detail" TEXT);
CREATE TABLE events (id INTEGER PRIMARY KEY AUTOINCREMENT, type text, message text, source text, target text, action text, status text, date text);

CREATE TRIGGER LOG_DELETE AFTER DELETE ON ajxp_index BEGIN INSERT INTO ajxp_changes (node_id,source,target,type,deleted_md5) VALUES (old.node_id, old.node_path, "NULL", "delete", old.md5); END;
CREATE TRIGGER LOG_INSERT AFTER INSERT ON ajxp_index BEGIN INSERT INTO ajxp_changes (node_id,source,target,type) VALUES (new.node_id, "NULL", new.node_path, "create"); END;
CREATE TRIGGER LOG_UPDATE_CONTENT AFTER UPDATE ON "ajxp_index" FOR EACH ROW BEGIN INSERT INTO "ajxp_changes" (node_id,source,target,type) VALUES (new.node_id, old.node_path, new.node_path, CASE WHEN old.node_path = new.node_path THEN "content" ELSE "path" END);END;
CREATE TRIGGER STATUS_DELETE AFTER DELETE ON "ajxp_index" BEGIN DELETE FROM ajxp_node_status WHERE node_id=old.node_id; END;
CREATE TRIGGER STATUS_INSERT AFTER INSERT ON "ajxp_index" BEGIN INSERT INTO ajxp_node_status (node_id) VALUES (new.node_id); END;

CREATE INDEX changes_node_id ON ajxp_changes( node_id );
CREATE INDEX changes_type ON ajxp_changes( type );
CREATE INDEX changes_node_source ON ajxp_changes( source );
CREATE INDEX index_node_id ON ajxp_index( node_id );
CREATE INDEX index_node_path ON ajxp_index( node_path );
CREATE INDEX index_bytesize ON ajxp_index( bytesize );
CREATE INDEX index_md5 ON ajxp_index( md5 );
CREATE INDEX node_status_status ON ajxp_node_status( status );
Run Code Online (Sandbox Code Playgroud)

test_sqlite.py

这是意外失败的单元测试类. TestStateManagement.test_db_clean传递,表明表格已正确创建. TestStateManagement.test_inode_create失败,重新确认检索到零结果.

import os.path as osp

from twisted.internet import defer
from twisted.enterprise import adbapi

import sqlengine # see below

class TestStateManagement(TestCase):

    def setUp(self):
        self.meta = mkdtemp()

        self.db = adbapi.ConnectionPool(
            "sqlite3", osp.join(self.meta, "db.sqlite"), check_same_thread=False,
        )
        self.stateman = sqlengine.StateManager(self.db)

        with open("init.sql") as f:
            script = f.read()

        self.d = self.db.runInteraction(lambda c, s: c.executescript(s), script)

    def tearDown(self):
        self.db.close()
        del self.db
        del self.stateman
        del self.d

        rmtree(self.meta)

    @defer.inlineCallbacks
    def test_db_clean(self):
        """Canary test to ensure that the db is initialized in a blank state"""

        yield self.d  # wait for db to be initialized

        q = "SELECT name FROM sqlite_master WHERE type='table' AND name=?;"
        for table in ("ajxp_index", "ajxp_changes"):
            res = yield self.db.runQuery(q, (table,))
            self.assertTrue(
                len(res) == 1,
                "table {0} does not exist".format(table)
         )

    @defer.inlineCallbacks
    def test_inode_create_file(self):
        yield self.d

        path = osp.join(self.ws, "test.txt")
        with open(path, "wt") as f:
            pass

        inode = mk_dummy_inode(path)
        yield self.stateman.create(inode, directory=False)

        entry = yield self.db.runQuery("SELECT * FROM ajxp_index")
        emsg = "got {0} results, expected 1.  Are canary tests failing?"
        lentry = len(entry)
        self.assertTrue(lentry == 1, emsg.format(lentry))
Run Code Online (Sandbox Code Playgroud)

sqlengine.py

这些是通过上述单元测试进行测试的人工制品.

def values_as_tuple(d, *param):
    """Return the values for each key in `param` as a tuple"""
    return tuple(map(d.get, param))


class StateManager:
    """Manages the SQLite database's state, ensuring that it reflects the state
    of the filesystem.
    """

    log = Logger()

    def __init__(self, db):
        self._db = db

    def create(self, inode, directory=False):
        params = values_as_tuple(
            inode, "node_path", "bytesize", "md5", "mtime", "stat_result"
        )

        directive = (
            "INSERT INTO ajxp_index (node_path,bytesize,md5,mtime,stat_result) "
            "VALUES (?,?,?,?,?);"
        )

        return self._db.runOperation(directive, params)
Run Code Online (Sandbox Code Playgroud)

案例2:bug消失在外面 twisted.trial

#! /usr/bin/env python

import os.path as osp
from tempfile import mkdtemp

from twisted.enterprise import adbapi
from twisted.internet.task import react
from twisted.internet.defer import inlineCallbacks

INIT_FILE = "example.sql"


def values_as_tuple(d, *param):
    """Return the values for each key in `param` as a tuple"""
    return tuple(map(d.get, param))


def create(db, inode):
    params = values_as_tuple(
        inode, "node_path", "bytesize", "md5", "mtime", "stat_result"
    )

    directive = (
        "INSERT INTO ajxp_index (node_path,bytesize,md5,mtime,stat_result) "
        "VALUES (?,?,?,?,?);"
    )

    return db.runOperation(directive, params)


def init_database(db):
    with open(INIT_FILE) as f:
        script = f.read()

    return db.runInteraction(lambda c, s: c.executescript(s), script)


@react
@inlineCallbacks
def main(reactor):
    meta = mkdtemp()
    db = adbapi.ConnectionPool(
        "sqlite3", osp.join(meta, "db.sqlite"), check_same_thread=False,
    )

    yield init_database(db)

    # Let's make sure the tables were created as expected and that we're
    # starting from a blank slate
    res = yield db.runQuery("SELECT * FROM ajxp_index LIMIT 1")
    assert not res, "database is not empty [ajxp_index]"

    res = yield db.runQuery("SELECT * FROM ajxp_changes LIMIT 1")
    assert not res, "database is not empty [ajxp_changes]"

    # The details of this are not important.  Suffice to say they (should)
    # conform to the DB schema for ajxp_index.
    test_data = {
        "node_path": "/this/is/some/arbitrary/path.ext",
        "bytesize": 0,
        "mtime": 179273.0,
        "stat_result": b"this simulates a blob of raw binary data",
        "md5": "d41d8cd98f00b204e9800998ecf8427e",  # arbitrary
    }

    # store the test data in the ajxp_index table
    yield create(db, test_data)

    # test if the entry exists in the db
    entry = yield db.runQuery("SELECT * FROM ajxp_index")
    assert len(entry) == 1, "got {0} results, expected 1".format(len(entry))

    print("OK")
Run Code Online (Sandbox Code Playgroud)

闭幕致辞

再次,在使用sqlitebrowser检查时,似乎正在写入数据db.sqlite,因此这看起来像检索问题.从这里开始,我有点难过......任何想法?

编辑

此代码将生成inode可用于测试的代码.

def mk_dummy_inode(path, isdir=False):
return {
    "node_path": path,
    "bytesize": osp.getsize(path),
    "mtime": osp.getmtime(path),
    "stat_result": dumps(stat(path), protocol=4),
    "md5": "directory" if isdir else "d41d8cd98f00b204e9800998ecf8427e",
}
Run Code Online (Sandbox Code Playgroud)

blz*_*blz 2

好吧,事实证明这有点棘手。单独运行测试(正如发布到这个问题的那样)使得错误很少发生。然而,当在整个测试套件的上下文中运行时,它几乎 100% 的时间都会失败。

yield task.deferLater(reactor, .00001, lambda: None)在写入数据库之后和从数据库读取之前添加,这解决了问题。

从那里,我怀疑这可能是源于连接池和 sqlite 有限的并发容忍度的竞争条件。我尝试将cb_mincb_max参数设置为ConnectionPoolto 1,这也解决了问题。

简而言之:似乎 sqlite 不能很好地处理多个连接,并且适当的修复方法是尽可能避免并发。