import os, re from base64 import b32decode from twisted.trial import unittest from twisted.internet import defer from twisted.python import log from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue from twisted.application import service from allmydata.interfaces import InsufficientVersionError from allmydata.introducer.client import IntroducerClient from allmydata.introducer.server import IntroducerService # test compatibility with old introducer .tac files from allmydata.introducer import IntroducerNode from allmydata.util import pollmixin import common_util as testutil class LoggingMultiService(service.MultiService): def log(self, msg, **kw): log.msg(msg, **kw) class Node(testutil.SignalMixin, unittest.TestCase): def test_loadable(self): basedir = "introducer.IntroducerNode.test_loadable" os.mkdir(basedir) q = IntroducerNode(basedir) d = fireEventually(None) d.addCallback(lambda res: q.startService()) d.addCallback(lambda res: q.when_tub_ready()) d.addCallback(lambda res: q.stopService()) d.addCallback(flushEventualQueue) return d class ServiceMixin: def setUp(self): self.parent = LoggingMultiService() self.parent.startService() def tearDown(self): log.msg("TestIntroducer.tearDown") d = defer.succeed(None) d.addCallback(lambda res: self.parent.stopService()) d.addCallback(flushEventualQueue) return d class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin): def test_create(self): ic = IntroducerClient(None, "introducer.furl", u"my_nickname", "my_version", "oldest_version") def test_listen(self): i = IntroducerService() i.setServiceParent(self.parent) def test_duplicate(self): i = IntroducerService() self.failUnlessEqual(len(i.get_announcements()), 0) self.failUnlessEqual(len(i.get_subscribers()), 0) furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra" furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra" ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0") ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0") ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0") i.remote_publish(ann1) self.failUnlessEqual(len(i.get_announcements()), 1) self.failUnlessEqual(len(i.get_subscribers()), 0) i.remote_publish(ann2) self.failUnlessEqual(len(i.get_announcements()), 2) self.failUnlessEqual(len(i.get_subscribers()), 0) i.remote_publish(ann1b) self.failUnlessEqual(len(i.get_announcements()), 2) self.failUnlessEqual(len(i.get_subscribers()), 0) class SystemTestMixin(ServiceMixin, pollmixin.PollMixin): def create_tub(self, portnum=0): tubfile = os.path.join(self.basedir, "tub.pem") self.central_tub = tub = Tub(certFile=tubfile) #tub.setOption("logLocalFailures", True) #tub.setOption("logRemoteFailures", True) tub.setOption("expose-remote-exception-types", False) tub.setServiceParent(self.parent) l = tub.listenOn("tcp:%d" % portnum) self.central_portnum = l.getPortnum() if portnum != 0: assert self.central_portnum == portnum tub.setLocation("localhost:%d" % self.central_portnum) class SystemTest(SystemTestMixin, unittest.TestCase): def test_system(self): self.basedir = "introducer/SystemTest/system" os.makedirs(self.basedir) return self.do_system_test(IntroducerService) test_system.timeout = 480 # occasionally takes longer than 350s on "draco" def do_system_test(self, create_introducer): self.create_tub() introducer = create_introducer() introducer.setServiceParent(self.parent) iff = os.path.join(self.basedir, "introducer.furl") tub = self.central_tub ifurl = self.central_tub.registerReference(introducer, furlFile=iff) self.introducer_furl = ifurl NUMCLIENTS = 5 # we have 5 clients who publish themselves, and an extra one does # which not. When the connections are fully established, all six nodes # should have 5 connections each. clients = [] tubs = {} received_announcements = {} NUM_SERVERS = NUMCLIENTS subscribing_clients = [] publishing_clients = [] for i in range(NUMCLIENTS+1): tub = Tub() #tub.setOption("logLocalFailures", True) #tub.setOption("logRemoteFailures", True) tub.setOption("expose-remote-exception-types", False) tub.setServiceParent(self.parent) l = tub.listenOn("tcp:0") portnum = l.getPortnum() tub.setLocation("localhost:%d" % portnum) log.msg("creating client %d: %s" % (i, tub.getShortTubID())) c = IntroducerClient(tub, self.introducer_furl, u"nickname-%d" % i, "version", "oldest") received_announcements[c] = ra = {} def got(serverid, ann_d, announcements): announcements[serverid] = ann_d c.subscribe_to("storage", got, received_announcements[c]) subscribing_clients.append(c) if i < NUMCLIENTS: node_furl = tub.registerReference(Referenceable()) c.publish(node_furl, "storage", "ri_name") publishing_clients.append(c) # the last one does not publish anything c.setServiceParent(self.parent) clients.append(c) tubs[c] = tub def _wait_for_all_connections(): for c in subscribing_clients: if len(received_announcements[c]) < NUM_SERVERS: return False return True d = self.poll(_wait_for_all_connections) def _check1(res): log.msg("doing _check1") dc = introducer._debug_counts self.failUnlessEqual(dc["inbound_message"], NUM_SERVERS) self.failUnlessEqual(dc["inbound_duplicate"], 0) self.failUnlessEqual(dc["inbound_update"], 0) self.failUnless(dc["outbound_message"]) for c in clients: self.failUnless(c.connected_to_introducer()) for c in subscribing_clients: cdc = c._debug_counts self.failUnless(cdc["inbound_message"]) self.failUnlessEqual(cdc["inbound_announcement"], NUM_SERVERS) self.failUnlessEqual(cdc["wrong_service"], 0) self.failUnlessEqual(cdc["duplicate_announcement"], 0) self.failUnlessEqual(cdc["update"], 0) self.failUnlessEqual(cdc["new_announcement"], NUM_SERVERS) anns = received_announcements[c] self.failUnlessEqual(len(anns), NUM_SERVERS) nodeid0 = b32decode(tubs[clients[0]].tubID.upper()) ann_d = anns[nodeid0] nick = ann_d["nickname"] self.failUnlessEqual(type(nick), unicode) self.failUnlessEqual(nick, u"nickname-0") for c in publishing_clients: cdc = c._debug_counts self.failUnlessEqual(cdc["outbound_message"], 1) d.addCallback(_check1) # force an introducer reconnect, by shutting down the Tub it's using # and starting a new Tub (with the old introducer). Everybody should # reconnect and republish, but the introducer should ignore the # republishes as duplicates. However, because the server doesn't know # what each client does and does not know, it will send them a copy # of the current announcement table anyway. d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub")) d.addCallback(lambda _ign: self.central_tub.disownServiceParent()) def _wait_for_introducer_loss(): for c in clients: if c.connected_to_introducer(): return False return True d.addCallback(lambda res: self.poll(_wait_for_introducer_loss)) def _restart_introducer_tub(_ign): log.msg("restarting introducer's Tub") dc = introducer._debug_counts self.expected_count = dc["inbound_message"] + NUM_SERVERS self.expected_subscribe_count = dc["inbound_subscribe"] + NUMCLIENTS+1 introducer._debug0 = dc["outbound_message"] for c in subscribing_clients: cdc = c._debug_counts c._debug0 = cdc["inbound_message"] self.create_tub(self.central_portnum) newfurl = self.central_tub.registerReference(introducer, furlFile=iff) assert newfurl == self.introducer_furl d.addCallback(_restart_introducer_tub) def _wait_for_introducer_reconnect(): # wait until: # all clients are connected # the introducer has received publish messages from all of them # the introducer has received subscribe messages from all of them # the introducer has sent (duplicate) announcements to all of them # all clients have received (duplicate) announcements dc = introducer._debug_counts for c in clients: if not c.connected_to_introducer(): return False if dc["inbound_message"] < self.expected_count: return False if dc["inbound_subscribe"] < self.expected_subscribe_count: return False for c in subscribing_clients: cdc = c._debug_counts if cdc["inbound_message"] < c._debug0+1: return False return True d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect)) def _check2(res): log.msg("doing _check2") # assert that the introducer sent out new messages, one per # subscriber dc = introducer._debug_counts self.failUnlessEqual(dc["inbound_message"], 2*NUM_SERVERS) self.failUnlessEqual(dc["inbound_duplicate"], NUM_SERVERS) self.failUnlessEqual(dc["inbound_update"], 0) self.failUnlessEqual(dc["outbound_message"], introducer._debug0 + len(subscribing_clients)) for c in clients: self.failUnless(c.connected_to_introducer()) for c in subscribing_clients: cdc = c._debug_counts self.failUnlessEqual(cdc["duplicate_announcement"], NUM_SERVERS) d.addCallback(_check2) # Then force an introducer restart, by shutting down the Tub, # destroying the old introducer, and starting a new Tub+Introducer. # Everybody should reconnect and republish, and the (new) introducer # will distribute the new announcements, but the clients should # ignore the republishes as duplicates. d.addCallback(lambda _ign: log.msg("shutting down introducer")) d.addCallback(lambda _ign: self.central_tub.disownServiceParent()) d.addCallback(lambda res: self.poll(_wait_for_introducer_loss)) def _restart_introducer(_ign): log.msg("restarting introducer") self.create_tub(self.central_portnum) for c in subscribing_clients: # record some counters for later comparison. Stash the values # on the client itself, because I'm lazy. cdc = c._debug_counts c._debug1 = cdc["inbound_announcement"] c._debug2 = cdc["inbound_message"] c._debug3 = cdc["new_announcement"] newintroducer = create_introducer() self.expected_message_count = NUM_SERVERS self.expected_announcement_count = NUM_SERVERS*len(subscribing_clients) self.expected_subscribe_count = len(subscribing_clients) newfurl = self.central_tub.registerReference(newintroducer, furlFile=iff) assert newfurl == self.introducer_furl d.addCallback(_restart_introducer) def _wait_for_introducer_reconnect2(): # wait until: # all clients are connected # the introducer has received publish messages from all of them # the introducer has received subscribe messages from all of them # the introducer has sent announcements for everybody to everybody # all clients have received all the (duplicate) announcements # at that point, the system should be quiescent dc = introducer._debug_counts for c in clients: if not c.connected_to_introducer(): return False if dc["inbound_message"] < self.expected_message_count: return False if dc["outbound_announcements"] < self.expected_announcement_count: return False if dc["inbound_subscribe"] < self.expected_subscribe_count: return False for c in subscribing_clients: cdc = c._debug_counts if cdc["inbound_announcement"] < c._debug1+NUM_SERVERS: return False return True d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect2)) def _check3(res): log.msg("doing _check3") for c in clients: self.failUnless(c.connected_to_introducer()) for c in subscribing_clients: cdc = c._debug_counts self.failUnless(cdc["inbound_announcement"] > c._debug1) self.failUnless(cdc["inbound_message"] > c._debug2) # there should have been no new announcements self.failUnlessEqual(cdc["new_announcement"], c._debug3) # and the right number of duplicate ones. There were # NUM_SERVERS from the servertub restart, and there should be # another NUM_SERVERS now self.failUnlessEqual(cdc["duplicate_announcement"], 2*NUM_SERVERS) d.addCallback(_check3) return d class TooNewServer(IntroducerService): VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999": { }, "application-version": "greetings from the crazy future", } class NonV1Server(SystemTestMixin, unittest.TestCase): # if the 1.3.0 client connects to a server that doesn't provide the 'v1' # protocol, it is supposed to provide a useful error instead of a weird # exception. def test_failure(self): self.basedir = "introducer/NonV1Server/failure" os.makedirs(self.basedir) self.create_tub() i = TooNewServer() i.setServiceParent(self.parent) self.introducer_furl = self.central_tub.registerReference(i) tub = Tub() tub.setOption("expose-remote-exception-types", False) tub.setServiceParent(self.parent) l = tub.listenOn("tcp:0") portnum = l.getPortnum() tub.setLocation("localhost:%d" % portnum) c = IntroducerClient(tub, self.introducer_furl, u"nickname-client", "version", "oldest") announcements = {} def got(serverid, ann_d): announcements[serverid] = ann_d c.subscribe_to("storage", got) c.setServiceParent(self.parent) # now we wait for it to connect and notice the bad version def _got_bad(): return bool(c._introducer_error) or bool(c._publisher) d = self.poll(_got_bad) def _done(res): self.failUnless(c._introducer_error) self.failUnless(c._introducer_error.check(InsufficientVersionError)) d.addCallback(_done) return d class DecodeFurl(unittest.TestCase): def test_decode(self): # make sure we have a working base64.b32decode. The one in # python2.4.[01] was broken. furl = 'pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i' m = re.match(r'pb://(\w+)@', furl) assert m nodeid = b32decode(m.group(1).upper()) self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")