Python观察者模式:示例,提示?

Jim*_*nis 43 python design-patterns listener observer-pattern

是否有在Python中实现的GoF Observer的示例性示例?我有一些代码,当前有一些调试代码通过密钥类(如果设置了魔法env,当前生成消息给stderr).此外,该类还有一个接口,用于递增返回结果以及存储它们(在内存中)以进行后期处理.(该类本身是一个作业管理器,用于通过ssh在远程机器上同时执行命令).

目前该类的用法类似于:

job = SSHJobMan(hostlist, cmd)
job.start()
while not job.done():
    for each in job.poll():
        incrementally_process(job.results[each])
        time.sleep(0.2) # or other more useful work
post_process(job.results)
Run Code Online (Sandbox Code Playgroud)

一个易用的使用模型是:

job = SSHJobMan(hostlist, cmd)
job.wait()  # implicitly performs a start()
process(job.results)
Run Code Online (Sandbox Code Playgroud)

这一切都适用于当前的实用程序.但它确实缺乏灵活性.例如,我目前支持简短的输出格式或进度条作为增量结果,我还支持该post_process()功能的简短,完整和"合并消息"输出.

但是,我想支持多个结果/输出流(到终端的进度条,对日志文件的调试和警告,从成功作业到一个文件/目录的输出,错误消息以及从非成功作业到另一个作业的其他结果)等).

这听起来像是一个需要Observer的情况......我的类的实例接受来自其他对象的注册,并在发生时使用特定类型的事件回调它们.

我正在看PyPubSub,因为我在SO相关问题中看到了几个引用.我不确定我是否已经准备好将外部依赖项添加到我的实用程序中,但是我可以看到使用它们的接口作为我的模型的价值,如果这将使其他人更容易使用它.(该项目既可以作为独立的命令行实用程序,也可以作为编写其他脚本/实用程序的类).

总之,我知道如何做我想要的......但是有很多方法可以实现它.从长远来看,我想知道最有可能为代码的其他用户工作的建议.

代码本身位于:classh.

Jas*_*rff 50

但它确实缺乏灵活性.

嗯......实际上,如果您想要的是异步API,这对我来说就像是一个很好的设计.通常是.也许你只需要从stderr切换到Python的logging模块,它有一种自己的发布/订阅模型,Logger.addHandler()等等.

如果你确实想支持观察者,我的建议是保持简单.你真的只需要几行代码.

class Event(object):
    pass

class Observable(object):
    def __init__(self):
        self.callbacks = []
    def subscribe(self, callback):
        self.callbacks.append(callback)
    def fire(self, **attrs):
        e = Event()
        e.source = self
        for k, v in attrs.iteritems():
            setattr(e, k, v)
        for fn in self.callbacks:
            fn(e)
Run Code Online (Sandbox Code Playgroud)

您的Job类可以是子类Observable.当感兴趣的事情发生时,打电话self.fire(type="progress", percent=50)等.


Pit*_*kos 21

我认为其他人的回答是过分的.您可以使用少于15行代码轻松地在Python中实现事件.

你简单有两个类:EventObserver.任何想要监听事件的类都需要继承Observer并设置为侦听(观察)特定事件.当Event被实例化和燃煤,听该事件的所有观察员将运行指定的回调函数.

class Observer():
    _observers = []
    def __init__(self):
        self._observers.append(self)
        self._observables = {}
    def observe(self, event_name, callback):
        self._observables[event_name] = callback


class Event():
    def __init__(self, name, data, autofire = True):
        self.name = name
        self.data = data
        if autofire:
            self.fire()
    def fire(self):
        for observer in Observer._observers:
            if self.name in observer._observables:
                observer._observables[self.name](self.data)
Run Code Online (Sandbox Code Playgroud)

示例:

class Room(Observer):

    def __init__(self):
        print("Room is ready.")
        Observer.__init__(self) # Observer's init needs to be called
    def someone_arrived(self, who):
        print(who + " has arrived!")

room = Room()
room.observe('someone arrived',  room.someone_arrived)

Event('someone arrived', 'Lenard')
Run Code Online (Sandbox Code Playgroud)

输出:

Room is ready.
Lenard has arrived!
Run Code Online (Sandbox Code Playgroud)


Jas*_*rff 13

还有一些方法......

示例:日志记录模块

也许您只需要从stderr切换到Python的logging模块,该模块具有强大的发布/订阅模型.

开始生成日志记录很容易.

# producer
import logging

log = logging.getLogger("myjobs")  # that's all the setup you need

class MyJob(object):
    def run(self):
        log.info("starting job")
        n = 10
        for i in range(n):
            log.info("%.1f%% done" % (100.0 * i / n))
        log.info("work complete")
Run Code Online (Sandbox Code Playgroud)

在消费者方面,还有一些工作要做.不幸的是,配置记录器输出需要7个完整的代码行.;)

# consumer
import myjobs, sys, logging

if user_wants_log_output:
    ch = logging.StreamHandler(sys.stderr)
    ch.setLevel(logging.INFO)
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    ch.setFormatter(formatter)
    myjobs.log.addHandler(ch)
    myjobs.log.setLevel(logging.INFO)

myjobs.MyJob().run()
Run Code Online (Sandbox Code Playgroud)

另一方面,日志包中有大量的东西.如果您需要将日志数据发送到一组旋转文件,一个电子邮件地址和Windows事件日志,那么您就可以了.

示例:最简单的观察者

但是你根本不需要使用任何库.支持观察者的一种非常简单的方法是调用一个什么都不做的方法.

# producer
class MyJob(object):
    def on_progress(self, pct):
        """Called when progress is made. pct is the percent complete.
        By default this does nothing. The user may override this method
        or even just assign to it."""
        pass

    def run(self):
        n = 10
        for i in range(n):
            self.on_progress(100.0 * i / n)
        self.on_progress(100.0)

# consumer
import sys, myjobs
job = myjobs.MyJob()
job.on_progress = lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()
Run Code Online (Sandbox Code Playgroud)

有时不是写一个lambda,你可以说job.on_progress = progressBar.update,这很好.

这很简单.一个缺点是它自然不支持订阅相同事件的多个监听器.

示例:类似C#的事件

通过一些支持代码,您可以在Python中获得类似C#的事件.这是代码:

# glue code
class event(object):
    def __init__(self, func):
        self.__doc__ = func.__doc__
        self._key = ' ' + func.__name__
    def __get__(self, obj, cls):
        try:
            return obj.__dict__[self._key]
        except KeyError, exc:
            be = obj.__dict__[self._key] = boundevent()
            return be

class boundevent(object):
    def __init__(self):
        self._fns = []
    def __iadd__(self, fn):
        self._fns.append(fn)
        return self
    def __isub__(self, fn):
        self._fns.remove(fn)
        return self
    def __call__(self, *args, **kwargs):
        for f in self._fns[:]:
            f(*args, **kwargs)
Run Code Online (Sandbox Code Playgroud)

生产者使用装饰器声明事件:

# producer
class MyJob(object):
    @event
    def progress(pct):
        """Called when progress is made. pct is the percent complete."""

    def run(self):
        n = 10
        for i in range(n+1):
            self.progress(100.0 * i / n)

#consumer
import sys, myjobs
job = myjobs.MyJob()
job.progress += lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()
Run Code Online (Sandbox Code Playgroud)

这与上面的"简单观察者"代码完全相同,但您可以根据需要添加任意数量的侦听器+=.(与C#不同,没有事件处理程序类型,new EventHandler(foo.bar)在订阅事件时没有必要,并且在触发事件之前不必检查null.就像C#一样,事件不会压制异常.)

如何选择

如果logging您需要的一切,请使用它.否则做最适合你的事情.需要注意的关键是你不需要承担大的外部依赖.


Dan*_*ank 7

如果一个实现对象不是因为他们正在观察某些东西而保持活着的呢?请在下面找到具有以下功能的观察者模式的实现:

  1. 用法是pythonic.要将观察者添加到.bar实例的绑定方法foo,只需执行foo.bar.addObserver(observer).
  2. 由于是观察者,观察者不会活着.换句话说,观察者代码不使用强引用.
  3. 不需要子分类(描述符ftw).
  4. 可以与不可用的类型一起使用.
  5. 可以在单个类中多次使用.
  6. (奖金)截至今天,代码存在于github上的适当可下载,可安装的软件包中.

这是代码(github包PyPI包具有最新的实现):

import weakref
import functools

class ObservableMethod(object):
    """
    A proxy for a bound method which can be observed.

    I behave like a bound method, but other bound methods can subscribe to be
    called whenever I am called.
    """

    def __init__(self, obj, func):
        self.func = func
        functools.update_wrapper(self, func)
        self.objectWeakRef = weakref.ref(obj)
        self.callbacks = {}  #observing object ID -> weak ref, methodNames

    def addObserver(self, boundMethod):
        """
        Register a bound method to observe this ObservableMethod.

        The observing method will be called whenever this ObservableMethod is
        called, and with the same arguments and keyword arguments. If a
        boundMethod has already been registered to as a callback, trying to add
        it again does nothing. In other words, there is no way to sign up an
        observer to be called back multiple times.
        """
        obj = boundMethod.__self__
        ID = id(obj)
        if ID in self.callbacks:
            s = self.callbacks[ID][1]
        else:
            wr = weakref.ref(obj, Cleanup(ID, self.callbacks))
            s = set()
            self.callbacks[ID] = (wr, s)
        s.add(boundMethod.__name__)

    def discardObserver(self, boundMethod):
        """
        Un-register a bound method.
        """
        obj = boundMethod.__self__
        if id(obj) in self.callbacks:
            self.callbacks[id(obj)][1].discard(boundMethod.__name__)

    def __call__(self, *arg, **kw):
        """
        Invoke the method which I proxy, and all of it's callbacks.

        The callbacks are called with the same *args and **kw as the main
        method.
        """
        result = self.func(self.objectWeakRef(), *arg, **kw)
        for ID in self.callbacks:
            wr, methodNames = self.callbacks[ID]
            obj = wr()
            for methodName in methodNames:
                getattr(obj, methodName)(*arg, **kw)
        return result

    @property
    def __self__(self):
        """
        Get a strong reference to the object owning this ObservableMethod

        This is needed so that ObservableMethod instances can observe other
        ObservableMethod instances.
        """
        return self.objectWeakRef()


class ObservableMethodDescriptor(object):

    def __init__(self, func):
        """
        To each instance of the class using this descriptor, I associate an
        ObservableMethod.
        """
        self.instances = {}  # Instance id -> (weak ref, Observablemethod)
        self._func = func

    def __get__(self, inst, cls):
        if inst is None:
            return self
        ID = id(inst)
        if ID in self.instances:
            wr, om = self.instances[ID]
            if not wr():
                msg = "Object id %d should have been cleaned up"%(ID,)
                raise RuntimeError(msg)
        else:
            wr = weakref.ref(inst, Cleanup(ID, self.instances))
            om = ObservableMethod(inst, self._func)
            self.instances[ID] = (wr, om)
        return om

    def __set__(self, inst, val):
        raise RuntimeError("Assigning to ObservableMethod not supported")


def event(func):
    return ObservableMethodDescriptor(func)


class Cleanup(object):
    """
    I manage remove elements from a dict whenever I'm called.

    Use me as a weakref.ref callback to remove an object's id from a dict
    when that object is garbage collected.
    """
    def __init__(self, key, d):
        self.key = key
        self.d = d

    def __call__(self, wr):
        del self.d[self.key]
Run Code Online (Sandbox Code Playgroud)

要使用它,我们只需装饰我们想要观察的方法@event.这是一个例子

class Foo(object):
    def __init__(self, name):
        self.name = name

    @event
    def bar(self):
        print("%s called bar"%(self.name,))

    def baz(self):
        print("%s called baz"%(self.name,))

a = Foo('a')
b = Foo('b')
a.bar.addObserver(b.bar)
a.bar()
Run Code Online (Sandbox Code Playgroud)


Ewa*_*odd 5

来自维基百科

from collections import defaultdict

class Observable (defaultdict):

  def __init__ (self):
      defaultdict.__init__(self, object)

  def emit (self, *args):
      '''Pass parameters to all observers and update states.'''
      for subscriber in self:
          response = subscriber(*args)
          self[subscriber] = response

  def subscribe (self, subscriber):
      '''Add a new subscriber to self.'''
      self[subscriber]

  def stat (self):
      '''Return a tuple containing the state of each observer.'''
      return tuple(self.values())
Run Code Online (Sandbox Code Playgroud)

Observable 就是这样使用的。

myObservable = Observable ()

# subscribe some inlined functions.
# myObservable[lambda x, y: x * y] would also work here.
myObservable.subscribe(lambda x, y: x * y)
myObservable.subscribe(lambda x, y: float(x) / y)
myObservable.subscribe(lambda x, y: x + y)
myObservable.subscribe(lambda x, y: x - y)

# emit parameters to each observer
myObservable.emit(6, 2)

# get updated values
myObservable.stat()         # returns: (8, 3.0, 4, 12)
Run Code Online (Sandbox Code Playgroud)

  • 我在发布之前阅读了维基百科示例。它似乎不适用于我的需求,也似乎没有描述 Python 社区的一些常见或最佳实践。此外,我认为 defaultdict 和 self[subscriber] 的使用是难以理解的,并且我选择与 Python 2.4.x 兼容(通常随 Linux 发行版一起提供——我的主要目标受众)排除了我的选择。 (3认同)
  • @Jim Denis:请用此评论更新您的问题。这是改变答案的信息。 (2认同)