add OneShotObserverList from the amdlib tree

This commit is contained in:
Brian Warner 2007-03-08 15:07:38 -07:00
parent 228e17560a
commit 929d725577
2 changed files with 166 additions and 0 deletions

View File

@ -0,0 +1,79 @@
from twisted.trial import unittest
from twisted.internet import defer, reactor
from allmydata.util import observer
def nextTurn(res=None):
d = defer.Deferred()
reactor.callLater(1, d.callback, res)
return d
class Observer(unittest.TestCase):
def test_oneshot(self):
ol = observer.OneShotObserverList()
d1 = ol.when_fired()
d2 = ol.when_fired()
def _addmore(res):
self.failUnlessEqual(res, "result")
d3 = ol.when_fired()
d3.addCallback(self.failUnlessEqual, "result")
return d3
d1.addCallback(_addmore)
ol.fire("result")
d4 = ol.when_fired()
dl = defer.DeferredList([d1,d2,d4])
return dl
def test_oneshot_fireagain(self):
ol = observer.OneShotObserverList()
d = ol.when_fired()
def _addmore(res):
self.failUnlessEqual(res, "result")
ol.fire_if_not_fired("result3") # should be ignored
d2 = ol.when_fired()
d2.addCallback(self.failUnlessEqual, "result")
return d2
d.addCallback(_addmore)
ol.fire("result")
ol.fire_if_not_fired("result2")
return d
def test_observerlist(self):
ol = observer.ObserverList()
l1 = []
l2 = []
l3 = []
ol.subscribe(l1.append)
ol.notify(1)
ol.subscribe(l2.append)
ol.notify(2)
ol.unsubscribe(l1.append)
ol.notify(3)
def _check(res):
self.failUnlessEqual(l1, [1,2])
if l2 == [3,2]:
msg = ("ObserverList does not yet guarantee ordering of "
"its calls, although it should. This only actually "
"ever fails under windows because time.time() has "
"low resolution and because Twisted does not "
"guarantee ordering of consecutive "
"reactor.callLater(0) calls, although it should. "
"This will be fixed by adding a dependency upon "
"Foolscap and using foolscap.eventual.eventually() "
"instead of callLater(0)")
self.todo = msg
self.failUnlessEqual(l2, [2,3])
d = nextTurn()
d.addCallback(_check)
def _step2(res):
def _add(a, b, c=None):
l3.append((a,b,c))
ol.unsubscribe(l2.append)
ol.subscribe(_add)
ol.notify(4, 5, c=6)
return nextTurn()
def _check2(res):
self.failUnlessEqual(l3, [(4,5,6)])
d.addCallback(_step2)
d.addCallback(_check2)
return d

View File

@ -0,0 +1,87 @@
# -*- test-case-name: allmydata.test.test_observer -*-
from twisted.internet import defer
from foolscap.eventual import eventually
class OneShotObserverList:
"""A one-shot event distributor."""
def __init__(self):
self._fired = False
self._result = None
self._watchers = []
self.__repr__ = self._unfired_repr
def _unfired_repr(self):
return "<OneShotObserverList [%s]>" % (self._watchers, )
def _fired_repr(self):
return "<OneShotObserverList -> %s>" % (self._result, )
def _get_result(self):
return self._result
def when_fired(self):
if self._fired:
return defer.succeed(self._get_result())
d = defer.Deferred()
self._watchers.append(d)
return d
def fire(self, result):
assert not self._fired
self._fired = True
self._result = result
self._fire(result)
def _fire(self, result):
for w in self._watchers:
eventually(w.callback, result)
del self._watchers
self.__repr__ = self._fired_repr
def fire_if_not_fired(self, result):
if not self._fired:
self.fire(result)
class LazyOneShotObserverList(OneShotObserverList):
"""
a variant of OneShotObserverList which does not retain
the result it handles, but rather retains a callable()
through which is retrieves the data if and when needed.
"""
def __init__(self):
OneShotObserverList.__init__(self)
def _get_result(self):
return self._result_producer()
def fire(self, result_producer):
"""
@param result_producer: a no-arg callable which
returns the data which is to be considered the
'result' for this observer list. note that this
function may be called multiple times - once
upon initial firing, and potentially once more
for each subsequent when_fired() deferred created
"""
assert not self._fired
self._fired = True
self._result_producer = result_producer
if self._watchers: # if not, don't call result_producer
self._fire(self._get_result())
class ObserverList:
"""A simple class to distribute events to a number of subscribers."""
def __init__(self):
self._watchers = []
def subscribe(self, observer):
self._watchers.append(observer)
def unsubscribe(self, observer):
self._watchers.remove(observer)
def notify(self, *args, **kwargs):
for o in self._watchers:
eventually(o, *args, **kwargs)