Remove v1 introducer code and fix tests

Fixed many of the test_introducer tests.
Work-in-progress.
This commit is contained in:
David Stainton 2016-06-02 16:47:58 +00:00
parent 73b08d2a54
commit ea35563b81
4 changed files with 39 additions and 729 deletions

View File

@ -2,12 +2,12 @@
import time, yaml
from zope.interface import implements
from twisted.application import service
from twisted.internet import defer
from foolscap.api import Referenceable, eventually, RemoteInterface
from allmydata.interfaces import InsufficientVersionError
from allmydata.introducer.interfaces import IIntroducerClient, \
RIIntroducerSubscriberClient_v1, RIIntroducerSubscriberClient_v2
RIIntroducerSubscriberClient_v2
from allmydata.introducer.common import sign_to_foolscap, unsign_from_foolscap,\
convert_announcement_v1_to_v2, convert_announcement_v2_to_v1, \
make_index, get_tubid_string_from_ann, get_tubid_string
from allmydata.util import log
from allmydata.util.rrefutil import add_version_to_remote_reference
@ -16,32 +16,6 @@ from allmydata.util.keyutil import BadSignatureError
class InvalidCacheError(Exception):
pass
class WrapV2ClientInV1Interface(Referenceable): # for_v1
"""I wrap a v2 IntroducerClient to make it look like a v1 client, so it
can be attached to an old server."""
implements(RIIntroducerSubscriberClient_v1)
def __init__(self, original):
self.original = original
def remote_announce(self, announcements):
lp = self.original.log("received %d announcements (v1)" %
len(announcements))
anns_v1 = set([convert_announcement_v1_to_v2(ann_v1)
for ann_v1 in announcements])
return self.original.got_announcements(anns_v1, lp)
class RIStubClient(RemoteInterface): # for_v1
"""Each client publishes a service announcement for a dummy object called
the StubClient. This object doesn't actually offer any services, but the
announcement helps the Introducer keep track of which clients are
subscribed (so the grid admin can keep track of things like the size of
the grid and the client versions in use. This is the (empty)
RemoteInterface for the StubClient."""
class StubClient(Referenceable): # for_v1
implements(RIStubClient)
V1 = "http://allmydata.org/tahoe/protocols/introducer/v1"
V2 = "http://allmydata.org/tahoe/protocols/introducer/v2"
@ -68,8 +42,6 @@ class IntroducerClient(service.Service, Referenceable):
"my-version": self._my_version,
"oldest-supported": self._oldest_supported,
}
self._stub_client = None # for_v1
self._stub_client_furl = None
self._outbound_announcements = {} # not signed
self._published_announcements = {} # signed
@ -156,7 +128,7 @@ class IntroducerClient(service.Service, Referenceable):
def _got_introducer(self, publisher):
self.log("connected to introducer, getting versions")
default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
default = { "http://allmydata.org/tahoe/protocols/introducer/v2":
{ },
"application-version": "unknown: no get_version()",
}
@ -170,9 +142,9 @@ class IntroducerClient(service.Service, Referenceable):
def _got_versioned_introducer(self, publisher):
self.log("got introducer version: %s" % (publisher.version,))
# we require an introducer that speaks at least one of (V1, V2)
if not (V1 in publisher.version or V2 in publisher.version):
raise InsufficientVersionError("V1 or V2", publisher.version)
# we require an introducer that speaks at least V2
if V2 not in publisher.version:
raise InsufficientVersionError("V2", publisher.version)
self._publisher = publisher
publisher.notifyOnDisconnect(self._disconnected)
self._maybe_publish()
@ -213,32 +185,10 @@ class IntroducerClient(service.Service, Referenceable):
self._my_subscriber_info)
d.addBoth(self._debug_retired)
else:
d = self._subscribe_handle_v1(service_name) # for_v1
d = defer.fail(InsufficientVersionError("V2", self._publisher.version))
d.addErrback(log.err, facility="tahoe.introducer.client",
level=log.WEIRD, umid="2uMScQ")
def _subscribe_handle_v1(self, service_name): # for_v1
# they don't speak V2: must be a v1 introducer. Fall back to the v1
# 'subscribe' method, using a client adapter.
ca = WrapV2ClientInV1Interface(self)
self._debug_outstanding += 1
d = self._publisher.callRemote("subscribe", ca, service_name)
d.addBoth(self._debug_retired)
# We must also publish an empty 'stub_client' object, so the
# introducer can count how many clients are connected and see what
# versions they're running.
if not self._stub_client_furl:
self._stub_client = sc = StubClient()
self._stub_client_furl = self._tub.registerReference(sc)
def _publish_stub_client(ignored):
furl = self._stub_client_furl
self.publish("stub_client",
{ "anonymous-storage-FURL": furl,
"permutation-seed-base32": get_tubid_string(furl),
})
d.addCallback(_publish_stub_client)
return d
def create_announcement_dict(self, service_name, ann):
ann_d = { "version": 0,
# "seqnum" and "nonce" will be populated with new values in
@ -281,29 +231,17 @@ class IntroducerClient(service.Service, Referenceable):
self._canary)
d.addBoth(self._debug_retired)
else:
d = self._handle_v1_publisher(ann_t) # for_v1
d = defer.fail(InsufficientVersionError("V2", self._publisher.version))
d.addErrback(log.err, ann_t=ann_t,
facility="tahoe.introducer.client",
level=log.WEIRD, umid="xs9pVQ")
def _handle_v1_publisher(self, ann_t): # for_v1
# they don't speak V2, so fall back to the old 'publish' method
# (which takes an unsigned tuple of bytestrings)
self.log("falling back to publish_v1",
level=log.UNUSUAL, umid="9RCT1A")
ann_v1 = convert_announcement_v2_to_v1(ann_t)
self._debug_outstanding += 1
d = self._publisher.callRemote("publish", ann_v1)
d.addBoth(self._debug_retired)
return d
def remote_announce_v2(self, announcements):
lp = self.log("received %d announcements (v2)" % len(announcements))
return self.got_announcements(announcements, lp)
def got_announcements(self, announcements, lp=None):
# this is the common entry point for both v1 and v2 announcements
# this is the common entry point for announcements
self._debug_counts["inbound_message"] += 1
for ann_t in announcements:
try:

View File

@ -2,24 +2,8 @@
from zope.interface import Interface
from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \
RemoteInterface, Referenceable
from old import RIIntroducerSubscriberClient_v1
FURL = StringConstraint(1000)
# old introducer protocol (v1):
#
# Announcements are (FURL, service_name, remoteinterface_name,
# nickname, my_version, oldest_supported)
# the (FURL, service_name, remoteinterface_name) refer to the service being
# announced. The (nickname, my_version, oldest_supported) refer to the
# client as a whole. The my_version/oldest_supported strings can be parsed
# by an allmydata.util.version.Version instance, and then compared. The
# first goal is to make sure that nodes are not confused by speaking to an
# incompatible peer. The second goal is to enable the development of
# backwards-compatibility code.
Announcement_v1 = TupleOf(FURL, str, str,
str, str, str)
# v2 protocol over foolscap: Announcements are 3-tuples of (bytes, str, str)
# or (bytes, none, none)
Announcement_v2 = Any()
@ -41,12 +25,8 @@ class RIIntroducerPublisherAndSubscriberService_v2(RemoteInterface):
__remote_name__ = "RIIntroducerPublisherAndSubscriberService_v2.tahoe.allmydata.com"
def get_version():
return DictOf(str, Any())
def publish(announcement=Announcement_v1):
return None
def publish_v2(announcement=Announcement_v2, canary=Referenceable):
return None
def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
return None
def subscribe_v2(subscriber=RIIntroducerSubscriberClient_v2,
service_name=str, subscriber_info=SubscriberInfo):
"""Give me a subscriber reference, and I will call its announce_v2()

View File

@ -1,469 +0,0 @@
import time
from base64 import b32decode
from zope.interface import implements, Interface
from twisted.application import service
import allmydata
from allmydata.interfaces import InsufficientVersionError
from allmydata.util import log, idlib, rrefutil
from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \
RemoteInterface, Referenceable, eventually, SturdyRef
from allmydata.introducer.common import SubscriberDescriptor, \
AnnouncementDescriptor
FURL = StringConstraint(1000)
# We keep a copy of the old introducer (both client and server) here to
# support compatibility tests. The old client is supposed to handle the new
# server, and new client is supposed to handle the old server.
# Announcements are (FURL, service_name, remoteinterface_name,
# nickname, my_version, oldest_supported)
# the (FURL, service_name, remoteinterface_name) refer to the service being
# announced. The (nickname, my_version, oldest_supported) refer to the
# client as a whole. The my_version/oldest_supported strings can be parsed
# by an allmydata.util.version.Version instance, and then compared. The
# first goal is to make sure that nodes are not confused by speaking to an
# incompatible peer. The second goal is to enable the development of
# backwards-compatibility code.
Announcement = TupleOf(FURL, str, str,
str, str, str)
class RIIntroducerSubscriberClient_v1(RemoteInterface):
__remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com"
def announce(announcements=SetOf(Announcement)):
"""I accept announcements from the publisher."""
return None
# When Foolscap can handle multiple interfaces (Foolscap#17), the
# full-powered introducer will implement both RIIntroducerPublisher and
# RIIntroducerSubscriberService. Until then, we define
# RIIntroducerPublisherAndSubscriberService as a combination of the two, and
# make everybody use that.
class RIIntroducerPublisher_v1(RemoteInterface):
"""To publish a service to the world, connect to me and give me your
announcement message. I will deliver a copy to all connected subscribers."""
__remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com"
def publish(announcement=Announcement):
# canary?
return None
class RIIntroducerSubscriberService_v1(RemoteInterface):
__remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com"
def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
"""Give me a subscriber reference, and I will call its new_peers()
method will any announcements that match the desired service name. I
will ignore duplicate subscriptions.
"""
return None
class RIIntroducerPublisherAndSubscriberService_v1(RemoteInterface):
__remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com"
def get_version():
return DictOf(str, Any())
def publish(announcement=Announcement):
return None
def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
return None
class IIntroducerClient(Interface):
"""I provide service introduction facilities for a node. I help nodes
publish their services to the rest of the world, and I help them learn
about services available on other nodes."""
def publish(furl, service_name, remoteinterface_name):
"""Once you call this, I will tell the world that the Referenceable
available at FURL is available to provide a service named
SERVICE_NAME. The precise definition of the service being provided is
identified by the Foolscap 'remote interface name' in the last
parameter: this is supposed to be a globally-unique string that
identifies the RemoteInterface that is implemented."""
def subscribe_to(service_name, callback, *args, **kwargs):
"""Call this if you will eventually want to use services with the
given SERVICE_NAME. This will prompt me to subscribe to announcements
of those services. Your callback will be invoked with at least two
arguments: a serverid (binary string), and an announcement
dictionary, followed by any additional callback args/kwargs you give
me. I will run your callback for both new announcements and for
announcements that have changed, but you must be prepared to tolerate
duplicates.
The announcement dictionary that I give you will have the following
keys:
version: 0
service-name: str('storage')
FURL: str(furl)
remoteinterface-name: str(ri_name)
nickname: unicode
app-versions: {}
my-version: str
oldest-supported: str
Note that app-version will be an empty dictionary until #466 is done
and both the introducer and the remote client have been upgraded. For
current (native) server types, the serverid will always be equal to
the binary form of the FURL's tubid.
"""
def connected_to_introducer():
"""Returns a boolean, True if we are currently connected to the
introducer, False if not."""
class IntroducerClient_v1(service.Service, Referenceable):
implements(RIIntroducerSubscriberClient_v1, IIntroducerClient)
def __init__(self, tub, introducer_furl,
nickname, my_version, oldest_supported):
self._tub = tub
self.introducer_furl = introducer_furl
assert type(nickname) is unicode
self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
self._my_version = my_version
self._oldest_supported = oldest_supported
self._published_announcements = set()
self._publisher = None
self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
self._subscribed_service_names = set()
self._subscriptions = set() # requests we've actually sent
# _current_announcements remembers one announcement per
# (servicename,serverid) pair. Anything that arrives with the same
# pair will displace the previous one. This stores unpacked
# announcement dictionaries, which can be compared for equality to
# distinguish re-announcement from updates. It also provides memory
# for clients who subscribe after startup.
self._current_announcements = {}
# hooks for unit tests
self._debug_counts = {
"inbound_message": 0,
"inbound_announcement": 0,
"wrong_service": 0,
"duplicate_announcement": 0,
"update": 0,
"new_announcement": 0,
"outbound_message": 0,
}
self._debug_outstanding = 0
def _debug_retired(self, res):
self._debug_outstanding -= 1
return res
def startService(self):
service.Service.startService(self)
self._introducer_error = None
rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
self._introducer_reconnector = rc
def connect_failed(failure):
self.log("Initial Introducer connection failed: perhaps it's down",
level=log.WEIRD, failure=failure, umid="c5MqUQ")
d = self._tub.getReference(self.introducer_furl)
d.addErrback(connect_failed)
def _got_introducer(self, publisher):
self.log("connected to introducer, getting versions")
default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
{ },
"application-version": "unknown: no get_version()",
}
d = rrefutil.add_version_to_remote_reference(publisher, default)
d.addCallback(self._got_versioned_introducer)
d.addErrback(self._got_error)
def _got_error(self, f):
# TODO: for the introducer, perhaps this should halt the application
self._introducer_error = f # polled by tests
def _got_versioned_introducer(self, publisher):
self.log("got introducer version: %s" % (publisher.version,))
# we require a V1 introducer
needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
if needed not in publisher.version:
raise InsufficientVersionError(needed, publisher.version)
self._publisher = publisher
publisher.notifyOnDisconnect(self._disconnected)
self._maybe_publish()
self._maybe_subscribe()
def _disconnected(self):
self.log("bummer, we've lost our connection to the introducer")
self._publisher = None
self._subscriptions.clear()
def log(self, *args, **kwargs):
if "facility" not in kwargs:
kwargs["facility"] = "tahoe.introducer"
return log.msg(*args, **kwargs)
def publish(self, furl, service_name, remoteinterface_name):
assert type(self._nickname_utf8) is str # we always send UTF-8
ann = (furl, service_name, remoteinterface_name,
self._nickname_utf8, self._my_version, self._oldest_supported)
self._published_announcements.add(ann)
self._maybe_publish()
def subscribe_to(self, service_name, cb, *args, **kwargs):
self._local_subscribers.append( (service_name,cb,args,kwargs) )
self._subscribed_service_names.add(service_name)
self._maybe_subscribe()
for (servicename,nodeid),ann_d in self._current_announcements.items():
if servicename == service_name:
eventually(cb, nodeid, ann_d)
def _maybe_subscribe(self):
if not self._publisher:
self.log("want to subscribe, but no introducer yet",
level=log.NOISY)
return
for service_name in self._subscribed_service_names:
if service_name not in self._subscriptions:
# there is a race here, but the subscription desk ignores
# duplicate requests.
self._subscriptions.add(service_name)
self._debug_outstanding += 1
d = self._publisher.callRemote("subscribe", self, service_name)
d.addBoth(self._debug_retired)
d.addErrback(rrefutil.trap_deadref)
d.addErrback(log.err, format="server errored during subscribe",
facility="tahoe.introducer",
level=log.WEIRD, umid="2uMScQ")
def _maybe_publish(self):
if not self._publisher:
self.log("want to publish, but no introducer yet", level=log.NOISY)
return
# this re-publishes everything. The Introducer ignores duplicates
for ann in self._published_announcements:
self._debug_counts["outbound_message"] += 1
self._debug_outstanding += 1
d = self._publisher.callRemote("publish", ann)
d.addBoth(self._debug_retired)
d.addErrback(rrefutil.trap_deadref)
d.addErrback(log.err,
format="server errored during publish %(ann)s",
ann=ann, facility="tahoe.introducer",
level=log.WEIRD, umid="xs9pVQ")
def remote_announce(self, announcements):
self.log("received %d announcements" % len(announcements))
self._debug_counts["inbound_message"] += 1
for ann in announcements:
try:
self._process_announcement(ann)
except:
log.err(format="unable to process announcement %(ann)s",
ann=ann)
# Don't let a corrupt announcement prevent us from processing
# the remaining ones. Don't return an error to the server,
# since they'd just ignore it anyways.
pass
def _process_announcement(self, ann):
self._debug_counts["inbound_announcement"] += 1
(furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
if service_name not in self._subscribed_service_names:
self.log("announcement for a service we don't care about [%s]"
% (service_name,), level=log.UNUSUAL, umid="dIpGNA")
self._debug_counts["wrong_service"] += 1
return
self.log("announcement for [%s]: %s" % (service_name, ann),
umid="BoKEag")
assert type(furl) is str
assert type(service_name) is str
assert type(ri_name) is str
assert type(nickname_utf8) is str
nickname = nickname_utf8.decode("utf-8")
assert type(nickname) is unicode
assert type(ver) is str
assert type(oldest) is str
nodeid = b32decode(SturdyRef(furl).tubID.upper())
nodeid_s = idlib.shortnodeid_b2a(nodeid)
ann_d = { "version": 0,
"service-name": service_name,
"FURL": furl,
"nickname": nickname,
"app-versions": {}, # need #466 and v2 introducer
"my-version": ver,
"oldest-supported": oldest,
}
index = (service_name, nodeid)
if self._current_announcements.get(index, None) == ann_d:
self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
service=service_name, nodeid=nodeid_s,
level=log.UNUSUAL, umid="B1MIdA")
self._debug_counts["duplicate_announcement"] += 1
return
if index in self._current_announcements:
self._debug_counts["update"] += 1
else:
self._debug_counts["new_announcement"] += 1
self._current_announcements[index] = ann_d
# note: we never forget an index, but we might update its value
for (service_name2,cb,args,kwargs) in self._local_subscribers:
if service_name2 == service_name:
eventually(cb, nodeid, ann_d, *args, **kwargs)
def connected_to_introducer(self):
return bool(self._publisher)
class IntroducerService_v1(service.MultiService, Referenceable):
implements(RIIntroducerPublisherAndSubscriberService_v1)
name = "introducer"
VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1":
{ },
"application-version": str(allmydata.__full_version__),
}
def __init__(self, basedir="."):
service.MultiService.__init__(self)
self.introducer_url = None
# 'index' is (service_name, tubid)
self._announcements = {} # dict of index -> (announcement, timestamp)
self._subscribers = {} # [service_name]->[rref]->timestamp
self._debug_counts = {"inbound_message": 0,
"inbound_duplicate": 0,
"inbound_update": 0,
"outbound_message": 0,
"outbound_announcements": 0,
"inbound_subscribe": 0}
self._debug_outstanding = 0
def _debug_retired(self, res):
self._debug_outstanding -= 1
return res
def log(self, *args, **kwargs):
if "facility" not in kwargs:
kwargs["facility"] = "tahoe.introducer"
return log.msg(*args, **kwargs)
def get_announcements(self, include_stub_clients=True):
announcements = []
for index, (ann_t, when) in self._announcements.items():
(furl, service_name, ri_name, nickname, ver, oldest) = ann_t
if service_name == "stub_client" and not include_stub_clients:
continue
ann_d = {"nickname": nickname.decode("utf-8", "replace"),
"my-version": ver,
"service-name": service_name,
"anonymous-storage-FURL": furl,
}
# the V2 introducer uses (service_name, key_s, tubid_s) as an
# index, so match that format for AnnouncementDescriptor
new_index = (index[0], None, idlib.nodeid_b2a(index[1]))
ad = AnnouncementDescriptor(when, new_index, None, ann_d)
announcements.append(ad)
return announcements
def get_subscribers(self):
s = []
for service_name, subscribers in self._subscribers.items():
for rref, when in subscribers.items():
tubid = rref.getRemoteTubID() or "?"
remote_address = rrefutil.stringify_remote_address(rref)
nickname, version, app_versions = u"?", u"?", {}
sd = SubscriberDescriptor(service_name, when,
nickname, version, app_versions,
remote_address, tubid)
s.append(sd)
return s
def remote_get_version(self):
return self.VERSION
def remote_publish(self, announcement):
try:
self._publish(announcement)
except:
log.err(format="Introducer.remote_publish failed on %(ann)s",
ann=announcement, level=log.UNUSUAL, umid="620rWA")
raise
def _publish(self, announcement):
self._debug_counts["inbound_message"] += 1
self.log("introducer: announcement published: %s" % (announcement,) )
(furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement
#print "PUB", service_name, nickname_utf8
nodeid = b32decode(SturdyRef(furl).tubID.upper())
index = (service_name, nodeid)
if index in self._announcements:
(old_announcement, timestamp) = self._announcements[index]
if old_announcement == announcement:
self.log("but we already knew it, ignoring", level=log.NOISY)
self._debug_counts["inbound_duplicate"] += 1
return
else:
self.log("old announcement being updated", level=log.NOISY)
self._debug_counts["inbound_update"] += 1
self._announcements[index] = (announcement, time.time())
for s in self._subscribers.get(service_name, []):
self._debug_counts["outbound_message"] += 1
self._debug_counts["outbound_announcements"] += 1
self._debug_outstanding += 1
d = s.callRemote("announce", set([announcement]))
d.addBoth(self._debug_retired)
d.addErrback(rrefutil.trap_deadref)
d.addErrback(log.err,
format="subscriber errored on announcement %(ann)s",
ann=announcement, facility="tahoe.introducer",
level=log.UNUSUAL, umid="jfGMXQ")
def remote_subscribe(self, subscriber, service_name):
self.log("introducer: subscription[%s] request at %s" % (service_name,
subscriber))
self._debug_counts["inbound_subscribe"] += 1
if service_name not in self._subscribers:
self._subscribers[service_name] = {}
subscribers = self._subscribers[service_name]
if subscriber in subscribers:
self.log("but they're already subscribed, ignoring",
level=log.UNUSUAL)
return
subscribers[subscriber] = time.time()
def _remove():
self.log("introducer: unsubscribing[%s] %s" % (service_name,
subscriber))
subscribers.pop(subscriber, None)
subscriber.notifyOnDisconnect(_remove)
announcements = set(
[ ann
for (sn2,nodeid),(ann,when) in self._announcements.items()
if sn2 == service_name] )
self._debug_counts["outbound_message"] += 1
self._debug_counts["outbound_announcements"] += len(announcements)
self._debug_outstanding += 1
d = subscriber.callRemote("announce", announcements)
d.addBoth(self._debug_retired)
d.addErrback(rrefutil.trap_deadref)
d.addErrback(log.err,
format="subscriber errored during subscribe %(anns)s",
anns=announcements, facility="tahoe.introducer",
level=log.UNUSUAL, umid="1XChxA")

View File

@ -11,13 +11,11 @@ from twisted.python.filepath import FilePath
from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
from twisted.application import service
from allmydata.interfaces import InsufficientVersionError
from allmydata.introducer.client import IntroducerClient, \
WrapV2ClientInV1Interface
from allmydata.introducer.client import IntroducerClient
from allmydata.introducer.server import IntroducerService, FurlFileConflictError
from allmydata.introducer.common import get_tubid_string_from_ann, \
get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \
UnknownKeyError
from allmydata.introducer import old
# test compatibility with old introducer .tac files
from allmydata.introducer import IntroducerNode
from allmydata.web import introweb
@ -172,56 +170,6 @@ def make_ann_t(ic, furl, privkey, seqnum):
return ann_t
class Client(unittest.TestCase):
def test_duplicate_receive_v1(self):
ic = IntroducerClient(None,
"introducer.furl", u"my_nickname",
"my_version", "oldest_version", {}, fakeseq,
FilePath(self.mktemp()))
announcements = []
ic.subscribe_to("storage",
lambda key_s,ann: announcements.append(ann))
furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
ca = WrapV2ClientInV1Interface(ic)
ca.remote_announce([ann1])
d = fireEventually()
def _then(ign):
self.failUnlessEqual(len(announcements), 1)
self.failUnlessEqual(announcements[0]["nickname"], u"nick1")
self.failUnlessEqual(announcements[0]["my-version"], "ver23")
self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1)
self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
self.failUnlessEqual(ic._debug_counts["update"], 0)
self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0)
# now send a duplicate announcement: this should not notify clients
ca.remote_announce([ann1])
return fireEventually()
d.addCallback(_then)
def _then2(ign):
self.failUnlessEqual(len(announcements), 1)
self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2)
self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
self.failUnlessEqual(ic._debug_counts["update"], 0)
self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
# and a replacement announcement: same FURL, new other stuff.
# Clients should be notified.
ca.remote_announce([ann1b])
return fireEventually()
d.addCallback(_then2)
def _then3(ign):
self.failUnlessEqual(len(announcements), 2)
self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3)
self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
self.failUnlessEqual(ic._debug_counts["update"], 1)
self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
# test that the other stuff changed
self.failUnlessEqual(announcements[-1]["nickname"], u"nick1")
self.failUnlessEqual(announcements[-1]["my-version"], "ver24")
d.addCallback(_then3)
return d
def test_duplicate_receive_v2(self):
ic1 = IntroducerClient(None,
"introducer.furl", u"my_nickname",
@ -330,45 +278,6 @@ class Client(unittest.TestCase):
d.addCallback(_then5)
return d
def test_id_collision(self):
# test replacement case where tubid equals a keyid (one should
# not replace the other)
ic = IntroducerClient(None,
"introducer.furl", u"my_nickname",
"my_version", "oldest_version", {}, fakeseq,
FilePath(self.mktemp()))
announcements = []
ic.subscribe_to("storage",
lambda key_s,ann: announcements.append(ann))
sk_s, vk_s = keyutil.make_keypair()
sk, _ignored = keyutil.parse_privkey(sk_s)
keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
ann_t = make_ann_t(ic, furl1, sk, 1)
ic.remote_announce_v2([ann_t])
d = fireEventually()
def _then(ign):
# first announcement has been processed
self.failUnlessEqual(len(announcements), 1)
self.failUnlessEqual(announcements[0]["anonymous-storage-FURL"],
furl1)
# now submit a second one, with a tubid that happens to look just
# like the pubkey-based serverid we just processed. They should
# not overlap.
ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
ca = WrapV2ClientInV1Interface(ic)
ca.remote_announce([ann2])
return fireEventually()
d.addCallback(_then)
def _then2(ign):
# if they overlapped, the second announcement would be ignored
self.failUnlessEqual(len(announcements), 2)
self.failUnlessEqual(announcements[1]["anonymous-storage-FURL"],
furl2)
d.addCallback(_then2)
return d
class Server(unittest.TestCase):
def test_duplicate(self):
i = IntroducerService()
@ -518,12 +427,9 @@ class Queue(SystemTestMixin, unittest.TestCase):
V1 = "v1"; V2 = "v2"
class SystemTest(SystemTestMixin, unittest.TestCase):
def do_system_test(self, server_version):
def do_system_test(self):
self.create_tub()
if server_version == V1:
introducer = old.IntroducerService_v1()
else:
introducer = IntroducerService()
introducer = IntroducerService()
introducer.setServiceParent(self.parent)
iff = os.path.join(self.basedir, "introducer.furl")
tub = self.central_tub
@ -558,16 +464,11 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
tub.setLocation("localhost:%d" % portnum)
log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
if i == 0:
c = old.IntroducerClient_v1(tub, self.introducer_furl,
NICKNAME % str(i),
"version", "oldest")
else:
c = IntroducerClient(tub, self.introducer_furl,
NICKNAME % str(i),
"version", "oldest",
{"component": "component-v1"}, fakeseq,
FilePath(self.mktemp()))
c = IntroducerClient(tub, self.introducer_furl,
NICKNAME % str(i),
"version", "oldest",
{"component": "component-v1"}, fakeseq,
FilePath(self.mktemp()))
received_announcements[c] = {}
def got(key_s_or_tubid, ann, announcements, i):
if i == 0:
@ -582,19 +483,18 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
node_furl = tub.registerReference(Referenceable())
if i < NUM_STORAGE:
if i == 0:
c.publish(node_furl, "storage", "ri_name")
printable_serverids[i] = get_tubid_string(node_furl)
# XXX wtf this makes no sense
#c.publish(node_furl, "storage", "ri_name")
#printable_serverids[i] = get_tubid_string(node_furl)
pass
elif i == 1:
# sign the announcement
privkey_s, pubkey_s = keyutil.make_keypair()
privkey, _ignored = keyutil.parse_privkey(privkey_s)
privkeys[c] = privkey
c.publish("storage", make_ann(node_furl), privkey)
if server_version == V1:
printable_serverids[i] = get_tubid_string(node_furl)
else:
assert pubkey_s.startswith("pub-")
printable_serverids[i] = pubkey_s[len("pub-"):]
assert pubkey_s.startswith("pub-")
printable_serverids[i] = pubkey_s[len("pub-"):]
else:
c.publish("storage", make_ann(node_furl))
printable_serverids[i] = get_tubid_string(node_furl)
@ -608,7 +508,8 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
# 'stub_client' record (somewhat after they published the
# 'storage' record), so the introducer could see their
# version. Match that behavior.
c.publish(node_furl, "stub_client", "stub_ri_name")
#c.publish(node_furl, "stub_client", "stub_ri_name")
pass
if i == 2:
# also publish something that nobody cares about
@ -661,24 +562,10 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
def _check1(res):
log.msg("doing _check1")
dc = self.the_introducer._debug_counts
if server_version == V1:
# each storage server publishes a record, and (after its
# 'subscribe' has been ACKed) also publishes a "stub_client".
# The non-storage client (which subscribes) also publishes a
# stub_client. There is also one "boring" service. The number
# of messages is higher, because the stub_clients aren't
# published until after we get the 'subscribe' ack (since we
# don't realize that we're dealing with a v1 server [which
# needs stub_clients] until then), and the act of publishing
# the stub_client causes us to re-send all previous
# announcements.
self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"],
NUM_STORAGE + NUM_CLIENTS + 1)
else:
# each storage server publishes a record. There is also one
# "stub_client" and one "boring"
self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2)
self.failUnlessEqual(dc["inbound_duplicate"], 0)
# each storage server publishes a record. There is also one
# "stub_client" and one "boring"
self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2)
self.failUnlessEqual(dc["inbound_duplicate"], 0)
self.failUnlessEqual(dc["inbound_update"], 0)
self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
# the number of outbound messages is tricky.. I think it depends
@ -706,30 +593,14 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
nick = ann["nickname"]
self.failUnlessEqual(type(nick), unicode)
self.failUnlessEqual(nick, NICKNAME % "0")
if server_version == V1:
for c in publishing_clients:
cdc = c._debug_counts
expected = 1 # storage
if c is clients[2]:
expected += 1 # boring
if c is not clients[0]:
# the v2 client tries to call publish_v2, which fails
# because the server is v1. It then re-sends
# everything it has so far, plus a stub_client record
expected = 2*expected + 1
if c is clients[0]:
# we always tell v1 client to send stub_client
expected += 1
self.failUnlessEqual(cdc["outbound_message"], expected)
else:
for c in publishing_clients:
cdc = c._debug_counts
expected = 1
if c in [clients[0], # stub_client
clients[2], # boring
]:
expected = 2
self.failUnlessEqual(cdc["outbound_message"], expected)
for c in publishing_clients:
cdc = c._debug_counts
expected = 1
if c in [clients[0], # stub_client
clients[2], # boring
]:
expected = 2
self.failUnlessEqual(cdc["outbound_message"], expected)
# now check the web status, make sure it renders without error
ir = introweb.IntroducerRoot(self.parent)
self.parent.nodeid = "NODEID"
@ -825,10 +696,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
for k in c._debug_counts:
c._debug_counts[k] = 0
expected_announcements[i] += 1 # new 'storage' for everyone
if server_version == V1:
introducer = old.IntroducerService_v1()
else:
introducer = IntroducerService()
introducer = IntroducerService()
self.the_introducer = introducer
newfurl = self.central_tub.registerReference(self.the_introducer,
furlFile=iff)
@ -861,17 +729,10 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
def test_system_v2_server(self):
self.basedir = "introducer/SystemTest/system_v2_server"
os.makedirs(self.basedir)
return self.do_system_test(V2)
return self.do_system_test()
test_system_v2_server.timeout = 480
# occasionally takes longer than 350s on "draco"
def test_system_v1_server(self):
self.basedir = "introducer/SystemTest/system_v1_server"
os.makedirs(self.basedir)
return self.do_system_test(V1)
test_system_v1_server.timeout = 480
# occasionally takes longer than 350s on "draco"
class FakeRemoteReference:
def notifyOnDisconnect(self, *args, **kwargs): pass
def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y"