Jim*_*nis 43 python design-patterns listener observer-pattern
是否有在Python中实现的GoF Observer的示例性示例?我有一些代码,当前有一些调试代码通过密钥类(如果设置了魔法env,当前生成消息给stderr).此外,该类还有一个接口,用于递增返回结果以及存储它们(在内存中)以进行后期处理.(该类本身是一个作业管理器,用于通过ssh在远程机器上同时执行命令).
目前该类的用法类似于:
job = SSHJobMan(hostlist, cmd)
job.start()
while not job.done():
for each in job.poll():
incrementally_process(job.results[each])
time.sleep(0.2) # or other more useful work
post_process(job.results)
Run Code Online (Sandbox Code Playgroud)
一个易用的使用模型是:
job = SSHJobMan(hostlist, cmd)
job.wait() # implicitly performs a start()
process(job.results)
Run Code Online (Sandbox Code Playgroud)
这一切都适用于当前的实用程序.但它确实缺乏灵活性.例如,我目前支持简短的输出格式或进度条作为增量结果,我还支持该post_process()
功能的简短,完整和"合并消息"输出.
但是,我想支持多个结果/输出流(到终端的进度条,对日志文件的调试和警告,从成功作业到一个文件/目录的输出,错误消息以及从非成功作业到另一个作业的其他结果)等).
这听起来像是一个需要Observer的情况......我的类的实例接受来自其他对象的注册,并在发生时使用特定类型的事件回调它们.
我正在看PyPubSub,因为我在SO相关问题中看到了几个引用.我不确定我是否已经准备好将外部依赖项添加到我的实用程序中,但是我可以看到使用它们的接口作为我的模型的价值,如果这将使其他人更容易使用它.(该项目既可以作为独立的命令行实用程序,也可以作为编写其他脚本/实用程序的类).
总之,我知道如何做我想要的......但是有很多方法可以实现它.从长远来看,我想知道最有可能为代码的其他用户工作的建议.
代码本身位于:classh.
Jas*_*rff 50
但它确实缺乏灵活性.
嗯......实际上,如果您想要的是异步API,这对我来说就像是一个很好的设计.通常是.也许你只需要从stderr切换到Python的logging
模块,它有一种自己的发布/订阅模型,Logger.addHandler()
等等.
如果你确实想支持观察者,我的建议是保持简单.你真的只需要几行代码.
class Event(object):
pass
class Observable(object):
def __init__(self):
self.callbacks = []
def subscribe(self, callback):
self.callbacks.append(callback)
def fire(self, **attrs):
e = Event()
e.source = self
for k, v in attrs.iteritems():
setattr(e, k, v)
for fn in self.callbacks:
fn(e)
Run Code Online (Sandbox Code Playgroud)
您的Job类可以是子类Observable
.当感兴趣的事情发生时,打电话self.fire(type="progress", percent=50)
等.
Pit*_*kos 21
我认为其他人的回答是过分的.您可以使用少于15行代码轻松地在Python中实现事件.
你简单有两个类:Event
和Observer
.任何想要监听事件的类都需要继承Observer并设置为侦听(观察)特定事件.当Event
被实例化和燃煤,听该事件的所有观察员将运行指定的回调函数.
class Observer():
_observers = []
def __init__(self):
self._observers.append(self)
self._observables = {}
def observe(self, event_name, callback):
self._observables[event_name] = callback
class Event():
def __init__(self, name, data, autofire = True):
self.name = name
self.data = data
if autofire:
self.fire()
def fire(self):
for observer in Observer._observers:
if self.name in observer._observables:
observer._observables[self.name](self.data)
Run Code Online (Sandbox Code Playgroud)
示例:
class Room(Observer):
def __init__(self):
print("Room is ready.")
Observer.__init__(self) # Observer's init needs to be called
def someone_arrived(self, who):
print(who + " has arrived!")
room = Room()
room.observe('someone arrived', room.someone_arrived)
Event('someone arrived', 'Lenard')
Run Code Online (Sandbox Code Playgroud)
输出:
Room is ready.
Lenard has arrived!
Run Code Online (Sandbox Code Playgroud)
Jas*_*rff 13
还有一些方法......
也许您只需要从stderr切换到Python的logging
模块,该模块具有强大的发布/订阅模型.
开始生成日志记录很容易.
# producer
import logging
log = logging.getLogger("myjobs") # that's all the setup you need
class MyJob(object):
def run(self):
log.info("starting job")
n = 10
for i in range(n):
log.info("%.1f%% done" % (100.0 * i / n))
log.info("work complete")
Run Code Online (Sandbox Code Playgroud)
在消费者方面,还有一些工作要做.不幸的是,配置记录器输出需要7个完整的代码行.;)
# consumer
import myjobs, sys, logging
if user_wants_log_output:
ch = logging.StreamHandler(sys.stderr)
ch.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch.setFormatter(formatter)
myjobs.log.addHandler(ch)
myjobs.log.setLevel(logging.INFO)
myjobs.MyJob().run()
Run Code Online (Sandbox Code Playgroud)
另一方面,日志包中有大量的东西.如果您需要将日志数据发送到一组旋转文件,一个电子邮件地址和Windows事件日志,那么您就可以了.
但是你根本不需要使用任何库.支持观察者的一种非常简单的方法是调用一个什么都不做的方法.
# producer
class MyJob(object):
def on_progress(self, pct):
"""Called when progress is made. pct is the percent complete.
By default this does nothing. The user may override this method
or even just assign to it."""
pass
def run(self):
n = 10
for i in range(n):
self.on_progress(100.0 * i / n)
self.on_progress(100.0)
# consumer
import sys, myjobs
job = myjobs.MyJob()
job.on_progress = lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()
Run Code Online (Sandbox Code Playgroud)
有时不是写一个lambda,你可以说job.on_progress = progressBar.update
,这很好.
这很简单.一个缺点是它自然不支持订阅相同事件的多个监听器.
通过一些支持代码,您可以在Python中获得类似C#的事件.这是代码:
# glue code
class event(object):
def __init__(self, func):
self.__doc__ = func.__doc__
self._key = ' ' + func.__name__
def __get__(self, obj, cls):
try:
return obj.__dict__[self._key]
except KeyError, exc:
be = obj.__dict__[self._key] = boundevent()
return be
class boundevent(object):
def __init__(self):
self._fns = []
def __iadd__(self, fn):
self._fns.append(fn)
return self
def __isub__(self, fn):
self._fns.remove(fn)
return self
def __call__(self, *args, **kwargs):
for f in self._fns[:]:
f(*args, **kwargs)
Run Code Online (Sandbox Code Playgroud)
生产者使用装饰器声明事件:
# producer
class MyJob(object):
@event
def progress(pct):
"""Called when progress is made. pct is the percent complete."""
def run(self):
n = 10
for i in range(n+1):
self.progress(100.0 * i / n)
#consumer
import sys, myjobs
job = myjobs.MyJob()
job.progress += lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()
Run Code Online (Sandbox Code Playgroud)
这与上面的"简单观察者"代码完全相同,但您可以根据需要添加任意数量的侦听器+=
.(与C#不同,没有事件处理程序类型,new EventHandler(foo.bar)
在订阅事件时没有必要,并且在触发事件之前不必检查null.就像C#一样,事件不会压制异常.)
如果logging
您需要的一切,请使用它.否则做最适合你的事情.需要注意的关键是你不需要承担大的外部依赖.
如果一个实现对象不是因为他们正在观察某些东西而保持活着的呢?请在下面找到具有以下功能的观察者模式的实现:
.bar
实例的绑定方法foo
,只需执行foo.bar.addObserver(observer)
.import weakref
import functools
class ObservableMethod(object):
"""
A proxy for a bound method which can be observed.
I behave like a bound method, but other bound methods can subscribe to be
called whenever I am called.
"""
def __init__(self, obj, func):
self.func = func
functools.update_wrapper(self, func)
self.objectWeakRef = weakref.ref(obj)
self.callbacks = {} #observing object ID -> weak ref, methodNames
def addObserver(self, boundMethod):
"""
Register a bound method to observe this ObservableMethod.
The observing method will be called whenever this ObservableMethod is
called, and with the same arguments and keyword arguments. If a
boundMethod has already been registered to as a callback, trying to add
it again does nothing. In other words, there is no way to sign up an
observer to be called back multiple times.
"""
obj = boundMethod.__self__
ID = id(obj)
if ID in self.callbacks:
s = self.callbacks[ID][1]
else:
wr = weakref.ref(obj, Cleanup(ID, self.callbacks))
s = set()
self.callbacks[ID] = (wr, s)
s.add(boundMethod.__name__)
def discardObserver(self, boundMethod):
"""
Un-register a bound method.
"""
obj = boundMethod.__self__
if id(obj) in self.callbacks:
self.callbacks[id(obj)][1].discard(boundMethod.__name__)
def __call__(self, *arg, **kw):
"""
Invoke the method which I proxy, and all of it's callbacks.
The callbacks are called with the same *args and **kw as the main
method.
"""
result = self.func(self.objectWeakRef(), *arg, **kw)
for ID in self.callbacks:
wr, methodNames = self.callbacks[ID]
obj = wr()
for methodName in methodNames:
getattr(obj, methodName)(*arg, **kw)
return result
@property
def __self__(self):
"""
Get a strong reference to the object owning this ObservableMethod
This is needed so that ObservableMethod instances can observe other
ObservableMethod instances.
"""
return self.objectWeakRef()
class ObservableMethodDescriptor(object):
def __init__(self, func):
"""
To each instance of the class using this descriptor, I associate an
ObservableMethod.
"""
self.instances = {} # Instance id -> (weak ref, Observablemethod)
self._func = func
def __get__(self, inst, cls):
if inst is None:
return self
ID = id(inst)
if ID in self.instances:
wr, om = self.instances[ID]
if not wr():
msg = "Object id %d should have been cleaned up"%(ID,)
raise RuntimeError(msg)
else:
wr = weakref.ref(inst, Cleanup(ID, self.instances))
om = ObservableMethod(inst, self._func)
self.instances[ID] = (wr, om)
return om
def __set__(self, inst, val):
raise RuntimeError("Assigning to ObservableMethod not supported")
def event(func):
return ObservableMethodDescriptor(func)
class Cleanup(object):
"""
I manage remove elements from a dict whenever I'm called.
Use me as a weakref.ref callback to remove an object's id from a dict
when that object is garbage collected.
"""
def __init__(self, key, d):
self.key = key
self.d = d
def __call__(self, wr):
del self.d[self.key]
Run Code Online (Sandbox Code Playgroud)
要使用它,我们只需装饰我们想要观察的方法@event
.这是一个例子
class Foo(object):
def __init__(self, name):
self.name = name
@event
def bar(self):
print("%s called bar"%(self.name,))
def baz(self):
print("%s called baz"%(self.name,))
a = Foo('a')
b = Foo('b')
a.bar.addObserver(b.bar)
a.bar()
Run Code Online (Sandbox Code Playgroud)
来自维基百科:
from collections import defaultdict
class Observable (defaultdict):
def __init__ (self):
defaultdict.__init__(self, object)
def emit (self, *args):
'''Pass parameters to all observers and update states.'''
for subscriber in self:
response = subscriber(*args)
self[subscriber] = response
def subscribe (self, subscriber):
'''Add a new subscriber to self.'''
self[subscriber]
def stat (self):
'''Return a tuple containing the state of each observer.'''
return tuple(self.values())
Run Code Online (Sandbox Code Playgroud)
Observable 就是这样使用的。
myObservable = Observable ()
# subscribe some inlined functions.
# myObservable[lambda x, y: x * y] would also work here.
myObservable.subscribe(lambda x, y: x * y)
myObservable.subscribe(lambda x, y: float(x) / y)
myObservable.subscribe(lambda x, y: x + y)
myObservable.subscribe(lambda x, y: x - y)
# emit parameters to each observer
myObservable.emit(6, 2)
# get updated values
myObservable.stat() # returns: (8, 3.0, 4, 12)
Run Code Online (Sandbox Code Playgroud)