diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 1058a0493..440bd7e6d 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -19,8 +19,6 @@ ShareData = StringConstraint(100000) class RIIntroducerClient(RemoteInterface): def new_peers(pburls=SetOf(PBURL)): return None - def lost_peers(pburls=SetOf(PBURL)): - return None class RIIntroducer(RemoteInterface): def hello(node=RIIntroducerClient, pburl=PBURL): diff --git a/src/allmydata/introducer.py b/src/allmydata/introducer.py index 7060cc467..d2fe6c3c6 100644 --- a/src/allmydata/introducer.py +++ b/src/allmydata/introducer.py @@ -7,15 +7,6 @@ from foolscap import Referenceable from allmydata.interfaces import RIIntroducer, RIIntroducerClient from allmydata.util import idlib, observer -def ignoreDeadRef(target, *args, **kwargs): - from twisted.internet import error - from foolscap import DeadReferenceError - d = target.callRemote(*args, **kwargs) - def _ignore(f): - f.trap(error.ConnectionDone, error.ConnectError, - error.ConnectionLost, DeadReferenceError) - d.addErrback(_ignore) - class Introducer(service.MultiService, Referenceable): implements(RIIntroducer) @@ -30,11 +21,6 @@ class Introducer(service.MultiService, Referenceable): log.msg(" introducer: removing %s %s" % (node, pburl)) self.nodes.remove(node) self.pburls.remove(pburl) - for othernode in self.nodes: - #othernode.callRemote("lost_peers", set([pburl])) - #othernode.callRemoteOnly("lost_peers", set([pburl])) - ignoreDeadRef(othernode, "lost_peers", set([pburl])) - node.notifyOnDisconnect(_remove) self.pburls.add(pburl) node.callRemote("new_peers", self.pburls) @@ -54,7 +40,7 @@ class IntroducerClient(service.Service, Referenceable): self.connections = {} # k: nodeid, v: ref self.reconnectors = {} # k: PBURL, v: reconnector - self.change_observers = observer.ObserverList() + self.connection_observers = observer.ObserverList() def startService(self): self.introducer_reconnector = self.tub.connectTo(self.introducer_pburl, @@ -67,16 +53,11 @@ class IntroducerClient(service.Service, Referenceable): for pburl in pburls: self._new_peer(pburl) - def remote_lost_peers(self, pburls): - for pburl in pburls: - self._lost_peer(pburl) - def stopService(self): service.Service.stopService(self) self.introducer_reconnector.stopConnecting() for reconnector in self.reconnectors.itervalues(): reconnector.stopConnecting() - self.reconnectors = {} def _new_peer(self, pburl): if pburl in self.reconnectors: @@ -95,7 +76,7 @@ class IntroducerClient(service.Service, Referenceable): nodeid = idlib.a2b(m.group(1)) def _got_peer(rref): self.log(" connected to(%s)" % idlib.b2a(nodeid)) - self.change_observers.notify("add", nodeid, rref) + self.connection_observers.notify(nodeid, rref) self.connections[nodeid] = rref def _lost(): # TODO: notifyOnDisconnect uses eventually(), but connects do @@ -111,19 +92,8 @@ class IntroducerClient(service.Service, Referenceable): node=self, pburl=self.my_pburl) - def notify_on_change(self, cb): - """Register a callback that will be fired (with ('add',nodeid,rref) - or ('remove',pburl) ) when a new connection is established or a peer - is lost. This is used by the unit tests.""" - self.change_observers.subscribe(cb) + def notify_on_new_connection(self, cb): + """Register a callback that will be fired (with nodeid, rref) when + a new connection is established.""" + self.connection_observers.subscribe(cb) - def _lost_peer(self, pburl): - if pburl in self.reconnectors: - self.reconnectors[pburl].stopConnecting() - del self.reconnectors[pburl] - self.change_observers.notify("remove", pburl) - # TODO: we don't currently bother to terminate any connections we - # might have to this peer. The assumption is that, since the - # introducer lost their connection to this peer, we'll probably lose - # our connection too. Also, foolscap doesn't currently provide a - # clean way to terminate a given connection. diff --git a/src/allmydata/test/test_introducer.py b/src/allmydata/test/test_introducer.py index 40d8527fb..057926a60 100644 --- a/src/allmydata/test/test_introducer.py +++ b/src/allmydata/test/test_introducer.py @@ -48,7 +48,7 @@ class TestIntroducer(unittest.TestCase): ic = IntroducerClient(None, "introducer", "mypburl") def _ignore(nodeid, rref): pass - ic.notify_on_change(_ignore) + ic.notify_on_new_connection(_ignore) def test_listen(self): i = Introducer() @@ -71,23 +71,11 @@ class TestIntroducer(unittest.TestCase): self.waiting_for_connections = NUMCLIENTS*NUMCLIENTS d = self._done_counting = defer.Deferred() - self._done_counting_down = defer.Deferred() - self.waiting_for_disconnections = None - - def _count(changetype, *args): - if changetype == "add": - nodeid, rref = args - log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref)) - self.waiting_for_connections -= 1 - if self.waiting_for_connections == 0: - self._done_counting.callback("done!") - else: - pburl = args[0] - log.msg("LOST PEER! %s" % (pburl,)) - if self.waiting_for_disconnections is not None: - self.waiting_for_disconnections -= 1 - if self.waiting_for_disconnections == 0: - self._done_counting_down.callback("done") + def _count(nodeid, rref): + log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref)) + self.waiting_for_connections -= 1 + if self.waiting_for_connections == 0: + self._done_counting.callback("done!") clients = [] tubs = {} @@ -103,7 +91,7 @@ class TestIntroducer(unittest.TestCase): n = MyNode() node_pburl = tub.registerReference(n) c = IntroducerClient(tub, iurl, node_pburl) - c.notify_on_change(_count) + c.notify_on_new_connection(_count) c.setServiceParent(self.parent) clients.append(c) tubs[c] = tub @@ -114,8 +102,7 @@ class TestIntroducer(unittest.TestCase): log.msg("doing _check") for c in clients: self.failUnlessEqual(len(c.connections), NUMCLIENTS) - # now disconnect somebody's connection to someone else, and check - # to see that the connection is reestablished + # now disconnect somebody's connection to someone else self.waiting_for_connections = 2 d2 = self._done_counting = defer.Deferred() origin_c = clients[0] @@ -129,14 +116,12 @@ class TestIntroducer(unittest.TestCase): log.msg(" did disconnect") return d2 d.addCallback(_check) - def _check_again(res): log.msg("doing _check_again") for c in clients: self.failUnlessEqual(len(c.connections), NUMCLIENTS) - # now disconnect somebody's connection to themselves, and make - # sure it reconnects. This will only result in one new - # connection, since it is a loopback. + # now disconnect somebody's connection to themselves. This will + # only result in one new connection, since it is a loopback. self.waiting_for_connections = 1 d2 = self._done_counting = defer.Deferred() origin_c = clients[0] @@ -150,40 +135,12 @@ class TestIntroducer(unittest.TestCase): log.msg(" did disconnect") return d2 d.addCallback(_check_again) - def _check_again2(res): log.msg("doing _check_again2") for c in clients: self.failUnlessEqual(len(c.connections), NUMCLIENTS) + # now disconnect somebody's connection to themselves d.addCallback(_check_again2) - - def _shutdown_one_client(res): - log.msg("_shutdown_one_client, waiting for %d shutdowns" % - (NUMCLIENTS-1,)) - # shutdown a single client, make sure everyone else notices - self.waiting_for_disconnections = NUMCLIENTS-1 - victim_client = clients[0] - victim_tub = tubs[victim_client] - # disownServiceParent will stop the service too - d1 = defer.maybeDeferred(victim_client.disownServiceParent) - def _stoptub(res): - log.msg("_stoptub") - return victim_tub.disownServiceParent() - d1.addCallback(_stoptub) - def _wait_for_counting_down(res): - log.msg("_wait_for_counting_down") - return self._done_counting_down - d1.addCallback(_wait_for_counting_down) - return d1 - d.addCallback(_shutdown_one_client) - - def _check_shutdown(res): - log.msg("_check_shutdown") - c = clients[1] - self.failUnlessEqual(len(c.connections), NUMCLIENTS-1) - self.failUnlessEqual(len(c.reconnectors), NUMCLIENTS-1) - d.addCallback(_check_shutdown) - return d test_system.timeout = 2400