mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-24 07:06:41 +00:00
UNDO: forget about old peers (closes #26)
Add a new method to RIIntroducer, to allow the central introducer node to remove peers from the active set after they've gone away. Without this, client nodes accumulate stale peer FURLs forever. This introduces a compatibility break, as old introducers won't know about the 'lost_peers' message, although the errors produced are probably harmless.
This commit is contained in:
parent
04af4a48b5
commit
82ba0f8540
@ -19,8 +19,6 @@ ShareData = StringConstraint(100000)
|
||||
class RIIntroducerClient(RemoteInterface):
|
||||
def new_peers(pburls=SetOf(PBURL)):
|
||||
return None
|
||||
def lost_peers(pburls=SetOf(PBURL)):
|
||||
return None
|
||||
|
||||
class RIIntroducer(RemoteInterface):
|
||||
def hello(node=RIIntroducerClient, pburl=PBURL):
|
||||
|
@ -7,15 +7,6 @@ from foolscap import Referenceable
|
||||
from allmydata.interfaces import RIIntroducer, RIIntroducerClient
|
||||
from allmydata.util import idlib, observer
|
||||
|
||||
def ignoreDeadRef(target, *args, **kwargs):
|
||||
from twisted.internet import error
|
||||
from foolscap import DeadReferenceError
|
||||
d = target.callRemote(*args, **kwargs)
|
||||
def _ignore(f):
|
||||
f.trap(error.ConnectionDone, error.ConnectError,
|
||||
error.ConnectionLost, DeadReferenceError)
|
||||
d.addErrback(_ignore)
|
||||
|
||||
class Introducer(service.MultiService, Referenceable):
|
||||
implements(RIIntroducer)
|
||||
|
||||
@ -30,11 +21,6 @@ class Introducer(service.MultiService, Referenceable):
|
||||
log.msg(" introducer: removing %s %s" % (node, pburl))
|
||||
self.nodes.remove(node)
|
||||
self.pburls.remove(pburl)
|
||||
for othernode in self.nodes:
|
||||
#othernode.callRemote("lost_peers", set([pburl]))
|
||||
#othernode.callRemoteOnly("lost_peers", set([pburl]))
|
||||
ignoreDeadRef(othernode, "lost_peers", set([pburl]))
|
||||
|
||||
node.notifyOnDisconnect(_remove)
|
||||
self.pburls.add(pburl)
|
||||
node.callRemote("new_peers", self.pburls)
|
||||
@ -54,7 +40,7 @@ class IntroducerClient(service.Service, Referenceable):
|
||||
self.connections = {} # k: nodeid, v: ref
|
||||
self.reconnectors = {} # k: PBURL, v: reconnector
|
||||
|
||||
self.change_observers = observer.ObserverList()
|
||||
self.connection_observers = observer.ObserverList()
|
||||
|
||||
def startService(self):
|
||||
self.introducer_reconnector = self.tub.connectTo(self.introducer_pburl,
|
||||
@ -67,16 +53,11 @@ class IntroducerClient(service.Service, Referenceable):
|
||||
for pburl in pburls:
|
||||
self._new_peer(pburl)
|
||||
|
||||
def remote_lost_peers(self, pburls):
|
||||
for pburl in pburls:
|
||||
self._lost_peer(pburl)
|
||||
|
||||
def stopService(self):
|
||||
service.Service.stopService(self)
|
||||
self.introducer_reconnector.stopConnecting()
|
||||
for reconnector in self.reconnectors.itervalues():
|
||||
reconnector.stopConnecting()
|
||||
self.reconnectors = {}
|
||||
|
||||
def _new_peer(self, pburl):
|
||||
if pburl in self.reconnectors:
|
||||
@ -95,7 +76,7 @@ class IntroducerClient(service.Service, Referenceable):
|
||||
nodeid = idlib.a2b(m.group(1))
|
||||
def _got_peer(rref):
|
||||
self.log(" connected to(%s)" % idlib.b2a(nodeid))
|
||||
self.change_observers.notify("add", nodeid, rref)
|
||||
self.connection_observers.notify(nodeid, rref)
|
||||
self.connections[nodeid] = rref
|
||||
def _lost():
|
||||
# TODO: notifyOnDisconnect uses eventually(), but connects do
|
||||
@ -111,19 +92,8 @@ class IntroducerClient(service.Service, Referenceable):
|
||||
node=self,
|
||||
pburl=self.my_pburl)
|
||||
|
||||
def notify_on_change(self, cb):
|
||||
"""Register a callback that will be fired (with ('add',nodeid,rref)
|
||||
or ('remove',pburl) ) when a new connection is established or a peer
|
||||
is lost. This is used by the unit tests."""
|
||||
self.change_observers.subscribe(cb)
|
||||
def notify_on_new_connection(self, cb):
|
||||
"""Register a callback that will be fired (with nodeid, rref) when
|
||||
a new connection is established."""
|
||||
self.connection_observers.subscribe(cb)
|
||||
|
||||
def _lost_peer(self, pburl):
|
||||
if pburl in self.reconnectors:
|
||||
self.reconnectors[pburl].stopConnecting()
|
||||
del self.reconnectors[pburl]
|
||||
self.change_observers.notify("remove", pburl)
|
||||
# TODO: we don't currently bother to terminate any connections we
|
||||
# might have to this peer. The assumption is that, since the
|
||||
# introducer lost their connection to this peer, we'll probably lose
|
||||
# our connection too. Also, foolscap doesn't currently provide a
|
||||
# clean way to terminate a given connection.
|
||||
|
@ -48,7 +48,7 @@ class TestIntroducer(unittest.TestCase):
|
||||
ic = IntroducerClient(None, "introducer", "mypburl")
|
||||
def _ignore(nodeid, rref):
|
||||
pass
|
||||
ic.notify_on_change(_ignore)
|
||||
ic.notify_on_new_connection(_ignore)
|
||||
|
||||
def test_listen(self):
|
||||
i = Introducer()
|
||||
@ -71,23 +71,11 @@ class TestIntroducer(unittest.TestCase):
|
||||
|
||||
self.waiting_for_connections = NUMCLIENTS*NUMCLIENTS
|
||||
d = self._done_counting = defer.Deferred()
|
||||
self._done_counting_down = defer.Deferred()
|
||||
self.waiting_for_disconnections = None
|
||||
|
||||
def _count(changetype, *args):
|
||||
if changetype == "add":
|
||||
nodeid, rref = args
|
||||
def _count(nodeid, rref):
|
||||
log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref))
|
||||
self.waiting_for_connections -= 1
|
||||
if self.waiting_for_connections == 0:
|
||||
self._done_counting.callback("done!")
|
||||
else:
|
||||
pburl = args[0]
|
||||
log.msg("LOST PEER! %s" % (pburl,))
|
||||
if self.waiting_for_disconnections is not None:
|
||||
self.waiting_for_disconnections -= 1
|
||||
if self.waiting_for_disconnections == 0:
|
||||
self._done_counting_down.callback("done")
|
||||
|
||||
clients = []
|
||||
tubs = {}
|
||||
@ -103,7 +91,7 @@ class TestIntroducer(unittest.TestCase):
|
||||
n = MyNode()
|
||||
node_pburl = tub.registerReference(n)
|
||||
c = IntroducerClient(tub, iurl, node_pburl)
|
||||
c.notify_on_change(_count)
|
||||
c.notify_on_new_connection(_count)
|
||||
c.setServiceParent(self.parent)
|
||||
clients.append(c)
|
||||
tubs[c] = tub
|
||||
@ -114,8 +102,7 @@ class TestIntroducer(unittest.TestCase):
|
||||
log.msg("doing _check")
|
||||
for c in clients:
|
||||
self.failUnlessEqual(len(c.connections), NUMCLIENTS)
|
||||
# now disconnect somebody's connection to someone else, and check
|
||||
# to see that the connection is reestablished
|
||||
# now disconnect somebody's connection to someone else
|
||||
self.waiting_for_connections = 2
|
||||
d2 = self._done_counting = defer.Deferred()
|
||||
origin_c = clients[0]
|
||||
@ -129,14 +116,12 @@ class TestIntroducer(unittest.TestCase):
|
||||
log.msg(" did disconnect")
|
||||
return d2
|
||||
d.addCallback(_check)
|
||||
|
||||
def _check_again(res):
|
||||
log.msg("doing _check_again")
|
||||
for c in clients:
|
||||
self.failUnlessEqual(len(c.connections), NUMCLIENTS)
|
||||
# now disconnect somebody's connection to themselves, and make
|
||||
# sure it reconnects. This will only result in one new
|
||||
# connection, since it is a loopback.
|
||||
# now disconnect somebody's connection to themselves. This will
|
||||
# only result in one new connection, since it is a loopback.
|
||||
self.waiting_for_connections = 1
|
||||
d2 = self._done_counting = defer.Deferred()
|
||||
origin_c = clients[0]
|
||||
@ -150,40 +135,12 @@ class TestIntroducer(unittest.TestCase):
|
||||
log.msg(" did disconnect")
|
||||
return d2
|
||||
d.addCallback(_check_again)
|
||||
|
||||
def _check_again2(res):
|
||||
log.msg("doing _check_again2")
|
||||
for c in clients:
|
||||
self.failUnlessEqual(len(c.connections), NUMCLIENTS)
|
||||
# now disconnect somebody's connection to themselves
|
||||
d.addCallback(_check_again2)
|
||||
|
||||
def _shutdown_one_client(res):
|
||||
log.msg("_shutdown_one_client, waiting for %d shutdowns" %
|
||||
(NUMCLIENTS-1,))
|
||||
# shutdown a single client, make sure everyone else notices
|
||||
self.waiting_for_disconnections = NUMCLIENTS-1
|
||||
victim_client = clients[0]
|
||||
victim_tub = tubs[victim_client]
|
||||
# disownServiceParent will stop the service too
|
||||
d1 = defer.maybeDeferred(victim_client.disownServiceParent)
|
||||
def _stoptub(res):
|
||||
log.msg("_stoptub")
|
||||
return victim_tub.disownServiceParent()
|
||||
d1.addCallback(_stoptub)
|
||||
def _wait_for_counting_down(res):
|
||||
log.msg("_wait_for_counting_down")
|
||||
return self._done_counting_down
|
||||
d1.addCallback(_wait_for_counting_down)
|
||||
return d1
|
||||
d.addCallback(_shutdown_one_client)
|
||||
|
||||
def _check_shutdown(res):
|
||||
log.msg("_check_shutdown")
|
||||
c = clients[1]
|
||||
self.failUnlessEqual(len(c.connections), NUMCLIENTS-1)
|
||||
self.failUnlessEqual(len(c.reconnectors), NUMCLIENTS-1)
|
||||
d.addCallback(_check_shutdown)
|
||||
|
||||
return d
|
||||
test_system.timeout = 2400
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user