SWI-Prolog:使用消息队列进行线程安全的数据库读/写与`library(persistency)`

Sho*_*hon 4 multithreading message-queue prolog swi-prolog

SWI-Prolog 宣称自己是LAMP堆栈的单一语言替代品.在更换M(ySQL)时,文档列举了几种方法,其中library(persistency)似乎是最简单的方法,能够提供持久的,可更新的数据库.

该文档library(persistency)提供了以下示例,该示例演示了如何在从数据库读取和更新数据时使用互斥锁来避免无效状态:

:- module(user_db,
          [ attach_user_db/1,       % +File
            current_user_role/2,    % ?User, ?Role
            add_user/2,         % +User, +Role
            set_user_role/2     % +User, +Role
          ]).
:- use_module(library(persistency)).

:- persistent
        user_role(name:atom, role:oneof([user,administrator])).

attach_user_db(File) :-
        db_attach(File, []).

%%  current_user_role(+Name, -Role) is semidet.

current_user_role(Name, Role) :-
        with_mutex(user_db, user_role(Name, Role)).

add_user(Name, Role) :-
        assert_user_role(Name, Role).

set_user_role(Name, Role) :-
        user_role(Name, Role), !.
set_user_role(Name, Role) :-
        with_mutex(user_db,
                   (  retractall_user_role(Name, _),
                      assert_user_role(Name, Role))).
Run Code Online (Sandbox Code Playgroud)

但是,在有关线程同步的文档中mutex,它指出了这一点

谓词的with_mutex/2行为once/1与守卫目标相同.这意味着我们的谓词address/2不再是一个很好的逻辑非确定性关系.

不得不放弃"良好的逻辑非确定性关系"以保持有效性似乎是一个糟糕的讨价还价,因为良好的逻辑非确定性关系是Prolog的主要优势!幸运的是,建议采用更具吸引力的替代方案:

消息队列(请参阅参考资料message_queue_create/3)通常为线程提供更简单,更健壮的通信方式.

听起来我应该能够使用消息队列,如线程通信文档中所示,以便在不牺牲LP范例的本质的情况下实现安全读取/更新.不幸的是,对线程来说还是一个新手,我无法弄清楚消息队列的用途是什么样的!

我希望有人能够改变这个library(persistency)例子,用更合适的消息队列替换互斥体的使用,确保数据库状态的恒定有效性而不牺牲非确定性关系.

Eya*_*yal 7

在SWI Prolog中,每个线程都有自己的消息队列.因此,您可以在线程上运行数据库服务器,并让客户端将查询发布到数据库的消息队列.数据库将一次处理一个请求,以便数据库始终有效.查询仍然是确定性的(如一次/ 1),但正如您所指出的,与情况不同with_mutex/2,数据库关系本身可以相关地指定.

[注意,我正在展示如何使用内置的SWI Prolog消息队列来执行此操作,但您也可以使用Pengines,这可能更加用户友好,并且内置了对远程执行的支持.

首先,我将删除with_mutex电话:

:- module(user_db,
          [ attach_user_db/1,       % +File
            current_user_role/2,    % ?User, ?Role
            add_user/2,         % +User, +Role
            set_user_role/2     % +User, +Role
          ]).
:- use_module(library(persistency)).

:- persistent
        user_role(name:atom, role:oneof([user,administrator])).

attach_user_db(File) :-
        db_attach(File, []).

%%  current_user_role(+Name, -Role) is semidet.

current_user_role(Name, Role) :-
        user_role(Name, Role).

add_user(Name, Role) :-
        assert_user_role(Name, Role).

set_user_role(Name, Role) :-
    user_role(Name, Role), !.
set_user_role(Name, Role) :-
    retractall_user_role(Name, _),
    assert_user_role(Name, Role).
Run Code Online (Sandbox Code Playgroud)

我刚刚在同一个文件中添加了数据库服务器代码,但它应该可以去其他地方.另外,打开:- debug(db),我在多线程代码中发现必不可少的调试消息.

我们需要一个谓词来启动db_thread.它的名字是db,它是"分离的",所以当系统退出时它将被清理.线程通过调用启动db_run/0.

db_up(File, DbThreadId) :-
    db_attach(File, []),
    thread_create(db_run, DbThreadId, [detached(true), alias(db)]),
    debug(db, 'db thread created~n').   
Run Code Online (Sandbox Code Playgroud)

`db_run/0'是一个故障驱动的循环,它在db线程中运行并检查其消息队列中的新消息.收到消息后,将调用该消息.完成后,循环再次启动.

db_run :-
    debug(db, 'db_run:...', []),
    repeat, 
      thread_get_message(db, Msg, []),
      debug(db, 'Received: ~p', [Msg]),
      Msg,            
      debug(db, 'db_query succeeded', []),
      fail.
Run Code Online (Sandbox Code Playgroud)

客户端发送表单的消息db_query(<Query>, <ClientThreadId>),因此我们需要一个db_query/2实际运行计算的谓词.它向客户端线程发送成功,失败或异常消息.

:- meta_predicate user_db:db_query(0,*).
db_query(Goal, ClientId) :-
    catch((Goal -> Status = true; Status = false),
          Err,
          Status = err(Err)), 
    (   Status = true ->
            Response = db_response(succ(Goal))
     ;
     Status = false ->
         Response = db_response(fail)
     ;
     Status = err(Err) ->
         Response = db_response(err(Err))
    ),
    debug(db, 'db_query/2: sending message ~w to ~p', [Response, ClientId]),
    thread_send_message(ClientId, Response).
Run Code Online (Sandbox Code Playgroud)

最后,我们需要一个谓词,它将查询从客户端发布到数据库.发送消息后,客户端将等待响应client_wait/1.

:- meta_predicate client_post(0).
client_post(Goal) :-
    thread_self(Me),
    Msg = db_query(Goal, Me),
    debug(db, 'client_post/1: sending message ~p...', [Msg]),
    thread_send_message(db, Msg),
    debug(db, 'client_post/1: waiting...', []),
    client_wait(Goal).
Run Code Online (Sandbox Code Playgroud)

client_wait/1等待db_response()形式的消息(在失败之前最多1秒,但你可能想要更聪明地做一些事情).它

:- meta_predicate client_wait(0).
client_wait(Goal) :-
    thread_self(Me), 
    thread_get_message(Me, db_response(Term), [timeout(1)]), % blocks until db_response(_) arrives
    Msg = db_response(Term),
    debug(db, 'Client received ~p', [Msg]),
    (   Term = succ(Goal) ->
            debug(db, 'client_wait/1: exit with true', []),
            true
     ;
     Term = fail ->
         fail
     ;
     Term = err(Err) ->
         throw(Err)
     ;
     domain_error(db_response_message, Msg)
    ).
Run Code Online (Sandbox Code Playgroud)

有了这个,我们可以创建数据库并发送查询:

$ swipl -l db_thread.pl 
Welcome to SWI-Prolog (Multi-threaded, 64 bits, Version 7.3.24-127-g9b94a9f-DIRTY)
Copyright (c) 1990-2016 University of Amsterdam, VU Amsterdam
SWI-Prolog comes with ABSOLUTELY NO WARRANTY. This is free software,
and you are welcome to redistribute it under certain conditions.
Please visit http://www.swi-prolog.org for details.

For help, use ?- help(Topic). or ?- apropos(Word).

?- user_db:db_up('db.pl', DB).
db thread created
DB = db.

?- Xs = [bob-administrator, john-user, bill-user], user_db:client_post(forall(member(U-R, Xs), add_user(U, R))).
Xs = [bob-administrator, john-user, bill-user].

?- findall(U, user_db:client_post(current_user_role(U, user)), Users).  %% queries are posted as in once/1
Users = [john].

?- user_db:client_post(findall(U, current_user_role(U, user), Users)).  %% but db predicates are themselves relational
Users = [john, bill].
Run Code Online (Sandbox Code Playgroud)

这个设置的一个小测试保留了数据库的一致性.在这个文件中test_db.pl,我创建了db,并运行了两个线程.运行的一个toggle/0在两个用户角色dbs之间切换,另一个运行print/0只打印出用户及其角色.我们切换世界,随机间隔时间,200次.同时,另一个线程以200个随机间隔的时间打印出db.

test_db.pl:

:- use_module(user_db).

:- initialization user_db:db_up('db.pl', _), test.

world(1, [bob-administrator,
       john-user]).

world(2, [bob-user,
         john-administrator]).

set_world(I) :-
    world(I, Xs),
    forall(member(U-R, Xs),
           set_user_role(U, R)).

print_world :-
    findall(U-R,
            current_user_role(U, R),
            URs),
    sort(URs, URs1),
    format('~p~n', [URs1]).



random_sleep :-
    random(R), 
    X is R * 0.05,
    sleep(X).

toggle(0) :- !. 
toggle(N) :-
    forall(world(I, _), 
           (user_db:client_post(set_world(I)),
             random_sleep)),
    succ(N0, N),
    toggle(N0).

print(0) :- !. 
print(N) :-
    user_db:client_post(print_world),
    succ(N0, N),
    random_sleep, 
    print(N0).



test :-

    thread_create(toggle(100), Id1, []), 
    thread_create(print(200), Id2, []),
    thread_join(Id1, _),
    thread_join(Id2, _).
Run Code Online (Sandbox Code Playgroud)

我们运行这个$ swipl -l test_db.pl:

[bob-administrator,john-user]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
...
Run Code Online (Sandbox Code Playgroud)