From ea35563b811f1d8e1737d600bb52f9bfb1b741f1 Mon Sep 17 00:00:00 2001 From: David Stainton Date: Thu, 2 Jun 2016 16:47:58 +0000 Subject: [PATCH] Remove v1 introducer code and fix tests Fixed many of the test_introducer tests. Work-in-progress. --- src/allmydata/introducer/client.py | 80 +---- src/allmydata/introducer/interfaces.py | 20 -- src/allmydata/introducer/old.py | 469 ------------------------- src/allmydata/test/test_introducer.py | 199 ++--------- 4 files changed, 39 insertions(+), 729 deletions(-) delete mode 100644 src/allmydata/introducer/old.py diff --git a/src/allmydata/introducer/client.py b/src/allmydata/introducer/client.py index b042ca2d5..e76c3a205 100644 --- a/src/allmydata/introducer/client.py +++ b/src/allmydata/introducer/client.py @@ -2,12 +2,12 @@ import time, yaml from zope.interface import implements from twisted.application import service +from twisted.internet import defer from foolscap.api import Referenceable, eventually, RemoteInterface from allmydata.interfaces import InsufficientVersionError from allmydata.introducer.interfaces import IIntroducerClient, \ - RIIntroducerSubscriberClient_v1, RIIntroducerSubscriberClient_v2 + RIIntroducerSubscriberClient_v2 from allmydata.introducer.common import sign_to_foolscap, unsign_from_foolscap,\ - convert_announcement_v1_to_v2, convert_announcement_v2_to_v1, \ make_index, get_tubid_string_from_ann, get_tubid_string from allmydata.util import log from allmydata.util.rrefutil import add_version_to_remote_reference @@ -16,32 +16,6 @@ from allmydata.util.keyutil import BadSignatureError class InvalidCacheError(Exception): pass -class WrapV2ClientInV1Interface(Referenceable): # for_v1 - """I wrap a v2 IntroducerClient to make it look like a v1 client, so it - can be attached to an old server.""" - implements(RIIntroducerSubscriberClient_v1) - - def __init__(self, original): - self.original = original - - def remote_announce(self, announcements): - lp = self.original.log("received %d announcements (v1)" % - len(announcements)) - anns_v1 = set([convert_announcement_v1_to_v2(ann_v1) - for ann_v1 in announcements]) - return self.original.got_announcements(anns_v1, lp) - -class RIStubClient(RemoteInterface): # for_v1 - """Each client publishes a service announcement for a dummy object called - the StubClient. This object doesn't actually offer any services, but the - announcement helps the Introducer keep track of which clients are - subscribed (so the grid admin can keep track of things like the size of - the grid and the client versions in use. This is the (empty) - RemoteInterface for the StubClient.""" - -class StubClient(Referenceable): # for_v1 - implements(RIStubClient) - V1 = "http://allmydata.org/tahoe/protocols/introducer/v1" V2 = "http://allmydata.org/tahoe/protocols/introducer/v2" @@ -68,8 +42,6 @@ class IntroducerClient(service.Service, Referenceable): "my-version": self._my_version, "oldest-supported": self._oldest_supported, } - self._stub_client = None # for_v1 - self._stub_client_furl = None self._outbound_announcements = {} # not signed self._published_announcements = {} # signed @@ -156,7 +128,7 @@ class IntroducerClient(service.Service, Referenceable): def _got_introducer(self, publisher): self.log("connected to introducer, getting versions") - default = { "http://allmydata.org/tahoe/protocols/introducer/v1": + default = { "http://allmydata.org/tahoe/protocols/introducer/v2": { }, "application-version": "unknown: no get_version()", } @@ -170,9 +142,9 @@ class IntroducerClient(service.Service, Referenceable): def _got_versioned_introducer(self, publisher): self.log("got introducer version: %s" % (publisher.version,)) - # we require an introducer that speaks at least one of (V1, V2) - if not (V1 in publisher.version or V2 in publisher.version): - raise InsufficientVersionError("V1 or V2", publisher.version) + # we require an introducer that speaks at least V2 + if V2 not in publisher.version: + raise InsufficientVersionError("V2", publisher.version) self._publisher = publisher publisher.notifyOnDisconnect(self._disconnected) self._maybe_publish() @@ -213,32 +185,10 @@ class IntroducerClient(service.Service, Referenceable): self._my_subscriber_info) d.addBoth(self._debug_retired) else: - d = self._subscribe_handle_v1(service_name) # for_v1 + d = defer.fail(InsufficientVersionError("V2", self._publisher.version)) d.addErrback(log.err, facility="tahoe.introducer.client", level=log.WEIRD, umid="2uMScQ") - def _subscribe_handle_v1(self, service_name): # for_v1 - # they don't speak V2: must be a v1 introducer. Fall back to the v1 - # 'subscribe' method, using a client adapter. - ca = WrapV2ClientInV1Interface(self) - self._debug_outstanding += 1 - d = self._publisher.callRemote("subscribe", ca, service_name) - d.addBoth(self._debug_retired) - # We must also publish an empty 'stub_client' object, so the - # introducer can count how many clients are connected and see what - # versions they're running. - if not self._stub_client_furl: - self._stub_client = sc = StubClient() - self._stub_client_furl = self._tub.registerReference(sc) - def _publish_stub_client(ignored): - furl = self._stub_client_furl - self.publish("stub_client", - { "anonymous-storage-FURL": furl, - "permutation-seed-base32": get_tubid_string(furl), - }) - d.addCallback(_publish_stub_client) - return d - def create_announcement_dict(self, service_name, ann): ann_d = { "version": 0, # "seqnum" and "nonce" will be populated with new values in @@ -281,29 +231,17 @@ class IntroducerClient(service.Service, Referenceable): self._canary) d.addBoth(self._debug_retired) else: - d = self._handle_v1_publisher(ann_t) # for_v1 + d = defer.fail(InsufficientVersionError("V2", self._publisher.version)) d.addErrback(log.err, ann_t=ann_t, facility="tahoe.introducer.client", level=log.WEIRD, umid="xs9pVQ") - def _handle_v1_publisher(self, ann_t): # for_v1 - # they don't speak V2, so fall back to the old 'publish' method - # (which takes an unsigned tuple of bytestrings) - self.log("falling back to publish_v1", - level=log.UNUSUAL, umid="9RCT1A") - ann_v1 = convert_announcement_v2_to_v1(ann_t) - self._debug_outstanding += 1 - d = self._publisher.callRemote("publish", ann_v1) - d.addBoth(self._debug_retired) - return d - - def remote_announce_v2(self, announcements): lp = self.log("received %d announcements (v2)" % len(announcements)) return self.got_announcements(announcements, lp) def got_announcements(self, announcements, lp=None): - # this is the common entry point for both v1 and v2 announcements + # this is the common entry point for announcements self._debug_counts["inbound_message"] += 1 for ann_t in announcements: try: diff --git a/src/allmydata/introducer/interfaces.py b/src/allmydata/introducer/interfaces.py index 8d46b6663..fdd63580c 100644 --- a/src/allmydata/introducer/interfaces.py +++ b/src/allmydata/introducer/interfaces.py @@ -2,24 +2,8 @@ from zope.interface import Interface from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \ RemoteInterface, Referenceable -from old import RIIntroducerSubscriberClient_v1 FURL = StringConstraint(1000) -# old introducer protocol (v1): -# -# Announcements are (FURL, service_name, remoteinterface_name, -# nickname, my_version, oldest_supported) -# the (FURL, service_name, remoteinterface_name) refer to the service being -# announced. The (nickname, my_version, oldest_supported) refer to the -# client as a whole. The my_version/oldest_supported strings can be parsed -# by an allmydata.util.version.Version instance, and then compared. The -# first goal is to make sure that nodes are not confused by speaking to an -# incompatible peer. The second goal is to enable the development of -# backwards-compatibility code. - -Announcement_v1 = TupleOf(FURL, str, str, - str, str, str) - # v2 protocol over foolscap: Announcements are 3-tuples of (bytes, str, str) # or (bytes, none, none) Announcement_v2 = Any() @@ -41,12 +25,8 @@ class RIIntroducerPublisherAndSubscriberService_v2(RemoteInterface): __remote_name__ = "RIIntroducerPublisherAndSubscriberService_v2.tahoe.allmydata.com" def get_version(): return DictOf(str, Any()) - def publish(announcement=Announcement_v1): - return None def publish_v2(announcement=Announcement_v2, canary=Referenceable): return None - def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str): - return None def subscribe_v2(subscriber=RIIntroducerSubscriberClient_v2, service_name=str, subscriber_info=SubscriberInfo): """Give me a subscriber reference, and I will call its announce_v2() diff --git a/src/allmydata/introducer/old.py b/src/allmydata/introducer/old.py deleted file mode 100644 index acb3c3f78..000000000 --- a/src/allmydata/introducer/old.py +++ /dev/null @@ -1,469 +0,0 @@ - -import time -from base64 import b32decode -from zope.interface import implements, Interface -from twisted.application import service -import allmydata -from allmydata.interfaces import InsufficientVersionError -from allmydata.util import log, idlib, rrefutil -from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \ - RemoteInterface, Referenceable, eventually, SturdyRef -from allmydata.introducer.common import SubscriberDescriptor, \ - AnnouncementDescriptor -FURL = StringConstraint(1000) - -# We keep a copy of the old introducer (both client and server) here to -# support compatibility tests. The old client is supposed to handle the new -# server, and new client is supposed to handle the old server. - - -# Announcements are (FURL, service_name, remoteinterface_name, -# nickname, my_version, oldest_supported) -# the (FURL, service_name, remoteinterface_name) refer to the service being -# announced. The (nickname, my_version, oldest_supported) refer to the -# client as a whole. The my_version/oldest_supported strings can be parsed -# by an allmydata.util.version.Version instance, and then compared. The -# first goal is to make sure that nodes are not confused by speaking to an -# incompatible peer. The second goal is to enable the development of -# backwards-compatibility code. - -Announcement = TupleOf(FURL, str, str, - str, str, str) - -class RIIntroducerSubscriberClient_v1(RemoteInterface): - __remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com" - - def announce(announcements=SetOf(Announcement)): - """I accept announcements from the publisher.""" - return None - -# When Foolscap can handle multiple interfaces (Foolscap#17), the -# full-powered introducer will implement both RIIntroducerPublisher and -# RIIntroducerSubscriberService. Until then, we define -# RIIntroducerPublisherAndSubscriberService as a combination of the two, and -# make everybody use that. - -class RIIntroducerPublisher_v1(RemoteInterface): - """To publish a service to the world, connect to me and give me your - announcement message. I will deliver a copy to all connected subscribers.""" - __remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com" - - def publish(announcement=Announcement): - # canary? - return None - -class RIIntroducerSubscriberService_v1(RemoteInterface): - __remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com" - - def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str): - """Give me a subscriber reference, and I will call its new_peers() - method will any announcements that match the desired service name. I - will ignore duplicate subscriptions. - """ - return None - -class RIIntroducerPublisherAndSubscriberService_v1(RemoteInterface): - __remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com" - def get_version(): - return DictOf(str, Any()) - def publish(announcement=Announcement): - return None - def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str): - return None - -class IIntroducerClient(Interface): - """I provide service introduction facilities for a node. I help nodes - publish their services to the rest of the world, and I help them learn - about services available on other nodes.""" - - def publish(furl, service_name, remoteinterface_name): - """Once you call this, I will tell the world that the Referenceable - available at FURL is available to provide a service named - SERVICE_NAME. The precise definition of the service being provided is - identified by the Foolscap 'remote interface name' in the last - parameter: this is supposed to be a globally-unique string that - identifies the RemoteInterface that is implemented.""" - - def subscribe_to(service_name, callback, *args, **kwargs): - """Call this if you will eventually want to use services with the - given SERVICE_NAME. This will prompt me to subscribe to announcements - of those services. Your callback will be invoked with at least two - arguments: a serverid (binary string), and an announcement - dictionary, followed by any additional callback args/kwargs you give - me. I will run your callback for both new announcements and for - announcements that have changed, but you must be prepared to tolerate - duplicates. - - The announcement dictionary that I give you will have the following - keys: - - version: 0 - service-name: str('storage') - - FURL: str(furl) - remoteinterface-name: str(ri_name) - nickname: unicode - app-versions: {} - my-version: str - oldest-supported: str - - Note that app-version will be an empty dictionary until #466 is done - and both the introducer and the remote client have been upgraded. For - current (native) server types, the serverid will always be equal to - the binary form of the FURL's tubid. - """ - - def connected_to_introducer(): - """Returns a boolean, True if we are currently connected to the - introducer, False if not.""" - - -class IntroducerClient_v1(service.Service, Referenceable): - implements(RIIntroducerSubscriberClient_v1, IIntroducerClient) - - def __init__(self, tub, introducer_furl, - nickname, my_version, oldest_supported): - self._tub = tub - self.introducer_furl = introducer_furl - - assert type(nickname) is unicode - self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8 - self._my_version = my_version - self._oldest_supported = oldest_supported - - self._published_announcements = set() - - self._publisher = None - - self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples - self._subscribed_service_names = set() - self._subscriptions = set() # requests we've actually sent - - # _current_announcements remembers one announcement per - # (servicename,serverid) pair. Anything that arrives with the same - # pair will displace the previous one. This stores unpacked - # announcement dictionaries, which can be compared for equality to - # distinguish re-announcement from updates. It also provides memory - # for clients who subscribe after startup. - self._current_announcements = {} - - # hooks for unit tests - self._debug_counts = { - "inbound_message": 0, - "inbound_announcement": 0, - "wrong_service": 0, - "duplicate_announcement": 0, - "update": 0, - "new_announcement": 0, - "outbound_message": 0, - } - self._debug_outstanding = 0 - - def _debug_retired(self, res): - self._debug_outstanding -= 1 - return res - - def startService(self): - service.Service.startService(self) - self._introducer_error = None - rc = self._tub.connectTo(self.introducer_furl, self._got_introducer) - self._introducer_reconnector = rc - def connect_failed(failure): - self.log("Initial Introducer connection failed: perhaps it's down", - level=log.WEIRD, failure=failure, umid="c5MqUQ") - d = self._tub.getReference(self.introducer_furl) - d.addErrback(connect_failed) - - def _got_introducer(self, publisher): - self.log("connected to introducer, getting versions") - default = { "http://allmydata.org/tahoe/protocols/introducer/v1": - { }, - "application-version": "unknown: no get_version()", - } - d = rrefutil.add_version_to_remote_reference(publisher, default) - d.addCallback(self._got_versioned_introducer) - d.addErrback(self._got_error) - - def _got_error(self, f): - # TODO: for the introducer, perhaps this should halt the application - self._introducer_error = f # polled by tests - - def _got_versioned_introducer(self, publisher): - self.log("got introducer version: %s" % (publisher.version,)) - # we require a V1 introducer - needed = "http://allmydata.org/tahoe/protocols/introducer/v1" - if needed not in publisher.version: - raise InsufficientVersionError(needed, publisher.version) - self._publisher = publisher - publisher.notifyOnDisconnect(self._disconnected) - self._maybe_publish() - self._maybe_subscribe() - - def _disconnected(self): - self.log("bummer, we've lost our connection to the introducer") - self._publisher = None - self._subscriptions.clear() - - def log(self, *args, **kwargs): - if "facility" not in kwargs: - kwargs["facility"] = "tahoe.introducer" - return log.msg(*args, **kwargs) - - - def publish(self, furl, service_name, remoteinterface_name): - assert type(self._nickname_utf8) is str # we always send UTF-8 - ann = (furl, service_name, remoteinterface_name, - self._nickname_utf8, self._my_version, self._oldest_supported) - self._published_announcements.add(ann) - self._maybe_publish() - - def subscribe_to(self, service_name, cb, *args, **kwargs): - self._local_subscribers.append( (service_name,cb,args,kwargs) ) - self._subscribed_service_names.add(service_name) - self._maybe_subscribe() - for (servicename,nodeid),ann_d in self._current_announcements.items(): - if servicename == service_name: - eventually(cb, nodeid, ann_d) - - def _maybe_subscribe(self): - if not self._publisher: - self.log("want to subscribe, but no introducer yet", - level=log.NOISY) - return - for service_name in self._subscribed_service_names: - if service_name not in self._subscriptions: - # there is a race here, but the subscription desk ignores - # duplicate requests. - self._subscriptions.add(service_name) - self._debug_outstanding += 1 - d = self._publisher.callRemote("subscribe", self, service_name) - d.addBoth(self._debug_retired) - d.addErrback(rrefutil.trap_deadref) - d.addErrback(log.err, format="server errored during subscribe", - facility="tahoe.introducer", - level=log.WEIRD, umid="2uMScQ") - - def _maybe_publish(self): - if not self._publisher: - self.log("want to publish, but no introducer yet", level=log.NOISY) - return - # this re-publishes everything. The Introducer ignores duplicates - for ann in self._published_announcements: - self._debug_counts["outbound_message"] += 1 - self._debug_outstanding += 1 - d = self._publisher.callRemote("publish", ann) - d.addBoth(self._debug_retired) - d.addErrback(rrefutil.trap_deadref) - d.addErrback(log.err, - format="server errored during publish %(ann)s", - ann=ann, facility="tahoe.introducer", - level=log.WEIRD, umid="xs9pVQ") - - - - def remote_announce(self, announcements): - self.log("received %d announcements" % len(announcements)) - self._debug_counts["inbound_message"] += 1 - for ann in announcements: - try: - self._process_announcement(ann) - except: - log.err(format="unable to process announcement %(ann)s", - ann=ann) - # Don't let a corrupt announcement prevent us from processing - # the remaining ones. Don't return an error to the server, - # since they'd just ignore it anyways. - pass - - def _process_announcement(self, ann): - self._debug_counts["inbound_announcement"] += 1 - (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann - if service_name not in self._subscribed_service_names: - self.log("announcement for a service we don't care about [%s]" - % (service_name,), level=log.UNUSUAL, umid="dIpGNA") - self._debug_counts["wrong_service"] += 1 - return - self.log("announcement for [%s]: %s" % (service_name, ann), - umid="BoKEag") - assert type(furl) is str - assert type(service_name) is str - assert type(ri_name) is str - assert type(nickname_utf8) is str - nickname = nickname_utf8.decode("utf-8") - assert type(nickname) is unicode - assert type(ver) is str - assert type(oldest) is str - - nodeid = b32decode(SturdyRef(furl).tubID.upper()) - nodeid_s = idlib.shortnodeid_b2a(nodeid) - - ann_d = { "version": 0, - "service-name": service_name, - - "FURL": furl, - "nickname": nickname, - "app-versions": {}, # need #466 and v2 introducer - "my-version": ver, - "oldest-supported": oldest, - } - - index = (service_name, nodeid) - if self._current_announcements.get(index, None) == ann_d: - self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring", - service=service_name, nodeid=nodeid_s, - level=log.UNUSUAL, umid="B1MIdA") - self._debug_counts["duplicate_announcement"] += 1 - return - if index in self._current_announcements: - self._debug_counts["update"] += 1 - else: - self._debug_counts["new_announcement"] += 1 - - self._current_announcements[index] = ann_d - # note: we never forget an index, but we might update its value - - for (service_name2,cb,args,kwargs) in self._local_subscribers: - if service_name2 == service_name: - eventually(cb, nodeid, ann_d, *args, **kwargs) - - def connected_to_introducer(self): - return bool(self._publisher) - -class IntroducerService_v1(service.MultiService, Referenceable): - implements(RIIntroducerPublisherAndSubscriberService_v1) - name = "introducer" - VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": - { }, - "application-version": str(allmydata.__full_version__), - } - - def __init__(self, basedir="."): - service.MultiService.__init__(self) - self.introducer_url = None - # 'index' is (service_name, tubid) - self._announcements = {} # dict of index -> (announcement, timestamp) - self._subscribers = {} # [service_name]->[rref]->timestamp - self._debug_counts = {"inbound_message": 0, - "inbound_duplicate": 0, - "inbound_update": 0, - "outbound_message": 0, - "outbound_announcements": 0, - "inbound_subscribe": 0} - self._debug_outstanding = 0 - - def _debug_retired(self, res): - self._debug_outstanding -= 1 - return res - - def log(self, *args, **kwargs): - if "facility" not in kwargs: - kwargs["facility"] = "tahoe.introducer" - return log.msg(*args, **kwargs) - - def get_announcements(self, include_stub_clients=True): - announcements = [] - for index, (ann_t, when) in self._announcements.items(): - (furl, service_name, ri_name, nickname, ver, oldest) = ann_t - if service_name == "stub_client" and not include_stub_clients: - continue - ann_d = {"nickname": nickname.decode("utf-8", "replace"), - "my-version": ver, - "service-name": service_name, - "anonymous-storage-FURL": furl, - } - # the V2 introducer uses (service_name, key_s, tubid_s) as an - # index, so match that format for AnnouncementDescriptor - new_index = (index[0], None, idlib.nodeid_b2a(index[1])) - ad = AnnouncementDescriptor(when, new_index, None, ann_d) - announcements.append(ad) - return announcements - - def get_subscribers(self): - s = [] - for service_name, subscribers in self._subscribers.items(): - for rref, when in subscribers.items(): - tubid = rref.getRemoteTubID() or "?" - remote_address = rrefutil.stringify_remote_address(rref) - nickname, version, app_versions = u"?", u"?", {} - sd = SubscriberDescriptor(service_name, when, - nickname, version, app_versions, - remote_address, tubid) - s.append(sd) - return s - - def remote_get_version(self): - return self.VERSION - - def remote_publish(self, announcement): - try: - self._publish(announcement) - except: - log.err(format="Introducer.remote_publish failed on %(ann)s", - ann=announcement, level=log.UNUSUAL, umid="620rWA") - raise - - def _publish(self, announcement): - self._debug_counts["inbound_message"] += 1 - self.log("introducer: announcement published: %s" % (announcement,) ) - (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement - #print "PUB", service_name, nickname_utf8 - - nodeid = b32decode(SturdyRef(furl).tubID.upper()) - index = (service_name, nodeid) - - if index in self._announcements: - (old_announcement, timestamp) = self._announcements[index] - if old_announcement == announcement: - self.log("but we already knew it, ignoring", level=log.NOISY) - self._debug_counts["inbound_duplicate"] += 1 - return - else: - self.log("old announcement being updated", level=log.NOISY) - self._debug_counts["inbound_update"] += 1 - self._announcements[index] = (announcement, time.time()) - - for s in self._subscribers.get(service_name, []): - self._debug_counts["outbound_message"] += 1 - self._debug_counts["outbound_announcements"] += 1 - self._debug_outstanding += 1 - d = s.callRemote("announce", set([announcement])) - d.addBoth(self._debug_retired) - d.addErrback(rrefutil.trap_deadref) - d.addErrback(log.err, - format="subscriber errored on announcement %(ann)s", - ann=announcement, facility="tahoe.introducer", - level=log.UNUSUAL, umid="jfGMXQ") - - def remote_subscribe(self, subscriber, service_name): - self.log("introducer: subscription[%s] request at %s" % (service_name, - subscriber)) - self._debug_counts["inbound_subscribe"] += 1 - if service_name not in self._subscribers: - self._subscribers[service_name] = {} - subscribers = self._subscribers[service_name] - if subscriber in subscribers: - self.log("but they're already subscribed, ignoring", - level=log.UNUSUAL) - return - subscribers[subscriber] = time.time() - def _remove(): - self.log("introducer: unsubscribing[%s] %s" % (service_name, - subscriber)) - subscribers.pop(subscriber, None) - subscriber.notifyOnDisconnect(_remove) - - announcements = set( - [ ann - for (sn2,nodeid),(ann,when) in self._announcements.items() - if sn2 == service_name] ) - - self._debug_counts["outbound_message"] += 1 - self._debug_counts["outbound_announcements"] += len(announcements) - self._debug_outstanding += 1 - d = subscriber.callRemote("announce", announcements) - d.addBoth(self._debug_retired) - d.addErrback(rrefutil.trap_deadref) - d.addErrback(log.err, - format="subscriber errored during subscribe %(anns)s", - anns=announcements, facility="tahoe.introducer", - level=log.UNUSUAL, umid="1XChxA") diff --git a/src/allmydata/test/test_introducer.py b/src/allmydata/test/test_introducer.py index 4feb2aa0c..57053ed19 100644 --- a/src/allmydata/test/test_introducer.py +++ b/src/allmydata/test/test_introducer.py @@ -11,13 +11,11 @@ from twisted.python.filepath import FilePath from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue from twisted.application import service from allmydata.interfaces import InsufficientVersionError -from allmydata.introducer.client import IntroducerClient, \ - WrapV2ClientInV1Interface +from allmydata.introducer.client import IntroducerClient from allmydata.introducer.server import IntroducerService, FurlFileConflictError from allmydata.introducer.common import get_tubid_string_from_ann, \ get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \ UnknownKeyError -from allmydata.introducer import old # test compatibility with old introducer .tac files from allmydata.introducer import IntroducerNode from allmydata.web import introweb @@ -172,56 +170,6 @@ def make_ann_t(ic, furl, privkey, seqnum): return ann_t class Client(unittest.TestCase): - def test_duplicate_receive_v1(self): - ic = IntroducerClient(None, - "introducer.furl", u"my_nickname", - "my_version", "oldest_version", {}, fakeseq, - FilePath(self.mktemp())) - announcements = [] - ic.subscribe_to("storage", - lambda key_s,ann: announcements.append(ann)) - furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra" - ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0") - ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0") - ca = WrapV2ClientInV1Interface(ic) - - ca.remote_announce([ann1]) - d = fireEventually() - def _then(ign): - self.failUnlessEqual(len(announcements), 1) - self.failUnlessEqual(announcements[0]["nickname"], u"nick1") - self.failUnlessEqual(announcements[0]["my-version"], "ver23") - self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1) - self.failUnlessEqual(ic._debug_counts["new_announcement"], 1) - self.failUnlessEqual(ic._debug_counts["update"], 0) - self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0) - # now send a duplicate announcement: this should not notify clients - ca.remote_announce([ann1]) - return fireEventually() - d.addCallback(_then) - def _then2(ign): - self.failUnlessEqual(len(announcements), 1) - self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2) - self.failUnlessEqual(ic._debug_counts["new_announcement"], 1) - self.failUnlessEqual(ic._debug_counts["update"], 0) - self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1) - # and a replacement announcement: same FURL, new other stuff. - # Clients should be notified. - ca.remote_announce([ann1b]) - return fireEventually() - d.addCallback(_then2) - def _then3(ign): - self.failUnlessEqual(len(announcements), 2) - self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3) - self.failUnlessEqual(ic._debug_counts["new_announcement"], 1) - self.failUnlessEqual(ic._debug_counts["update"], 1) - self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1) - # test that the other stuff changed - self.failUnlessEqual(announcements[-1]["nickname"], u"nick1") - self.failUnlessEqual(announcements[-1]["my-version"], "ver24") - d.addCallback(_then3) - return d - def test_duplicate_receive_v2(self): ic1 = IntroducerClient(None, "introducer.furl", u"my_nickname", @@ -330,45 +278,6 @@ class Client(unittest.TestCase): d.addCallback(_then5) return d - def test_id_collision(self): - # test replacement case where tubid equals a keyid (one should - # not replace the other) - ic = IntroducerClient(None, - "introducer.furl", u"my_nickname", - "my_version", "oldest_version", {}, fakeseq, - FilePath(self.mktemp())) - announcements = [] - ic.subscribe_to("storage", - lambda key_s,ann: announcements.append(ann)) - sk_s, vk_s = keyutil.make_keypair() - sk, _ignored = keyutil.parse_privkey(sk_s) - keyid = keyutil.remove_prefix(vk_s, "pub-v0-") - furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short") - furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid - ann_t = make_ann_t(ic, furl1, sk, 1) - ic.remote_announce_v2([ann_t]) - d = fireEventually() - def _then(ign): - # first announcement has been processed - self.failUnlessEqual(len(announcements), 1) - self.failUnlessEqual(announcements[0]["anonymous-storage-FURL"], - furl1) - # now submit a second one, with a tubid that happens to look just - # like the pubkey-based serverid we just processed. They should - # not overlap. - ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0") - ca = WrapV2ClientInV1Interface(ic) - ca.remote_announce([ann2]) - return fireEventually() - d.addCallback(_then) - def _then2(ign): - # if they overlapped, the second announcement would be ignored - self.failUnlessEqual(len(announcements), 2) - self.failUnlessEqual(announcements[1]["anonymous-storage-FURL"], - furl2) - d.addCallback(_then2) - return d - class Server(unittest.TestCase): def test_duplicate(self): i = IntroducerService() @@ -518,12 +427,9 @@ class Queue(SystemTestMixin, unittest.TestCase): V1 = "v1"; V2 = "v2" class SystemTest(SystemTestMixin, unittest.TestCase): - def do_system_test(self, server_version): + def do_system_test(self): self.create_tub() - if server_version == V1: - introducer = old.IntroducerService_v1() - else: - introducer = IntroducerService() + introducer = IntroducerService() introducer.setServiceParent(self.parent) iff = os.path.join(self.basedir, "introducer.furl") tub = self.central_tub @@ -558,16 +464,11 @@ class SystemTest(SystemTestMixin, unittest.TestCase): tub.setLocation("localhost:%d" % portnum) log.msg("creating client %d: %s" % (i, tub.getShortTubID())) - if i == 0: - c = old.IntroducerClient_v1(tub, self.introducer_furl, - NICKNAME % str(i), - "version", "oldest") - else: - c = IntroducerClient(tub, self.introducer_furl, - NICKNAME % str(i), - "version", "oldest", - {"component": "component-v1"}, fakeseq, - FilePath(self.mktemp())) + c = IntroducerClient(tub, self.introducer_furl, + NICKNAME % str(i), + "version", "oldest", + {"component": "component-v1"}, fakeseq, + FilePath(self.mktemp())) received_announcements[c] = {} def got(key_s_or_tubid, ann, announcements, i): if i == 0: @@ -582,19 +483,18 @@ class SystemTest(SystemTestMixin, unittest.TestCase): node_furl = tub.registerReference(Referenceable()) if i < NUM_STORAGE: if i == 0: - c.publish(node_furl, "storage", "ri_name") - printable_serverids[i] = get_tubid_string(node_furl) + # XXX wtf this makes no sense + #c.publish(node_furl, "storage", "ri_name") + #printable_serverids[i] = get_tubid_string(node_furl) + pass elif i == 1: # sign the announcement privkey_s, pubkey_s = keyutil.make_keypair() privkey, _ignored = keyutil.parse_privkey(privkey_s) privkeys[c] = privkey c.publish("storage", make_ann(node_furl), privkey) - if server_version == V1: - printable_serverids[i] = get_tubid_string(node_furl) - else: - assert pubkey_s.startswith("pub-") - printable_serverids[i] = pubkey_s[len("pub-"):] + assert pubkey_s.startswith("pub-") + printable_serverids[i] = pubkey_s[len("pub-"):] else: c.publish("storage", make_ann(node_furl)) printable_serverids[i] = get_tubid_string(node_furl) @@ -608,7 +508,8 @@ class SystemTest(SystemTestMixin, unittest.TestCase): # 'stub_client' record (somewhat after they published the # 'storage' record), so the introducer could see their # version. Match that behavior. - c.publish(node_furl, "stub_client", "stub_ri_name") + #c.publish(node_furl, "stub_client", "stub_ri_name") + pass if i == 2: # also publish something that nobody cares about @@ -661,24 +562,10 @@ class SystemTest(SystemTestMixin, unittest.TestCase): def _check1(res): log.msg("doing _check1") dc = self.the_introducer._debug_counts - if server_version == V1: - # each storage server publishes a record, and (after its - # 'subscribe' has been ACKed) also publishes a "stub_client". - # The non-storage client (which subscribes) also publishes a - # stub_client. There is also one "boring" service. The number - # of messages is higher, because the stub_clients aren't - # published until after we get the 'subscribe' ack (since we - # don't realize that we're dealing with a v1 server [which - # needs stub_clients] until then), and the act of publishing - # the stub_client causes us to re-send all previous - # announcements. - self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"], - NUM_STORAGE + NUM_CLIENTS + 1) - else: - # each storage server publishes a record. There is also one - # "stub_client" and one "boring" - self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2) - self.failUnlessEqual(dc["inbound_duplicate"], 0) + # each storage server publishes a record. There is also one + # "stub_client" and one "boring" + self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2) + self.failUnlessEqual(dc["inbound_duplicate"], 0) self.failUnlessEqual(dc["inbound_update"], 0) self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS) # the number of outbound messages is tricky.. I think it depends @@ -706,30 +593,14 @@ class SystemTest(SystemTestMixin, unittest.TestCase): nick = ann["nickname"] self.failUnlessEqual(type(nick), unicode) self.failUnlessEqual(nick, NICKNAME % "0") - if server_version == V1: - for c in publishing_clients: - cdc = c._debug_counts - expected = 1 # storage - if c is clients[2]: - expected += 1 # boring - if c is not clients[0]: - # the v2 client tries to call publish_v2, which fails - # because the server is v1. It then re-sends - # everything it has so far, plus a stub_client record - expected = 2*expected + 1 - if c is clients[0]: - # we always tell v1 client to send stub_client - expected += 1 - self.failUnlessEqual(cdc["outbound_message"], expected) - else: - for c in publishing_clients: - cdc = c._debug_counts - expected = 1 - if c in [clients[0], # stub_client - clients[2], # boring - ]: - expected = 2 - self.failUnlessEqual(cdc["outbound_message"], expected) + for c in publishing_clients: + cdc = c._debug_counts + expected = 1 + if c in [clients[0], # stub_client + clients[2], # boring + ]: + expected = 2 + self.failUnlessEqual(cdc["outbound_message"], expected) # now check the web status, make sure it renders without error ir = introweb.IntroducerRoot(self.parent) self.parent.nodeid = "NODEID" @@ -825,10 +696,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase): for k in c._debug_counts: c._debug_counts[k] = 0 expected_announcements[i] += 1 # new 'storage' for everyone - if server_version == V1: - introducer = old.IntroducerService_v1() - else: - introducer = IntroducerService() + introducer = IntroducerService() self.the_introducer = introducer newfurl = self.central_tub.registerReference(self.the_introducer, furlFile=iff) @@ -861,17 +729,10 @@ class SystemTest(SystemTestMixin, unittest.TestCase): def test_system_v2_server(self): self.basedir = "introducer/SystemTest/system_v2_server" os.makedirs(self.basedir) - return self.do_system_test(V2) + return self.do_system_test() test_system_v2_server.timeout = 480 # occasionally takes longer than 350s on "draco" - def test_system_v1_server(self): - self.basedir = "introducer/SystemTest/system_v1_server" - os.makedirs(self.basedir) - return self.do_system_test(V1) - test_system_v1_server.timeout = 480 - # occasionally takes longer than 350s on "draco" - class FakeRemoteReference: def notifyOnDisconnect(self, *args, **kwargs): pass def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y"