forget about old peers (closes #26)

Add a new method to RIIntroducer, to allow the central introducer node to
remove peers from the active set after they've gone away. Without this,
client nodes accumulate stale peer FURLs forever. This introduces a
compatibility break, as old introducers won't know about the 'lost_peers'
message, although the errors produced are probably harmless.
This commit is contained in:
Brian Warner 2007-05-07 19:10:24 -07:00
parent e0bedb64ea
commit 04af4a48b5
3 changed files with 92 additions and 17 deletions

View File

@ -19,6 +19,8 @@ ShareData = StringConstraint(100000)
class RIIntroducerClient(RemoteInterface):
def new_peers(pburls=SetOf(PBURL)):
return None
def lost_peers(pburls=SetOf(PBURL)):
return None
class RIIntroducer(RemoteInterface):
def hello(node=RIIntroducerClient, pburl=PBURL):

View File

@ -7,6 +7,15 @@ from foolscap import Referenceable
from allmydata.interfaces import RIIntroducer, RIIntroducerClient
from allmydata.util import idlib, observer
def ignoreDeadRef(target, *args, **kwargs):
from twisted.internet import error
from foolscap import DeadReferenceError
d = target.callRemote(*args, **kwargs)
def _ignore(f):
f.trap(error.ConnectionDone, error.ConnectError,
error.ConnectionLost, DeadReferenceError)
d.addErrback(_ignore)
class Introducer(service.MultiService, Referenceable):
implements(RIIntroducer)
@ -21,6 +30,11 @@ class Introducer(service.MultiService, Referenceable):
log.msg(" introducer: removing %s %s" % (node, pburl))
self.nodes.remove(node)
self.pburls.remove(pburl)
for othernode in self.nodes:
#othernode.callRemote("lost_peers", set([pburl]))
#othernode.callRemoteOnly("lost_peers", set([pburl]))
ignoreDeadRef(othernode, "lost_peers", set([pburl]))
node.notifyOnDisconnect(_remove)
self.pburls.add(pburl)
node.callRemote("new_peers", self.pburls)
@ -40,7 +54,7 @@ class IntroducerClient(service.Service, Referenceable):
self.connections = {} # k: nodeid, v: ref
self.reconnectors = {} # k: PBURL, v: reconnector
self.connection_observers = observer.ObserverList()
self.change_observers = observer.ObserverList()
def startService(self):
self.introducer_reconnector = self.tub.connectTo(self.introducer_pburl,
@ -53,11 +67,16 @@ class IntroducerClient(service.Service, Referenceable):
for pburl in pburls:
self._new_peer(pburl)
def remote_lost_peers(self, pburls):
for pburl in pburls:
self._lost_peer(pburl)
def stopService(self):
service.Service.stopService(self)
self.introducer_reconnector.stopConnecting()
for reconnector in self.reconnectors.itervalues():
reconnector.stopConnecting()
self.reconnectors = {}
def _new_peer(self, pburl):
if pburl in self.reconnectors:
@ -76,7 +95,7 @@ class IntroducerClient(service.Service, Referenceable):
nodeid = idlib.a2b(m.group(1))
def _got_peer(rref):
self.log(" connected to(%s)" % idlib.b2a(nodeid))
self.connection_observers.notify(nodeid, rref)
self.change_observers.notify("add", nodeid, rref)
self.connections[nodeid] = rref
def _lost():
# TODO: notifyOnDisconnect uses eventually(), but connects do
@ -92,8 +111,19 @@ class IntroducerClient(service.Service, Referenceable):
node=self,
pburl=self.my_pburl)
def notify_on_new_connection(self, cb):
"""Register a callback that will be fired (with nodeid, rref) when
a new connection is established."""
self.connection_observers.subscribe(cb)
def notify_on_change(self, cb):
"""Register a callback that will be fired (with ('add',nodeid,rref)
or ('remove',pburl) ) when a new connection is established or a peer
is lost. This is used by the unit tests."""
self.change_observers.subscribe(cb)
def _lost_peer(self, pburl):
if pburl in self.reconnectors:
self.reconnectors[pburl].stopConnecting()
del self.reconnectors[pburl]
self.change_observers.notify("remove", pburl)
# TODO: we don't currently bother to terminate any connections we
# might have to this peer. The assumption is that, since the
# introducer lost their connection to this peer, we'll probably lose
# our connection too. Also, foolscap doesn't currently provide a
# clean way to terminate a given connection.

View File

@ -48,7 +48,7 @@ class TestIntroducer(unittest.TestCase):
ic = IntroducerClient(None, "introducer", "mypburl")
def _ignore(nodeid, rref):
pass
ic.notify_on_new_connection(_ignore)
ic.notify_on_change(_ignore)
def test_listen(self):
i = Introducer()
@ -71,11 +71,23 @@ class TestIntroducer(unittest.TestCase):
self.waiting_for_connections = NUMCLIENTS*NUMCLIENTS
d = self._done_counting = defer.Deferred()
def _count(nodeid, rref):
log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref))
self.waiting_for_connections -= 1
if self.waiting_for_connections == 0:
self._done_counting.callback("done!")
self._done_counting_down = defer.Deferred()
self.waiting_for_disconnections = None
def _count(changetype, *args):
if changetype == "add":
nodeid, rref = args
log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref))
self.waiting_for_connections -= 1
if self.waiting_for_connections == 0:
self._done_counting.callback("done!")
else:
pburl = args[0]
log.msg("LOST PEER! %s" % (pburl,))
if self.waiting_for_disconnections is not None:
self.waiting_for_disconnections -= 1
if self.waiting_for_disconnections == 0:
self._done_counting_down.callback("done")
clients = []
tubs = {}
@ -91,7 +103,7 @@ class TestIntroducer(unittest.TestCase):
n = MyNode()
node_pburl = tub.registerReference(n)
c = IntroducerClient(tub, iurl, node_pburl)
c.notify_on_new_connection(_count)
c.notify_on_change(_count)
c.setServiceParent(self.parent)
clients.append(c)
tubs[c] = tub
@ -102,7 +114,8 @@ class TestIntroducer(unittest.TestCase):
log.msg("doing _check")
for c in clients:
self.failUnlessEqual(len(c.connections), NUMCLIENTS)
# now disconnect somebody's connection to someone else
# now disconnect somebody's connection to someone else, and check
# to see that the connection is reestablished
self.waiting_for_connections = 2
d2 = self._done_counting = defer.Deferred()
origin_c = clients[0]
@ -116,12 +129,14 @@ class TestIntroducer(unittest.TestCase):
log.msg(" did disconnect")
return d2
d.addCallback(_check)
def _check_again(res):
log.msg("doing _check_again")
for c in clients:
self.failUnlessEqual(len(c.connections), NUMCLIENTS)
# now disconnect somebody's connection to themselves. This will
# only result in one new connection, since it is a loopback.
# now disconnect somebody's connection to themselves, and make
# sure it reconnects. This will only result in one new
# connection, since it is a loopback.
self.waiting_for_connections = 1
d2 = self._done_counting = defer.Deferred()
origin_c = clients[0]
@ -135,12 +150,40 @@ class TestIntroducer(unittest.TestCase):
log.msg(" did disconnect")
return d2
d.addCallback(_check_again)
def _check_again2(res):
log.msg("doing _check_again2")
for c in clients:
self.failUnlessEqual(len(c.connections), NUMCLIENTS)
# now disconnect somebody's connection to themselves
d.addCallback(_check_again2)
def _shutdown_one_client(res):
log.msg("_shutdown_one_client, waiting for %d shutdowns" %
(NUMCLIENTS-1,))
# shutdown a single client, make sure everyone else notices
self.waiting_for_disconnections = NUMCLIENTS-1
victim_client = clients[0]
victim_tub = tubs[victim_client]
# disownServiceParent will stop the service too
d1 = defer.maybeDeferred(victim_client.disownServiceParent)
def _stoptub(res):
log.msg("_stoptub")
return victim_tub.disownServiceParent()
d1.addCallback(_stoptub)
def _wait_for_counting_down(res):
log.msg("_wait_for_counting_down")
return self._done_counting_down
d1.addCallback(_wait_for_counting_down)
return d1
d.addCallback(_shutdown_one_client)
def _check_shutdown(res):
log.msg("_check_shutdown")
c = clients[1]
self.failUnlessEqual(len(c.connections), NUMCLIENTS-1)
self.failUnlessEqual(len(c.reconnectors), NUMCLIENTS-1)
d.addCallback(_check_shutdown)
return d
test_system.timeout = 2400