Merge branch '2784-remove-v1-introducer'

Closes PR #289
Closes ticket:2784
This commit is contained in:
Brian Warner 2016-07-05 15:31:34 -07:00
commit 78810d5851
7 changed files with 99 additions and 1091 deletions

View File

@ -2,13 +2,12 @@
import time, yaml
from zope.interface import implements
from twisted.application import service
from foolscap.api import Referenceable, eventually, RemoteInterface
from foolscap.api import Referenceable, eventually
from allmydata.interfaces import InsufficientVersionError
from allmydata.introducer.interfaces import IIntroducerClient, \
RIIntroducerSubscriberClient_v1, RIIntroducerSubscriberClient_v2
RIIntroducerSubscriberClient_v2
from allmydata.introducer.common import sign_to_foolscap, unsign_from_foolscap,\
convert_announcement_v1_to_v2, convert_announcement_v2_to_v1, \
make_index, get_tubid_string_from_ann, get_tubid_string
get_tubid_string_from_ann
from allmydata.util import log
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.util.keyutil import BadSignatureError
@ -16,33 +15,6 @@ from allmydata.util.keyutil import BadSignatureError
class InvalidCacheError(Exception):
pass
class WrapV2ClientInV1Interface(Referenceable): # for_v1
"""I wrap a v2 IntroducerClient to make it look like a v1 client, so it
can be attached to an old server."""
implements(RIIntroducerSubscriberClient_v1)
def __init__(self, original):
self.original = original
def remote_announce(self, announcements):
lp = self.original.log("received %d announcements (v1)" %
len(announcements))
anns_v1 = set([convert_announcement_v1_to_v2(ann_v1)
for ann_v1 in announcements])
return self.original.got_announcements(anns_v1, lp)
class RIStubClient(RemoteInterface): # for_v1
"""Each client publishes a service announcement for a dummy object called
the StubClient. This object doesn't actually offer any services, but the
announcement helps the Introducer keep track of which clients are
subscribed (so the grid admin can keep track of things like the size of
the grid and the client versions in use. This is the (empty)
RemoteInterface for the StubClient."""
class StubClient(Referenceable): # for_v1
implements(RIStubClient)
V1 = "http://allmydata.org/tahoe/protocols/introducer/v1"
V2 = "http://allmydata.org/tahoe/protocols/introducer/v2"
class IntroducerClient(service.Service, Referenceable):
@ -68,8 +40,6 @@ class IntroducerClient(service.Service, Referenceable):
"my-version": self._my_version,
"oldest-supported": self._oldest_supported,
}
self._stub_client = None # for_v1
self._stub_client_furl = None
self._outbound_announcements = {} # not signed
self._published_announcements = {} # signed
@ -170,9 +140,9 @@ class IntroducerClient(service.Service, Referenceable):
def _got_versioned_introducer(self, publisher):
self.log("got introducer version: %s" % (publisher.version,))
# we require an introducer that speaks at least one of (V1, V2)
if not (V1 in publisher.version or V2 in publisher.version):
raise InsufficientVersionError("V1 or V2", publisher.version)
# we require an introducer that speaks at least V2
if V2 not in publisher.version:
raise InsufficientVersionError("V2", publisher.version)
self._publisher = publisher
publisher.notifyOnDisconnect(self._disconnected)
self._maybe_publish()
@ -206,39 +176,14 @@ class IntroducerClient(service.Service, Referenceable):
if service_name in self._subscriptions:
continue
self._subscriptions.add(service_name)
if V2 in self._publisher.version:
self._debug_outstanding += 1
d = self._publisher.callRemote("subscribe_v2",
self, service_name,
self._my_subscriber_info)
d.addBoth(self._debug_retired)
else:
d = self._subscribe_handle_v1(service_name) # for_v1
d.addErrback(log.err, facility="tahoe.introducer.client",
level=log.WEIRD, umid="2uMScQ")
def _subscribe_handle_v1(self, service_name): # for_v1
# they don't speak V2: must be a v1 introducer. Fall back to the v1
# 'subscribe' method, using a client adapter.
ca = WrapV2ClientInV1Interface(self)
self._debug_outstanding += 1
d = self._publisher.callRemote("subscribe", ca, service_name)
d.addBoth(self._debug_retired)
# We must also publish an empty 'stub_client' object, so the
# introducer can count how many clients are connected and see what
# versions they're running.
if not self._stub_client_furl:
self._stub_client = sc = StubClient()
self._stub_client_furl = self._tub.registerReference(sc)
def _publish_stub_client(ignored):
furl = self._stub_client_furl
self.publish("stub_client",
{ "anonymous-storage-FURL": furl,
"permutation-seed-base32": get_tubid_string(furl),
})
d.addCallback(_publish_stub_client)
return d
def create_announcement_dict(self, service_name, ann):
ann_d = { "version": 0,
# "seqnum" and "nonce" will be populated with new values in
@ -253,7 +198,7 @@ class IntroducerClient(service.Service, Referenceable):
ann_d.update(ann)
return ann_d
def publish(self, service_name, ann, signing_key=None):
def publish(self, service_name, ann, signing_key):
# we increment the seqnum every time we publish something new
current_seqnum, current_nonce = self._sequencer()
@ -275,35 +220,18 @@ class IntroducerClient(service.Service, Referenceable):
# this re-publishes everything. The Introducer ignores duplicates
for ann_t in self._published_announcements.values():
self._debug_counts["outbound_message"] += 1
if V2 in self._publisher.version:
self._debug_outstanding += 1
d = self._publisher.callRemote("publish_v2", ann_t,
self._canary)
d = self._publisher.callRemote("publish_v2", ann_t, self._canary)
d.addBoth(self._debug_retired)
else:
d = self._handle_v1_publisher(ann_t) # for_v1
d.addErrback(log.err, ann_t=ann_t,
facility="tahoe.introducer.client",
level=log.WEIRD, umid="xs9pVQ")
def _handle_v1_publisher(self, ann_t): # for_v1
# they don't speak V2, so fall back to the old 'publish' method
# (which takes an unsigned tuple of bytestrings)
self.log("falling back to publish_v1",
level=log.UNUSUAL, umid="9RCT1A")
ann_v1 = convert_announcement_v2_to_v1(ann_t)
self._debug_outstanding += 1
d = self._publisher.callRemote("publish", ann_v1)
d.addBoth(self._debug_retired)
return d
def remote_announce_v2(self, announcements):
lp = self.log("received %d announcements (v2)" % len(announcements))
return self.got_announcements(announcements, lp)
def got_announcements(self, announcements, lp=None):
# this is the common entry point for both v1 and v2 announcements
self._debug_counts["inbound_message"] += 1
for ann_t in announcements:
try:
@ -343,7 +271,7 @@ class IntroducerClient(service.Service, Referenceable):
description = "/".join(desc_bits)
# the index is used to track duplicates
index = make_index(ann, key_s)
index = (service_name, key_s)
# is this announcement a duplicate?
if (index in self._inbound_announcements

View File

@ -2,20 +2,6 @@
import re, simplejson
from allmydata.util import keyutil, base32, rrefutil
def make_index(ann, key_s):
"""Return something that can be used as an index (e.g. a tuple of
strings), such that two messages that refer to the same 'thing' will have
the same index. This is a tuple of (service-name, signing-key, None) for
signed announcements, or (service-name, None, tubid_s) for unsigned
announcements."""
service_name = str(ann["service-name"])
if key_s:
return (service_name, key_s, None)
else:
tubid_s = get_tubid_string_from_ann(ann)
return (service_name, None, tubid_s)
def get_tubid_string_from_ann(ann):
return get_tubid_string(str(ann.get("anonymous-storage-FURL")
or ann.get("FURL")))
@ -25,52 +11,15 @@ def get_tubid_string(furl):
assert m
return m.group(1).lower()
def convert_announcement_v1_to_v2(ann_t):
(furl, service_name, ri_name, nickname, ver, oldest) = ann_t
assert type(furl) is str
assert type(service_name) is str
# ignore ri_name
assert type(nickname) is str
assert type(ver) is str
assert type(oldest) is str
ann = {"version": 0,
"nickname": nickname.decode("utf-8", "replace"),
"app-versions": {},
"my-version": ver,
"oldest-supported": oldest,
"service-name": service_name,
"anonymous-storage-FURL": furl,
"permutation-seed-base32": get_tubid_string(furl),
}
msg = simplejson.dumps(ann).encode("utf-8")
return (msg, None, None)
def convert_announcement_v2_to_v1(ann_v2):
(msg, sig, pubkey) = ann_v2
ann = simplejson.loads(msg)
assert ann["version"] == 0
ann_t = (str(ann["anonymous-storage-FURL"]),
str(ann["service-name"]),
"remoteinterface-name is unused",
ann["nickname"].encode("utf-8"),
str(ann["my-version"]),
str(ann["oldest-supported"]),
)
return ann_t
def sign_to_foolscap(ann, sk):
# return (bytes, None, None) or (bytes, sig-str, pubkey-str). A future
# HTTP-based serialization will use JSON({msg:b64(JSON(msg).utf8),
# sig:v0-b64(sig), pubkey:v0-b64(pubkey)}) .
# return (bytes, sig-str, pubkey-str). A future HTTP-based serialization
# will use JSON({msg:b64(JSON(msg).utf8), sig:v0-b64(sig),
# pubkey:v0-b64(pubkey)}) .
msg = simplejson.dumps(ann).encode("utf-8")
if sk:
sig = "v0-"+base32.b2a(sk.sign(msg))
vk_bytes = sk.get_verifying_key_bytes()
ann_t = (msg, sig, "v0-"+base32.b2a(vk_bytes))
else:
ann_t = (msg, None, None)
return ann_t
class UnknownKeyError(Exception):
@ -144,8 +93,8 @@ class AnnouncementDescriptor:
self.service_name = ann_d["service-name"]
self.version = ann_d.get("my-version", "")
self.nickname = ann_d.get("nickname", u"")
(service_name, key_s, tubid_s) = index
self.serverid = key_s or tubid_s
(service_name, key_s) = index
self.serverid = key_s
furl = ann_d.get("anonymous-storage-FURL")
if furl:
self.connection_hints = rrefutil.connection_hints_for_furl(furl)

View File

@ -1,27 +1,30 @@
from zope.interface import Interface
from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \
from foolscap.api import StringConstraint, SetOf, DictOf, Any, \
RemoteInterface, Referenceable
from old import RIIntroducerSubscriberClient_v1
FURL = StringConstraint(1000)
# old introducer protocol (v1):
#
# Announcements are (FURL, service_name, remoteinterface_name,
# nickname, my_version, oldest_supported)
# the (FURL, service_name, remoteinterface_name) refer to the service being
# announced. The (nickname, my_version, oldest_supported) refer to the
# client as a whole. The my_version/oldest_supported strings can be parsed
# by an allmydata.util.version.Version instance, and then compared. The
# first goal is to make sure that nodes are not confused by speaking to an
# incompatible peer. The second goal is to enable the development of
# v2 protocol over foolscap: Announcements are 3-tuples of (msg, sig_vs,
# claimed_key_vs):
# * msg (bytes): UTF-8(json(ann_dict))
# * ann_dict has IntroducerClient-provided keys like "version", "nickname",
# "app-versions", "my-version", "oldest-supported", and "service-name".
# Plus service-specific keys like "anonymous-storage-FURL" and
# "permutation-seed-base32" (both for service="storage").
# * sig_vs (str): "v0-"+base32(signature(msg))
# * claimed_key_vs (str): "v0-"+base32(pubkey)
# (nickname, my_version, oldest_supported) refer to the client as a whole.
# The my_version/oldest_supported strings can be parsed by an
# allmydata.util.version.Version instance, and then compared. The first goal
# is to make sure that nodes are not confused by speaking to an incompatible
# peer. The second goal is to enable the development of
# backwards-compatibility code.
Announcement_v1 = TupleOf(FURL, str, str,
str, str, str)
# Note that old v1 clients (which are gone now) did not sign messages, so v2
# servers would deliver v2-format messages with sig_vs=claimed_key_vs=None.
# These days we should always get a signature and a pubkey.
# v2 protocol over foolscap: Announcements are 3-tuples of (bytes, str, str)
# or (bytes, none, none)
Announcement_v2 = Any()
class RIIntroducerSubscriberClient_v2(RemoteInterface):
@ -41,12 +44,8 @@ class RIIntroducerPublisherAndSubscriberService_v2(RemoteInterface):
__remote_name__ = "RIIntroducerPublisherAndSubscriberService_v2.tahoe.allmydata.com"
def get_version():
return DictOf(str, Any())
def publish(announcement=Announcement_v1):
return None
def publish_v2(announcement=Announcement_v2, canary=Referenceable):
return None
def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
return None
def subscribe_v2(subscriber=RIIntroducerSubscriberClient_v2,
service_name=str, subscriber_info=SubscriberInfo):
"""Give me a subscriber reference, and I will call its announce_v2()

View File

@ -1,469 +0,0 @@
import time
from base64 import b32decode
from zope.interface import implements, Interface
from twisted.application import service
import allmydata
from allmydata.interfaces import InsufficientVersionError
from allmydata.util import log, idlib, rrefutil
from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \
RemoteInterface, Referenceable, eventually, SturdyRef
from allmydata.introducer.common import SubscriberDescriptor, \
AnnouncementDescriptor
FURL = StringConstraint(1000)
# We keep a copy of the old introducer (both client and server) here to
# support compatibility tests. The old client is supposed to handle the new
# server, and new client is supposed to handle the old server.
# Announcements are (FURL, service_name, remoteinterface_name,
# nickname, my_version, oldest_supported)
# the (FURL, service_name, remoteinterface_name) refer to the service being
# announced. The (nickname, my_version, oldest_supported) refer to the
# client as a whole. The my_version/oldest_supported strings can be parsed
# by an allmydata.util.version.Version instance, and then compared. The
# first goal is to make sure that nodes are not confused by speaking to an
# incompatible peer. The second goal is to enable the development of
# backwards-compatibility code.
Announcement = TupleOf(FURL, str, str,
str, str, str)
class RIIntroducerSubscriberClient_v1(RemoteInterface):
__remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com"
def announce(announcements=SetOf(Announcement)):
"""I accept announcements from the publisher."""
return None
# When Foolscap can handle multiple interfaces (Foolscap#17), the
# full-powered introducer will implement both RIIntroducerPublisher and
# RIIntroducerSubscriberService. Until then, we define
# RIIntroducerPublisherAndSubscriberService as a combination of the two, and
# make everybody use that.
class RIIntroducerPublisher_v1(RemoteInterface):
"""To publish a service to the world, connect to me and give me your
announcement message. I will deliver a copy to all connected subscribers."""
__remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com"
def publish(announcement=Announcement):
# canary?
return None
class RIIntroducerSubscriberService_v1(RemoteInterface):
__remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com"
def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
"""Give me a subscriber reference, and I will call its new_peers()
method will any announcements that match the desired service name. I
will ignore duplicate subscriptions.
"""
return None
class RIIntroducerPublisherAndSubscriberService_v1(RemoteInterface):
__remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com"
def get_version():
return DictOf(str, Any())
def publish(announcement=Announcement):
return None
def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
return None
class IIntroducerClient(Interface):
"""I provide service introduction facilities for a node. I help nodes
publish their services to the rest of the world, and I help them learn
about services available on other nodes."""
def publish(furl, service_name, remoteinterface_name):
"""Once you call this, I will tell the world that the Referenceable
available at FURL is available to provide a service named
SERVICE_NAME. The precise definition of the service being provided is
identified by the Foolscap 'remote interface name' in the last
parameter: this is supposed to be a globally-unique string that
identifies the RemoteInterface that is implemented."""
def subscribe_to(service_name, callback, *args, **kwargs):
"""Call this if you will eventually want to use services with the
given SERVICE_NAME. This will prompt me to subscribe to announcements
of those services. Your callback will be invoked with at least two
arguments: a serverid (binary string), and an announcement
dictionary, followed by any additional callback args/kwargs you give
me. I will run your callback for both new announcements and for
announcements that have changed, but you must be prepared to tolerate
duplicates.
The announcement dictionary that I give you will have the following
keys:
version: 0
service-name: str('storage')
FURL: str(furl)
remoteinterface-name: str(ri_name)
nickname: unicode
app-versions: {}
my-version: str
oldest-supported: str
Note that app-version will be an empty dictionary until #466 is done
and both the introducer and the remote client have been upgraded. For
current (native) server types, the serverid will always be equal to
the binary form of the FURL's tubid.
"""
def connected_to_introducer():
"""Returns a boolean, True if we are currently connected to the
introducer, False if not."""
class IntroducerClient_v1(service.Service, Referenceable):
implements(RIIntroducerSubscriberClient_v1, IIntroducerClient)
def __init__(self, tub, introducer_furl,
nickname, my_version, oldest_supported):
self._tub = tub
self.introducer_furl = introducer_furl
assert type(nickname) is unicode
self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
self._my_version = my_version
self._oldest_supported = oldest_supported
self._published_announcements = set()
self._publisher = None
self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
self._subscribed_service_names = set()
self._subscriptions = set() # requests we've actually sent
# _current_announcements remembers one announcement per
# (servicename,serverid) pair. Anything that arrives with the same
# pair will displace the previous one. This stores unpacked
# announcement dictionaries, which can be compared for equality to
# distinguish re-announcement from updates. It also provides memory
# for clients who subscribe after startup.
self._current_announcements = {}
# hooks for unit tests
self._debug_counts = {
"inbound_message": 0,
"inbound_announcement": 0,
"wrong_service": 0,
"duplicate_announcement": 0,
"update": 0,
"new_announcement": 0,
"outbound_message": 0,
}
self._debug_outstanding = 0
def _debug_retired(self, res):
self._debug_outstanding -= 1
return res
def startService(self):
service.Service.startService(self)
self._introducer_error = None
rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
self._introducer_reconnector = rc
def connect_failed(failure):
self.log("Initial Introducer connection failed: perhaps it's down",
level=log.WEIRD, failure=failure, umid="c5MqUQ")
d = self._tub.getReference(self.introducer_furl)
d.addErrback(connect_failed)
def _got_introducer(self, publisher):
self.log("connected to introducer, getting versions")
default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
{ },
"application-version": "unknown: no get_version()",
}
d = rrefutil.add_version_to_remote_reference(publisher, default)
d.addCallback(self._got_versioned_introducer)
d.addErrback(self._got_error)
def _got_error(self, f):
# TODO: for the introducer, perhaps this should halt the application
self._introducer_error = f # polled by tests
def _got_versioned_introducer(self, publisher):
self.log("got introducer version: %s" % (publisher.version,))
# we require a V1 introducer
needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
if needed not in publisher.version:
raise InsufficientVersionError(needed, publisher.version)
self._publisher = publisher
publisher.notifyOnDisconnect(self._disconnected)
self._maybe_publish()
self._maybe_subscribe()
def _disconnected(self):
self.log("bummer, we've lost our connection to the introducer")
self._publisher = None
self._subscriptions.clear()
def log(self, *args, **kwargs):
if "facility" not in kwargs:
kwargs["facility"] = "tahoe.introducer"
return log.msg(*args, **kwargs)
def publish(self, furl, service_name, remoteinterface_name):
assert type(self._nickname_utf8) is str # we always send UTF-8
ann = (furl, service_name, remoteinterface_name,
self._nickname_utf8, self._my_version, self._oldest_supported)
self._published_announcements.add(ann)
self._maybe_publish()
def subscribe_to(self, service_name, cb, *args, **kwargs):
self._local_subscribers.append( (service_name,cb,args,kwargs) )
self._subscribed_service_names.add(service_name)
self._maybe_subscribe()
for (servicename,nodeid),ann_d in self._current_announcements.items():
if servicename == service_name:
eventually(cb, nodeid, ann_d)
def _maybe_subscribe(self):
if not self._publisher:
self.log("want to subscribe, but no introducer yet",
level=log.NOISY)
return
for service_name in self._subscribed_service_names:
if service_name not in self._subscriptions:
# there is a race here, but the subscription desk ignores
# duplicate requests.
self._subscriptions.add(service_name)
self._debug_outstanding += 1
d = self._publisher.callRemote("subscribe", self, service_name)
d.addBoth(self._debug_retired)
d.addErrback(rrefutil.trap_deadref)
d.addErrback(log.err, format="server errored during subscribe",
facility="tahoe.introducer",
level=log.WEIRD, umid="2uMScQ")
def _maybe_publish(self):
if not self._publisher:
self.log("want to publish, but no introducer yet", level=log.NOISY)
return
# this re-publishes everything. The Introducer ignores duplicates
for ann in self._published_announcements:
self._debug_counts["outbound_message"] += 1
self._debug_outstanding += 1
d = self._publisher.callRemote("publish", ann)
d.addBoth(self._debug_retired)
d.addErrback(rrefutil.trap_deadref)
d.addErrback(log.err,
format="server errored during publish %(ann)s",
ann=ann, facility="tahoe.introducer",
level=log.WEIRD, umid="xs9pVQ")
def remote_announce(self, announcements):
self.log("received %d announcements" % len(announcements))
self._debug_counts["inbound_message"] += 1
for ann in announcements:
try:
self._process_announcement(ann)
except:
log.err(format="unable to process announcement %(ann)s",
ann=ann)
# Don't let a corrupt announcement prevent us from processing
# the remaining ones. Don't return an error to the server,
# since they'd just ignore it anyways.
pass
def _process_announcement(self, ann):
self._debug_counts["inbound_announcement"] += 1
(furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
if service_name not in self._subscribed_service_names:
self.log("announcement for a service we don't care about [%s]"
% (service_name,), level=log.UNUSUAL, umid="dIpGNA")
self._debug_counts["wrong_service"] += 1
return
self.log("announcement for [%s]: %s" % (service_name, ann),
umid="BoKEag")
assert type(furl) is str
assert type(service_name) is str
assert type(ri_name) is str
assert type(nickname_utf8) is str
nickname = nickname_utf8.decode("utf-8")
assert type(nickname) is unicode
assert type(ver) is str
assert type(oldest) is str
nodeid = b32decode(SturdyRef(furl).tubID.upper())
nodeid_s = idlib.shortnodeid_b2a(nodeid)
ann_d = { "version": 0,
"service-name": service_name,
"FURL": furl,
"nickname": nickname,
"app-versions": {}, # need #466 and v2 introducer
"my-version": ver,
"oldest-supported": oldest,
}
index = (service_name, nodeid)
if self._current_announcements.get(index, None) == ann_d:
self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
service=service_name, nodeid=nodeid_s,
level=log.UNUSUAL, umid="B1MIdA")
self._debug_counts["duplicate_announcement"] += 1
return
if index in self._current_announcements:
self._debug_counts["update"] += 1
else:
self._debug_counts["new_announcement"] += 1
self._current_announcements[index] = ann_d
# note: we never forget an index, but we might update its value
for (service_name2,cb,args,kwargs) in self._local_subscribers:
if service_name2 == service_name:
eventually(cb, nodeid, ann_d, *args, **kwargs)
def connected_to_introducer(self):
return bool(self._publisher)
class IntroducerService_v1(service.MultiService, Referenceable):
implements(RIIntroducerPublisherAndSubscriberService_v1)
name = "introducer"
VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1":
{ },
"application-version": str(allmydata.__full_version__),
}
def __init__(self, basedir="."):
service.MultiService.__init__(self)
self.introducer_url = None
# 'index' is (service_name, tubid)
self._announcements = {} # dict of index -> (announcement, timestamp)
self._subscribers = {} # [service_name]->[rref]->timestamp
self._debug_counts = {"inbound_message": 0,
"inbound_duplicate": 0,
"inbound_update": 0,
"outbound_message": 0,
"outbound_announcements": 0,
"inbound_subscribe": 0}
self._debug_outstanding = 0
def _debug_retired(self, res):
self._debug_outstanding -= 1
return res
def log(self, *args, **kwargs):
if "facility" not in kwargs:
kwargs["facility"] = "tahoe.introducer"
return log.msg(*args, **kwargs)
def get_announcements(self, include_stub_clients=True):
announcements = []
for index, (ann_t, when) in self._announcements.items():
(furl, service_name, ri_name, nickname, ver, oldest) = ann_t
if service_name == "stub_client" and not include_stub_clients:
continue
ann_d = {"nickname": nickname.decode("utf-8", "replace"),
"my-version": ver,
"service-name": service_name,
"anonymous-storage-FURL": furl,
}
# the V2 introducer uses (service_name, key_s, tubid_s) as an
# index, so match that format for AnnouncementDescriptor
new_index = (index[0], None, idlib.nodeid_b2a(index[1]))
ad = AnnouncementDescriptor(when, new_index, None, ann_d)
announcements.append(ad)
return announcements
def get_subscribers(self):
s = []
for service_name, subscribers in self._subscribers.items():
for rref, when in subscribers.items():
tubid = rref.getRemoteTubID() or "?"
remote_address = rrefutil.stringify_remote_address(rref)
nickname, version, app_versions = u"?", u"?", {}
sd = SubscriberDescriptor(service_name, when,
nickname, version, app_versions,
remote_address, tubid)
s.append(sd)
return s
def remote_get_version(self):
return self.VERSION
def remote_publish(self, announcement):
try:
self._publish(announcement)
except:
log.err(format="Introducer.remote_publish failed on %(ann)s",
ann=announcement, level=log.UNUSUAL, umid="620rWA")
raise
def _publish(self, announcement):
self._debug_counts["inbound_message"] += 1
self.log("introducer: announcement published: %s" % (announcement,) )
(furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement
#print "PUB", service_name, nickname_utf8
nodeid = b32decode(SturdyRef(furl).tubID.upper())
index = (service_name, nodeid)
if index in self._announcements:
(old_announcement, timestamp) = self._announcements[index]
if old_announcement == announcement:
self.log("but we already knew it, ignoring", level=log.NOISY)
self._debug_counts["inbound_duplicate"] += 1
return
else:
self.log("old announcement being updated", level=log.NOISY)
self._debug_counts["inbound_update"] += 1
self._announcements[index] = (announcement, time.time())
for s in self._subscribers.get(service_name, []):
self._debug_counts["outbound_message"] += 1
self._debug_counts["outbound_announcements"] += 1
self._debug_outstanding += 1
d = s.callRemote("announce", set([announcement]))
d.addBoth(self._debug_retired)
d.addErrback(rrefutil.trap_deadref)
d.addErrback(log.err,
format="subscriber errored on announcement %(ann)s",
ann=announcement, facility="tahoe.introducer",
level=log.UNUSUAL, umid="jfGMXQ")
def remote_subscribe(self, subscriber, service_name):
self.log("introducer: subscription[%s] request at %s" % (service_name,
subscriber))
self._debug_counts["inbound_subscribe"] += 1
if service_name not in self._subscribers:
self._subscribers[service_name] = {}
subscribers = self._subscribers[service_name]
if subscriber in subscribers:
self.log("but they're already subscribed, ignoring",
level=log.UNUSUAL)
return
subscribers[subscriber] = time.time()
def _remove():
self.log("introducer: unsubscribing[%s] %s" % (service_name,
subscriber))
subscribers.pop(subscriber, None)
subscriber.notifyOnDisconnect(_remove)
announcements = set(
[ ann
for (sn2,nodeid),(ann,when) in self._announcements.items()
if sn2 == service_name] )
self._debug_counts["outbound_message"] += 1
self._debug_counts["outbound_announcements"] += len(announcements)
self._debug_outstanding += 1
d = subscriber.callRemote("announce", announcements)
d.addBoth(self._debug_retired)
d.addErrback(rrefutil.trap_deadref)
d.addErrback(log.err,
format="subscriber errored during subscribe %(anns)s",
anns=announcements, facility="tahoe.introducer",
level=log.UNUSUAL, umid="1XChxA")

View File

@ -9,9 +9,8 @@ from allmydata.util import log, rrefutil
from allmydata.util.fileutil import abspath_expanduser_unicode
from allmydata.introducer.interfaces import \
RIIntroducerPublisherAndSubscriberService_v2
from allmydata.introducer.common import convert_announcement_v1_to_v2, \
convert_announcement_v2_to_v1, unsign_from_foolscap, make_index, \
get_tubid_string_from_ann, SubscriberDescriptor, AnnouncementDescriptor
from allmydata.introducer.common import unsign_from_foolscap, \
SubscriberDescriptor, AnnouncementDescriptor
class FurlFileConflictError(Exception):
pass
@ -63,42 +62,12 @@ class IntroducerNode(node.Node):
ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir)
self.add_service(ws)
class WrapV1SubscriberInV2Interface: # for_v1
"""I wrap a RemoteReference that points at an old v1 subscriber, enabling
it to be treated like a v2 subscriber.
"""
def __init__(self, original):
self.original = original # also used for tests
def __eq__(self, them):
return self.original == them
def __ne__(self, them):
return self.original != them
def __hash__(self):
return hash(self.original)
def getRemoteTubID(self):
return self.original.getRemoteTubID()
def getSturdyRef(self):
return self.original.getSturdyRef()
def getPeer(self):
return self.original.getPeer()
def getLocationHints(self):
return self.original.getLocationHints()
def callRemote(self, methname, *args, **kwargs):
m = getattr(self, "wrap_" + methname)
return m(*args, **kwargs)
def wrap_announce_v2(self, announcements):
anns_v1 = [convert_announcement_v2_to_v1(ann) for ann in announcements]
return self.original.callRemote("announce", set(anns_v1))
def notifyOnDisconnect(self, *args, **kwargs):
return self.original.notifyOnDisconnect(*args, **kwargs)
class IntroducerService(service.MultiService, Referenceable):
implements(RIIntroducerPublisherAndSubscriberService_v2)
name = "introducer"
# v1 is the original protocol, supported since 1.0 (but only advertised
# starting in 1.3). v2 is the new signed protocol, supported after 1.9
VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": { },
# v1 is the original protocol, added in 1.0 (but only advertised starting
# in 1.3), removed in 1.12. v2 is the new signed protocol, added in 1.10
VERSION = { #"http://allmydata.org/tahoe/protocols/introducer/v1": { },
"http://allmydata.org/tahoe/protocols/introducer/v2": { },
"application-version": str(allmydata.__full_version__),
}
@ -118,16 +87,11 @@ class IntroducerService(service.MultiService, Referenceable):
# self._subscribers is a dict mapping servicename to subscriptions
# 'subscriptions' is a dict mapping rref to a subscription
# 'subscription' is a tuple of (subscriber_info, timestamp)
# 'subscriber_info' is a dict, provided directly for v2 clients, or
# synthesized for v1 clients. The expected keys are:
# version, nickname, app-versions, my-version, oldest-supported
# 'subscriber_info' is a dict, provided directly by v2 clients. The
# expected keys are: version, nickname, app-versions, my-version,
# oldest-supported
self._subscribers = {}
# self._stub_client_announcements contains the information provided
# by v1 clients. We stash this so we can match it up with their
# subscriptions.
self._stub_client_announcements = {} # maps tubid to sinfo # for_v1
self._debug_counts = {"inbound_message": 0,
"inbound_duplicate": 0,
"inbound_no_seqnum": 0,
@ -136,7 +100,7 @@ class IntroducerService(service.MultiService, Referenceable):
"outbound_message": 0,
"outbound_announcements": 0,
"inbound_subscribe": 0}
self._debug_outstanding = 0 # also covers WrapV1SubscriberInV2Interface
self._debug_outstanding = 0
def _debug_retired(self, res):
self._debug_outstanding -= 1
@ -147,13 +111,10 @@ class IntroducerService(service.MultiService, Referenceable):
kwargs["facility"] = "tahoe.introducer.server"
return log.msg(*args, **kwargs)
def get_announcements(self, include_stub_clients=True):
def get_announcements(self):
"""Return a list of AnnouncementDescriptor for all announcements"""
announcements = []
for (index, (_, canary, ann, when)) in self._announcements.items():
if ann["service-name"] == "stub_client":
if not include_stub_clients:
continue
ad = AnnouncementDescriptor(when, index, canary, ann)
announcements.append(ad)
return announcements
@ -170,9 +131,6 @@ class IntroducerService(service.MultiService, Referenceable):
remote_address = rrefutil.stringify_remote_address(rref)
# these three assume subscriber_info["version"]==0, but
# should tolerate other versions
if not subscriber_info:
# V1 clients that haven't yet sent their stub_info data
subscriber_info = {}
nickname = subscriber_info.get("nickname", u"?")
version = subscriber_info.get("my-version", u"?")
app_versions = subscriber_info.get("app-versions", {})
@ -186,12 +144,6 @@ class IntroducerService(service.MultiService, Referenceable):
def remote_get_version(self):
return self.VERSION
def remote_publish(self, ann_t): # for_v1
lp = self.log("introducer: old (v1) announcement published: %s"
% (ann_t,), umid="6zGOIw")
ann_v2 = convert_announcement_v1_to_v2(ann_t)
return self.publish(ann_v2, None, lp)
def remote_publish_v2(self, ann_t, canary):
lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ")
return self.publish(ann_t, canary, lp)
@ -210,13 +162,9 @@ class IntroducerService(service.MultiService, Referenceable):
self.log("introducer: announcement published: %s" % (ann_t,),
umid="wKHgCw")
ann, key = unsign_from_foolscap(ann_t) # might raise BadSignatureError
index = make_index(ann, key)
service_name = str(ann["service-name"])
if service_name == "stub_client": # for_v1
self._attach_stub_client(ann, lp)
return
index = (service_name, key)
old = self._announcements.get(index)
if old:
(old_ann_t, canary, old_ann, timestamp) = old
@ -263,56 +211,6 @@ class IntroducerService(service.MultiService, Referenceable):
ann=ann_t, facility="tahoe.introducer",
level=log.UNUSUAL, umid="jfGMXQ")
def _attach_stub_client(self, ann, lp):
# There might be a v1 subscriber for whom this is a stub_client.
# We might have received the subscription before the stub_client
# announcement, in which case we now need to fix up the record in
# self._subscriptions .
# record it for later, in case the stub_client arrived before the
# subscription
subscriber_info = self._get_subscriber_info_from_ann(ann)
ann_tubid = get_tubid_string_from_ann(ann)
self._stub_client_announcements[ann_tubid] = subscriber_info
lp2 = self.log("stub_client announcement, "
"looking for matching subscriber",
parent=lp, level=log.NOISY, umid="BTywDg")
for sn in self._subscribers:
s = self._subscribers[sn]
for (subscriber, info) in s.items():
# we correlate these by looking for a subscriber whose tubid
# matches this announcement
sub_tubid = subscriber.getRemoteTubID()
if sub_tubid == ann_tubid:
self.log(format="found a match, nodeid=%(nodeid)s",
nodeid=sub_tubid,
level=log.NOISY, parent=lp2, umid="xsWs1A")
# found a match. Does it need info?
if not info[0]:
self.log(format="replacing info",
level=log.NOISY, parent=lp2, umid="m5kxwA")
# yup
s[subscriber] = (subscriber_info, info[1])
# and we don't remember or announce stub_clients beyond what we
# need to get the subscriber_info set up
def _get_subscriber_info_from_ann(self, ann): # for_v1
sinfo = { "version": ann["version"],
"nickname": ann["nickname"],
"app-versions": ann["app-versions"],
"my-version": ann["my-version"],
"oldest-supported": ann["oldest-supported"],
}
return sinfo
def remote_subscribe(self, subscriber, service_name): # for_v1
self.log("introducer: old (v1) subscription[%s] request at %s"
% (service_name, subscriber), umid="hJlGUg")
return self.add_subscriber(WrapV1SubscriberInV2Interface(subscriber),
service_name, None)
def remote_subscribe_v2(self, subscriber, service_name, subscriber_info):
self.log("introducer: subscription[%s] request at %s"
% (service_name, subscriber), umid="U3uzLg")
@ -328,14 +226,7 @@ class IntroducerService(service.MultiService, Referenceable):
level=log.UNUSUAL, umid="Sy9EfA")
return
if not subscriber_info: # for_v1
# v1 clients don't provide subscriber_info, but they should
# publish a 'stub client' record which contains the same
# information. If we've already received this, it will be in
# self._stub_client_announcements
tubid = subscriber.getRemoteTubID()
if tubid in self._stub_client_announcements:
subscriber_info = self._stub_client_announcements[tubid]
assert subscriber_info
subscribers[subscriber] = (subscriber_info, time.time())
def _remove():

View File

@ -11,13 +11,11 @@ from twisted.python.filepath import FilePath
from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
from twisted.application import service
from allmydata.interfaces import InsufficientVersionError
from allmydata.introducer.client import IntroducerClient, \
WrapV2ClientInV1Interface
from allmydata.introducer.client import IntroducerClient
from allmydata.introducer.server import IntroducerService, FurlFileConflictError
from allmydata.introducer.common import get_tubid_string_from_ann, \
get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \
UnknownKeyError
from allmydata.introducer import old
# test compatibility with old introducer .tac files
from allmydata.introducer import IntroducerNode
from allmydata.web import introweb
@ -89,7 +87,6 @@ class ServiceMixin:
return d
class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
def test_create(self):
ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
"my_version", "oldest_version", {}, fakeseq,
@ -100,57 +97,6 @@ class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
i = IntroducerService()
i.setServiceParent(self.parent)
def test_duplicate_publish(self):
i = IntroducerService()
self.failUnlessEqual(len(i.get_announcements()), 0)
self.failUnlessEqual(len(i.get_subscribers()), 0)
furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
i.remote_publish(ann1)
self.failUnlessEqual(len(i.get_announcements()), 1)
self.failUnlessEqual(len(i.get_subscribers()), 0)
i.remote_publish(ann2)
self.failUnlessEqual(len(i.get_announcements()), 2)
self.failUnlessEqual(len(i.get_subscribers()), 0)
i.remote_publish(ann1b)
self.failUnlessEqual(len(i.get_announcements()), 2)
self.failUnlessEqual(len(i.get_subscribers()), 0)
def test_id_collision(self):
# test replacement case where tubid equals a keyid (one should
# not replace the other)
i = IntroducerService()
ic = IntroducerClient(None,
"introducer.furl", u"my_nickname",
"my_version", "oldest_version", {}, fakeseq,
FilePath(self.mktemp()))
sk_s, vk_s = keyutil.make_keypair()
sk, _ignored = keyutil.parse_privkey(sk_s)
keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
ann_t = make_ann_t(ic, furl1, sk, 1)
i.remote_publish_v2(ann_t, Referenceable())
announcements = i.get_announcements()
self.failUnlessEqual(len(announcements), 1)
key1 = ("storage", "v0-"+keyid, None)
self.failUnlessEqual(announcements[0].index, key1)
ann1_out = announcements[0].announcement
self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
i.remote_publish(ann2)
announcements = i.get_announcements()
self.failUnlessEqual(len(announcements), 2)
key2 = ("storage", None, keyid)
wanted = [ad for ad in announcements if ad.index == key2]
self.failUnlessEqual(len(wanted), 1)
ann2_out = wanted[0].announcement
self.failUnlessEqual(ann2_out["anonymous-storage-FURL"], furl2)
def fakeseq():
return 1, "nonce"
@ -165,6 +111,7 @@ def make_ann(furl):
return ann
def make_ann_t(ic, furl, privkey, seqnum):
assert privkey
ann_d = ic.create_announcement_dict("storage", make_ann(furl))
ann_d["seqnum"] = seqnum
ann_d["nonce"] = "nonce"
@ -172,56 +119,6 @@ def make_ann_t(ic, furl, privkey, seqnum):
return ann_t
class Client(unittest.TestCase):
def test_duplicate_receive_v1(self):
ic = IntroducerClient(None,
"introducer.furl", u"my_nickname",
"my_version", "oldest_version", {}, fakeseq,
FilePath(self.mktemp()))
announcements = []
ic.subscribe_to("storage",
lambda key_s,ann: announcements.append(ann))
furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
ca = WrapV2ClientInV1Interface(ic)
ca.remote_announce([ann1])
d = fireEventually()
def _then(ign):
self.failUnlessEqual(len(announcements), 1)
self.failUnlessEqual(announcements[0]["nickname"], u"nick1")
self.failUnlessEqual(announcements[0]["my-version"], "ver23")
self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1)
self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
self.failUnlessEqual(ic._debug_counts["update"], 0)
self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0)
# now send a duplicate announcement: this should not notify clients
ca.remote_announce([ann1])
return fireEventually()
d.addCallback(_then)
def _then2(ign):
self.failUnlessEqual(len(announcements), 1)
self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2)
self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
self.failUnlessEqual(ic._debug_counts["update"], 0)
self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
# and a replacement announcement: same FURL, new other stuff.
# Clients should be notified.
ca.remote_announce([ann1b])
return fireEventually()
d.addCallback(_then2)
def _then3(ign):
self.failUnlessEqual(len(announcements), 2)
self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3)
self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
self.failUnlessEqual(ic._debug_counts["update"], 1)
self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
# test that the other stuff changed
self.failUnlessEqual(announcements[-1]["nickname"], u"nick1")
self.failUnlessEqual(announcements[-1]["my-version"], "ver24")
d.addCallback(_then3)
return d
def test_duplicate_receive_v2(self):
ic1 = IntroducerClient(None,
"introducer.furl", u"my_nickname",
@ -330,45 +227,6 @@ class Client(unittest.TestCase):
d.addCallback(_then5)
return d
def test_id_collision(self):
# test replacement case where tubid equals a keyid (one should
# not replace the other)
ic = IntroducerClient(None,
"introducer.furl", u"my_nickname",
"my_version", "oldest_version", {}, fakeseq,
FilePath(self.mktemp()))
announcements = []
ic.subscribe_to("storage",
lambda key_s,ann: announcements.append(ann))
sk_s, vk_s = keyutil.make_keypair()
sk, _ignored = keyutil.parse_privkey(sk_s)
keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
ann_t = make_ann_t(ic, furl1, sk, 1)
ic.remote_announce_v2([ann_t])
d = fireEventually()
def _then(ign):
# first announcement has been processed
self.failUnlessEqual(len(announcements), 1)
self.failUnlessEqual(announcements[0]["anonymous-storage-FURL"],
furl1)
# now submit a second one, with a tubid that happens to look just
# like the pubkey-based serverid we just processed. They should
# not overlap.
ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
ca = WrapV2ClientInV1Interface(ic)
ca.remote_announce([ann2])
return fireEventually()
d.addCallback(_then)
def _then2(ign):
# if they overlapped, the second announcement would be ignored
self.failUnlessEqual(len(announcements), 2)
self.failUnlessEqual(announcements[1]["anonymous-storage-FURL"],
furl2)
d.addCallback(_then2)
return d
class Server(unittest.TestCase):
def test_duplicate(self):
i = IntroducerService()
@ -515,14 +373,10 @@ class Queue(SystemTestMixin, unittest.TestCase):
return d
V1 = "v1"; V2 = "v2"
class SystemTest(SystemTestMixin, unittest.TestCase):
def do_system_test(self, server_version):
def do_system_test(self):
self.create_tub()
if server_version == V1:
introducer = old.IntroducerService_v1()
else:
introducer = IntroducerService()
introducer.setServiceParent(self.parent)
iff = os.path.join(self.basedir, "introducer.furl")
@ -545,6 +399,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
printable_serverids = {}
self.the_introducer = introducer
privkeys = {}
pubkeys = {}
expected_announcements = [0 for c in range(NUM_CLIENTS)]
for i in range(NUM_CLIENTS):
@ -558,62 +413,39 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
tub.setLocation("localhost:%d" % portnum)
log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
if i == 0:
c = old.IntroducerClient_v1(tub, self.introducer_furl,
NICKNAME % str(i),
"version", "oldest")
else:
c = IntroducerClient(tub, self.introducer_furl,
NICKNAME % str(i),
"version", "oldest",
{"component": "component-v1"}, fakeseq,
FilePath(self.mktemp()))
received_announcements[c] = {}
def got(key_s_or_tubid, ann, announcements, i):
if i == 0:
index = get_tubid_string_from_ann(ann)
else:
def got(key_s_or_tubid, ann, announcements):
index = key_s_or_tubid or get_tubid_string_from_ann(ann)
announcements[index] = ann
c.subscribe_to("storage", got, received_announcements[c], i)
c.subscribe_to("storage", got, received_announcements[c])
subscribing_clients.append(c)
expected_announcements[i] += 1 # all expect a 'storage' announcement
node_furl = tub.registerReference(Referenceable())
if i < NUM_STORAGE:
if i == 0:
c.publish(node_furl, "storage", "ri_name")
printable_serverids[i] = get_tubid_string(node_furl)
elif i == 1:
# sign the announcement
privkey_s, pubkey_s = keyutil.make_keypair()
privkey, _ignored = keyutil.parse_privkey(privkey_s)
privkeys[c] = privkey
privkeys[i] = privkey
pubkeys[i] = pubkey_s
if i < NUM_STORAGE:
# sign all announcements
c.publish("storage", make_ann(node_furl), privkey)
if server_version == V1:
printable_serverids[i] = get_tubid_string(node_furl)
else:
assert pubkey_s.startswith("pub-")
printable_serverids[i] = pubkey_s[len("pub-"):]
else:
c.publish("storage", make_ann(node_furl))
printable_serverids[i] = get_tubid_string(node_furl)
publishing_clients.append(c)
else:
# the last one does not publish anything
pass
if i == 0:
# users of the V1 client were required to publish a
# 'stub_client' record (somewhat after they published the
# 'storage' record), so the introducer could see their
# version. Match that behavior.
c.publish(node_furl, "stub_client", "stub_ri_name")
if i == 2:
# also publish something that nobody cares about
boring_furl = tub.registerReference(Referenceable())
c.publish("boring", make_ann(boring_furl))
c.publish("boring", make_ann(boring_furl), privkey)
c.setServiceParent(self.parent)
clients.append(c)
@ -661,23 +493,9 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
def _check1(res):
log.msg("doing _check1")
dc = self.the_introducer._debug_counts
if server_version == V1:
# each storage server publishes a record, and (after its
# 'subscribe' has been ACKed) also publishes a "stub_client".
# The non-storage client (which subscribes) also publishes a
# stub_client. There is also one "boring" service. The number
# of messages is higher, because the stub_clients aren't
# published until after we get the 'subscribe' ack (since we
# don't realize that we're dealing with a v1 server [which
# needs stub_clients] until then), and the act of publishing
# the stub_client causes us to re-send all previous
# announcements.
self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"],
NUM_STORAGE + NUM_CLIENTS + 1)
else:
# each storage server publishes a record. There is also one
# "stub_client" and one "boring"
self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2)
# "boring"
self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+1)
self.failUnlessEqual(dc["inbound_duplicate"], 0)
self.failUnlessEqual(dc["inbound_update"], 0)
self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
@ -701,32 +519,15 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
anns = received_announcements[c]
self.failUnlessEqual(len(anns), NUM_STORAGE)
nodeid0 = tubs[clients[0]].tubID
ann = anns[nodeid0]
serverid0 = printable_serverids[0]
ann = anns[serverid0]
nick = ann["nickname"]
self.failUnlessEqual(type(nick), unicode)
self.failUnlessEqual(nick, NICKNAME % "0")
if server_version == V1:
for c in publishing_clients:
cdc = c._debug_counts
expected = 1 # storage
if c is clients[2]:
expected += 1 # boring
if c is not clients[0]:
# the v2 client tries to call publish_v2, which fails
# because the server is v1. It then re-sends
# everything it has so far, plus a stub_client record
expected = 2*expected + 1
if c is clients[0]:
# we always tell v1 client to send stub_client
expected += 1
self.failUnlessEqual(cdc["outbound_message"], expected)
else:
for c in publishing_clients:
cdc = c._debug_counts
expected = 1
if c in [clients[0], # stub_client
clients[2], # boring
if c in [clients[2], # boring
]:
expected = 2
self.failUnlessEqual(cdc["outbound_message"], expected)
@ -734,8 +535,8 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
ir = introweb.IntroducerRoot(self.parent)
self.parent.nodeid = "NODEID"
text = ir.renderSynchronously().decode("utf-8")
self.failUnlessIn(NICKNAME % "0", text) # the v1 client
self.failUnlessIn(NICKNAME % "1", text) # a v2 client
self.failUnlessIn(NICKNAME % "0", text) # a v2 client
self.failUnlessIn(NICKNAME % "1", text) # another v2 client
for i in range(NUM_STORAGE):
self.failUnlessIn(printable_serverids[i], text,
(i,printable_serverids[i],text))
@ -825,9 +626,6 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
for k in c._debug_counts:
c._debug_counts[k] = 0
expected_announcements[i] += 1 # new 'storage' for everyone
if server_version == V1:
introducer = old.IntroducerService_v1()
else:
introducer = IntroducerService()
self.the_introducer = introducer
newfurl = self.central_tub.registerReference(self.the_introducer,
@ -861,17 +659,10 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
def test_system_v2_server(self):
self.basedir = "introducer/SystemTest/system_v2_server"
os.makedirs(self.basedir)
return self.do_system_test(V2)
return self.do_system_test()
test_system_v2_server.timeout = 480
# occasionally takes longer than 350s on "draco"
def test_system_v1_server(self):
self.basedir = "introducer/SystemTest/system_v1_server"
os.makedirs(self.basedir)
return self.do_system_test(V1)
test_system_v1_server.timeout = 480
# occasionally takes longer than 350s on "draco"
class FakeRemoteReference:
def notifyOnDisconnect(self, *args, **kwargs): pass
def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
@ -902,69 +693,7 @@ class ClientInfo(unittest.TestCase):
self.failUnlessEqual(s0.nickname, NICKNAME % u"v2")
self.failUnlessEqual(s0.version, "my_version")
def test_client_v1(self):
introducer = IntroducerService()
subscriber = FakeRemoteReference()
introducer.remote_subscribe(subscriber, "storage")
# the v1 subscribe interface had no subscriber_info: that was usually
# sent in a separate stub_client pseudo-announcement
subs = introducer.get_subscribers()
self.failUnlessEqual(len(subs), 1)
s0 = subs[0]
self.failUnlessEqual(s0.nickname, u"?") # not known yet
self.failUnlessEqual(s0.service_name, "storage")
# now submit the stub_client announcement
furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
ann = (furl1, "stub_client", "RIStubClient",
(NICKNAME % u"v1").encode("utf-8"), "my_version", "oldest")
introducer.remote_publish(ann)
# the server should correlate the two
subs = introducer.get_subscribers()
self.failUnlessEqual(len(subs), 1)
s0 = subs[0]
self.failUnlessEqual(s0.service_name, "storage")
# v1 announcements do not contain app-versions
self.failUnlessEqual(s0.app_versions, {})
self.failUnlessEqual(s0.nickname, NICKNAME % u"v1")
self.failUnlessEqual(s0.version, "my_version")
# a subscription that arrives after the stub_client announcement
# should be correlated too
subscriber2 = FakeRemoteReference()
introducer.remote_subscribe(subscriber2, "thing2")
subs = introducer.get_subscribers()
self.failUnlessEqual(len(subs), 2)
s0 = [s for s in subs if s.service_name == "thing2"][0]
# v1 announcements do not contain app-versions
self.failUnlessEqual(s0.app_versions, {})
self.failUnlessEqual(s0.nickname, NICKNAME % u"v1")
self.failUnlessEqual(s0.version, "my_version")
class Announcements(unittest.TestCase):
def test_client_v2_unsigned(self):
introducer = IntroducerService()
tub = introducer_furl = None
app_versions = {"whizzy": "fizzy"}
client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
"my_version", "oldest", app_versions,
fakeseq, FilePath(self.mktemp()))
furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
ann_s0 = make_ann_t(client_v2, furl1, None, 10)
canary0 = Referenceable()
introducer.remote_publish_v2(ann_s0, canary0)
a = introducer.get_announcements()
self.failUnlessEqual(len(a), 1)
self.failUnlessIdentical(a[0].canary, canary0)
self.failUnlessEqual(a[0].index, ("storage", None, tubid))
self.failUnlessEqual(a[0].announcement["app-versions"], app_versions)
self.failUnlessEqual(a[0].nickname, u"nick-v2")
self.failUnlessEqual(a[0].service_name, "storage")
self.failUnlessEqual(a[0].version, "my_version")
self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
def test_client_v2_signed(self):
introducer = IntroducerService()
tub = introducer_furl = None
@ -982,32 +711,13 @@ class Announcements(unittest.TestCase):
a = introducer.get_announcements()
self.failUnlessEqual(len(a), 1)
self.failUnlessIdentical(a[0].canary, canary0)
self.failUnlessEqual(a[0].index, ("storage", pks, None))
self.failUnlessEqual(a[0].index, ("storage", pks))
self.failUnlessEqual(a[0].announcement["app-versions"], app_versions)
self.failUnlessEqual(a[0].nickname, u"nick-v2")
self.failUnlessEqual(a[0].service_name, "storage")
self.failUnlessEqual(a[0].version, "my_version")
self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
def test_client_v1(self):
introducer = IntroducerService()
furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
ann = (furl1, "storage", "RIStorage",
u"nick-v1".encode("utf-8"), "my_version", "oldest")
introducer.remote_publish(ann)
a = introducer.get_announcements()
self.failUnlessEqual(len(a), 1)
self.failUnlessEqual(a[0].index, ("storage", None, tubid))
self.failUnlessEqual(a[0].canary, None)
self.failUnlessEqual(a[0].announcement["app-versions"], {})
self.failUnlessEqual(a[0].nickname, u"nick-v1".encode("utf-8"))
self.failUnlessEqual(a[0].service_name, "storage")
self.failUnlessEqual(a[0].version, "my_version")
self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
def _load_cache(self, cache_filepath):
def construct_unicode(loader, node):
return node.value
@ -1170,7 +880,7 @@ class TooNewServer(IntroducerService):
}
class NonV1Server(SystemTestMixin, unittest.TestCase):
# if the 1.3.0 client connects to a server that doesn't provide the 'v1'
# if the client connects to a server that doesn't provide the 'v2'
# protocol, it is supposed to provide a useful error instead of a weird
# exception.

View File

@ -82,7 +82,7 @@ class IntroducerRoot(rend.Page):
for name in sorted(counts.keys()) ] )
def data_services(self, ctx, data):
services = self.introducer_service.get_announcements(False)
services = self.introducer_service.get_announcements()
services.sort(key=lambda ad: (ad.service_name, ad.nickname))
return services