logpublisher: implement subscribe/publish for log, add a sample client

This commit is contained in:
Brian Warner 2007-11-16 20:07:50 -07:00
parent c2765bd8c6
commit b29ce1c30a
3 changed files with 123 additions and 2 deletions

43
misc/get-logs.py Normal file
View File

@ -0,0 +1,43 @@
#! /usr/bin/python
import sys
import foolscap
from foolscap.schema import DictOf, Any
from twisted.internet import reactor
from zope.interface import implements
from twisted.python import log
#log.startLogging(sys.stderr)
class RILogObserver(foolscap.RemoteInterface):
def msg(logmsg=DictOf(str, Any())):
return None
class LogFetcher(foolscap.Referenceable):
implements(RILogObserver)
def start(self, target_furl):
print "Connecting.."
self._tub = foolscap.Tub()
self._tub.startService()
d = self._tub.getReference(target_furl)
d.addCallback(self._got_logpublisher)
d.addErrback(self._error)
def _error(self, f):
print "ERROR", f
reactor.stop()
def _got_logpublisher(self, publisher):
print "Connected"
d = publisher.callRemote("subscribe_to_all", self)
d.addErrback(self._error)
def remote_msg(self, d):
print d
target_furl = sys.argv[1]
lf = LogFetcher()
lf.start(target_furl)
#print "starting.."
reactor.run()

View File

@ -2,23 +2,42 @@
import os.path
from zope.interface import implements
from twisted.application import service
from twisted.python import log
from foolscap import Referenceable, RemoteInterface
from foolscap.schema import DictOf
from foolscap.schema import DictOf, Any
from foolscap.eventual import eventually
class RILogObserver(RemoteInterface):
def msg(logmsg=DictOf(str, Any())):
return None
class RISubscription(RemoteInterface):
pass
class RILogPublisher(RemoteInterface):
def get_versions():
return DictOf(str, str)
def subscribe_to_all(observer=RILogObserver):
return RISubscription
def unsubscribe(subscription=Any()):
# I don't know how to get the constraint right: unsubscribe() should
# accept return value of subscribe_to_all()
return None
class RILogGatherer(RemoteInterface):
def logport(nodeid=str, logport=RILogPublisher):
return None
class Subscription(Referenceable):
implements(RISubscription)
class LogPublisher(Referenceable, service.MultiService):
implements(RILogPublisher)
name = "log_publisher"
def __init__(self):
service.MultiService.__init__(self)
self._subscribers = {}
self._notifyOnDisconnectors = {}
def startService(self):
service.MultiService.startService(self)
@ -26,6 +45,28 @@ class LogPublisher(Referenceable, service.MultiService):
self.parent.tub.registerReference(self, furlFile=furlfile)
os.chmod(furlfile, 0600)
log.addObserver(self._twisted_log_observer)
def stopService(self):
log.removeObserver(self._twisted_log_observer)
return service.MultiService.stopService(self)
def _twisted_log_observer(self, d):
# Twisted will remove this for us if it fails.
# keys:
# ['message']: *args
# ['time']: float
# ['isError']: bool, usually False
# ['system']: string
for o in self._subscribers.values():
o.callRemoteOnly("msg", d)
#f = open("/tmp/f.out", "a")
#print >>f, d['message']
#f.close()
def remote_get_versions(self):
versions = self.parent.get_versions()
# our __version__ attributes are actually instances of
@ -34,3 +75,15 @@ class LogPublisher(Referenceable, service.MultiService):
return dict([(k,str(v))
for k,v in versions.items()])
def remote_subscribe_to_all(self, observer):
s = Subscription()
self._subscribers[s] = observer
c = observer.notifyOnDisconnect(self.remote_unsubscribe, s)
self._notifyOnDisconnectors[s] = c
return s
def remote_unsubscribe(self, s):
observer = self._subscribers.pop(s)
c = self._notifyOnDisconnectors.pop(s)
observer.dontNotifyOnDisconnect(c)

View File

@ -6,7 +6,7 @@ from twisted.internet import defer
from twisted.python import log
from foolscap import Tub, Referenceable
from foolscap.eventual import flushEventualQueue
from foolscap.eventual import fireEventually, flushEventualQueue
from twisted.application import service
import allmydata
from allmydata.node import Node, formatTimeTahoeStyle
@ -61,6 +61,7 @@ class TestCase(unittest.TestCase, testutil.SignalMixin):
def test_logpublisher(self):
basedir = "test_node/test_logpublisher"
fileutil.make_dirs(basedir)
observer = LogObserver()
n = TestNode(basedir)
n.setServiceParent(self.parent)
d = n.when_tub_ready()
@ -75,6 +76,23 @@ class TestCase(unittest.TestCase, testutil.SignalMixin):
self.failUnlessEqual(versions["allmydata"],
allmydata.__version__)
d.addCallback(_check)
d.addCallback(lambda res:
logport.callRemote("subscribe_to_all", observer))
def _emit(subscription):
self._subscription = subscription
log.msg("message here")
d.addCallback(_emit)
d.addCallback(fireEventually)
d.addCallback(fireEventually)
def _check_observer(res):
msgs = observer.messages
self.failUnlessEqual(len(msgs), 1)
#print msgs
self.failUnlessEqual(msgs[0]["message"], ("message here",) )
d.addCallback(_check_observer)
def _done(res):
return logport.callRemote("unsubscribe", self._subscription)
d.addCallback(_done)
return d
d.addCallback(_got_logport)
return d
@ -122,3 +140,10 @@ class Gatherer(Referenceable):
def remote_logport(self, nodeid, logport):
d = logport.callRemote("get_versions")
d.addCallback(self.d.callback)
class LogObserver(Referenceable):
implements(logpublisher.RILogObserver)
def __init__(self):
self.messages = []
def remote_msg(self, d):
self.messages.append(d)