小编era*_*man的帖子

如何在单元测试中统计sqlalchemy查询

在Django中,我经常声明应该进行的查询数量,以便单元测试能够捕获新的N + 1查询问题

from django import db
from django.conf import settings
settings.DEBUG=True

class SendData(TestCase):
    def test_send(self):
        db.connection.queries = []
        event = Events.objects.all()[1:]
        s = str(event) # QuerySet is lazy, force retrieval
        self.assertEquals(len(db.connection.queries), 2)
Run Code Online (Sandbox Code Playgroud)

在SQLAlchemy中,通过echo在引擎上设置标志来启用对STDOUT的跟踪

engine.echo=True
Run Code Online (Sandbox Code Playgroud)

编写计算SQLAlchemy查询数量的测试的最佳方法是什么?

class SendData(TestCase):
    def test_send(self):
        event = session.query(Events).first()
        s = str(event)
        self.assertEquals( ... , 2)
Run Code Online (Sandbox Code Playgroud)

sqlalchemy

16
推荐指数
2
解决办法
1929
查看次数

如何教SQLAlchemy从断开连接中恢复?

根据http://docs.sqlalchemy.org/en/rel_0_9/core/pooling.html#disconnect-handling-pessimistic,如果连接池中的条目不再有效,可以检测SQLAlchemy重新连接.我创建了以下测试用例来测试它:

import subprocess
from sqlalchemy import create_engine, event
from sqlalchemy import exc
from sqlalchemy.pool import Pool

@event.listens_for(Pool, "checkout")
def ping_connection(dbapi_connection, connection_record, connection_proxy):
    cursor = dbapi_connection.cursor()
    try:
        print "pinging server"
        cursor.execute("SELECT 1")
    except:
        print "raising disconnect error"
        raise exc.DisconnectionError()
    cursor.close()

engine = create_engine('postgresql://postgres@localhost/test')

connection = engine.connect()

subprocess.check_call(['psql', str(engine.url), '-c',
    "select pg_terminate_backend(pid) from pg_stat_activity " +
    "where pid <> pg_backend_pid() " +
    "and datname='%s';" % engine.url.database],
    stdout=subprocess.PIPE)

result = connection.execute("select 'OK'")
for row in result:
    print "Success!", " ".join(row)
Run Code Online (Sandbox Code Playgroud)

但是没有恢复,我收到了这个例外: …

python sqlalchemy psycopg2

10
推荐指数
1
解决办法
4049
查看次数

使用kqueue响应多个事件类型

当事件注册时,kqueue提供与该事件类型相关的ID; 例如,文件描述符用于标识要监视的文件

int kq;
struct kevent ke;

kq = kqueue();
fd = open(argv[1], O_RDONLY);
EV_SET(&ke, fd, EVFILT_VNODE, EV_ADD, NOTE_DELETE | NOTE_RENAME, 0, NULL);
kevent(kq, &ke, 1, NULL, 0, NULL);

while (1) {
    kevent(kq, NULL, 0, &ke, 1, NULL);
    /* respond to file system event */
}
Run Code Online (Sandbox Code Playgroud)

现在,如果我还需要响应其他事件类型这样的信号,我们需要一个新的kqueue实例,以避免与ident参数的 冲突kevent().

kq_sig = kqueue();
struct kevent ke_sig;

/* set the handler and ignore SIGINT */
signal(SIGINT, SIG_IGN);
EV_SET(&ke_sig, SIGINT, EVFILT_SIGNAL, EV_ADD, 0, 0, NULL);
kevent(kq_sig, &ke_sig, 1, …
Run Code Online (Sandbox Code Playgroud)

kqueue

6
推荐指数
1
解决办法
1508
查看次数

标签 统计

sqlalchemy ×2

kqueue ×1

psycopg2 ×1

python ×1