共享 txpostgres 连接池

swi*_*ard 4 python postgresql psycopg2 twisted

我们有一个 RESTful(-ish)扭曲的应用程序,它使用 txpostgres 来访问 postgres 数据库。txpostgres.Connection目前,每次客户端 ping 服务器进行数据库调用时,我们都会生成新实例。这是低效的,会导致我们的数据库很快就不堪重负。我一直在尝试改用它来代替使用txpostgres.ConnectionPool,但遇到了麻烦。现在我有一些看起来像这样的东西:

class DBTester(object):
    def __init__(self):
        self.cfg = load_config('local')  # load the db settings from a JSON file
        self.pool = ConnectionPool(None, min=1, **self.cfg) # create the pool

    @defer.inlineCallbacks
    def get_pool(self):
        yield self.pool.start()
        defer.returnValue(self.pool)


class DBT(object):
    def __init__(self):
        self.db = DBTester()

    @defer.inlineCallbacks
    def t(self):
        conn = yield self.db.get_pool()
        res = yield conn.runQuery('select * from clients')
        println('DBT.t result: {}'.format(res))


if __name__ == "__main__":
    dbt = DBT()
    dbt.t()
    dbt.t()

    reactor.run()
Run Code Online (Sandbox Code Playgroud)

问题在于pool.start()通话的时间。如果我把它放进去DBTester.__init__,我就明白了psycopg2.OperationalError: asynchronous connection attempt underway。如果我将其放入DBTester.get_pool,则一个db.t()调用有效,而其他调用则失败并显示exceptions.AttributeError: 'NoneType' object has no attribute 'runQuery'。我基本上一整天都在努力解决这个问题,但无法破解它,也无法在网上找到太多信息。

我真的只需要一个指向一些如何ConnectionPool使用的最小示例的指针。有什么建议么?

bru*_*ard 6

听起来您的问题不在于 txpostgres,而在于扭曲和异步的思维方式。

exceptions.AttributeError: 'NoneType' object has no attribute 'runQuery' 表示: 在建立连接之前,您试图在数据库之后抛出 SQL 查询。那是愚蠢的!所以现在我想我会抛出一个异常,让亲爱的用户知道这种疯狂。

所以,如果你有类似的东西,这可能会发生

pool = ConnectionPool(None, min=1)
d1 = pool.start()
d2 = pool.runQuery('select tablename from pg_tables')
Run Code Online (Sandbox Code Playgroud)

这段代码在反应堆中创建了两个 deferred 和 thorws 。只有调度算法知道首先执行两者中的哪一个,如果是d2,则发生错误。

txpostgres.txpostgres.AlreadyConnected 的意思是:很好的自我解释,启动一个已经启动的池是没有意义的。

psycopg2.OperationalError:正在进行异步连接尝试意味着:
当您开始执行 SQL 语句时,我正在设置一个不错的异步数据库连接。数据库连接还没有准备好,因此没有执行 sql 查询,这让我很伤心。我想我会抛出一个操作错误,所以亲爱的用户知道语句失败了。

好的,所以我们需要一种方法确保在我们在数据库之后抛出 sql 查询之前建立连接。下面是一个使用回调来实现这一点的代码示例。

from txpostgres.txpostgres import ConnectionPool
from twisted.internet import reactor, defer
from twisted.python import log, util


class SomeClass(object):

    pool = ConnectionPool(
        None,
        min=1,
        user="user",
        password="pass",
        host='host.com')

    @defer.inlineCallbacks
    def fetch_tables(self):
        res = yield self.pool.runQuery('select tablename from pg_tables')
        defer.returnValue(res)


if __name__ == "__main__":

    def querydb(n=10):
        dl = []
        for i in range(n):
            d = s.fetch_tables()
            d.addCallback(lambda tables: util.println(len(tables)))
            dl.append(d)
        return defer.DeferredList(dl)

    s = SomeClass()
    d_startpool = s.pool.start()
    d_startpool.addCallback(lambda _: querydb())
    d_startpool.addCallback(lambda _: s.pool.close())
    d_startpool.addErrback(log.err)
    d_startpool.addBoth(lambda _: reactor.stop())

    reactor.run()
Run Code Online (Sandbox Code Playgroud)

希望这可以帮助。