mirror of
synced 2025-03-10 22:43:52 +00:00
new introducer: signed extensible dictionary-based messages! refs #466
This introduces new client and server halves to the Introducer (renaming the old one with a _V1 suffix). Both have fallbacks to accomodate talking to a different version: the publishing client switches on whether the server's .get_version() advertises V2 support, the server switches on which subscription method was invoked by the subscribing client. The V2 protocol sends a three-tuple of (serialized announcement dictionary, signature, pubkey) for each announcement. The V2 server dispatches messages to subscribers according to the service-name, and throws errors for invalid signatures, but does not otherwise examine the messages. The V2 receiver's subscription callback will receive a (serverid, ann_dict) pair. The 'serverid' will be equal to the pubkey if all of the following are true: the originating client is V2, and was told a privkey to use the announcement went through a V2 server the signature is valid If not, 'serverid' will be equal to the tubid portion of the announced FURL, as was the case for V1 receivers. Servers will create a keypair if one does not exist yet, stored in private/server.privkey . The signed announcement dictionary puts the server FURL in a key named "anonymous-storage-FURL", which anticipates upcoming Accounting-related changes in the server advertisements. It also provides a key named "permutation-seed-base32" to tell clients what permutation seed to use. This is computed at startup, using tubid if there are existing shares, otherwise the pubkey, to retain share-order compatibility for existing servers.
This commit is contained in:
@ -1,12 +1,10 @@
import os, stat, time, weakref
from allmydata.interfaces import RIStorageServer
from allmydata import node
from zope.interface import implements
from twisted.internet import reactor, defer
from twisted.application import service
from twisted.application.internet import TimerService
from foolscap.api import Referenceable
from pycryptopp.publickey import rsa
import allmydata
@ -16,14 +14,13 @@ from allmydata.immutable.upload import Uploader
from allmydata.immutable.offloaded import Helper
from allmydata.control import ControlServer
from allmydata.introducer.client import IntroducerClient
from allmydata.util import hashutil, base32, pollmixin, log
from allmydata.util import hashutil, base32, pollmixin, log, keyutil
from allmydata.util.encodingutil import get_filesystem_encoding
from allmydata.util.abbreviate import parse_abbreviated_size
from allmydata.util.time_format import parse_duration, parse_date
from allmydata.stats import StatsProvider
from allmydata.history import History
from allmydata.interfaces import IStatsProducer, RIStubClient, \
from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION
from allmydata.nodemaker import NodeMaker
from allmydata.blacklist import Blacklist
from allmydata.node import OldConfigOptionError
@ -35,9 +32,6 @@ GiB=1024*MiB
class StubClient(Referenceable):
def _make_secret():
return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
@ -174,7 +168,8 @@ class Client(node.Node, pollmixin.PollMixin):
ic = IntroducerClient(self.tub, self.introducer_furl,
self.introducer_client = ic
# hold off on starting the IntroducerClient until our tub has been
# started, so we'll have a useful address on our RemoteReference, so
@ -203,12 +198,46 @@ class Client(node.Node, pollmixin.PollMixin):
self.convergence = base32.a2b(convergence_s)
self._secret_holder = SecretHolder(lease_secret, self.convergence)
def _maybe_create_server_key(self):
# we only create the key once. On all subsequent runs, we re-use the
# existing key
def _make_key():
sk_vs,vk_vs = keyutil.make_keypair()
return sk_vs+"\n"
sk_vs = self.get_or_create_private_config("server.privkey", _make_key)
sk,vk_vs = keyutil.parse_privkey(sk_vs.strip())
self.write_config("server.pubkey", vk_vs+"\n")
self._server_key = sk
def _init_permutation_seed(self, ss):
seed = self.get_config_from_file("permutation-seed")
if not seed:
have_shares = ss.have_shares()
if have_shares:
# if the server has shares but not a recorded
# permutation-seed, then it has been around since pre-#466
# days, and the clients who uploaded those shares used our
# TubID as a permutation-seed. We should keep using that same
# seed to keep the shares in the same place in the permuted
# ring, so those clients don't have to perform excessive
# searches.
seed = base32.b2a(self.nodeid)
# otherwise, we're free to use the more natural seed of our
# pubkey-based serverid
vk_bytes = self._server_key.get_verifying_key_bytes()
seed = base32.b2a(vk_bytes)
self.write_config("permutation-seed", seed+"\n")
return seed.strip()
def init_storage(self):
# should we run a storage server (and publish it for others to use)?
if not self.get_config("storage", "enabled", True, boolean=True):
readonly = self.get_config("storage", "readonly", False, boolean=True)
storedir = os.path.join(self.basedir, self.STOREDIR)
data = self.get_config("storage", "reserved_space", None)
@ -262,8 +291,10 @@ class Client(node.Node, pollmixin.PollMixin):
def _publish(res):
furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
furl = self.tub.registerReference(ss, furlFile=furl_file)
ri_name = RIStorageServer.__remote_name__
self.introducer_client.publish(furl, "storage", ri_name)
ann = {"anonymous-storage-FURL": furl,
"permutation-seed-base32": self._init_permutation_seed(ss),
self.introducer_client.publish("storage", ann, self._server_key)
d.addErrback(log.err, facility="tahoe.init",
level=log.BAD, umid="aLGBKw")
@ -281,7 +312,6 @@ class Client(node.Node, pollmixin.PollMixin):
self.add_service(Uploader(helper_furl, self.stats_provider,
@ -321,20 +351,6 @@ class Client(node.Node, pollmixin.PollMixin):
def get_storage_broker(self):
return self.storage_broker
def init_stub_client(self):
def _publish(res):
# we publish an empty object so that the introducer can count how
# many clients are connected and see what versions they're
# running.
sc = StubClient()
furl = self.tub.registerReference(sc)
ri_name = RIStubClient.__remote_name__
self.introducer_client.publish(furl, "stub_client", ri_name)
d = self.when_tub_ready()
d.addErrback(log.err, facility="tahoe.init",
level=log.BAD, umid="OEHq3g")
def init_blacklist(self):
fn = os.path.join(self.basedir, "access.blacklist")
self.blacklist = Blacklist(fn)
@ -30,14 +30,6 @@ WriteEnablerSecret = Hash # used to protect mutable bucket modifications
LeaseRenewSecret = Hash # used to protect bucket lease renewal requests
LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests
class RIStubClient(RemoteInterface):
"""Each client publishes a service announcement for a dummy object called
the StubClient. This object doesn't actually offer any services, but the
announcement helps the Introducer keep track of which clients are
subscribed (so the grid admin can keep track of things like the size of
the grid and the client versions in use. This is the (empty)
RemoteInterface for the StubClient."""
class RIBucketWriter(RemoteInterface):
""" Objects of this kind live on the server side. """
def write(offset=Offset, data=ShareData):
@ -1,29 +1,76 @@
from base64 import b32decode
import time
from zope.interface import implements
from twisted.application import service
from foolscap.api import Referenceable, SturdyRef, eventually
from foolscap.api import Referenceable, eventually, RemoteInterface
from allmydata.interfaces import InsufficientVersionError
from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
from allmydata.util import log, idlib
from allmydata.util.rrefutil import add_version_to_remote_reference, trap_deadref
from allmydata.introducer.interfaces import IIntroducerClient, \
RIIntroducerSubscriberClient_v1, RIIntroducerSubscriberClient_v2
from allmydata.introducer.common import sign_to_foolscap, unsign_from_foolscap,\
convert_announcement_v1_to_v2, convert_announcement_v2_to_v1, \
make_index, get_tubid_string_from_ann, get_tubid_string
from allmydata.util import log
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.util.keyutil import BadSignatureError
class WrapV2ClientInV1Interface(Referenceable): # for_v1
"""I wrap a v2 IntroducerClient to make it look like a v1 client, so it
can be attached to an old server."""
def __init__(self, original):
self.original = original
def remote_announce(self, announcements):
lp = self.original.log("received %d announcements (v1)" %
anns_v1 = set([convert_announcement_v1_to_v2(ann_v1)
for ann_v1 in announcements])
return self.original.got_announcements(anns_v1, lp)
def remote_set_encoding_parameters(self, parameters):
class RIStubClient(RemoteInterface): # for_v1
"""Each client publishes a service announcement for a dummy object called
the StubClient. This object doesn't actually offer any services, but the
announcement helps the Introducer keep track of which clients are
subscribed (so the grid admin can keep track of things like the size of
the grid and the client versions in use. This is the (empty)
RemoteInterface for the StubClient."""
class StubClient(Referenceable): # for_v1
V1 = "http://allmydata.org/tahoe/protocols/introducer/v1"
V2 = "http://allmydata.org/tahoe/protocols/introducer/v2"
class IntroducerClient(service.Service, Referenceable):
implements(RIIntroducerSubscriberClient, IIntroducerClient)
implements(RIIntroducerSubscriberClient_v2, IIntroducerClient)
def __init__(self, tub, introducer_furl,
nickname, my_version, oldest_supported):
nickname, my_version, oldest_supported,
self._tub = tub
self.introducer_furl = introducer_furl
assert type(nickname) is unicode
self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
self._nickname = nickname
self._my_version = my_version
self._oldest_supported = oldest_supported
self._app_versions = app_versions
self._published_announcements = set()
self._my_subscriber_info = { "version": 0,
"nickname": self._nickname,
"app-versions": self._app_versions,
"my-version": self._my_version,
"oldest-supported": self._oldest_supported,
self._stub_client = None # for_v1
self._stub_client_furl = None
self._published_announcements = {}
self._canary = Referenceable()
self._publisher = None
@ -33,10 +80,11 @@ class IntroducerClient(service.Service, Referenceable):
# _current_announcements remembers one announcement per
# (servicename,serverid) pair. Anything that arrives with the same
# pair will displace the previous one. This stores unpacked
# announcement dictionaries, which can be compared for equality to
# distinguish re-announcement from updates. It also provides memory
# for clients who subscribe after startup.
# pair will displace the previous one. This stores tuples of
# (unpacked announcement dictionary, verifyingkey, rxtime). The ann
# dicts can be compared for equality to distinguish re-announcement
# from updates. It also provides memory for clients who subscribe
# after startup.
self._current_announcements = {}
self.encoding_parameters = None
@ -51,6 +99,11 @@ class IntroducerClient(service.Service, Referenceable):
"new_announcement": 0,
"outbound_message": 0,
self._debug_outstanding = 0
def _debug_retired(self, res):
self._debug_outstanding -= 1
return res
def startService(self):
@ -79,10 +132,9 @@ class IntroducerClient(service.Service, Referenceable):
def _got_versioned_introducer(self, publisher):
self.log("got introducer version: %s" % (publisher.version,))
# we require a V1 introducer
needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
if needed not in publisher.version:
raise InsufficientVersionError(needed, publisher.version)
# we require an introducer that speaks at least one of (V1, V2)
if not (V1 in publisher.version or V2 in publisher.version):
raise InsufficientVersionError("V1 or V2", publisher.version)
self._publisher = publisher
@ -95,24 +147,17 @@ class IntroducerClient(service.Service, Referenceable):
def log(self, *args, **kwargs):
if "facility" not in kwargs:
kwargs["facility"] = "tahoe.introducer"
kwargs["facility"] = "tahoe.introducer.client"
return log.msg(*args, **kwargs)
def publish(self, furl, service_name, remoteinterface_name):
assert type(self._nickname_utf8) is str # we always send UTF-8
ann = (furl, service_name, remoteinterface_name,
self._nickname_utf8, self._my_version, self._oldest_supported)
def subscribe_to(self, service_name, cb, *args, **kwargs):
self._local_subscribers.append( (service_name,cb,args,kwargs) )
for (servicename,nodeid),ann_d in self._current_announcements.items():
for index,(ann,key_s,when) in self._current_announcements.items():
servicename = index[0]
if servicename == service_name:
eventually(cb, nodeid, ann_d)
eventually(cb, key_s, ann, *args, **kwargs)
def _maybe_subscribe(self):
if not self._publisher:
@ -120,96 +165,160 @@ class IntroducerClient(service.Service, Referenceable):
for service_name in self._subscribed_service_names:
if service_name not in self._subscriptions:
# there is a race here, but the subscription desk ignores
# duplicate requests.
d = self._publisher.callRemote("subscribe", self, service_name)
d.addErrback(log.err, format="server errored during subscribe",
level=log.WEIRD, umid="2uMScQ")
if service_name in self._subscriptions:
if V2 in self._publisher.version:
self._debug_outstanding += 1
d = self._publisher.callRemote("subscribe_v2",
self, service_name,
d = self._subscribe_handle_v1(service_name) # for_v1
d.addErrback(log.err, facility="tahoe.introducer.client",
level=log.WEIRD, umid="2uMScQ")
def _subscribe_handle_v1(self, service_name): # for_v1
# they don't speak V2: must be a v1 introducer. Fall back to the v1
# 'subscribe' method, using a client adapter.
ca = WrapV2ClientInV1Interface(self)
self._debug_outstanding += 1
d = self._publisher.callRemote("subscribe", ca, service_name)
# We must also publish an empty 'stub_client' object, so the
# introducer can count how many clients are connected and see what
# versions they're running.
if not self._stub_client_furl:
self._stub_client = sc = StubClient()
self._stub_client_furl = self._tub.registerReference(sc)
def _publish_stub_client(ignored):
furl = self._stub_client_furl
{ "anonymous-storage-FURL": furl,
"permutation-seed-base32": get_tubid_string(furl),
return d
def create_announcement(self, service_name, ann, signing_key):
full_ann = { "version": 0,
"nickname": self._nickname,
"app-versions": self._app_versions,
"my-version": self._my_version,
"oldest-supported": self._oldest_supported,
"service-name": service_name,
return sign_to_foolscap(full_ann, signing_key)
def publish(self, service_name, ann, signing_key=None):
ann_t = self.create_announcement(service_name, ann, signing_key)
self._published_announcements[service_name] = ann_t
def _maybe_publish(self):
if not self._publisher:
self.log("want to publish, but no introducer yet", level=log.NOISY)
# this re-publishes everything. The Introducer ignores duplicates
for ann in self._published_announcements:
for ann_t in self._published_announcements.values():
self._debug_counts["outbound_message"] += 1
d = self._publisher.callRemote("publish", ann)
format="server errored during publish %(ann)s",
ann=ann, facility="tahoe.introducer",
if V2 in self._publisher.version:
self._debug_outstanding += 1
d = self._publisher.callRemote("publish_v2", ann_t,
d = self._handle_v1_publisher(ann_t) # for_v1
d.addErrback(log.err, ann_t=ann_t,
level=log.WEIRD, umid="xs9pVQ")
def _handle_v1_publisher(self, ann_t): # for_v1
# they don't speak V2, so fall back to the old 'publish' method
# (which takes an unsigned tuple of bytestrings)
self.log("falling back to publish_v1",
level=log.UNUSUAL, umid="9RCT1A")
ann_v1 = convert_announcement_v2_to_v1(ann_t)
self._debug_outstanding += 1
d = self._publisher.callRemote("publish", ann_v1)
return d
def remote_announce(self, announcements):
self.log("received %d announcements" % len(announcements))
def remote_announce_v2(self, announcements):
lp = self.log("received %d announcements (v2)" % len(announcements))
return self.got_announcements(announcements, lp)
def got_announcements(self, announcements, lp=None):
# this is the common entry point for both v1 and v2 announcements
self._debug_counts["inbound_message"] += 1
for ann in announcements:
for ann_t in announcements:
log.err(format="unable to process announcement %(ann)s",
# Don't let a corrupt announcement prevent us from processing
# the remaining ones. Don't return an error to the server,
# since they'd just ignore it anyways.
# this might raise UnknownKeyError or bad-sig error
ann, key_s = unsign_from_foolscap(ann_t)
# key is "v0-base32abc123"
except BadSignatureError:
self.log("bad signature on inbound announcement: %s" % (ann_t,),
parent=lp, level=log.WEIRD, umid="ZAU15Q")
# process other announcements that arrived with the bad one
def _process_announcement(self, ann):
self._process_announcement(ann, key_s)
def _process_announcement(self, ann, key_s):
self._debug_counts["inbound_announcement"] += 1
(furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
service_name = str(ann["service-name"])
if service_name not in self._subscribed_service_names:
self.log("announcement for a service we don't care about [%s]"
% (service_name,), level=log.UNUSUAL, umid="dIpGNA")
self._debug_counts["wrong_service"] += 1
self.log("announcement for [%s]: %s" % (service_name, ann),
assert type(furl) is str
assert type(service_name) is str
assert type(ri_name) is str
assert type(nickname_utf8) is str
nickname = nickname_utf8.decode("utf-8")
assert type(nickname) is unicode
assert type(ver) is str
assert type(oldest) is str
# for ASCII values, simplejson might give us unicode *or* bytes
if "nickname" in ann and isinstance(ann["nickname"], str):
ann["nickname"] = unicode(ann["nickname"])
nick_s = ann.get("nickname",u"").encode("utf-8")
lp2 = self.log(format="announcement for nickname '%(nick)s', service=%(svc)s: %(ann)s",
nick=nick_s, svc=service_name, ann=ann, umid="BoKEag")
nodeid = b32decode(SturdyRef(furl).tubID.upper())
nodeid_s = idlib.shortnodeid_b2a(nodeid)
# how do we describe this node in the logs?
desc_bits = []
if key_s:
desc_bits.append("serverid=" + key_s[:20])
if "anonymous-storage-FURL" in ann:
tubid_s = get_tubid_string_from_ann(ann)
desc_bits.append("tubid=" + tubid_s[:8])
description = "/".join(desc_bits)
ann_d = { "version": 0,
"service-name": service_name,
# the index is used to track duplicates
index = make_index(ann, key_s)
"FURL": furl,
"nickname": nickname,
"app-versions": {}, # need #466 and v2 introducer
"my-version": ver,
"oldest-supported": oldest,
index = (service_name, nodeid)
if self._current_announcements.get(index, None) == ann_d:
self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
service=service_name, nodeid=nodeid_s,
level=log.UNUSUAL, umid="B1MIdA")
# is this announcement a duplicate?
if (index in self._current_announcements
and self._current_announcements[index][0] == ann):
self.log(format="reannouncement for [%(service)s]:%(description)s, ignoring",
service=service_name, description=description,
parent=lp2, level=log.UNUSUAL, umid="B1MIdA")
self._debug_counts["duplicate_announcement"] += 1
# does it update an existing one?
if index in self._current_announcements:
self._debug_counts["update"] += 1
self.log("replacing old announcement: %s" % (ann,),
parent=lp2, level=log.NOISY, umid="wxwgIQ")
self._debug_counts["new_announcement"] += 1
self.log("new announcement[%s]" % service_name,
parent=lp2, level=log.NOISY)
self._current_announcements[index] = ann_d
self._current_announcements[index] = (ann, key_s, time.time())
# note: we never forget an index, but we might update its value
for (service_name2,cb,args,kwargs) in self._local_subscribers:
if service_name2 == service_name:
eventually(cb, nodeid, ann_d, *args, **kwargs)
eventually(cb, key_s, ann, *args, **kwargs)
def remote_set_encoding_parameters(self, parameters):
self.encoding_parameters = parameters
Normal file
Normal file
@ -0,0 +1,92 @@
import re, simplejson
from allmydata.util import keyutil, base32
def make_index(ann, key_s):
"""Return something that can be used as an index (e.g. a tuple of
strings), such that two messages that refer to the same 'thing' will have
the same index. This is a tuple of (service-name, signing-key, None) for
signed announcements, or (service-name, None, tubid) for unsigned
service_name = str(ann["service-name"])
if key_s:
return (service_name, key_s, None)
tubid = get_tubid_string_from_ann(ann)
return (service_name, None, tubid)
def get_tubid_string_from_ann(ann):
return get_tubid_string(str(ann.get("anonymous-storage-FURL")
or ann.get("FURL")))
def get_tubid_string(furl):
m = re.match(r'pb://(\w+)@', furl)
assert m
return m.group(1).lower()
def convert_announcement_v1_to_v2(ann_t):
(furl, service_name, ri_name, nickname, ver, oldest) = ann_t
assert type(furl) is str
assert type(service_name) is str
# ignore ri_name
assert type(nickname) is str
assert type(ver) is str
assert type(oldest) is str
ann = {"version": 0,
"nickname": nickname.decode("utf-8"),
"app-versions": {},
"my-version": ver,
"oldest-supported": oldest,
"service-name": service_name,
"anonymous-storage-FURL": furl,
"permutation-seed-base32": get_tubid_string(furl),
msg = simplejson.dumps(ann).encode("utf-8")
return (msg, None, None)
def convert_announcement_v2_to_v1(ann_v2):
(msg, sig, pubkey) = ann_v2
ann = simplejson.loads(msg)
assert ann["version"] == 0
ann_t = (str(ann["anonymous-storage-FURL"]),
"remoteinterface-name is unused",
return ann_t
def sign_to_foolscap(ann, sk):
# return (bytes, None, None) or (bytes, sig-str, pubkey-str). A future
# HTTP-based serialization will use JSON({msg:b64(JSON(msg).utf8),
# sig:v0-b64(sig), pubkey:v0-b64(pubkey)}) .
msg = simplejson.dumps(ann).encode("utf-8")
if sk:
sig = "v0-"+base32.b2a(sk.sign(msg))
vk_bytes = sk.get_verifying_key_bytes()
ann_t = (msg, sig, "v0-"+base32.b2a(vk_bytes))
ann_t = (msg, None, None)
return ann_t
class UnknownKeyError(Exception):
def unsign_from_foolscap(ann_t):
(msg, sig_vs, claimed_key_vs) = ann_t
key_vs = None
if sig_vs and claimed_key_vs:
if not sig_vs.startswith("v0-"):
raise UnknownKeyError("only v0- signatures recognized")
if not claimed_key_vs.startswith("v0-"):
raise UnknownKeyError("only v0- keys recognized")
claimed_key = keyutil.parse_pubkey("pub-"+claimed_key_vs)
sig_bytes = base32.a2b(keyutil.remove_prefix(sig_vs, "v0-"))
claimed_key.verify(sig_bytes, msg)
key_vs = claimed_key_vs
ann = simplejson.loads(msg.decode("utf-8"))
return (ann, key_vs)
@ -1,9 +1,12 @@
from zope.interface import Interface
from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \
RemoteInterface, Referenceable
from old import RIIntroducerSubscriberClient_v1
FURL = StringConstraint(1000)
# old introducer protocol (v1):
# Announcements are (FURL, service_name, remoteinterface_name,
# nickname, my_version, oldest_supported)
# the (FURL, service_name, remoteinterface_name) refer to the service being
@ -14,13 +17,17 @@ FURL = StringConstraint(1000)
# incompatible peer. The second goal is to enable the development of
# backwards-compatibility code.
Announcement = TupleOf(FURL, str, str,
str, str, str)
Announcement_v1 = TupleOf(FURL, str, str,
str, str, str)
class RIIntroducerSubscriberClient(RemoteInterface):
__remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com"
# v2 protocol over foolscap: Announcements are 3-tuples of (bytes, str, str)
# or (bytes, none, none)
Announcement_v2 = Any()
def announce(announcements=SetOf(Announcement)):
class RIIntroducerSubscriberClient_v2(RemoteInterface):
__remote_name__ = "RIIntroducerSubscriberClient_v2.tahoe.allmydata.com"
def announce_v2(announcements=SetOf(Announcement_v2)):
"""I accept announcements from the publisher."""
return None
@ -41,38 +48,29 @@ class RIIntroducerSubscriberClient(RemoteInterface):
return None
# When Foolscap can handle multiple interfaces (Foolscap#17), the
# full-powered introducer will implement both RIIntroducerPublisher and
# RIIntroducerSubscriberService. Until then, we define
# RIIntroducerPublisherAndSubscriberService as a combination of the two, and
# make everybody use that.
SubscriberInfo = DictOf(str, Any())
class RIIntroducerPublisher(RemoteInterface):
class RIIntroducerPublisherAndSubscriberService_v2(RemoteInterface):
"""To publish a service to the world, connect to me and give me your
announcement message. I will deliver a copy to all connected subscribers."""
__remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com"
def publish(announcement=Announcement):
# canary?
return None
class RIIntroducerSubscriberService(RemoteInterface):
__remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com"
def subscribe(subscriber=RIIntroducerSubscriberClient, service_name=str):
"""Give me a subscriber reference, and I will call its new_peers()
method will any announcements that match the desired service name. I
will ignore duplicate subscriptions.
return None
class RIIntroducerPublisherAndSubscriberService(RemoteInterface):
__remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com"
announcement message. I will deliver a copy to all connected subscribers.
To hear about services, connect to me and subscribe to a specific
__remote_name__ = "RIIntroducerPublisherAndSubscriberService_v2.tahoe.allmydata.com"
def get_version():
return DictOf(str, Any())
def publish(announcement=Announcement):
def publish(announcement=Announcement_v1):
return None
def subscribe(subscriber=RIIntroducerSubscriberClient, service_name=str):
def publish_v2(announcement=Announcement_v2, canary=Referenceable):
return None
def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
return None
def subscribe_v2(subscriber=RIIntroducerSubscriberClient_v2,
service_name=str, subscriber_info=SubscriberInfo):
"""Give me a subscriber reference, and I will call its announce_v2()
method with any announcements that match the desired service name. I
will ignore duplicate subscriptions. The subscriber_info dictionary
tells me about the subscriber, and is used for diagnostic/status
return None
class IIntroducerClient(Interface):
@ -80,41 +78,47 @@ class IIntroducerClient(Interface):
publish their services to the rest of the world, and I help them learn
about services available on other nodes."""
def publish(furl, service_name, remoteinterface_name):
"""Once you call this, I will tell the world that the Referenceable
available at FURL is available to provide a service named
SERVICE_NAME. The precise definition of the service being provided is
identified by the Foolscap 'remote interface name' in the last
parameter: this is supposed to be a globally-unique string that
identifies the RemoteInterface that is implemented."""
def publish(service_name, ann, signing_key=None):
"""Publish the given announcement dictionary (which must be
JSON-serializable), plus some additional keys, to the world.
Each announcement is characterized by a (service_name, serverid)
pair. When the server sees two announcements with the same pair, the
later one will replace the earlier one. The serverid is derived from
the signing_key, if present, otherwise it is derived from the
'anonymous-storage-FURL' key.
If signing_key= is set to an instance of SigningKey, it will be
used to sign the announcement."""
def subscribe_to(service_name, callback, *args, **kwargs):
"""Call this if you will eventually want to use services with the
given SERVICE_NAME. This will prompt me to subscribe to announcements
of those services. Your callback will be invoked with at least two
arguments: a serverid (binary string), and an announcement
dictionary, followed by any additional callback args/kwargs you give
me. I will run your callback for both new announcements and for
arguments: a pubkey and an announcement dictionary, followed by any
additional callback args/kwargs you gave me. The pubkey will be None
unless the announcement was signed by the corresponding pubkey, in
which case it will be a printable string like 'v0-base32..'.
I will run your callback for both new announcements and for
announcements that have changed, but you must be prepared to tolerate
The announcement dictionary that I give you will have the following
The announcement that I give you comes from some other client. It
will be a JSON-serializable dictionary which (by convention) is
expected to have at least the following keys:
version: 0
service-name: str('storage')
FURL: str(furl)
remoteinterface-name: str(ri_name)
nickname: unicode
app-versions: {}
my-version: str
oldest-supported: str
Note that app-version will be an empty dictionary until #466 is done
and both the introducer and the remote client have been upgraded. For
current (native) server types, the serverid will always be equal to
the binary form of the FURL's tubid.
service-name: str('storage')
anonymous-storage-FURL: str(furl)
Note that app-version will be an empty dictionary if either the
publishing client or the Introducer are running older code.
def connected_to_introducer():
Normal file
Normal file
@ -0,0 +1,463 @@
import time
from base64 import b32decode
from zope.interface import implements, Interface
from twisted.application import service
import allmydata
from allmydata.interfaces import InsufficientVersionError
from allmydata.util import log, idlib, rrefutil
from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \
RemoteInterface, Referenceable, eventually, SturdyRef
FURL = StringConstraint(1000)
# We keep a copy of the old introducer (both client and server) here to
# support compatibility tests. The old client is supposed to handle the new
# server, and new client is supposed to handle the old server.
# Announcements are (FURL, service_name, remoteinterface_name,
# nickname, my_version, oldest_supported)
# the (FURL, service_name, remoteinterface_name) refer to the service being
# announced. The (nickname, my_version, oldest_supported) refer to the
# client as a whole. The my_version/oldest_supported strings can be parsed
# by an allmydata.util.version.Version instance, and then compared. The
# first goal is to make sure that nodes are not confused by speaking to an
# incompatible peer. The second goal is to enable the development of
# backwards-compatibility code.
Announcement = TupleOf(FURL, str, str,
str, str, str)
class RIIntroducerSubscriberClient_v1(RemoteInterface):
__remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com"
def announce(announcements=SetOf(Announcement)):
"""I accept announcements from the publisher."""
return None
def set_encoding_parameters(parameters=(int, int, int)):
"""Advise the client of the recommended k-of-n encoding parameters
for this grid. 'parameters' is a tuple of (k, desired, n), where 'n'
is the total number of shares that will be created for any given
file, while 'k' is the number of shares that must be retrieved to
recover that file, and 'desired' is the minimum number of shares that
must be placed before the uploader will consider its job a success.
n/k is the expansion ratio, while k determines the robustness.
Introducers should specify 'n' according to the expected size of the
grid (there is no point to producing more shares than there are
peers), and k according to the desired reliability-vs-overhead goals.
Note that setting k=1 is equivalent to simple replication.
return None
# When Foolscap can handle multiple interfaces (Foolscap#17), the
# full-powered introducer will implement both RIIntroducerPublisher and
# RIIntroducerSubscriberService. Until then, we define
# RIIntroducerPublisherAndSubscriberService as a combination of the two, and
# make everybody use that.
class RIIntroducerPublisher_v1(RemoteInterface):
"""To publish a service to the world, connect to me and give me your
announcement message. I will deliver a copy to all connected subscribers."""
__remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com"
def publish(announcement=Announcement):
# canary?
return None
class RIIntroducerSubscriberService_v1(RemoteInterface):
__remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com"
def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
"""Give me a subscriber reference, and I will call its new_peers()
method will any announcements that match the desired service name. I
will ignore duplicate subscriptions.
return None
class RIIntroducerPublisherAndSubscriberService_v1(RemoteInterface):
__remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com"
def get_version():
return DictOf(str, Any())
def publish(announcement=Announcement):
return None
def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
return None
class IIntroducerClient(Interface):
"""I provide service introduction facilities for a node. I help nodes
publish their services to the rest of the world, and I help them learn
about services available on other nodes."""
def publish(furl, service_name, remoteinterface_name):
"""Once you call this, I will tell the world that the Referenceable
available at FURL is available to provide a service named
SERVICE_NAME. The precise definition of the service being provided is
identified by the Foolscap 'remote interface name' in the last
parameter: this is supposed to be a globally-unique string that
identifies the RemoteInterface that is implemented."""
def subscribe_to(service_name, callback, *args, **kwargs):
"""Call this if you will eventually want to use services with the
given SERVICE_NAME. This will prompt me to subscribe to announcements
of those services. Your callback will be invoked with at least two
arguments: a serverid (binary string), and an announcement
dictionary, followed by any additional callback args/kwargs you give
me. I will run your callback for both new announcements and for
announcements that have changed, but you must be prepared to tolerate
The announcement dictionary that I give you will have the following
version: 0
service-name: str('storage')
FURL: str(furl)
remoteinterface-name: str(ri_name)
nickname: unicode
app-versions: {}
my-version: str
oldest-supported: str
Note that app-version will be an empty dictionary until #466 is done
and both the introducer and the remote client have been upgraded. For
current (native) server types, the serverid will always be equal to
the binary form of the FURL's tubid.
def connected_to_introducer():
"""Returns a boolean, True if we are currently connected to the
introducer, False if not."""
class IntroducerClient_v1(service.Service, Referenceable):
implements(RIIntroducerSubscriberClient_v1, IIntroducerClient)
def __init__(self, tub, introducer_furl,
nickname, my_version, oldest_supported):
self._tub = tub
self.introducer_furl = introducer_furl
assert type(nickname) is unicode
self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
self._my_version = my_version
self._oldest_supported = oldest_supported
self._published_announcements = set()
self._publisher = None
self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
self._subscribed_service_names = set()
self._subscriptions = set() # requests we've actually sent
# _current_announcements remembers one announcement per
# (servicename,serverid) pair. Anything that arrives with the same
# pair will displace the previous one. This stores unpacked
# announcement dictionaries, which can be compared for equality to
# distinguish re-announcement from updates. It also provides memory
# for clients who subscribe after startup.
self._current_announcements = {}
self.encoding_parameters = None
# hooks for unit tests
self._debug_counts = {
"inbound_message": 0,
"inbound_announcement": 0,
"wrong_service": 0,
"duplicate_announcement": 0,
"update": 0,
"new_announcement": 0,
"outbound_message": 0,
self._debug_outstanding = 0
def _debug_retired(self, res):
self._debug_outstanding -= 1
return res
def startService(self):
self._introducer_error = None
rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
self._introducer_reconnector = rc
def connect_failed(failure):
self.log("Initial Introducer connection failed: perhaps it's down",
level=log.WEIRD, failure=failure, umid="c5MqUQ")
d = self._tub.getReference(self.introducer_furl)
def _got_introducer(self, publisher):
self.log("connected to introducer, getting versions")
default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
{ },
"application-version": "unknown: no get_version()",
d = rrefutil.add_version_to_remote_reference(publisher, default)
def _got_error(self, f):
# TODO: for the introducer, perhaps this should halt the application
self._introducer_error = f # polled by tests
def _got_versioned_introducer(self, publisher):
self.log("got introducer version: %s" % (publisher.version,))
# we require a V1 introducer
needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
if needed not in publisher.version:
raise InsufficientVersionError(needed, publisher.version)
self._publisher = publisher
def _disconnected(self):
self.log("bummer, we've lost our connection to the introducer")
self._publisher = None
def log(self, *args, **kwargs):
if "facility" not in kwargs:
kwargs["facility"] = "tahoe.introducer"
return log.msg(*args, **kwargs)
def publish(self, furl, service_name, remoteinterface_name):
assert type(self._nickname_utf8) is str # we always send UTF-8
ann = (furl, service_name, remoteinterface_name,
self._nickname_utf8, self._my_version, self._oldest_supported)
def subscribe_to(self, service_name, cb, *args, **kwargs):
self._local_subscribers.append( (service_name,cb,args,kwargs) )
for (servicename,nodeid),ann_d in self._current_announcements.items():
if servicename == service_name:
eventually(cb, nodeid, ann_d)
def _maybe_subscribe(self):
if not self._publisher:
self.log("want to subscribe, but no introducer yet",
for service_name in self._subscribed_service_names:
if service_name not in self._subscriptions:
# there is a race here, but the subscription desk ignores
# duplicate requests.
self._debug_outstanding += 1
d = self._publisher.callRemote("subscribe", self, service_name)
d.addErrback(log.err, format="server errored during subscribe",
level=log.WEIRD, umid="2uMScQ")
def _maybe_publish(self):
if not self._publisher:
self.log("want to publish, but no introducer yet", level=log.NOISY)
# this re-publishes everything. The Introducer ignores duplicates
for ann in self._published_announcements:
self._debug_counts["outbound_message"] += 1
self._debug_outstanding += 1
d = self._publisher.callRemote("publish", ann)
format="server errored during publish %(ann)s",
ann=ann, facility="tahoe.introducer",
level=log.WEIRD, umid="xs9pVQ")
def remote_announce(self, announcements):
self.log("received %d announcements" % len(announcements))
self._debug_counts["inbound_message"] += 1
for ann in announcements:
log.err(format="unable to process announcement %(ann)s",
# Don't let a corrupt announcement prevent us from processing
# the remaining ones. Don't return an error to the server,
# since they'd just ignore it anyways.
def _process_announcement(self, ann):
self._debug_counts["inbound_announcement"] += 1
(furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
if service_name not in self._subscribed_service_names:
self.log("announcement for a service we don't care about [%s]"
% (service_name,), level=log.UNUSUAL, umid="dIpGNA")
self._debug_counts["wrong_service"] += 1
self.log("announcement for [%s]: %s" % (service_name, ann),
assert type(furl) is str
assert type(service_name) is str
assert type(ri_name) is str
assert type(nickname_utf8) is str
nickname = nickname_utf8.decode("utf-8")
assert type(nickname) is unicode
assert type(ver) is str
assert type(oldest) is str
nodeid = b32decode(SturdyRef(furl).tubID.upper())
nodeid_s = idlib.shortnodeid_b2a(nodeid)
ann_d = { "version": 0,
"service-name": service_name,
"FURL": furl,
"nickname": nickname,
"app-versions": {}, # need #466 and v2 introducer
"my-version": ver,
"oldest-supported": oldest,
index = (service_name, nodeid)
if self._current_announcements.get(index, None) == ann_d:
self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
service=service_name, nodeid=nodeid_s,
level=log.UNUSUAL, umid="B1MIdA")
self._debug_counts["duplicate_announcement"] += 1
if index in self._current_announcements:
self._debug_counts["update"] += 1
self._debug_counts["new_announcement"] += 1
self._current_announcements[index] = ann_d
# note: we never forget an index, but we might update its value
for (service_name2,cb,args,kwargs) in self._local_subscribers:
if service_name2 == service_name:
eventually(cb, nodeid, ann_d, *args, **kwargs)
def remote_set_encoding_parameters(self, parameters):
self.encoding_parameters = parameters
def connected_to_introducer(self):
return bool(self._publisher)
class IntroducerService_v1(service.MultiService, Referenceable):
name = "introducer"
VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1":
{ },
"application-version": str(allmydata.__full_version__),
def __init__(self, basedir="."):
self.introducer_url = None
# 'index' is (service_name, tubid)
self._announcements = {} # dict of index -> (announcement, timestamp)
self._subscribers = {} # dict of (rref->timestamp) dicts
self._debug_counts = {"inbound_message": 0,
"inbound_duplicate": 0,
"inbound_update": 0,
"outbound_message": 0,
"outbound_announcements": 0,
"inbound_subscribe": 0}
self._debug_outstanding = 0
def _debug_retired(self, res):
self._debug_outstanding -= 1
return res
def log(self, *args, **kwargs):
if "facility" not in kwargs:
kwargs["facility"] = "tahoe.introducer"
return log.msg(*args, **kwargs)
def get_announcements(self):
return self._announcements
def get_subscribers(self):
return self._subscribers
def remote_get_version(self):
return self.VERSION
def remote_publish(self, announcement):
log.err(format="Introducer.remote_publish failed on %(ann)s",
ann=announcement, level=log.UNUSUAL, umid="620rWA")
def _publish(self, announcement):
self._debug_counts["inbound_message"] += 1
self.log("introducer: announcement published: %s" % (announcement,) )
(furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement
#print "PUB", service_name, nickname_utf8
nodeid = b32decode(SturdyRef(furl).tubID.upper())
index = (service_name, nodeid)
if index in self._announcements:
(old_announcement, timestamp) = self._announcements[index]
if old_announcement == announcement:
self.log("but we already knew it, ignoring", level=log.NOISY)
self._debug_counts["inbound_duplicate"] += 1
self.log("old announcement being updated", level=log.NOISY)
self._debug_counts["inbound_update"] += 1
self._announcements[index] = (announcement, time.time())
for s in self._subscribers.get(service_name, []):
self._debug_counts["outbound_message"] += 1
self._debug_counts["outbound_announcements"] += 1
self._debug_outstanding += 1
d = s.callRemote("announce", set([announcement]))
format="subscriber errored on announcement %(ann)s",
ann=announcement, facility="tahoe.introducer",
level=log.UNUSUAL, umid="jfGMXQ")
def remote_subscribe(self, subscriber, service_name):
self.log("introducer: subscription[%s] request at %s" % (service_name,
self._debug_counts["inbound_subscribe"] += 1
if service_name not in self._subscribers:
self._subscribers[service_name] = {}
subscribers = self._subscribers[service_name]
if subscriber in subscribers:
self.log("but they're already subscribed, ignoring",
subscribers[subscriber] = time.time()
def _remove():
self.log("introducer: unsubscribing[%s] %s" % (service_name,
subscribers.pop(subscriber, None)
announcements = set(
[ ann
for (sn2,nodeid),(ann,when) in self._announcements.items()
if sn2 == service_name] )
self._debug_counts["outbound_message"] += 1
self._debug_counts["outbound_announcements"] += len(announcements)
self._debug_outstanding += 1
d = subscriber.callRemote("announce", announcements)
format="subscriber errored during subscribe %(anns)s",
anns=announcements, facility="tahoe.introducer",
level=log.UNUSUAL, umid="1XChxA")
@ -1,14 +1,16 @@
import time, os.path
from base64 import b32decode
from zope.interface import implements
from twisted.application import service
from foolscap.api import Referenceable, SturdyRef
from foolscap.api import Referenceable
import allmydata
from allmydata import node
from allmydata.util import log, rrefutil
from allmydata.util import log
from allmydata.introducer.interfaces import \
from allmydata.introducer.common import convert_announcement_v1_to_v2, \
convert_announcement_v2_to_v1, unsign_from_foolscap, make_index, \
class IntroducerNode(node.Node):
PORTNUMFILE = "introducer.port"
@ -31,14 +33,15 @@ class IntroducerNode(node.Node):
def _publish(res):
self.introducer_url = self.tub.registerReference(introducerservice,
self.log(" introducer is at %s" % self.introducer_url)
self.log(" introducer is at %s" % self.introducer_url,
self.write_config("introducer.furl", self.introducer_url + "\n")
d.addErrback(log.err, facility="tahoe.init",
level=log.BAD, umid="UaNs9A")
def init_web(self, webport):
self.log("init_web(webport=%s)", args=(webport,))
self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA")
from allmydata.webish import IntroducerWebishServer
nodeurl_path = os.path.join(self.basedir, "node.url")
@ -47,105 +50,260 @@ class IntroducerNode(node.Node):
ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir)
class WrapV1SubscriberInV2Interface: # for_v1
"""I wrap a RemoteReference that points at an old v1 subscriber, enabling
it to be treated like a v2 subscriber.
def __init__(self, original):
self.original = original
def __eq__(self, them):
return self.original == them
def __ne__(self, them):
return self.original != them
def __hash__(self):
return hash(self.original)
def getRemoteTubID(self):
return self.original.getRemoteTubID()
def getSturdyRef(self):
return self.original.getSturdyRef()
def getPeer(self):
return self.original.getPeer()
def callRemote(self, methname, *args, **kwargs):
m = getattr(self, "wrap_" + methname)
return m(*args, **kwargs)
def wrap_announce_v2(self, announcements):
anns_v1 = [convert_announcement_v2_to_v1(ann) for ann in announcements]
return self.original.callRemote("announce", set(anns_v1))
def wrap_set_encoding_parameters(self, parameters):
# note: unused
return self.original.callRemote("set_encoding_parameters", parameters)
def notifyOnDisconnect(self, *args, **kwargs):
return self.original.notifyOnDisconnect(*args, **kwargs)
class IntroducerService(service.MultiService, Referenceable):
name = "introducer"
VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1":
{ },
# v1 is the original protocol, supported since 1.0 (but only advertised
# starting in 1.3). v2 is the new signed protocol, supported after 1.9
VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": { },
"http://allmydata.org/tahoe/protocols/introducer/v2": { },
"application-version": str(allmydata.__full_version__),
def __init__(self, basedir="."):
self.introducer_url = None
# 'index' is (service_name, tubid)
self._announcements = {} # dict of index -> (announcement, timestamp)
self._subscribers = {} # dict of (rref->timestamp) dicts
# 'index' is (service_name, key_s, tubid), where key_s or tubid is
# None
self._announcements = {} # dict of index ->
# (ann_t, canary, ann, timestamp)
# ann (the announcement dictionary) is cleaned up: nickname is always
# unicode, servicename is always ascii, etc, even though
# simplejson.loads sometimes returns either
# self._subscribers is a dict mapping servicename to subscriptions
# 'subscriptions' is a dict mapping rref to a subscription
# 'subscription' is a tuple of (subscriber_info, timestamp)
# 'subscriber_info' is a dict, provided directly for v2 clients, or
# synthesized for v1 clients. The expected keys are:
# version, nickname, app-versions, my-version, oldest-supported
self._subscribers = {}
# self._stub_client_announcements contains the information provided
# by v1 clients. We stash this so we can match it up with their
# subscriptions.
self._stub_client_announcements = {} # maps tubid to sinfo # for_v1
self._debug_counts = {"inbound_message": 0,
"inbound_duplicate": 0,
"inbound_update": 0,
"outbound_message": 0,
"outbound_announcements": 0,
"inbound_subscribe": 0}
self._debug_outstanding = 0 # also covers WrapV1SubscriberInV2Interface
def _debug_retired(self, res):
self._debug_outstanding -= 1
return res
def log(self, *args, **kwargs):
if "facility" not in kwargs:
kwargs["facility"] = "tahoe.introducer"
kwargs["facility"] = "tahoe.introducer.server"
return log.msg(*args, **kwargs)
def get_announcements(self):
return self._announcements
def get_subscribers(self):
return self._subscribers
"""Return a list of (service_name, when, subscriber_info, rref) for
all subscribers. subscriber_info is a dict with the following keys:
version, nickname, app-versions, my-version, oldest-supported"""
s = []
for service_name, subscriptions in self._subscribers.items():
for rref,(subscriber_info,when) in subscriptions.items():
s.append( (service_name, when, subscriber_info, rref) )
return s
def remote_get_version(self):
return self.VERSION
def remote_publish(self, announcement):
def remote_publish(self, ann_t): # for_v1
lp = self.log("introducer: old (v1) announcement published: %s"
% (ann_t,), umid="6zGOIw")
ann_v2 = convert_announcement_v1_to_v2(ann_t)
return self.publish(ann_v2, None, lp)
def remote_publish_v2(self, ann_t, canary):
lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ")
return self.publish(ann_t, canary, lp)
def publish(self, ann_t, canary, lp):
self._publish(ann_t, canary, lp)
log.err(format="Introducer.remote_publish failed on %(ann)s",
ann=announcement, level=log.UNUSUAL, umid="620rWA")
level=log.UNUSUAL, parent=lp, umid="620rWA")
def _publish(self, announcement):
def _publish(self, ann_t, canary, lp):
self._debug_counts["inbound_message"] += 1
self.log("introducer: announcement published: %s" % (announcement,) )
(furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement
self.log("introducer: announcement published: %s" % (ann_t,),
ann, key = unsign_from_foolscap(ann_t) # might raise BadSignatureError
index = make_index(ann, key)
nodeid = b32decode(SturdyRef(furl).tubID.upper())
index = (service_name, nodeid)
service_name = str(ann["service-name"])
if service_name == "stub_client": # for_v1
self._attach_stub_client(ann, lp)
if index in self._announcements:
(old_announcement, timestamp) = self._announcements[index]
if old_announcement == announcement:
self.log("but we already knew it, ignoring", level=log.NOISY)
old = self._announcements.get(index)
if old:
(old_ann_t, canary, old_ann, timestamp) = old
if old_ann == ann:
self.log("but we already knew it, ignoring", level=log.NOISY,
self._debug_counts["inbound_duplicate"] += 1
self.log("old announcement being updated", level=log.NOISY)
self.log("old announcement being updated", level=log.NOISY,
self._debug_counts["inbound_update"] += 1
self._announcements[index] = (announcement, time.time())
self._announcements[index] = (ann_t, canary, ann, time.time())
#if canary:
# canary.notifyOnDisconnect ...
# use a CanaryWatcher? with cw.is_connected()?
# actually we just want foolscap to give rref.is_connected(), since
# this is only for the status display
for s in self._subscribers.get(service_name, []):
self._debug_counts["outbound_message"] += 1
self._debug_counts["outbound_announcements"] += 1
d = s.callRemote("announce", set([announcement]))
self._debug_outstanding += 1
d = s.callRemote("announce_v2", set([ann_t]))
format="subscriber errored on announcement %(ann)s",
ann=announcement, facility="tahoe.introducer",
ann=ann_t, facility="tahoe.introducer",
level=log.UNUSUAL, umid="jfGMXQ")
def remote_subscribe(self, subscriber, service_name):
self.log("introducer: subscription[%s] request at %s" % (service_name,
def _attach_stub_client(self, ann, lp):
# There might be a v1 subscriber for whom this is a stub_client.
# We might have received the subscription before the stub_client
# announcement, in which case we now need to fix up the record in
# self._subscriptions .
# record it for later, in case the stub_client arrived before the
# subscription
subscriber_info = self._get_subscriber_info_from_ann(ann)
ann_tubid = get_tubid_string_from_ann(ann)
self._stub_client_announcements[ann_tubid] = subscriber_info
lp2 = self.log("stub_client announcement, "
"looking for matching subscriber",
parent=lp, level=log.NOISY, umid="BTywDg")
for sn in self._subscribers:
s = self._subscribers[sn]
for (subscriber, info) in s.items():
# we correlate these by looking for a subscriber whose tubid
# matches this announcement
sub_tubid = subscriber.getRemoteTubID()
if sub_tubid == ann_tubid:
self.log(format="found a match, nodeid=%(nodeid)s",
level=log.NOISY, parent=lp2, umid="xsWs1A")
# found a match. Does it need info?
if not info[0]:
self.log(format="replacing info",
level=log.NOISY, parent=lp2, umid="m5kxwA")
# yup
s[subscriber] = (subscriber_info, info[1])
# and we don't remember or announce stub_clients beyond what we
# need to get the subscriber_info set up
def _get_subscriber_info_from_ann(self, ann): # for_v1
sinfo = { "version": ann["version"],
"nickname": ann["nickname"],
"app-versions": ann["app-versions"],
"my-version": ann["my-version"],
"oldest-supported": ann["oldest-supported"],
return sinfo
def remote_subscribe(self, subscriber, service_name): # for_v1
self.log("introducer: old (v1) subscription[%s] request at %s"
% (service_name, subscriber), umid="hJlGUg")
return self.add_subscriber(WrapV1SubscriberInV2Interface(subscriber),
service_name, None)
def remote_subscribe_v2(self, subscriber, service_name, subscriber_info):
self.log("introducer: subscription[%s] request at %s"
% (service_name, subscriber), umid="U3uzLg")
return self.add_subscriber(subscriber, service_name, subscriber_info)
def add_subscriber(self, subscriber, service_name, subscriber_info):
self._debug_counts["inbound_subscribe"] += 1
if service_name not in self._subscribers:
self._subscribers[service_name] = {}
subscribers = self._subscribers[service_name]
if subscriber in subscribers:
self.log("but they're already subscribed, ignoring",
level=log.UNUSUAL, umid="Sy9EfA")
subscribers[subscriber] = time.time()
if not subscriber_info: # for_v1
# v1 clients don't provide subscriber_info, but they should
# publish a 'stub client' record which contains the same
# information. If we've already received this, it will be in
# self._stub_client_announcements
tubid = subscriber.getRemoteTubID()
if tubid in self._stub_client_announcements:
subscriber_info = self._stub_client_announcements[tubid]
subscribers[subscriber] = (subscriber_info, time.time())
def _remove():
self.log("introducer: unsubscribing[%s] %s" % (service_name,
subscribers.pop(subscriber, None)
announcements = set(
[ ann
for (sn2,nodeid),(ann,when) in self._announcements.items()
if sn2 == service_name] )
self._debug_counts["outbound_message"] += 1
self._debug_counts["outbound_announcements"] += len(announcements)
d = subscriber.callRemote("announce", announcements)
format="subscriber errored during subscribe %(anns)s",
anns=announcements, facility="tahoe.introducer",
level=log.UNUSUAL, umid="mtZepQ")
# now tell them about any announcements they're interested in
announcements = set( [ ann_t
for idx,(ann_t,canary,ann,when)
in self._announcements.items()
if idx[0] == service_name] )
if announcements:
self._debug_counts["outbound_message"] += 1
self._debug_counts["outbound_announcements"] += len(announcements)
self._debug_outstanding += 1
d = subscriber.callRemote("announce_v2", announcements)
format="subscriber errored during subscribe %(anns)s",
anns=announcements, facility="tahoe.introducer",
level=log.UNUSUAL, umid="mtZepQ")
return d
@ -195,6 +195,19 @@ class Node(service.MultiService):
# TODO: merge this with allmydata.get_package_versions
return dict(app_versions.versions)
def get_config_from_file(self, name, required=False):
"""Get the (string) contents of a config file, or None if the file
did not exist. If required=True, raise an exception rather than
returning None. Any leading or trailing whitespace will be stripped
from the data."""
fn = os.path.join(self.basedir, name)
return fileutil.read(fn).strip()
except EnvironmentError:
if not required:
return None
def write_private_config(self, name, value):
"""Write the (string) contents of a private config file (which is a
config file that resides within the subdirectory named 'private'), and
@ -99,6 +99,11 @@ class StorageServer(service.MultiService, Referenceable):
def __repr__(self):
return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
def have_shares(self):
# quick test to decide if we need to commit to an implicit
# permutation-seed or if we should use a new one
return bool(set(os.listdir(self.sharedir)) - set(["incoming"]))
def add_bucket_counter(self):
statefile = os.path.join(self.storedir, "bucket_counter.state")
self.bucket_counter = BucketCountingCrawler(self, statefile)
@ -29,11 +29,11 @@ the foolscap-based server implemented in src/allmydata/storage/*.py .
# 6: implement other sorts of IStorageClient classes: S3, etc
import time
import re, time
from zope.interface import implements, Interface
from foolscap.api import eventually
from allmydata.interfaces import IStorageBroker
from allmydata.util import idlib, log
from allmydata.util import log, base32
from allmydata.util.assertutil import precondition
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.util.hashutil import sha1
@ -74,8 +74,8 @@ class StorageFarmBroker:
self.introducer_client = None
# these two are used in unit tests
def test_add_rref(self, serverid, rref):
s = NativeStorageServer(serverid, {})
def test_add_rref(self, serverid, rref, ann):
s = NativeStorageServer(serverid, ann.copy())
s.rref = rref
self.servers[serverid] = s
@ -86,21 +86,23 @@ class StorageFarmBroker:
self.introducer_client = ic = introducer_client
ic.subscribe_to("storage", self._got_announcement)
def _got_announcement(self, serverid, ann_d):
precondition(isinstance(serverid, str), serverid)
precondition(len(serverid) == 20, serverid)
assert ann_d["service-name"] == "storage"
def _got_announcement(self, key_s, ann):
if key_s is not None:
precondition(isinstance(key_s, str), key_s)
precondition(key_s.startswith("v0-"), key_s)
assert ann["service-name"] == "storage"
s = NativeStorageServer(key_s, ann)
serverid = s.get_serverid()
old = self.servers.get(serverid)
if old:
if old.get_announcement() == ann_d:
if old.get_announcement() == ann:
return # duplicate
# replacement
del self.servers[serverid]
# now we forget about them and start using the new one
dsc = NativeStorageServer(serverid, ann_d)
self.servers[serverid] = dsc
dsc.start_connecting(self.tub, self._trigger_connections)
self.servers[serverid] = s
s.start_connecting(self.tub, self._trigger_connections)
# the descriptor will manage their own Reconnector, and each time we
# need servers, we'll ask them if they're connected or not.
@ -173,13 +175,25 @@ class NativeStorageServer:
"application-version": "unknown: no get_version()",
def __init__(self, serverid, ann_d, min_shares=1):
self.serverid = serverid
self._tubid = serverid
self.announcement = ann_d
def __init__(self, key_s, ann, min_shares=1):
self.key_s = key_s
self.announcement = ann
self.min_shares = min_shares
self.serverid_s = idlib.shortnodeid_b2a(self.serverid)
assert "anonymous-storage-FURL" in ann, ann
furl = str(ann["anonymous-storage-FURL"])
m = re.match(r'pb://(\w+)@', furl)
assert m, furl
tubid_s = m.group(1).lower()
self._tubid = base32.a2b(tubid_s)
assert "permutation-seed-base32" in ann, ann
ps = base32.a2b(str(ann["permutation-seed-base32"]))
self._permutation_seed = ps
name = key_s or tubid_s
self._long_description = name
self._short_description = name[:8] # TODO: decide who adds []
self.announcement_time = time.time()
self.last_connect_time = None
self.last_loss_time = None
@ -191,17 +205,17 @@ class NativeStorageServer:
def __repr__(self):
return "<NativeStorageServer for %s>" % self.get_name()
def get_serverid(self):
return self._tubid
return self._tubid # XXX replace with self.key_s
def get_permutation_seed(self):
return self._tubid
return self._permutation_seed
def get_version(self):
if self.rref:
return self.rref.version
return None
def get_name(self): # keep methodname short
return self.serverid_s
return self._short_description
def get_longname(self):
return idlib.nodeid_b2a(self._tubid)
return self._long_description
def get_lease_seed(self):
return self._tubid
def get_foolscap_write_enabler_seed(self):
@ -221,7 +235,7 @@ class NativeStorageServer:
return self.announcement_time
def start_connecting(self, tub, trigger_cb):
furl = self.announcement["FURL"]
furl = str(self.announcement["anonymous-storage-FURL"])
self._trigger_cb = trigger_cb
self._reconnector = tub.connectTo(furl, self._got_connection)
@ -22,15 +22,16 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
for (peerid, nickname) in [("\x00"*20, "peer-0"),
("\xff"*20, "peer-f"),
("\x11"*20, "peer-11")] :
ann_d = { "version": 0,
"service-name": "storage",
"FURL": "fake furl",
"nickname": unicode(nickname),
"app-versions": {}, # need #466 and v2 introducer
"my-version": "ver",
"oldest-supported": "oldest",
s = NativeStorageServer(peerid, ann_d)
ann = { "version": 0,
"service-name": "storage",
"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
"permutation-seed-base32": "",
"nickname": unicode(nickname),
"app-versions": {}, # need #466 and v2 introducer
"my-version": "ver",
"oldest-supported": "oldest",
s = NativeStorageServer(peerid, ann)
sb.test_add_server(peerid, s)
c = FakeClient()
c.storage_broker = sb
@ -136,12 +136,14 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0)
def _permute(self, sb, key):
return [ s.get_serverid() for s in sb.get_servers_for_psi(key) ]
return [ s.get_longname() for s in sb.get_servers_for_psi(key) ]
def test_permute(self):
sb = StorageFarmBroker(None, True)
for k in ["%d" % i for i in range(5)]:
sb.test_add_rref(k, "rref")
ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
"permutation-seed-base32": base32.b2a(k) }
sb.test_add_rref(k, "rref", ann)
self.failUnlessReallyEqual(self._permute(sb, "one"), ['3','1','0','4','2'])
self.failUnlessReallyEqual(self._permute(sb, "two"), ['0','4','2','1','3'])
@ -1,6 +1,7 @@
import os, re
from base64 import b32decode
import simplejson
from twisted.trial import unittest
from twisted.internet import defer
@ -9,11 +10,16 @@ from twisted.python import log
from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
from twisted.application import service
from allmydata.interfaces import InsufficientVersionError
from allmydata.introducer.client import IntroducerClient
from allmydata.introducer.client import IntroducerClient, \
from allmydata.introducer.server import IntroducerService
from allmydata.introducer.common import get_tubid_string_from_ann, \
get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \
from allmydata.introducer import old
# test compatibility with old introducer .tac files
from allmydata.introducer import IntroducerNode
from allmydata.util import pollmixin
from allmydata.util import pollmixin, keyutil
import allmydata.test.common_util as testutil
class LoggingMultiService(service.MultiService):
@ -47,14 +53,14 @@ class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
def test_create(self):
ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
"my_version", "oldest_version")
"my_version", "oldest_version", {})
self.failUnless(isinstance(ic, IntroducerClient))
def test_listen(self):
i = IntroducerService()
def test_duplicate(self):
def test_duplicate_publish(self):
i = IntroducerService()
self.failUnlessEqual(len(i.get_announcements()), 0)
self.failUnlessEqual(len(i.get_subscribers()), 0)
@ -73,6 +79,223 @@ class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
self.failUnlessEqual(len(i.get_announcements()), 2)
self.failUnlessEqual(len(i.get_subscribers()), 0)
def test_id_collision(self):
# test replacement case where tubid equals a keyid (one should
# not replace the other)
i = IntroducerService()
ic = IntroducerClient(None,
"introducer.furl", u"my_nickname",
"my_version", "oldest_version", {})
sk_s, vk_s = keyutil.make_keypair()
sk, _ignored = keyutil.parse_privkey(sk_s)
keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
furl1 = "pb://onug64tu@" # base32("short")
ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
i.remote_publish_v2(ann_t, Referenceable())
announcements = i.get_announcements()
self.failUnlessEqual(len(announcements), 1)
key1 = ("storage", "v0-"+keyid, None)
self.failUnless(key1 in announcements)
(ign, ign, ann1_out, ign) = announcements[key1]
self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
furl2 = "pb://%s@" % keyid
ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
self.failUnlessEqual(len(announcements), 2)
key2 = ("storage", None, keyid)
self.failUnless(key2 in announcements)
(ign, ign, ann2_out, ign) = announcements[key2]
self.failUnlessEqual(ann2_out["anonymous-storage-FURL"], furl2)
def make_ann(furl):
ann = { "anonymous-storage-FURL": furl,
"permutation-seed-base32": get_tubid_string(furl) }
return ann
def make_ann_t(ic, furl, privkey):
return ic.create_announcement("storage", make_ann(furl), privkey)
class Client(unittest.TestCase):
def test_duplicate_receive_v1(self):
ic = IntroducerClient(None,
"introducer.furl", u"my_nickname",
"my_version", "oldest_version", {})
announcements = []
lambda key_s,ann: announcements.append(ann))
furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@"
ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
ca = WrapV2ClientInV1Interface(ic)
d = fireEventually()
def _then(ign):
self.failUnlessEqual(len(announcements), 1)
self.failUnlessEqual(announcements[0]["nickname"], u"nick1")
self.failUnlessEqual(announcements[0]["my-version"], "ver23")
self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1)
self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
self.failUnlessEqual(ic._debug_counts["update"], 0)
self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0)
# now send a duplicate announcement: this should not notify clients
return fireEventually()
def _then2(ign):
self.failUnlessEqual(len(announcements), 1)
self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2)
self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
self.failUnlessEqual(ic._debug_counts["update"], 0)
self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
# and a replacement announcement: same FURL, new other stuff.
# Clients should be notified.
return fireEventually()
def _then3(ign):
self.failUnlessEqual(len(announcements), 2)
self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3)
self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
self.failUnlessEqual(ic._debug_counts["update"], 1)
self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
# test that the other stuff changed
self.failUnlessEqual(announcements[-1]["nickname"], u"nick1")
self.failUnlessEqual(announcements[-1]["my-version"], "ver24")
return d
def test_duplicate_receive_v2(self):
ic1 = IntroducerClient(None,
"introducer.furl", u"my_nickname",
"ver23", "oldest_version", {})
# we use a second client just to create a different-looking
# announcement
ic2 = IntroducerClient(None,
"introducer.furl", u"my_nickname",
announcements = []
def _received(key_s, ann):
announcements.append( (key_s, ann) )
ic1.subscribe_to("storage", _received)
furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@"
furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@"
furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@"
privkey_s, pubkey_vs = keyutil.make_keypair()
privkey, _ignored = keyutil.parse_privkey(privkey_s)
pubkey_s = keyutil.remove_prefix(pubkey_vs, "pub-")
# ann1: ic1, furl1
# ann1a: ic1, furl1a (same SturdyRef, different connection hints)
# ann1b: ic2, furl1
# ann2: ic2, furl2
self.ann1 = make_ann_t(ic1, furl1, privkey)
self.ann1a = make_ann_t(ic1, furl1a, privkey)
self.ann1b = make_ann_t(ic2, furl1, privkey)
self.ann2 = make_ann_t(ic2, furl2, privkey)
ic1.remote_announce_v2([self.ann1]) # queues eventual-send
d = fireEventually()
def _then1(ign):
self.failUnlessEqual(len(announcements), 1)
key_s,ann = announcements[0]
self.failUnlessEqual(key_s, pubkey_s)
self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
self.failUnlessEqual(ann["my-version"], "ver23")
# now send a duplicate announcement. This should not fire the
# subscriber
d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1]))
def _then2(ign):
self.failUnlessEqual(len(announcements), 1)
# and a replacement announcement: same FURL, new other stuff. The
# subscriber *should* be fired.
d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b]))
def _then3(ign):
self.failUnlessEqual(len(announcements), 2)
key_s,ann = announcements[-1]
self.failUnlessEqual(key_s, pubkey_s)
self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
self.failUnlessEqual(ann["my-version"], "ver24")
# and a replacement announcement with a different FURL (it uses
# different connection hints)
d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a]))
def _then4(ign):
self.failUnlessEqual(len(announcements), 3)
key_s,ann = announcements[-1]
self.failUnlessEqual(key_s, pubkey_s)
self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
self.failUnlessEqual(ann["my-version"], "ver23")
# now add a new subscription, which should be called with the
# backlog. The introducer only records one announcement per index, so
# the backlog will only have the latest message.
announcements2 = []
def _received2(key_s, ann):
announcements2.append( (key_s, ann) )
d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2))
def _then5(ign):
self.failUnlessEqual(len(announcements2), 1)
key_s,ann = announcements2[-1]
self.failUnlessEqual(key_s, pubkey_s)
self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
self.failUnlessEqual(ann["my-version"], "ver23")
return d
def test_id_collision(self):
# test replacement case where tubid equals a keyid (one should
# not replace the other)
ic = IntroducerClient(None,
"introducer.furl", u"my_nickname",
"my_version", "oldest_version", {})
announcements = []
lambda key_s,ann: announcements.append(ann))
sk_s, vk_s = keyutil.make_keypair()
sk, _ignored = keyutil.parse_privkey(sk_s)
keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
furl1 = "pb://onug64tu@" # base32("short")
furl2 = "pb://%s@" % keyid
ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
d = fireEventually()
def _then(ign):
# first announcement has been processed
self.failUnlessEqual(len(announcements), 1)
# now submit a second one, with a tubid that happens to look just
# like the pubkey-based serverid we just processed. They should
# not overlap.
ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
ca = WrapV2ClientInV1Interface(ic)
return fireEventually()
def _then2(ign):
# if they overlapped, the second announcement would be ignored
self.failUnlessEqual(len(announcements), 2)
return d
class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
def create_tub(self, portnum=0):
@ -88,36 +311,86 @@ class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
assert self.central_portnum == portnum
tub.setLocation("localhost:%d" % self.central_portnum)
class Queue(SystemTestMixin, unittest.TestCase):
def test_queue_until_connected(self):
self.basedir = "introducer/QueueUntilConnected/queued"
introducer = IntroducerService()
iff = os.path.join(self.basedir, "introducer.furl")
ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
tub2 = Tub()
c = IntroducerClient(tub2, ifurl,
u"nickname", "version", "oldest", {})
furl1 = "pb://onug64tu@" # base32("short")
sk_s, vk_s = keyutil.make_keypair()
sk, _ignored = keyutil.parse_privkey(sk_s)
d = introducer.disownServiceParent()
def _offline(ign):
# now that the introducer server is offline, create a client and
# publish some messages
c.setServiceParent(self.parent) # this starts the reconnector
c.publish("storage", make_ann(furl1), sk)
introducer.setServiceParent(self.parent) # restart the server
# now wait for the messages to be delivered
def _got_announcement():
return bool(introducer.get_announcements())
return self.poll(_got_announcement)
def _done(ign):
v = list(introducer.get_announcements().values())[0]
(ign, ign, ann1_out, ign) = v
self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
# now let the ack get back
def _wait_until_idle(ign):
def _idle():
if c._debug_outstanding:
return False
if introducer._debug_outstanding:
return False
return True
return self.poll(_idle)
return d
V1 = "v1"; V2 = "v2"
class SystemTest(SystemTestMixin, unittest.TestCase):
def test_system(self):
self.basedir = "introducer/SystemTest/system"
return self.do_system_test(IntroducerService)
test_system.timeout = 480 # occasionally takes longer than 350s on "draco"
def do_system_test(self, create_introducer):
def do_system_test(self, server_version):
introducer = create_introducer()
if server_version == V1:
introducer = old.IntroducerService_v1()
introducer = IntroducerService()
iff = os.path.join(self.basedir, "introducer.furl")
tub = self.central_tub
ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
self.introducer_furl = ifurl
# we have 5 clients who publish themselves, and an extra one does
# which not. When the connections are fully established, all six nodes
# we have 5 clients who publish themselves as storage servers, and a
# sixth which does which not. All 6 clients subscriber to hear about
# storage. When the connections are fully established, all six nodes
# should have 5 connections each.
clients = []
tubs = {}
received_announcements = {}
subscribing_clients = []
publishing_clients = []
privkeys = {}
expected_announcements = [0 for c in range(NUM_CLIENTS)]
for i in range(NUMCLIENTS+1):
for i in range(NUM_CLIENTS):
tub = Tub()
#tub.setOption("logLocalFailures", True)
#tub.setOption("logRemoteFailures", True)
@ -128,62 +401,171 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
tub.setLocation("localhost:%d" % portnum)
log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
c = IntroducerClient(tub, self.introducer_furl, u"nickname-%d" % i,
"version", "oldest")
if i == 0:
c = old.IntroducerClient_v1(tub, self.introducer_furl,
u"nickname-%d" % i,
"version", "oldest")
c = IntroducerClient(tub, self.introducer_furl,
u"nickname-%d" % i,
"version", "oldest",
{"component": "component-v1"})
received_announcements[c] = {}
def got(serverid, ann_d, announcements):
announcements[serverid] = ann_d
c.subscribe_to("storage", got, received_announcements[c])
def got(key_s_or_tubid, ann, announcements, i):
if i == 0:
index = get_tubid_string_from_ann(ann)
index = key_s_or_tubid or get_tubid_string_from_ann(ann)
announcements[index] = ann
c.subscribe_to("storage", got, received_announcements[c], i)
expected_announcements[i] += 1 # all expect a 'storage' announcement
node_furl = tub.registerReference(Referenceable())
c.publish(node_furl, "storage", "ri_name")
node_furl = tub.registerReference(Referenceable())
if i == 0:
c.publish(node_furl, "storage", "ri_name")
elif i == 1:
# sign the announcement
privkey_s, pubkey_s = keyutil.make_keypair()
privkey, _ignored = keyutil.parse_privkey(privkey_s)
privkeys[c] = privkey
c.publish("storage", make_ann(node_furl), privkey)
c.publish("storage", make_ann(node_furl))
# the last one does not publish anything
# the last one does not publish anything
if i == 0:
# users of the V1 client were required to publish a
# 'stub_client' record (somewhat after they published the
# 'storage' record), so the introducer could see their
# version. Match that behavior.
c.publish(node_furl, "stub_client", "stub_ri_name")
if i == 2:
# also publish something that nobody cares about
boring_furl = tub.registerReference(Referenceable())
c.publish("boring", make_ann(boring_furl))
tubs[c] = tub
def _wait_for_all_connections():
for c in subscribing_clients:
if len(received_announcements[c]) < NUM_SERVERS:
def _wait_for_connected(ign):
def _connected():
for c in clients:
if not c.connected_to_introducer():
return False
return True
return self.poll(_connected)
# we watch the clients to determine when the system has settled down.
# Then we can look inside the server to assert things about its
# state.
def _wait_for_expected_announcements(ign):
def _got_expected_announcements():
for i,c in enumerate(subscribing_clients):
if len(received_announcements[c]) < expected_announcements[i]:
return False
return True
return self.poll(_got_expected_announcements)
# before shutting down any Tub, we'd like to know that there are no
# messages outstanding
def _wait_until_idle(ign):
def _idle():
for c in subscribing_clients + publishing_clients:
if c._debug_outstanding:
return False
if introducer._debug_outstanding:
return False
return True
d = self.poll(_wait_for_all_connections)
return True
return self.poll(_idle)
d = defer.succeed(None)
def _check1(res):
log.msg("doing _check1")
dc = introducer._debug_counts
self.failUnlessEqual(dc["inbound_message"], NUM_SERVERS)
self.failUnlessEqual(dc["inbound_duplicate"], 0)
if server_version == V1:
# each storage server publishes a record, and (after its
# 'subscribe' has been ACKed) also publishes a "stub_client".
# The non-storage client (which subscribes) also publishes a
# stub_client. There is also one "boring" service. The number
# of messages is higher, because the stub_clients aren't
# published until after we get the 'subscribe' ack (since we
# don't realize that we're dealing with a v1 server [which
# needs stub_clients] until then), and the act of publishing
# the stub_client causes us to re-send all previous
# announcements.
self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"],
# each storage server publishes a record. There is also one
# "stub_client" and one "boring"
self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2)
self.failUnlessEqual(dc["inbound_duplicate"], 0)
self.failUnlessEqual(dc["inbound_update"], 0)
self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
# the number of outbound messages is tricky.. I think it depends
# upon a race between the publish and the subscribe messages.
self.failUnless(dc["outbound_message"] > 0)
# each client subscribes to "storage", and each server publishes
for c in clients:
for c in subscribing_clients:
cdc = c._debug_counts
self.failUnlessEqual(cdc["wrong_service"], 0)
self.failUnlessEqual(cdc["duplicate_announcement"], 0)
self.failUnlessEqual(cdc["update"], 0)
anns = received_announcements[c]
self.failUnlessEqual(len(anns), NUM_SERVERS)
self.failUnlessEqual(len(anns), NUM_STORAGE)
nodeid0 = b32decode(tubs[clients[0]].tubID.upper())
ann_d = anns[nodeid0]
nick = ann_d["nickname"]
nodeid0 = tubs[clients[0]].tubID
ann = anns[nodeid0]
nick = ann["nickname"]
self.failUnlessEqual(type(nick), unicode)
self.failUnlessEqual(nick, u"nickname-0")
for c in publishing_clients:
cdc = c._debug_counts
self.failUnlessEqual(cdc["outbound_message"], 1)
if server_version == V1:
for c in publishing_clients:
cdc = c._debug_counts
expected = 1 # storage
if c is clients[2]:
expected += 1 # boring
if c is not clients[0]:
# the v2 client tries to call publish_v2, which fails
# because the server is v1. It then re-sends
# everything it has so far, plus a stub_client record
expected = 2*expected + 1
if c is clients[0]:
# we always tell v1 client to send stub_client
expected += 1
self.failUnlessEqual(cdc["outbound_message"], expected)
for c in publishing_clients:
cdc = c._debug_counts
expected = 1
if c in [clients[0], # stub_client
clients[2], # boring
expected = 2
self.failUnlessEqual(cdc["outbound_message"], expected)
log.msg("_check1 done")
# force an introducer reconnect, by shutting down the Tub it's using
@ -196,67 +578,54 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
def _wait_for_introducer_loss():
for c in clients:
if c.connected_to_introducer():
return False
return True
d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
def _wait_for_introducer_loss(ign):
def _introducer_lost():
for c in clients:
if c.connected_to_introducer():
return False
return True
return self.poll(_introducer_lost)
def _restart_introducer_tub(_ign):
log.msg("restarting introducer's Tub")
dc = introducer._debug_counts
self.expected_count = dc["inbound_message"] + NUM_SERVERS
self.expected_subscribe_count = dc["inbound_subscribe"] + NUMCLIENTS+1
introducer._debug0 = dc["outbound_message"]
for c in subscribing_clients:
cdc = c._debug_counts
c._debug0 = cdc["inbound_message"]
# reset counters
for i in range(NUM_CLIENTS):
c = subscribing_clients[i]
for k in c._debug_counts:
c._debug_counts[k] = 0
for k in introducer._debug_counts:
introducer._debug_counts[k] = 0
expected_announcements[i] += 1 # new 'storage' for everyone
newfurl = self.central_tub.registerReference(introducer,
assert newfurl == self.introducer_furl
def _wait_for_introducer_reconnect():
# wait until:
# all clients are connected
# the introducer has received publish messages from all of them
# the introducer has received subscribe messages from all of them
# the introducer has sent (duplicate) announcements to all of them
# all clients have received (duplicate) announcements
dc = introducer._debug_counts
for c in clients:
if not c.connected_to_introducer():
return False
if dc["inbound_message"] < self.expected_count:
return False
if dc["inbound_subscribe"] < self.expected_subscribe_count:
return False
for c in subscribing_clients:
cdc = c._debug_counts
if cdc["inbound_message"] < c._debug0+1:
return False
return True
d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect))
d.addCallback(lambda _ign: log.msg(" reconnected"))
# TODO: publish something while the introducer is offline, then
# confirm it gets delivered when the connection is reestablished
def _check2(res):
log.msg("doing _check2")
# assert that the introducer sent out new messages, one per
# subscriber
dc = introducer._debug_counts
self.failUnlessEqual(dc["inbound_message"], 2*NUM_SERVERS)
self.failUnlessEqual(dc["inbound_duplicate"], NUM_SERVERS)
self.failUnlessEqual(dc["inbound_update"], 0)
introducer._debug0 + len(subscribing_clients))
for c in clients:
self.failUnless(dc["outbound_message"] > 0)
self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
for c in subscribing_clients:
cdc = c._debug_counts
self.failUnlessEqual(cdc["duplicate_announcement"], NUM_SERVERS)
self.failUnlessEqual(cdc["inbound_message"], 1)
self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
self.failUnlessEqual(cdc["new_announcement"], 0)
self.failUnlessEqual(cdc["wrong_service"], 0)
self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
# Then force an introducer restart, by shutting down the Tub,
@ -267,71 +636,211 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
d.addCallback(lambda _ign: log.msg("shutting down introducer"))
d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
d.addCallback(lambda _ign: log.msg("introducer lost"))
def _restart_introducer(_ign):
log.msg("restarting introducer")
for c in subscribing_clients:
# record some counters for later comparison. Stash the values
# on the client itself, because I'm lazy.
cdc = c._debug_counts
c._debug1 = cdc["inbound_announcement"]
c._debug2 = cdc["inbound_message"]
c._debug3 = cdc["new_announcement"]
newintroducer = create_introducer()
self.expected_message_count = NUM_SERVERS
self.expected_announcement_count = NUM_SERVERS*len(subscribing_clients)
self.expected_subscribe_count = len(subscribing_clients)
newfurl = self.central_tub.registerReference(newintroducer,
# reset counters
for i in range(NUM_CLIENTS):
c = subscribing_clients[i]
for k in c._debug_counts:
c._debug_counts[k] = 0
expected_announcements[i] += 1 # new 'storage' for everyone
if server_version == V1:
introducer = old.IntroducerService_v1()
introducer = IntroducerService()
newfurl = self.central_tub.registerReference(introducer,
assert newfurl == self.introducer_furl
def _wait_for_introducer_reconnect2():
# wait until:
# all clients are connected
# the introducer has received publish messages from all of them
# the introducer has received subscribe messages from all of them
# the introducer has sent announcements for everybody to everybody
# all clients have received all the (duplicate) announcements
# at that point, the system should be quiescent
dc = introducer._debug_counts
for c in clients:
if not c.connected_to_introducer():
return False
if dc["inbound_message"] < self.expected_message_count:
return False
if dc["outbound_announcements"] < self.expected_announcement_count:
return False
if dc["inbound_subscribe"] < self.expected_subscribe_count:
return False
for c in subscribing_clients:
cdc = c._debug_counts
if cdc["inbound_announcement"] < c._debug1+NUM_SERVERS:
return False
return True
d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect2))
def _check3(res):
log.msg("doing _check3")
for c in clients:
dc = introducer._debug_counts
self.failUnless(dc["outbound_message"] > 0)
self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
for c in subscribing_clients:
cdc = c._debug_counts
self.failUnless(cdc["inbound_announcement"] > c._debug1)
self.failUnless(cdc["inbound_message"] > c._debug2)
# there should have been no new announcements
self.failUnlessEqual(cdc["new_announcement"], c._debug3)
# and the right number of duplicate ones. There were
# NUM_SERVERS from the servertub restart, and there should be
# another NUM_SERVERS now
self.failUnless(cdc["inbound_message"] > 0)
self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
self.failUnlessEqual(cdc["new_announcement"], 0)
self.failUnlessEqual(cdc["wrong_service"], 0)
self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
return d
def test_system_v2_server(self):
self.basedir = "introducer/SystemTest/system_v2_server"
return self.do_system_test(V2)
test_system_v2_server.timeout = 480
# occasionally takes longer than 350s on "draco"
def test_system_v1_server(self):
self.basedir = "introducer/SystemTest/system_v1_server"
return self.do_system_test(V1)
test_system_v1_server.timeout = 480
# occasionally takes longer than 350s on "draco"
class FakeRemoteReference:
def notifyOnDisconnect(self, *args, **kwargs): pass
def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
class ClientInfo(unittest.TestCase):
def test_client_v2(self):
introducer = IntroducerService()
tub = introducer_furl = None
app_versions = {"whizzy": "fizzy"}
client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
"my_version", "oldest", app_versions)
#furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@"
#ann_s = make_ann_t(client_v2, furl1, None)
#introducer.remote_publish_v2(ann_s, Referenceable())
subscriber = FakeRemoteReference()
introducer.remote_subscribe_v2(subscriber, "storage",
s = introducer.get_subscribers()
self.failUnlessEqual(len(s), 1)
sn, when, si, rref = s[0]
self.failUnlessIdentical(rref, subscriber)
self.failUnlessEqual(sn, "storage")
self.failUnlessEqual(si["version"], 0)
self.failUnlessEqual(si["oldest-supported"], "oldest")
self.failUnlessEqual(si["app-versions"], app_versions)
self.failUnlessEqual(si["nickname"], u"nick-v2")
self.failUnlessEqual(si["my-version"], "my_version")
def test_client_v1(self):
introducer = IntroducerService()
subscriber = FakeRemoteReference()
introducer.remote_subscribe(subscriber, "storage")
# the v1 subscribe interface had no subscriber_info: that was usually
# sent in a separate stub_client pseudo-announcement
s = introducer.get_subscribers()
self.failUnlessEqual(len(s), 1)
sn, when, si, rref = s[0]
# rref will be a WrapV1SubscriberInV2Interface around the real
# subscriber
self.failUnlessIdentical(rref.original, subscriber)
self.failUnlessEqual(si, None) # not known yet
self.failUnlessEqual(sn, "storage")
# now submit the stub_client announcement
furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@"
ann = (furl1, "stub_client", "RIStubClient",
u"nick-v1".encode("utf-8"), "my_version", "oldest")
# the server should correlate the two
s = introducer.get_subscribers()
self.failUnlessEqual(len(s), 1)
sn, when, si, rref = s[0]
self.failUnlessIdentical(rref.original, subscriber)
self.failUnlessEqual(sn, "storage")
self.failUnlessEqual(si["version"], 0)
self.failUnlessEqual(si["oldest-supported"], "oldest")
# v1 announcements do not contain app-versions
self.failUnlessEqual(si["app-versions"], {})
self.failUnlessEqual(si["nickname"], u"nick-v1")
self.failUnlessEqual(si["my-version"], "my_version")
# a subscription that arrives after the stub_client announcement
# should be correlated too
subscriber2 = FakeRemoteReference()
introducer.remote_subscribe(subscriber2, "thing2")
s = introducer.get_subscribers()
subs = dict([(sn, (si,rref)) for sn, when, si, rref in s])
self.failUnlessEqual(len(subs), 2)
(si,rref) = subs["thing2"]
self.failUnlessIdentical(rref.original, subscriber2)
self.failUnlessEqual(si["version"], 0)
self.failUnlessEqual(si["oldest-supported"], "oldest")
# v1 announcements do not contain app-versions
self.failUnlessEqual(si["app-versions"], {})
self.failUnlessEqual(si["nickname"], u"nick-v1")
self.failUnlessEqual(si["my-version"], "my_version")
class Announcements(unittest.TestCase):
def test_client_v2_unsigned(self):
introducer = IntroducerService()
tub = introducer_furl = None
app_versions = {"whizzy": "fizzy"}
client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
"my_version", "oldest", app_versions)
furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@"
tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
ann_s0 = make_ann_t(client_v2, furl1, None)
canary0 = Referenceable()
introducer.remote_publish_v2(ann_s0, canary0)
a = introducer.get_announcements()
self.failUnlessEqual(len(a), 1)
(index, (ann_s, canary, ann, when)) = a.items()[0]
self.failUnlessIdentical(canary, canary0)
self.failUnlessEqual(index, ("storage", None, tubid))
self.failUnlessEqual(ann["app-versions"], app_versions)
self.failUnlessEqual(ann["nickname"], u"nick-v2")
self.failUnlessEqual(ann["service-name"], "storage")
self.failUnlessEqual(ann["my-version"], "my_version")
self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
def test_client_v2_signed(self):
introducer = IntroducerService()
tub = introducer_furl = None
app_versions = {"whizzy": "fizzy"}
client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
"my_version", "oldest", app_versions)
furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@"
sk_s, vk_s = keyutil.make_keypair()
sk, _ignored = keyutil.parse_privkey(sk_s)
pks = keyutil.remove_prefix(vk_s, "pub-")
ann_t0 = make_ann_t(client_v2, furl1, sk)
canary0 = Referenceable()
introducer.remote_publish_v2(ann_t0, canary0)
a = introducer.get_announcements()
self.failUnlessEqual(len(a), 1)
(index, (ann_s, canary, ann, when)) = a.items()[0]
self.failUnlessIdentical(canary, canary0)
self.failUnlessEqual(index, ("storage", pks, None))
self.failUnlessEqual(ann["app-versions"], app_versions)
self.failUnlessEqual(ann["nickname"], u"nick-v2")
self.failUnlessEqual(ann["service-name"], "storage")
self.failUnlessEqual(ann["my-version"], "my_version")
self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
def test_client_v1(self):
introducer = IntroducerService()
furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@"
tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
ann = (furl1, "storage", "RIStorage",
u"nick-v1".encode("utf-8"), "my_version", "oldest")
a = introducer.get_announcements()
self.failUnlessEqual(len(a), 1)
(index, (ann_s, canary, ann, when)) = a.items()[0]
self.failUnlessEqual(canary, None)
self.failUnlessEqual(index, ("storage", None, tubid))
self.failUnlessEqual(ann["app-versions"], {})
self.failUnlessEqual(ann["nickname"], u"nick-v1".encode("utf-8"))
self.failUnlessEqual(ann["service-name"], "storage")
self.failUnlessEqual(ann["my-version"], "my_version")
self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
class TooNewServer(IntroducerService):
VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
{ },
@ -359,10 +868,10 @@ class NonV1Server(SystemTestMixin, unittest.TestCase):
tub.setLocation("localhost:%d" % portnum)
c = IntroducerClient(tub, self.introducer_furl,
u"nickname-client", "version", "oldest")
u"nickname-client", "version", "oldest", {})
announcements = {}
def got(serverid, ann_d):
announcements[serverid] = ann_d
def got(key_s, ann):
announcements[key_s] = ann
c.subscribe_to("storage", got)
@ -374,7 +883,8 @@ class NonV1Server(SystemTestMixin, unittest.TestCase):
d = self.poll(_got_bad)
def _done(res):
return d
@ -388,3 +898,44 @@ class DecodeFurl(unittest.TestCase):
nodeid = b32decode(m.group(1).upper())
self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
class Signatures(unittest.TestCase):
def test_sign(self):
ann = {"key1": "value1"}
sk_s,vk_s = keyutil.make_keypair()
sk,ignored = keyutil.parse_privkey(sk_s)
ann_t = sign_to_foolscap(ann, sk)
(msg, sig, key) = ann_t
self.failUnlessEqual(type(msg), type("".encode("utf-8"))) # bytes
self.failUnlessEqual(simplejson.loads(msg.decode("utf-8")), ann)
(ann2,key2) = unsign_from_foolscap(ann_t)
self.failUnlessEqual(ann2, ann)
self.failUnlessEqual("pub-"+key2, vk_s)
# bad signature
bad_ann = {"key1": "value2"}
bad_msg = simplejson.dumps(bad_ann).encode("utf-8")
unsign_from_foolscap, (bad_msg,sig,key))
# sneaky bad signature should be ignored
(ann2,key2) = unsign_from_foolscap( (bad_msg,None,key) )
self.failUnlessEqual(key2, None)
self.failUnlessEqual(ann2, bad_ann)
# unrecognized signatures
unsign_from_foolscap, (bad_msg,"v999-sig",key))
unsign_from_foolscap, (bad_msg,sig,"v999-key"))
# add tests of StorageFarmBroker: if it receives duplicate announcements, it
# should leave the Reconnector in place, also if it receives
# same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it
# should tear down the Reconnector and make a new one. This behavior used to
# live in the IntroducerClient, and thus used to be tested by test_introducer
# copying more tests from old branch:
# then also add Upgrade test
@ -228,7 +228,9 @@ def make_storagebroker(s=None, num_peers=10):
storage_broker = StorageFarmBroker(None, True)
for peerid in peerids:
fss = FakeStorageServer(peerid, s)
storage_broker.test_add_rref(peerid, fss)
ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid),
"permutation-seed-base32": base32.b2a(peerid) }
storage_broker.test_add_rref(peerid, fss, ann)
return storage_broker
def make_nodemaker(s=None, num_peers=10):
@ -783,7 +783,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
newappverstr = "%s: %s" % (allmydata.__appname__, altverstr)
self.failUnless((appverstr in res) or (newappverstr in res), (appverstr, newappverstr, res))
self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
self.failUnless("Announcement Summary: storage: 5" in res)
self.failUnless("Subscription Summary: storage: 5" in res)
self.failUnless("tahoe.css" in res)
except unittest.FailTest:
@ -804,9 +804,9 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
{"storage": 5})
{"storage": 5, "stub_client": 5})
{"storage": 5})
{"storage": 1, "stub_client": 1})
{"storage": 1})
except unittest.FailTest:
print "GET %s?t=json output was:" % self.introweb_url
@ -11,7 +11,7 @@ import allmydata # for __full_version__
from allmydata import uri, monitor, client
from allmydata.immutable import upload, encode
from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
from allmydata.util import log
from allmydata.util import log, base32
from allmydata.util.assertutil import precondition
from allmydata.util.deferredutil import DeferredListShouldSucceed
from allmydata.test.no_network import GridTestMixin
@ -197,7 +197,9 @@ class FakeClient:
for fakeid in range(self.num_servers) ]
self.storage_broker = StorageFarmBroker(None, permute_peers=True)
for (serverid, rref) in servers:
self.storage_broker.test_add_rref(serverid, rref)
ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid),
"permutation-seed-base32": base32.b2a(serverid) }
self.storage_broker.test_add_rref(serverid, rref, ann)
self.last_servers = [s[1] for s in servers]
def log(self, *args, **kwargs):
@ -34,15 +34,20 @@ class IntroducerRoot(rend.Page):
def render_JSON(self, ctx):
res = {}
clients = self.introducer_service.get_subscribers()
subscription_summary = dict([ (name, len(clients[name]))
for name in clients ])
res["subscription_summary"] = subscription_summary
counts = {}
subscribers = self.introducer_service.get_subscribers()
for (service_name, ign, ign, ign) in subscribers:
if service_name not in counts:
counts[service_name] = 0
counts[service_name] += 1
res["subscription_summary"] = counts
announcement_summary = {}
service_hosts = {}
for (ann,when) in self.introducer_service.get_announcements().values():
(furl, service_name, ri_name, nickname, ver, oldest) = ann
for a in self.introducer_service.get_announcements().values():
(_, _, ann, when) = a
service_name = ann["service-name"]
if service_name not in announcement_summary:
announcement_summary[service_name] = 0
announcement_summary[service_name] += 1
@ -55,6 +60,7 @@ class IntroducerRoot(rend.Page):
# enough: when multiple services are run on a single host,
# they're usually either configured with the same addresses,
# or setLocationAutomatically picks up the same interfaces.
furl = ann["anonymous-storage-FURL"]
locations = SturdyRef(furl).getTubRef().getLocations()
# list of tuples, ("ipv4", host, port)
host = frozenset([hint[1]
@ -79,8 +85,9 @@ class IntroducerRoot(rend.Page):
def render_announcement_summary(self, ctx, data):
services = {}
for (ann,when) in self.introducer_service.get_announcements().values():
(furl, service_name, ri_name, nickname, ver, oldest) = ann
for a in self.introducer_service.get_announcements().values():
(_, _, ann, when) = a
service_name = ann["service-name"]
if service_name not in services:
services[service_name] = 0
services[service_name] += 1
@ -90,65 +97,52 @@ class IntroducerRoot(rend.Page):
for service_name in service_names])
def render_client_summary(self, ctx, data):
counts = {}
clients = self.introducer_service.get_subscribers()
service_names = clients.keys()
return ", ".join(["%s: %d" % (service_name, len(clients[service_name]))
for service_name in service_names])
for (service_name, ign, ign, ign) in clients:
if service_name not in counts:
counts[service_name] = 0
counts[service_name] += 1
return ", ".join([ "%s: %d" % (name, counts[name])
for name in sorted(counts.keys()) ] )
def data_services(self, ctx, data):
introsvc = self.introducer_service
ann = [(since,a)
for (a,since) in introsvc.get_announcements().values()
if a[1] != "stub_client"]
ann.sort(lambda a,b: cmp( (a[1][1], a), (b[1][1], b) ) )
return ann
services = []
for a in introsvc.get_announcements().values():
(_, _, ann, when) = a
if ann["service-name"] == "stub_client":
services.append( (when, ann) )
services.sort(key=lambda x: (x[1]["service-name"], x[1]["nickname"]))
# this used to be:
#services.sort(lambda a,b: cmp( (a[1][1], a), (b[1][1], b) ) )
# service_name was the primary key, then the whole tuple (starting
# with the furl) was the secondary key
return services
def render_service_row(self, ctx, (since,announcement)):
(furl, service_name, ri_name, nickname, ver, oldest) = announcement
sr = SturdyRef(furl)
def render_service_row(self, ctx, (since,ann)):
sr = SturdyRef(ann["anonymous-storage-FURL"])
nodeid = sr.tubID
advertised = self.show_location_hints(sr)
ctx.fillSlots("peerid", nodeid)
ctx.fillSlots("nickname", nickname)
ctx.fillSlots("nickname", ann["nickname"])
ctx.fillSlots("advertised", " ".join(advertised))
ctx.fillSlots("connected", "?")
TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
time.strftime(TIME_FORMAT, time.localtime(since)))
ctx.fillSlots("version", ver)
ctx.fillSlots("service_name", service_name)
ctx.fillSlots("version", ann["my-version"])
ctx.fillSlots("service_name", ann["service-name"])
return ctx.tag
def data_subscribers(self, ctx, data):
# use the "stub_client" announcements to get information per nodeid
clients = {}
for (ann,when) in self.introducer_service.get_announcements().values():
if ann[1] != "stub_client":
(furl, service_name, ri_name, nickname, ver, oldest) = ann
sr = SturdyRef(furl)
nodeid = sr.tubID
clients[nodeid] = ann
# then we actually provide information per subscriber
s = []
introsvc = self.introducer_service
for service_name, subscribers in introsvc.get_subscribers().items():
for (rref, timestamp) in subscribers.items():
sr = rref.getSturdyRef()
nodeid = sr.tubID
ann = clients.get(nodeid)
s.append( (service_name, rref, timestamp, ann) )
return s
return self.introducer_service.get_subscribers()
def render_subscriber_row(self, ctx, s):
(service_name, rref, since, ann) = s
nickname = "?"
version = "?"
if ann:
(furl, service_name_2, ri_name, nickname, version, oldest) = ann
(service_name, since, info, rref) = s
nickname = info.get("nickname", "?")
version = info.get("my-version", "?")
sr = rref.getSturdyRef()
# if the subscriber didn't do Tub.setLocation, nodeid will be None
Reference in New Issue
Block a user