使用 pydbus 库通过会话总线发送信号

Mar*_*Aja 2 python dbus

我正在使用pydbus并且我已经成功地使用它来监听会话总线上的信号(在“客户端”)。我现在想编写服务器端,程序在每次触发操作时发送信号(例如,当它在 FS 上写入文件时)。我真的没有在他们的 GitHub 上得到任何例子。他们只展示了如何编写一个基本的服务器,它有一堆客户端可以调用的方法(但这不是我想要的信号)。

仅供参考,客户端非常简单,如下所示:

# Subscribe to DBus dump signal
session_bus = SessionBus()
session_bus.subscribe(iface='my.iface',
                      signal='my_signal_name',
                      object='/my/object',
                      signal_fired=my_callback)

# Create and run main loop
loop = GObject.MainLoop()
loop.run()
Run Code Online (Sandbox Code Playgroud)

my_callback每次监听事件弹出时调用的方法在哪里(my_signal_name在这种情况下)

谢谢你的帮助。

小智 5

下面的 python3 / pydbus 程序将每秒发布一个随机整数到会话 D-Bus 上。

#!/usr/bin/env python3
#!
from pydbus.generic import signal
from pydbus import SessionBus
from gi.repository import GLib
import random
loop = GLib.MainLoop()
interface_name = "org.example.project_1.server_1"
bus = SessionBus()


class Server_XML(object):
    """
    Server_XML definition.
    Emit / Publish a signal that is a random integer every second 
    type='i' for integer. 
    """
    dbus = """
    <node>
        <interface name="org.example.project_1.server_1">
            <signal name="app_1_signal">
                <arg type='i'/>
            </signal>
        </interface>
    </node>
    """
    app_1_signal = signal()

def repeating_timer():
    """Generate random integer between 0 and 100 and emit over Session D-Bus
    return True to keep the GLib timer calling this function once a second."""
    random_integer = random.randint(0,100)
    print(random_integer)
    emit.app_1_signal(random_integer)
    return True


if __name__=="__main__":
    # Setup server to emit signals over the DBus
    emit = Server_XML()
    bus.publish(interface_name, emit)
    # Setup repeating timer
    GLib.timeout_add_seconds(interval=1, function=repeating_timer) 
    # Run loop with graceful ctrl C exiting.
    try:
        loop.run()
    except KeyboardInterrupt as e:
        loop.quit()
        print("\nExit by Control C")
Run Code Online (Sandbox Code Playgroud)

打开另一个 linux 控制台终端并使用 gdbus 实用程序来验证是否正在会话总线上发布整数。例如...

$ gdbus monitor --session --dest org.example.project_1.server_1
Monitoring signals from all objects owned by org.example.project_1.server_1
The name org.example.project_1.server_1 is owned by :1.618
/org/example/project_1/server_1: org.example.project_1.server_1.app_1_signal (36,)
/org/example/project_1/server_1: org.example.project_1.server_1.app_1_signal (37,)
/org/example/project_1/server_1: org.example.project_1.server_1.app_1_signal (25,)
Run Code Online (Sandbox Code Playgroud)

以下 python3 / pydbus 客户端程序订阅在会话 D-Bus 上发布的随机整数...

#!/usr/bin/env python3
#!
from pydbus.generic import signal
from pydbus import SessionBus
from gi.repository import GLib
loop = GLib.MainLoop()
dbus_filter = "/org/example/project_1/server_1"
bus = SessionBus()


def cb_server_signal_emission(*args):
    """
    Callback on emitting signal from server
    """
    print("Message: ", args)
    print("Data: ", str(args[4][0]))

if __name__=="__main__":
    # Subscribe to bus to monitor for server signal emissions
    bus.subscribe(object = dbus_filter, 
                  signal_fired = cb_server_signal_emission)    
    loop.run()
Run Code Online (Sandbox Code Playgroud)

此客户端程序的控制台输出将类似于...

$ python3 client_pydbus.py
Message:  (':1.621', '/org/example/project_1/server_1', 'org.example.project_1.server_1', 'app_1_signal', (72,))
Data:  72
Message:  (':1.621', '/org/example/project_1/server_1', 'org.example.project_1.server_1', 'app_1_signal', (46,))
Data:  46
Message:  (':1.621', '/org/example/project_1/server_1', 'org.example.project_1.server_1', 'app_1_signal', (47,))
Data:  47
Run Code Online (Sandbox Code Playgroud)