new introducer: signed extensible dictionary-based messages! refs #466

This introduces new client and server halves to the Introducer (renaming the
old one with a _V1 suffix). Both have fallbacks to accomodate talking to a
different version: the publishing client switches on whether the server's
.get_version() advertises V2 support, the server switches on which
subscription method was invoked by the subscribing client.

The V2 protocol sends a three-tuple of (serialized announcement dictionary,
signature, pubkey) for each announcement. The V2 server dispatches messages
to subscribers according to the service-name, and throws errors for invalid
signatures, but does not otherwise examine the messages. The V2 receiver's
subscription callback will receive a (serverid, ann_dict) pair. The
'serverid' will be equal to the pubkey if all of the following are true:

  the originating client is V2, and was told a privkey to use
  the announcement went through a V2 server
  the signature is valid

If not, 'serverid' will be equal to the tubid portion of the announced FURL,
as was the case for V1 receivers.

Servers will create a keypair if one does not exist yet, stored in
private/server.privkey .

The signed announcement dictionary puts the server FURL in a key named
"anonymous-storage-FURL", which anticipates upcoming Accounting-related
changes in the server advertisements. It also provides a key named
"permutation-seed-base32" to tell clients what permutation seed to use. This
is computed at startup, using tubid if there are existing shares, otherwise
the pubkey, to retain share-order compatibility for existing servers.
This commit is contained in:
Brian Warner
2011-11-20 02:21:32 -08:00
parent 5ea8b698a5
commit bc21726dfd
17 changed files with 1870 additions and 452 deletions

View File

@ -1,12 +1,10 @@
import os, stat, time, weakref import os, stat, time, weakref
from allmydata.interfaces import RIStorageServer
from allmydata import node from allmydata import node
from zope.interface import implements from zope.interface import implements
from twisted.internet import reactor, defer from twisted.internet import reactor, defer
from twisted.application import service from twisted.application import service
from twisted.application.internet import TimerService from twisted.application.internet import TimerService
from foolscap.api import Referenceable
from pycryptopp.publickey import rsa from pycryptopp.publickey import rsa
import allmydata import allmydata
@ -16,14 +14,13 @@ from allmydata.immutable.upload import Uploader
from allmydata.immutable.offloaded import Helper from allmydata.immutable.offloaded import Helper
from allmydata.control import ControlServer from allmydata.control import ControlServer
from allmydata.introducer.client import IntroducerClient from allmydata.introducer.client import IntroducerClient
from allmydata.util import hashutil, base32, pollmixin, log from allmydata.util import hashutil, base32, pollmixin, log, keyutil
from allmydata.util.encodingutil import get_filesystem_encoding from allmydata.util.encodingutil import get_filesystem_encoding
from allmydata.util.abbreviate import parse_abbreviated_size from allmydata.util.abbreviate import parse_abbreviated_size
from allmydata.util.time_format import parse_duration, parse_date from allmydata.util.time_format import parse_duration, parse_date
from allmydata.stats import StatsProvider from allmydata.stats import StatsProvider
from allmydata.history import History from allmydata.history import History
from allmydata.interfaces import IStatsProducer, RIStubClient, \ from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION
SDMF_VERSION, MDMF_VERSION
from allmydata.nodemaker import NodeMaker from allmydata.nodemaker import NodeMaker
from allmydata.blacklist import Blacklist from allmydata.blacklist import Blacklist
from allmydata.node import OldConfigOptionError from allmydata.node import OldConfigOptionError
@ -35,9 +32,6 @@ GiB=1024*MiB
TiB=1024*GiB TiB=1024*GiB
PiB=1024*TiB PiB=1024*TiB
class StubClient(Referenceable):
implements(RIStubClient)
def _make_secret(): def _make_secret():
return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n" return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
@ -174,7 +168,8 @@ class Client(node.Node, pollmixin.PollMixin):
ic = IntroducerClient(self.tub, self.introducer_furl, ic = IntroducerClient(self.tub, self.introducer_furl,
self.nickname, self.nickname,
str(allmydata.__full_version__), str(allmydata.__full_version__),
str(self.OLDEST_SUPPORTED_VERSION)) str(self.OLDEST_SUPPORTED_VERSION),
self.get_app_versions())
self.introducer_client = ic self.introducer_client = ic
# hold off on starting the IntroducerClient until our tub has been # hold off on starting the IntroducerClient until our tub has been
# started, so we'll have a useful address on our RemoteReference, so # started, so we'll have a useful address on our RemoteReference, so
@ -203,12 +198,46 @@ class Client(node.Node, pollmixin.PollMixin):
self.convergence = base32.a2b(convergence_s) self.convergence = base32.a2b(convergence_s)
self._secret_holder = SecretHolder(lease_secret, self.convergence) self._secret_holder = SecretHolder(lease_secret, self.convergence)
def _maybe_create_server_key(self):
# we only create the key once. On all subsequent runs, we re-use the
# existing key
def _make_key():
sk_vs,vk_vs = keyutil.make_keypair()
return sk_vs+"\n"
sk_vs = self.get_or_create_private_config("server.privkey", _make_key)
sk,vk_vs = keyutil.parse_privkey(sk_vs.strip())
self.write_config("server.pubkey", vk_vs+"\n")
self._server_key = sk
def _init_permutation_seed(self, ss):
seed = self.get_config_from_file("permutation-seed")
if not seed:
have_shares = ss.have_shares()
if have_shares:
# if the server has shares but not a recorded
# permutation-seed, then it has been around since pre-#466
# days, and the clients who uploaded those shares used our
# TubID as a permutation-seed. We should keep using that same
# seed to keep the shares in the same place in the permuted
# ring, so those clients don't have to perform excessive
# searches.
seed = base32.b2a(self.nodeid)
else:
# otherwise, we're free to use the more natural seed of our
# pubkey-based serverid
vk_bytes = self._server_key.get_verifying_key_bytes()
seed = base32.b2a(vk_bytes)
self.write_config("permutation-seed", seed+"\n")
return seed.strip()
def init_storage(self): def init_storage(self):
# should we run a storage server (and publish it for others to use)? # should we run a storage server (and publish it for others to use)?
if not self.get_config("storage", "enabled", True, boolean=True): if not self.get_config("storage", "enabled", True, boolean=True):
return return
readonly = self.get_config("storage", "readonly", False, boolean=True) readonly = self.get_config("storage", "readonly", False, boolean=True)
self._maybe_create_server_key()
storedir = os.path.join(self.basedir, self.STOREDIR) storedir = os.path.join(self.basedir, self.STOREDIR)
data = self.get_config("storage", "reserved_space", None) data = self.get_config("storage", "reserved_space", None)
@ -262,8 +291,10 @@ class Client(node.Node, pollmixin.PollMixin):
def _publish(res): def _publish(res):
furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding()) furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
furl = self.tub.registerReference(ss, furlFile=furl_file) furl = self.tub.registerReference(ss, furlFile=furl_file)
ri_name = RIStorageServer.__remote_name__ ann = {"anonymous-storage-FURL": furl,
self.introducer_client.publish(furl, "storage", ri_name) "permutation-seed-base32": self._init_permutation_seed(ss),
}
self.introducer_client.publish("storage", ann, self._server_key)
d.addCallback(_publish) d.addCallback(_publish)
d.addErrback(log.err, facility="tahoe.init", d.addErrback(log.err, facility="tahoe.init",
level=log.BAD, umid="aLGBKw") level=log.BAD, umid="aLGBKw")
@ -281,7 +312,6 @@ class Client(node.Node, pollmixin.PollMixin):
self.terminator.setServiceParent(self) self.terminator.setServiceParent(self)
self.add_service(Uploader(helper_furl, self.stats_provider, self.add_service(Uploader(helper_furl, self.stats_provider,
self.history)) self.history))
self.init_stub_client()
self.init_blacklist() self.init_blacklist()
self.init_nodemaker() self.init_nodemaker()
@ -321,20 +351,6 @@ class Client(node.Node, pollmixin.PollMixin):
def get_storage_broker(self): def get_storage_broker(self):
return self.storage_broker return self.storage_broker
def init_stub_client(self):
def _publish(res):
# we publish an empty object so that the introducer can count how
# many clients are connected and see what versions they're
# running.
sc = StubClient()
furl = self.tub.registerReference(sc)
ri_name = RIStubClient.__remote_name__
self.introducer_client.publish(furl, "stub_client", ri_name)
d = self.when_tub_ready()
d.addCallback(_publish)
d.addErrback(log.err, facility="tahoe.init",
level=log.BAD, umid="OEHq3g")
def init_blacklist(self): def init_blacklist(self):
fn = os.path.join(self.basedir, "access.blacklist") fn = os.path.join(self.basedir, "access.blacklist")
self.blacklist = Blacklist(fn) self.blacklist = Blacklist(fn)

View File

@ -30,14 +30,6 @@ WriteEnablerSecret = Hash # used to protect mutable bucket modifications
LeaseRenewSecret = Hash # used to protect bucket lease renewal requests LeaseRenewSecret = Hash # used to protect bucket lease renewal requests
LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests
class RIStubClient(RemoteInterface):
"""Each client publishes a service announcement for a dummy object called
the StubClient. This object doesn't actually offer any services, but the
announcement helps the Introducer keep track of which clients are
subscribed (so the grid admin can keep track of things like the size of
the grid and the client versions in use. This is the (empty)
RemoteInterface for the StubClient."""
class RIBucketWriter(RemoteInterface): class RIBucketWriter(RemoteInterface):
""" Objects of this kind live on the server side. """ """ Objects of this kind live on the server side. """
def write(offset=Offset, data=ShareData): def write(offset=Offset, data=ShareData):

View File

@ -1,29 +1,76 @@
from base64 import b32decode import time
from zope.interface import implements from zope.interface import implements
from twisted.application import service from twisted.application import service
from foolscap.api import Referenceable, SturdyRef, eventually from foolscap.api import Referenceable, eventually, RemoteInterface
from allmydata.interfaces import InsufficientVersionError from allmydata.interfaces import InsufficientVersionError
from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \ from allmydata.introducer.interfaces import IIntroducerClient, \
IIntroducerClient RIIntroducerSubscriberClient_v1, RIIntroducerSubscriberClient_v2
from allmydata.util import log, idlib from allmydata.introducer.common import sign_to_foolscap, unsign_from_foolscap,\
from allmydata.util.rrefutil import add_version_to_remote_reference, trap_deadref convert_announcement_v1_to_v2, convert_announcement_v2_to_v1, \
make_index, get_tubid_string_from_ann, get_tubid_string
from allmydata.util import log
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.util.keyutil import BadSignatureError
class WrapV2ClientInV1Interface(Referenceable): # for_v1
"""I wrap a v2 IntroducerClient to make it look like a v1 client, so it
can be attached to an old server."""
implements(RIIntroducerSubscriberClient_v1)
def __init__(self, original):
self.original = original
def remote_announce(self, announcements):
lp = self.original.log("received %d announcements (v1)" %
len(announcements))
anns_v1 = set([convert_announcement_v1_to_v2(ann_v1)
for ann_v1 in announcements])
return self.original.got_announcements(anns_v1, lp)
def remote_set_encoding_parameters(self, parameters):
self.original.remote_set_encoding_parameters(parameters)
class RIStubClient(RemoteInterface): # for_v1
"""Each client publishes a service announcement for a dummy object called
the StubClient. This object doesn't actually offer any services, but the
announcement helps the Introducer keep track of which clients are
subscribed (so the grid admin can keep track of things like the size of
the grid and the client versions in use. This is the (empty)
RemoteInterface for the StubClient."""
class StubClient(Referenceable): # for_v1
implements(RIStubClient)
V1 = "http://allmydata.org/tahoe/protocols/introducer/v1"
V2 = "http://allmydata.org/tahoe/protocols/introducer/v2"
class IntroducerClient(service.Service, Referenceable): class IntroducerClient(service.Service, Referenceable):
implements(RIIntroducerSubscriberClient, IIntroducerClient) implements(RIIntroducerSubscriberClient_v2, IIntroducerClient)
def __init__(self, tub, introducer_furl, def __init__(self, tub, introducer_furl,
nickname, my_version, oldest_supported): nickname, my_version, oldest_supported,
app_versions):
self._tub = tub self._tub = tub
self.introducer_furl = introducer_furl self.introducer_furl = introducer_furl
assert type(nickname) is unicode assert type(nickname) is unicode
self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8 self._nickname = nickname
self._my_version = my_version self._my_version = my_version
self._oldest_supported = oldest_supported self._oldest_supported = oldest_supported
self._app_versions = app_versions
self._published_announcements = set() self._my_subscriber_info = { "version": 0,
"nickname": self._nickname,
"app-versions": self._app_versions,
"my-version": self._my_version,
"oldest-supported": self._oldest_supported,
}
self._stub_client = None # for_v1
self._stub_client_furl = None
self._published_announcements = {}
self._canary = Referenceable()
self._publisher = None self._publisher = None
@ -33,10 +80,11 @@ class IntroducerClient(service.Service, Referenceable):
# _current_announcements remembers one announcement per # _current_announcements remembers one announcement per
# (servicename,serverid) pair. Anything that arrives with the same # (servicename,serverid) pair. Anything that arrives with the same
# pair will displace the previous one. This stores unpacked # pair will displace the previous one. This stores tuples of
# announcement dictionaries, which can be compared for equality to # (unpacked announcement dictionary, verifyingkey, rxtime). The ann
# distinguish re-announcement from updates. It also provides memory # dicts can be compared for equality to distinguish re-announcement
# for clients who subscribe after startup. # from updates. It also provides memory for clients who subscribe
# after startup.
self._current_announcements = {} self._current_announcements = {}
self.encoding_parameters = None self.encoding_parameters = None
@ -51,6 +99,11 @@ class IntroducerClient(service.Service, Referenceable):
"new_announcement": 0, "new_announcement": 0,
"outbound_message": 0, "outbound_message": 0,
} }
self._debug_outstanding = 0
def _debug_retired(self, res):
self._debug_outstanding -= 1
return res
def startService(self): def startService(self):
service.Service.startService(self) service.Service.startService(self)
@ -79,10 +132,9 @@ class IntroducerClient(service.Service, Referenceable):
def _got_versioned_introducer(self, publisher): def _got_versioned_introducer(self, publisher):
self.log("got introducer version: %s" % (publisher.version,)) self.log("got introducer version: %s" % (publisher.version,))
# we require a V1 introducer # we require an introducer that speaks at least one of (V1, V2)
needed = "http://allmydata.org/tahoe/protocols/introducer/v1" if not (V1 in publisher.version or V2 in publisher.version):
if needed not in publisher.version: raise InsufficientVersionError("V1 or V2", publisher.version)
raise InsufficientVersionError(needed, publisher.version)
self._publisher = publisher self._publisher = publisher
publisher.notifyOnDisconnect(self._disconnected) publisher.notifyOnDisconnect(self._disconnected)
self._maybe_publish() self._maybe_publish()
@ -95,24 +147,17 @@ class IntroducerClient(service.Service, Referenceable):
def log(self, *args, **kwargs): def log(self, *args, **kwargs):
if "facility" not in kwargs: if "facility" not in kwargs:
kwargs["facility"] = "tahoe.introducer" kwargs["facility"] = "tahoe.introducer.client"
return log.msg(*args, **kwargs) return log.msg(*args, **kwargs)
def publish(self, furl, service_name, remoteinterface_name):
assert type(self._nickname_utf8) is str # we always send UTF-8
ann = (furl, service_name, remoteinterface_name,
self._nickname_utf8, self._my_version, self._oldest_supported)
self._published_announcements.add(ann)
self._maybe_publish()
def subscribe_to(self, service_name, cb, *args, **kwargs): def subscribe_to(self, service_name, cb, *args, **kwargs):
self._local_subscribers.append( (service_name,cb,args,kwargs) ) self._local_subscribers.append( (service_name,cb,args,kwargs) )
self._subscribed_service_names.add(service_name) self._subscribed_service_names.add(service_name)
self._maybe_subscribe() self._maybe_subscribe()
for (servicename,nodeid),ann_d in self._current_announcements.items(): for index,(ann,key_s,when) in self._current_announcements.items():
servicename = index[0]
if servicename == service_name: if servicename == service_name:
eventually(cb, nodeid, ann_d) eventually(cb, key_s, ann, *args, **kwargs)
def _maybe_subscribe(self): def _maybe_subscribe(self):
if not self._publisher: if not self._publisher:
@ -120,96 +165,160 @@ class IntroducerClient(service.Service, Referenceable):
level=log.NOISY) level=log.NOISY)
return return
for service_name in self._subscribed_service_names: for service_name in self._subscribed_service_names:
if service_name not in self._subscriptions: if service_name in self._subscriptions:
# there is a race here, but the subscription desk ignores continue
# duplicate requests. self._subscriptions.add(service_name)
self._subscriptions.add(service_name) if V2 in self._publisher.version:
d = self._publisher.callRemote("subscribe", self, service_name) self._debug_outstanding += 1
d.addErrback(trap_deadref) d = self._publisher.callRemote("subscribe_v2",
d.addErrback(log.err, format="server errored during subscribe", self, service_name,
facility="tahoe.introducer", self._my_subscriber_info)
level=log.WEIRD, umid="2uMScQ") d.addBoth(self._debug_retired)
else:
d = self._subscribe_handle_v1(service_name) # for_v1
d.addErrback(log.err, facility="tahoe.introducer.client",
level=log.WEIRD, umid="2uMScQ")
def _subscribe_handle_v1(self, service_name): # for_v1
# they don't speak V2: must be a v1 introducer. Fall back to the v1
# 'subscribe' method, using a client adapter.
ca = WrapV2ClientInV1Interface(self)
self._debug_outstanding += 1
d = self._publisher.callRemote("subscribe", ca, service_name)
d.addBoth(self._debug_retired)
# We must also publish an empty 'stub_client' object, so the
# introducer can count how many clients are connected and see what
# versions they're running.
if not self._stub_client_furl:
self._stub_client = sc = StubClient()
self._stub_client_furl = self._tub.registerReference(sc)
def _publish_stub_client(ignored):
furl = self._stub_client_furl
self.publish("stub_client",
{ "anonymous-storage-FURL": furl,
"permutation-seed-base32": get_tubid_string(furl),
})
d.addCallback(_publish_stub_client)
return d
def create_announcement(self, service_name, ann, signing_key):
full_ann = { "version": 0,
"nickname": self._nickname,
"app-versions": self._app_versions,
"my-version": self._my_version,
"oldest-supported": self._oldest_supported,
"service-name": service_name,
}
full_ann.update(ann)
return sign_to_foolscap(full_ann, signing_key)
def publish(self, service_name, ann, signing_key=None):
ann_t = self.create_announcement(service_name, ann, signing_key)
self._published_announcements[service_name] = ann_t
self._maybe_publish()
def _maybe_publish(self): def _maybe_publish(self):
if not self._publisher: if not self._publisher:
self.log("want to publish, but no introducer yet", level=log.NOISY) self.log("want to publish, but no introducer yet", level=log.NOISY)
return return
# this re-publishes everything. The Introducer ignores duplicates # this re-publishes everything. The Introducer ignores duplicates
for ann in self._published_announcements: for ann_t in self._published_announcements.values():
self._debug_counts["outbound_message"] += 1 self._debug_counts["outbound_message"] += 1
d = self._publisher.callRemote("publish", ann) if V2 in self._publisher.version:
d.addErrback(trap_deadref) self._debug_outstanding += 1
d.addErrback(log.err, d = self._publisher.callRemote("publish_v2", ann_t,
format="server errored during publish %(ann)s", self._canary)
ann=ann, facility="tahoe.introducer", d.addBoth(self._debug_retired)
else:
d = self._handle_v1_publisher(ann_t) # for_v1
d.addErrback(log.err, ann_t=ann_t,
facility="tahoe.introducer.client",
level=log.WEIRD, umid="xs9pVQ") level=log.WEIRD, umid="xs9pVQ")
def _handle_v1_publisher(self, ann_t): # for_v1
# they don't speak V2, so fall back to the old 'publish' method
# (which takes an unsigned tuple of bytestrings)
self.log("falling back to publish_v1",
level=log.UNUSUAL, umid="9RCT1A")
ann_v1 = convert_announcement_v2_to_v1(ann_t)
self._debug_outstanding += 1
d = self._publisher.callRemote("publish", ann_v1)
d.addBoth(self._debug_retired)
return d
def remote_announce(self, announcements): def remote_announce_v2(self, announcements):
self.log("received %d announcements" % len(announcements)) lp = self.log("received %d announcements (v2)" % len(announcements))
return self.got_announcements(announcements, lp)
def got_announcements(self, announcements, lp=None):
# this is the common entry point for both v1 and v2 announcements
self._debug_counts["inbound_message"] += 1 self._debug_counts["inbound_message"] += 1
for ann in announcements: for ann_t in announcements:
try: try:
self._process_announcement(ann) # this might raise UnknownKeyError or bad-sig error
except: ann, key_s = unsign_from_foolscap(ann_t)
log.err(format="unable to process announcement %(ann)s", # key is "v0-base32abc123"
ann=ann) except BadSignatureError:
# Don't let a corrupt announcement prevent us from processing self.log("bad signature on inbound announcement: %s" % (ann_t,),
# the remaining ones. Don't return an error to the server, parent=lp, level=log.WEIRD, umid="ZAU15Q")
# since they'd just ignore it anyways. # process other announcements that arrived with the bad one
pass continue
def _process_announcement(self, ann): self._process_announcement(ann, key_s)
def _process_announcement(self, ann, key_s):
self._debug_counts["inbound_announcement"] += 1 self._debug_counts["inbound_announcement"] += 1
(furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann service_name = str(ann["service-name"])
if service_name not in self._subscribed_service_names: if service_name not in self._subscribed_service_names:
self.log("announcement for a service we don't care about [%s]" self.log("announcement for a service we don't care about [%s]"
% (service_name,), level=log.UNUSUAL, umid="dIpGNA") % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
self._debug_counts["wrong_service"] += 1 self._debug_counts["wrong_service"] += 1
return return
self.log("announcement for [%s]: %s" % (service_name, ann), # for ASCII values, simplejson might give us unicode *or* bytes
umid="BoKEag") if "nickname" in ann and isinstance(ann["nickname"], str):
assert type(furl) is str ann["nickname"] = unicode(ann["nickname"])
assert type(service_name) is str nick_s = ann.get("nickname",u"").encode("utf-8")
assert type(ri_name) is str lp2 = self.log(format="announcement for nickname '%(nick)s', service=%(svc)s: %(ann)s",
assert type(nickname_utf8) is str nick=nick_s, svc=service_name, ann=ann, umid="BoKEag")
nickname = nickname_utf8.decode("utf-8")
assert type(nickname) is unicode
assert type(ver) is str
assert type(oldest) is str
nodeid = b32decode(SturdyRef(furl).tubID.upper()) # how do we describe this node in the logs?
nodeid_s = idlib.shortnodeid_b2a(nodeid) desc_bits = []
if key_s:
desc_bits.append("serverid=" + key_s[:20])
if "anonymous-storage-FURL" in ann:
tubid_s = get_tubid_string_from_ann(ann)
desc_bits.append("tubid=" + tubid_s[:8])
description = "/".join(desc_bits)
ann_d = { "version": 0, # the index is used to track duplicates
"service-name": service_name, index = make_index(ann, key_s)
"FURL": furl, # is this announcement a duplicate?
"nickname": nickname, if (index in self._current_announcements
"app-versions": {}, # need #466 and v2 introducer and self._current_announcements[index][0] == ann):
"my-version": ver, self.log(format="reannouncement for [%(service)s]:%(description)s, ignoring",
"oldest-supported": oldest, service=service_name, description=description,
} parent=lp2, level=log.UNUSUAL, umid="B1MIdA")
index = (service_name, nodeid)
if self._current_announcements.get(index, None) == ann_d:
self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
service=service_name, nodeid=nodeid_s,
level=log.UNUSUAL, umid="B1MIdA")
self._debug_counts["duplicate_announcement"] += 1 self._debug_counts["duplicate_announcement"] += 1
return return
# does it update an existing one?
if index in self._current_announcements: if index in self._current_announcements:
self._debug_counts["update"] += 1 self._debug_counts["update"] += 1
self.log("replacing old announcement: %s" % (ann,),
parent=lp2, level=log.NOISY, umid="wxwgIQ")
else: else:
self._debug_counts["new_announcement"] += 1 self._debug_counts["new_announcement"] += 1
self.log("new announcement[%s]" % service_name,
parent=lp2, level=log.NOISY)
self._current_announcements[index] = ann_d self._current_announcements[index] = (ann, key_s, time.time())
# note: we never forget an index, but we might update its value # note: we never forget an index, but we might update its value
for (service_name2,cb,args,kwargs) in self._local_subscribers: for (service_name2,cb,args,kwargs) in self._local_subscribers:
if service_name2 == service_name: if service_name2 == service_name:
eventually(cb, nodeid, ann_d, *args, **kwargs) eventually(cb, key_s, ann, *args, **kwargs)
def remote_set_encoding_parameters(self, parameters): def remote_set_encoding_parameters(self, parameters):
self.encoding_parameters = parameters self.encoding_parameters = parameters

View File

@ -0,0 +1,92 @@
import re, simplejson
from allmydata.util import keyutil, base32
def make_index(ann, key_s):
"""Return something that can be used as an index (e.g. a tuple of
strings), such that two messages that refer to the same 'thing' will have
the same index. This is a tuple of (service-name, signing-key, None) for
signed announcements, or (service-name, None, tubid) for unsigned
announcements."""
service_name = str(ann["service-name"])
if key_s:
return (service_name, key_s, None)
else:
tubid = get_tubid_string_from_ann(ann)
return (service_name, None, tubid)
def get_tubid_string_from_ann(ann):
return get_tubid_string(str(ann.get("anonymous-storage-FURL")
or ann.get("FURL")))
def get_tubid_string(furl):
m = re.match(r'pb://(\w+)@', furl)
assert m
return m.group(1).lower()
def convert_announcement_v1_to_v2(ann_t):
(furl, service_name, ri_name, nickname, ver, oldest) = ann_t
assert type(furl) is str
assert type(service_name) is str
# ignore ri_name
assert type(nickname) is str
assert type(ver) is str
assert type(oldest) is str
ann = {"version": 0,
"nickname": nickname.decode("utf-8"),
"app-versions": {},
"my-version": ver,
"oldest-supported": oldest,
"service-name": service_name,
"anonymous-storage-FURL": furl,
"permutation-seed-base32": get_tubid_string(furl),
}
msg = simplejson.dumps(ann).encode("utf-8")
return (msg, None, None)
def convert_announcement_v2_to_v1(ann_v2):
(msg, sig, pubkey) = ann_v2
ann = simplejson.loads(msg)
assert ann["version"] == 0
ann_t = (str(ann["anonymous-storage-FURL"]),
str(ann["service-name"]),
"remoteinterface-name is unused",
ann["nickname"].encode("utf-8"),
str(ann["my-version"]),
str(ann["oldest-supported"]),
)
return ann_t
def sign_to_foolscap(ann, sk):
# return (bytes, None, None) or (bytes, sig-str, pubkey-str). A future
# HTTP-based serialization will use JSON({msg:b64(JSON(msg).utf8),
# sig:v0-b64(sig), pubkey:v0-b64(pubkey)}) .
msg = simplejson.dumps(ann).encode("utf-8")
if sk:
sig = "v0-"+base32.b2a(sk.sign(msg))
vk_bytes = sk.get_verifying_key_bytes()
ann_t = (msg, sig, "v0-"+base32.b2a(vk_bytes))
else:
ann_t = (msg, None, None)
return ann_t
class UnknownKeyError(Exception):
pass
def unsign_from_foolscap(ann_t):
(msg, sig_vs, claimed_key_vs) = ann_t
key_vs = None
if sig_vs and claimed_key_vs:
if not sig_vs.startswith("v0-"):
raise UnknownKeyError("only v0- signatures recognized")
if not claimed_key_vs.startswith("v0-"):
raise UnknownKeyError("only v0- keys recognized")
claimed_key = keyutil.parse_pubkey("pub-"+claimed_key_vs)
sig_bytes = base32.a2b(keyutil.remove_prefix(sig_vs, "v0-"))
claimed_key.verify(sig_bytes, msg)
key_vs = claimed_key_vs
ann = simplejson.loads(msg.decode("utf-8"))
return (ann, key_vs)

View File

@ -1,9 +1,12 @@
from zope.interface import Interface from zope.interface import Interface
from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \ from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \
RemoteInterface RemoteInterface, Referenceable
from old import RIIntroducerSubscriberClient_v1
FURL = StringConstraint(1000) FURL = StringConstraint(1000)
# old introducer protocol (v1):
#
# Announcements are (FURL, service_name, remoteinterface_name, # Announcements are (FURL, service_name, remoteinterface_name,
# nickname, my_version, oldest_supported) # nickname, my_version, oldest_supported)
# the (FURL, service_name, remoteinterface_name) refer to the service being # the (FURL, service_name, remoteinterface_name) refer to the service being
@ -14,13 +17,17 @@ FURL = StringConstraint(1000)
# incompatible peer. The second goal is to enable the development of # incompatible peer. The second goal is to enable the development of
# backwards-compatibility code. # backwards-compatibility code.
Announcement = TupleOf(FURL, str, str, Announcement_v1 = TupleOf(FURL, str, str,
str, str, str) str, str, str)
class RIIntroducerSubscriberClient(RemoteInterface): # v2 protocol over foolscap: Announcements are 3-tuples of (bytes, str, str)
__remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com" # or (bytes, none, none)
Announcement_v2 = Any()
def announce(announcements=SetOf(Announcement)): class RIIntroducerSubscriberClient_v2(RemoteInterface):
__remote_name__ = "RIIntroducerSubscriberClient_v2.tahoe.allmydata.com"
def announce_v2(announcements=SetOf(Announcement_v2)):
"""I accept announcements from the publisher.""" """I accept announcements from the publisher."""
return None return None
@ -41,38 +48,29 @@ class RIIntroducerSubscriberClient(RemoteInterface):
""" """
return None return None
# When Foolscap can handle multiple interfaces (Foolscap#17), the SubscriberInfo = DictOf(str, Any())
# full-powered introducer will implement both RIIntroducerPublisher and
# RIIntroducerSubscriberService. Until then, we define
# RIIntroducerPublisherAndSubscriberService as a combination of the two, and
# make everybody use that.
class RIIntroducerPublisher(RemoteInterface): class RIIntroducerPublisherAndSubscriberService_v2(RemoteInterface):
"""To publish a service to the world, connect to me and give me your """To publish a service to the world, connect to me and give me your
announcement message. I will deliver a copy to all connected subscribers.""" announcement message. I will deliver a copy to all connected subscribers.
__remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com" To hear about services, connect to me and subscribe to a specific
service_name."""
def publish(announcement=Announcement): __remote_name__ = "RIIntroducerPublisherAndSubscriberService_v2.tahoe.allmydata.com"
# canary?
return None
class RIIntroducerSubscriberService(RemoteInterface):
__remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com"
def subscribe(subscriber=RIIntroducerSubscriberClient, service_name=str):
"""Give me a subscriber reference, and I will call its new_peers()
method will any announcements that match the desired service name. I
will ignore duplicate subscriptions.
"""
return None
class RIIntroducerPublisherAndSubscriberService(RemoteInterface):
__remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com"
def get_version(): def get_version():
return DictOf(str, Any()) return DictOf(str, Any())
def publish(announcement=Announcement): def publish(announcement=Announcement_v1):
return None return None
def subscribe(subscriber=RIIntroducerSubscriberClient, service_name=str): def publish_v2(announcement=Announcement_v2, canary=Referenceable):
return None
def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
return None
def subscribe_v2(subscriber=RIIntroducerSubscriberClient_v2,
service_name=str, subscriber_info=SubscriberInfo):
"""Give me a subscriber reference, and I will call its announce_v2()
method with any announcements that match the desired service name. I
will ignore duplicate subscriptions. The subscriber_info dictionary
tells me about the subscriber, and is used for diagnostic/status
displays."""
return None return None
class IIntroducerClient(Interface): class IIntroducerClient(Interface):
@ -80,41 +78,47 @@ class IIntroducerClient(Interface):
publish their services to the rest of the world, and I help them learn publish their services to the rest of the world, and I help them learn
about services available on other nodes.""" about services available on other nodes."""
def publish(furl, service_name, remoteinterface_name): def publish(service_name, ann, signing_key=None):
"""Once you call this, I will tell the world that the Referenceable """Publish the given announcement dictionary (which must be
available at FURL is available to provide a service named JSON-serializable), plus some additional keys, to the world.
SERVICE_NAME. The precise definition of the service being provided is
identified by the Foolscap 'remote interface name' in the last Each announcement is characterized by a (service_name, serverid)
parameter: this is supposed to be a globally-unique string that pair. When the server sees two announcements with the same pair, the
identifies the RemoteInterface that is implemented.""" later one will replace the earlier one. The serverid is derived from
the signing_key, if present, otherwise it is derived from the
'anonymous-storage-FURL' key.
If signing_key= is set to an instance of SigningKey, it will be
used to sign the announcement."""
def subscribe_to(service_name, callback, *args, **kwargs): def subscribe_to(service_name, callback, *args, **kwargs):
"""Call this if you will eventually want to use services with the """Call this if you will eventually want to use services with the
given SERVICE_NAME. This will prompt me to subscribe to announcements given SERVICE_NAME. This will prompt me to subscribe to announcements
of those services. Your callback will be invoked with at least two of those services. Your callback will be invoked with at least two
arguments: a serverid (binary string), and an announcement arguments: a pubkey and an announcement dictionary, followed by any
dictionary, followed by any additional callback args/kwargs you give additional callback args/kwargs you gave me. The pubkey will be None
me. I will run your callback for both new announcements and for unless the announcement was signed by the corresponding pubkey, in
which case it will be a printable string like 'v0-base32..'.
I will run your callback for both new announcements and for
announcements that have changed, but you must be prepared to tolerate announcements that have changed, but you must be prepared to tolerate
duplicates. duplicates.
The announcement dictionary that I give you will have the following The announcement that I give you comes from some other client. It
keys: will be a JSON-serializable dictionary which (by convention) is
expected to have at least the following keys:
version: 0 version: 0
service-name: str('storage')
FURL: str(furl)
remoteinterface-name: str(ri_name)
nickname: unicode nickname: unicode
app-versions: {} app-versions: {}
my-version: str my-version: str
oldest-supported: str oldest-supported: str
Note that app-version will be an empty dictionary until #466 is done service-name: str('storage')
and both the introducer and the remote client have been upgraded. For anonymous-storage-FURL: str(furl)
current (native) server types, the serverid will always be equal to
the binary form of the FURL's tubid. Note that app-version will be an empty dictionary if either the
publishing client or the Introducer are running older code.
""" """
def connected_to_introducer(): def connected_to_introducer():

View File

@ -0,0 +1,463 @@
import time
from base64 import b32decode
from zope.interface import implements, Interface
from twisted.application import service
import allmydata
from allmydata.interfaces import InsufficientVersionError
from allmydata.util import log, idlib, rrefutil
from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \
RemoteInterface, Referenceable, eventually, SturdyRef
FURL = StringConstraint(1000)
# We keep a copy of the old introducer (both client and server) here to
# support compatibility tests. The old client is supposed to handle the new
# server, and new client is supposed to handle the old server.
# Announcements are (FURL, service_name, remoteinterface_name,
# nickname, my_version, oldest_supported)
# the (FURL, service_name, remoteinterface_name) refer to the service being
# announced. The (nickname, my_version, oldest_supported) refer to the
# client as a whole. The my_version/oldest_supported strings can be parsed
# by an allmydata.util.version.Version instance, and then compared. The
# first goal is to make sure that nodes are not confused by speaking to an
# incompatible peer. The second goal is to enable the development of
# backwards-compatibility code.
Announcement = TupleOf(FURL, str, str,
str, str, str)
class RIIntroducerSubscriberClient_v1(RemoteInterface):
__remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com"
def announce(announcements=SetOf(Announcement)):
"""I accept announcements from the publisher."""
return None
def set_encoding_parameters(parameters=(int, int, int)):
"""Advise the client of the recommended k-of-n encoding parameters
for this grid. 'parameters' is a tuple of (k, desired, n), where 'n'
is the total number of shares that will be created for any given
file, while 'k' is the number of shares that must be retrieved to
recover that file, and 'desired' is the minimum number of shares that
must be placed before the uploader will consider its job a success.
n/k is the expansion ratio, while k determines the robustness.
Introducers should specify 'n' according to the expected size of the
grid (there is no point to producing more shares than there are
peers), and k according to the desired reliability-vs-overhead goals.
Note that setting k=1 is equivalent to simple replication.
"""
return None
# When Foolscap can handle multiple interfaces (Foolscap#17), the
# full-powered introducer will implement both RIIntroducerPublisher and
# RIIntroducerSubscriberService. Until then, we define
# RIIntroducerPublisherAndSubscriberService as a combination of the two, and
# make everybody use that.
class RIIntroducerPublisher_v1(RemoteInterface):
"""To publish a service to the world, connect to me and give me your
announcement message. I will deliver a copy to all connected subscribers."""
__remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com"
def publish(announcement=Announcement):
# canary?
return None
class RIIntroducerSubscriberService_v1(RemoteInterface):
__remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com"
def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
"""Give me a subscriber reference, and I will call its new_peers()
method will any announcements that match the desired service name. I
will ignore duplicate subscriptions.
"""
return None
class RIIntroducerPublisherAndSubscriberService_v1(RemoteInterface):
__remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com"
def get_version():
return DictOf(str, Any())
def publish(announcement=Announcement):
return None
def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
return None
class IIntroducerClient(Interface):
"""I provide service introduction facilities for a node. I help nodes
publish their services to the rest of the world, and I help them learn
about services available on other nodes."""
def publish(furl, service_name, remoteinterface_name):
"""Once you call this, I will tell the world that the Referenceable
available at FURL is available to provide a service named
SERVICE_NAME. The precise definition of the service being provided is
identified by the Foolscap 'remote interface name' in the last
parameter: this is supposed to be a globally-unique string that
identifies the RemoteInterface that is implemented."""
def subscribe_to(service_name, callback, *args, **kwargs):
"""Call this if you will eventually want to use services with the
given SERVICE_NAME. This will prompt me to subscribe to announcements
of those services. Your callback will be invoked with at least two
arguments: a serverid (binary string), and an announcement
dictionary, followed by any additional callback args/kwargs you give
me. I will run your callback for both new announcements and for
announcements that have changed, but you must be prepared to tolerate
duplicates.
The announcement dictionary that I give you will have the following
keys:
version: 0
service-name: str('storage')
FURL: str(furl)
remoteinterface-name: str(ri_name)
nickname: unicode
app-versions: {}
my-version: str
oldest-supported: str
Note that app-version will be an empty dictionary until #466 is done
and both the introducer and the remote client have been upgraded. For
current (native) server types, the serverid will always be equal to
the binary form of the FURL's tubid.
"""
def connected_to_introducer():
"""Returns a boolean, True if we are currently connected to the
introducer, False if not."""
class IntroducerClient_v1(service.Service, Referenceable):
implements(RIIntroducerSubscriberClient_v1, IIntroducerClient)
def __init__(self, tub, introducer_furl,
nickname, my_version, oldest_supported):
self._tub = tub
self.introducer_furl = introducer_furl
assert type(nickname) is unicode
self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
self._my_version = my_version
self._oldest_supported = oldest_supported
self._published_announcements = set()
self._publisher = None
self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
self._subscribed_service_names = set()
self._subscriptions = set() # requests we've actually sent
# _current_announcements remembers one announcement per
# (servicename,serverid) pair. Anything that arrives with the same
# pair will displace the previous one. This stores unpacked
# announcement dictionaries, which can be compared for equality to
# distinguish re-announcement from updates. It also provides memory
# for clients who subscribe after startup.
self._current_announcements = {}
self.encoding_parameters = None
# hooks for unit tests
self._debug_counts = {
"inbound_message": 0,
"inbound_announcement": 0,
"wrong_service": 0,
"duplicate_announcement": 0,
"update": 0,
"new_announcement": 0,
"outbound_message": 0,
}
self._debug_outstanding = 0
def _debug_retired(self, res):
self._debug_outstanding -= 1
return res
def startService(self):
service.Service.startService(self)
self._introducer_error = None
rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
self._introducer_reconnector = rc
def connect_failed(failure):
self.log("Initial Introducer connection failed: perhaps it's down",
level=log.WEIRD, failure=failure, umid="c5MqUQ")
d = self._tub.getReference(self.introducer_furl)
d.addErrback(connect_failed)
def _got_introducer(self, publisher):
self.log("connected to introducer, getting versions")
default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
{ },
"application-version": "unknown: no get_version()",
}
d = rrefutil.add_version_to_remote_reference(publisher, default)
d.addCallback(self._got_versioned_introducer)
d.addErrback(self._got_error)
def _got_error(self, f):
# TODO: for the introducer, perhaps this should halt the application
self._introducer_error = f # polled by tests
def _got_versioned_introducer(self, publisher):
self.log("got introducer version: %s" % (publisher.version,))
# we require a V1 introducer
needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
if needed not in publisher.version:
raise InsufficientVersionError(needed, publisher.version)
self._publisher = publisher
publisher.notifyOnDisconnect(self._disconnected)
self._maybe_publish()
self._maybe_subscribe()
def _disconnected(self):
self.log("bummer, we've lost our connection to the introducer")
self._publisher = None
self._subscriptions.clear()
def log(self, *args, **kwargs):
if "facility" not in kwargs:
kwargs["facility"] = "tahoe.introducer"
return log.msg(*args, **kwargs)
def publish(self, furl, service_name, remoteinterface_name):
assert type(self._nickname_utf8) is str # we always send UTF-8
ann = (furl, service_name, remoteinterface_name,
self._nickname_utf8, self._my_version, self._oldest_supported)
self._published_announcements.add(ann)
self._maybe_publish()
def subscribe_to(self, service_name, cb, *args, **kwargs):
self._local_subscribers.append( (service_name,cb,args,kwargs) )
self._subscribed_service_names.add(service_name)
self._maybe_subscribe()
for (servicename,nodeid),ann_d in self._current_announcements.items():
if servicename == service_name:
eventually(cb, nodeid, ann_d)
def _maybe_subscribe(self):
if not self._publisher:
self.log("want to subscribe, but no introducer yet",
level=log.NOISY)
return
for service_name in self._subscribed_service_names:
if service_name not in self._subscriptions:
# there is a race here, but the subscription desk ignores
# duplicate requests.
self._subscriptions.add(service_name)
self._debug_outstanding += 1
d = self._publisher.callRemote("subscribe", self, service_name)
d.addBoth(self._debug_retired)
d.addErrback(rrefutil.trap_deadref)
d.addErrback(log.err, format="server errored during subscribe",
facility="tahoe.introducer",
level=log.WEIRD, umid="2uMScQ")
def _maybe_publish(self):
if not self._publisher:
self.log("want to publish, but no introducer yet", level=log.NOISY)
return
# this re-publishes everything. The Introducer ignores duplicates
for ann in self._published_announcements:
self._debug_counts["outbound_message"] += 1
self._debug_outstanding += 1
d = self._publisher.callRemote("publish", ann)
d.addBoth(self._debug_retired)
d.addErrback(rrefutil.trap_deadref)
d.addErrback(log.err,
format="server errored during publish %(ann)s",
ann=ann, facility="tahoe.introducer",
level=log.WEIRD, umid="xs9pVQ")
def remote_announce(self, announcements):
self.log("received %d announcements" % len(announcements))
self._debug_counts["inbound_message"] += 1
for ann in announcements:
try:
self._process_announcement(ann)
except:
log.err(format="unable to process announcement %(ann)s",
ann=ann)
# Don't let a corrupt announcement prevent us from processing
# the remaining ones. Don't return an error to the server,
# since they'd just ignore it anyways.
pass
def _process_announcement(self, ann):
self._debug_counts["inbound_announcement"] += 1
(furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
if service_name not in self._subscribed_service_names:
self.log("announcement for a service we don't care about [%s]"
% (service_name,), level=log.UNUSUAL, umid="dIpGNA")
self._debug_counts["wrong_service"] += 1
return
self.log("announcement for [%s]: %s" % (service_name, ann),
umid="BoKEag")
assert type(furl) is str
assert type(service_name) is str
assert type(ri_name) is str
assert type(nickname_utf8) is str
nickname = nickname_utf8.decode("utf-8")
assert type(nickname) is unicode
assert type(ver) is str
assert type(oldest) is str
nodeid = b32decode(SturdyRef(furl).tubID.upper())
nodeid_s = idlib.shortnodeid_b2a(nodeid)
ann_d = { "version": 0,
"service-name": service_name,
"FURL": furl,
"nickname": nickname,
"app-versions": {}, # need #466 and v2 introducer
"my-version": ver,
"oldest-supported": oldest,
}
index = (service_name, nodeid)
if self._current_announcements.get(index, None) == ann_d:
self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
service=service_name, nodeid=nodeid_s,
level=log.UNUSUAL, umid="B1MIdA")
self._debug_counts["duplicate_announcement"] += 1
return
if index in self._current_announcements:
self._debug_counts["update"] += 1
else:
self._debug_counts["new_announcement"] += 1
self._current_announcements[index] = ann_d
# note: we never forget an index, but we might update its value
for (service_name2,cb,args,kwargs) in self._local_subscribers:
if service_name2 == service_name:
eventually(cb, nodeid, ann_d, *args, **kwargs)
def remote_set_encoding_parameters(self, parameters):
self.encoding_parameters = parameters
def connected_to_introducer(self):
return bool(self._publisher)
class IntroducerService_v1(service.MultiService, Referenceable):
implements(RIIntroducerPublisherAndSubscriberService_v1)
name = "introducer"
VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1":
{ },
"application-version": str(allmydata.__full_version__),
}
def __init__(self, basedir="."):
service.MultiService.__init__(self)
self.introducer_url = None
# 'index' is (service_name, tubid)
self._announcements = {} # dict of index -> (announcement, timestamp)
self._subscribers = {} # dict of (rref->timestamp) dicts
self._debug_counts = {"inbound_message": 0,
"inbound_duplicate": 0,
"inbound_update": 0,
"outbound_message": 0,
"outbound_announcements": 0,
"inbound_subscribe": 0}
self._debug_outstanding = 0
def _debug_retired(self, res):
self._debug_outstanding -= 1
return res
def log(self, *args, **kwargs):
if "facility" not in kwargs:
kwargs["facility"] = "tahoe.introducer"
return log.msg(*args, **kwargs)
def get_announcements(self):
return self._announcements
def get_subscribers(self):
return self._subscribers
def remote_get_version(self):
return self.VERSION
def remote_publish(self, announcement):
try:
self._publish(announcement)
except:
log.err(format="Introducer.remote_publish failed on %(ann)s",
ann=announcement, level=log.UNUSUAL, umid="620rWA")
raise
def _publish(self, announcement):
self._debug_counts["inbound_message"] += 1
self.log("introducer: announcement published: %s" % (announcement,) )
(furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement
#print "PUB", service_name, nickname_utf8
nodeid = b32decode(SturdyRef(furl).tubID.upper())
index = (service_name, nodeid)
if index in self._announcements:
(old_announcement, timestamp) = self._announcements[index]
if old_announcement == announcement:
self.log("but we already knew it, ignoring", level=log.NOISY)
self._debug_counts["inbound_duplicate"] += 1
return
else:
self.log("old announcement being updated", level=log.NOISY)
self._debug_counts["inbound_update"] += 1
self._announcements[index] = (announcement, time.time())
for s in self._subscribers.get(service_name, []):
self._debug_counts["outbound_message"] += 1
self._debug_counts["outbound_announcements"] += 1
self._debug_outstanding += 1
d = s.callRemote("announce", set([announcement]))
d.addBoth(self._debug_retired)
d.addErrback(rrefutil.trap_deadref)
d.addErrback(log.err,
format="subscriber errored on announcement %(ann)s",
ann=announcement, facility="tahoe.introducer",
level=log.UNUSUAL, umid="jfGMXQ")
def remote_subscribe(self, subscriber, service_name):
self.log("introducer: subscription[%s] request at %s" % (service_name,
subscriber))
self._debug_counts["inbound_subscribe"] += 1
if service_name not in self._subscribers:
self._subscribers[service_name] = {}
subscribers = self._subscribers[service_name]
if subscriber in subscribers:
self.log("but they're already subscribed, ignoring",
level=log.UNUSUAL)
return
subscribers[subscriber] = time.time()
def _remove():
self.log("introducer: unsubscribing[%s] %s" % (service_name,
subscriber))
subscribers.pop(subscriber, None)
subscriber.notifyOnDisconnect(_remove)
announcements = set(
[ ann
for (sn2,nodeid),(ann,when) in self._announcements.items()
if sn2 == service_name] )
self._debug_counts["outbound_message"] += 1
self._debug_counts["outbound_announcements"] += len(announcements)
self._debug_outstanding += 1
d = subscriber.callRemote("announce", announcements)
d.addBoth(self._debug_retired)
d.addErrback(rrefutil.trap_deadref)
d.addErrback(log.err,
format="subscriber errored during subscribe %(anns)s",
anns=announcements, facility="tahoe.introducer",
level=log.UNUSUAL, umid="1XChxA")

View File

@ -1,14 +1,16 @@
import time, os.path import time, os.path
from base64 import b32decode
from zope.interface import implements from zope.interface import implements
from twisted.application import service from twisted.application import service
from foolscap.api import Referenceable, SturdyRef from foolscap.api import Referenceable
import allmydata import allmydata
from allmydata import node from allmydata import node
from allmydata.util import log, rrefutil from allmydata.util import log
from allmydata.introducer.interfaces import \ from allmydata.introducer.interfaces import \
RIIntroducerPublisherAndSubscriberService RIIntroducerPublisherAndSubscriberService_v2
from allmydata.introducer.common import convert_announcement_v1_to_v2, \
convert_announcement_v2_to_v1, unsign_from_foolscap, make_index, \
get_tubid_string_from_ann
class IntroducerNode(node.Node): class IntroducerNode(node.Node):
PORTNUMFILE = "introducer.port" PORTNUMFILE = "introducer.port"
@ -31,14 +33,15 @@ class IntroducerNode(node.Node):
def _publish(res): def _publish(res):
self.introducer_url = self.tub.registerReference(introducerservice, self.introducer_url = self.tub.registerReference(introducerservice,
"introducer") "introducer")
self.log(" introducer is at %s" % self.introducer_url) self.log(" introducer is at %s" % self.introducer_url,
umid="qF2L9A")
self.write_config("introducer.furl", self.introducer_url + "\n") self.write_config("introducer.furl", self.introducer_url + "\n")
d.addCallback(_publish) d.addCallback(_publish)
d.addErrback(log.err, facility="tahoe.init", d.addErrback(log.err, facility="tahoe.init",
level=log.BAD, umid="UaNs9A") level=log.BAD, umid="UaNs9A")
def init_web(self, webport): def init_web(self, webport):
self.log("init_web(webport=%s)", args=(webport,)) self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA")
from allmydata.webish import IntroducerWebishServer from allmydata.webish import IntroducerWebishServer
nodeurl_path = os.path.join(self.basedir, "node.url") nodeurl_path = os.path.join(self.basedir, "node.url")
@ -47,105 +50,260 @@ class IntroducerNode(node.Node):
ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir) ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir)
self.add_service(ws) self.add_service(ws)
class WrapV1SubscriberInV2Interface: # for_v1
"""I wrap a RemoteReference that points at an old v1 subscriber, enabling
it to be treated like a v2 subscriber.
"""
def __init__(self, original):
self.original = original
def __eq__(self, them):
return self.original == them
def __ne__(self, them):
return self.original != them
def __hash__(self):
return hash(self.original)
def getRemoteTubID(self):
return self.original.getRemoteTubID()
def getSturdyRef(self):
return self.original.getSturdyRef()
def getPeer(self):
return self.original.getPeer()
def callRemote(self, methname, *args, **kwargs):
m = getattr(self, "wrap_" + methname)
return m(*args, **kwargs)
def wrap_announce_v2(self, announcements):
anns_v1 = [convert_announcement_v2_to_v1(ann) for ann in announcements]
return self.original.callRemote("announce", set(anns_v1))
def wrap_set_encoding_parameters(self, parameters):
# note: unused
return self.original.callRemote("set_encoding_parameters", parameters)
def notifyOnDisconnect(self, *args, **kwargs):
return self.original.notifyOnDisconnect(*args, **kwargs)
class IntroducerService(service.MultiService, Referenceable): class IntroducerService(service.MultiService, Referenceable):
implements(RIIntroducerPublisherAndSubscriberService) implements(RIIntroducerPublisherAndSubscriberService_v2)
name = "introducer" name = "introducer"
VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": # v1 is the original protocol, supported since 1.0 (but only advertised
{ }, # starting in 1.3). v2 is the new signed protocol, supported after 1.9
VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": { },
"http://allmydata.org/tahoe/protocols/introducer/v2": { },
"application-version": str(allmydata.__full_version__), "application-version": str(allmydata.__full_version__),
} }
def __init__(self, basedir="."): def __init__(self, basedir="."):
service.MultiService.__init__(self) service.MultiService.__init__(self)
self.introducer_url = None self.introducer_url = None
# 'index' is (service_name, tubid) # 'index' is (service_name, key_s, tubid), where key_s or tubid is
self._announcements = {} # dict of index -> (announcement, timestamp) # None
self._subscribers = {} # dict of (rref->timestamp) dicts self._announcements = {} # dict of index ->
# (ann_t, canary, ann, timestamp)
# ann (the announcement dictionary) is cleaned up: nickname is always
# unicode, servicename is always ascii, etc, even though
# simplejson.loads sometimes returns either
# self._subscribers is a dict mapping servicename to subscriptions
# 'subscriptions' is a dict mapping rref to a subscription
# 'subscription' is a tuple of (subscriber_info, timestamp)
# 'subscriber_info' is a dict, provided directly for v2 clients, or
# synthesized for v1 clients. The expected keys are:
# version, nickname, app-versions, my-version, oldest-supported
self._subscribers = {}
# self._stub_client_announcements contains the information provided
# by v1 clients. We stash this so we can match it up with their
# subscriptions.
self._stub_client_announcements = {} # maps tubid to sinfo # for_v1
self._debug_counts = {"inbound_message": 0, self._debug_counts = {"inbound_message": 0,
"inbound_duplicate": 0, "inbound_duplicate": 0,
"inbound_update": 0, "inbound_update": 0,
"outbound_message": 0, "outbound_message": 0,
"outbound_announcements": 0, "outbound_announcements": 0,
"inbound_subscribe": 0} "inbound_subscribe": 0}
self._debug_outstanding = 0 # also covers WrapV1SubscriberInV2Interface
def _debug_retired(self, res):
self._debug_outstanding -= 1
return res
def log(self, *args, **kwargs): def log(self, *args, **kwargs):
if "facility" not in kwargs: if "facility" not in kwargs:
kwargs["facility"] = "tahoe.introducer" kwargs["facility"] = "tahoe.introducer.server"
return log.msg(*args, **kwargs) return log.msg(*args, **kwargs)
def get_announcements(self): def get_announcements(self):
return self._announcements return self._announcements
def get_subscribers(self): def get_subscribers(self):
return self._subscribers """Return a list of (service_name, when, subscriber_info, rref) for
all subscribers. subscriber_info is a dict with the following keys:
version, nickname, app-versions, my-version, oldest-supported"""
s = []
for service_name, subscriptions in self._subscribers.items():
for rref,(subscriber_info,when) in subscriptions.items():
s.append( (service_name, when, subscriber_info, rref) )
return s
def remote_get_version(self): def remote_get_version(self):
return self.VERSION return self.VERSION
def remote_publish(self, announcement): def remote_publish(self, ann_t): # for_v1
lp = self.log("introducer: old (v1) announcement published: %s"
% (ann_t,), umid="6zGOIw")
ann_v2 = convert_announcement_v1_to_v2(ann_t)
return self.publish(ann_v2, None, lp)
def remote_publish_v2(self, ann_t, canary):
lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ")
return self.publish(ann_t, canary, lp)
def publish(self, ann_t, canary, lp):
try: try:
self._publish(announcement) self._publish(ann_t, canary, lp)
except: except:
log.err(format="Introducer.remote_publish failed on %(ann)s", log.err(format="Introducer.remote_publish failed on %(ann)s",
ann=announcement, level=log.UNUSUAL, umid="620rWA") ann=ann_t,
level=log.UNUSUAL, parent=lp, umid="620rWA")
raise raise
def _publish(self, announcement): def _publish(self, ann_t, canary, lp):
self._debug_counts["inbound_message"] += 1 self._debug_counts["inbound_message"] += 1
self.log("introducer: announcement published: %s" % (announcement,) ) self.log("introducer: announcement published: %s" % (ann_t,),
(furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement umid="wKHgCw")
ann, key = unsign_from_foolscap(ann_t) # might raise BadSignatureError
index = make_index(ann, key)
nodeid = b32decode(SturdyRef(furl).tubID.upper()) service_name = str(ann["service-name"])
index = (service_name, nodeid) if service_name == "stub_client": # for_v1
self._attach_stub_client(ann, lp)
return
if index in self._announcements: old = self._announcements.get(index)
(old_announcement, timestamp) = self._announcements[index] if old:
if old_announcement == announcement: (old_ann_t, canary, old_ann, timestamp) = old
self.log("but we already knew it, ignoring", level=log.NOISY) if old_ann == ann:
self.log("but we already knew it, ignoring", level=log.NOISY,
umid="myxzLw")
self._debug_counts["inbound_duplicate"] += 1 self._debug_counts["inbound_duplicate"] += 1
return return
else: else:
self.log("old announcement being updated", level=log.NOISY) self.log("old announcement being updated", level=log.NOISY,
umid="304r9g")
self._debug_counts["inbound_update"] += 1 self._debug_counts["inbound_update"] += 1
self._announcements[index] = (announcement, time.time()) self._announcements[index] = (ann_t, canary, ann, time.time())
#if canary:
# canary.notifyOnDisconnect ...
# use a CanaryWatcher? with cw.is_connected()?
# actually we just want foolscap to give rref.is_connected(), since
# this is only for the status display
for s in self._subscribers.get(service_name, []): for s in self._subscribers.get(service_name, []):
self._debug_counts["outbound_message"] += 1 self._debug_counts["outbound_message"] += 1
self._debug_counts["outbound_announcements"] += 1 self._debug_counts["outbound_announcements"] += 1
d = s.callRemote("announce", set([announcement])) self._debug_outstanding += 1
d.addErrback(rrefutil.trap_deadref) d = s.callRemote("announce_v2", set([ann_t]))
d.addBoth(self._debug_retired)
d.addErrback(log.err, d.addErrback(log.err,
format="subscriber errored on announcement %(ann)s", format="subscriber errored on announcement %(ann)s",
ann=announcement, facility="tahoe.introducer", ann=ann_t, facility="tahoe.introducer",
level=log.UNUSUAL, umid="jfGMXQ") level=log.UNUSUAL, umid="jfGMXQ")
def remote_subscribe(self, subscriber, service_name): def _attach_stub_client(self, ann, lp):
self.log("introducer: subscription[%s] request at %s" % (service_name, # There might be a v1 subscriber for whom this is a stub_client.
subscriber)) # We might have received the subscription before the stub_client
# announcement, in which case we now need to fix up the record in
# self._subscriptions .
# record it for later, in case the stub_client arrived before the
# subscription
subscriber_info = self._get_subscriber_info_from_ann(ann)
ann_tubid = get_tubid_string_from_ann(ann)
self._stub_client_announcements[ann_tubid] = subscriber_info
lp2 = self.log("stub_client announcement, "
"looking for matching subscriber",
parent=lp, level=log.NOISY, umid="BTywDg")
for sn in self._subscribers:
s = self._subscribers[sn]
for (subscriber, info) in s.items():
# we correlate these by looking for a subscriber whose tubid
# matches this announcement
sub_tubid = subscriber.getRemoteTubID()
if sub_tubid == ann_tubid:
self.log(format="found a match, nodeid=%(nodeid)s",
nodeid=sub_tubid,
level=log.NOISY, parent=lp2, umid="xsWs1A")
# found a match. Does it need info?
if not info[0]:
self.log(format="replacing info",
level=log.NOISY, parent=lp2, umid="m5kxwA")
# yup
s[subscriber] = (subscriber_info, info[1])
# and we don't remember or announce stub_clients beyond what we
# need to get the subscriber_info set up
def _get_subscriber_info_from_ann(self, ann): # for_v1
sinfo = { "version": ann["version"],
"nickname": ann["nickname"],
"app-versions": ann["app-versions"],
"my-version": ann["my-version"],
"oldest-supported": ann["oldest-supported"],
}
return sinfo
def remote_subscribe(self, subscriber, service_name): # for_v1
self.log("introducer: old (v1) subscription[%s] request at %s"
% (service_name, subscriber), umid="hJlGUg")
return self.add_subscriber(WrapV1SubscriberInV2Interface(subscriber),
service_name, None)
def remote_subscribe_v2(self, subscriber, service_name, subscriber_info):
self.log("introducer: subscription[%s] request at %s"
% (service_name, subscriber), umid="U3uzLg")
return self.add_subscriber(subscriber, service_name, subscriber_info)
def add_subscriber(self, subscriber, service_name, subscriber_info):
self._debug_counts["inbound_subscribe"] += 1 self._debug_counts["inbound_subscribe"] += 1
if service_name not in self._subscribers: if service_name not in self._subscribers:
self._subscribers[service_name] = {} self._subscribers[service_name] = {}
subscribers = self._subscribers[service_name] subscribers = self._subscribers[service_name]
if subscriber in subscribers: if subscriber in subscribers:
self.log("but they're already subscribed, ignoring", self.log("but they're already subscribed, ignoring",
level=log.UNUSUAL) level=log.UNUSUAL, umid="Sy9EfA")
return return
subscribers[subscriber] = time.time()
if not subscriber_info: # for_v1
# v1 clients don't provide subscriber_info, but they should
# publish a 'stub client' record which contains the same
# information. If we've already received this, it will be in
# self._stub_client_announcements
tubid = subscriber.getRemoteTubID()
if tubid in self._stub_client_announcements:
subscriber_info = self._stub_client_announcements[tubid]
subscribers[subscriber] = (subscriber_info, time.time())
def _remove(): def _remove():
self.log("introducer: unsubscribing[%s] %s" % (service_name, self.log("introducer: unsubscribing[%s] %s" % (service_name,
subscriber)) subscriber),
umid="vYGcJg")
subscribers.pop(subscriber, None) subscribers.pop(subscriber, None)
subscriber.notifyOnDisconnect(_remove) subscriber.notifyOnDisconnect(_remove)
announcements = set( # now tell them about any announcements they're interested in
[ ann announcements = set( [ ann_t
for (sn2,nodeid),(ann,when) in self._announcements.items() for idx,(ann_t,canary,ann,when)
if sn2 == service_name] ) in self._announcements.items()
if idx[0] == service_name] )
self._debug_counts["outbound_message"] += 1 if announcements:
self._debug_counts["outbound_announcements"] += len(announcements) self._debug_counts["outbound_message"] += 1
d = subscriber.callRemote("announce", announcements) self._debug_counts["outbound_announcements"] += len(announcements)
d.addErrback(rrefutil.trap_deadref) self._debug_outstanding += 1
d.addErrback(log.err, d = subscriber.callRemote("announce_v2", announcements)
format="subscriber errored during subscribe %(anns)s", d.addBoth(self._debug_retired)
anns=announcements, facility="tahoe.introducer", d.addErrback(log.err,
level=log.UNUSUAL, umid="mtZepQ") format="subscriber errored during subscribe %(anns)s",
anns=announcements, facility="tahoe.introducer",
level=log.UNUSUAL, umid="mtZepQ")
return d

View File

@ -195,6 +195,19 @@ class Node(service.MultiService):
# TODO: merge this with allmydata.get_package_versions # TODO: merge this with allmydata.get_package_versions
return dict(app_versions.versions) return dict(app_versions.versions)
def get_config_from_file(self, name, required=False):
"""Get the (string) contents of a config file, or None if the file
did not exist. If required=True, raise an exception rather than
returning None. Any leading or trailing whitespace will be stripped
from the data."""
fn = os.path.join(self.basedir, name)
try:
return fileutil.read(fn).strip()
except EnvironmentError:
if not required:
return None
raise
def write_private_config(self, name, value): def write_private_config(self, name, value):
"""Write the (string) contents of a private config file (which is a """Write the (string) contents of a private config file (which is a
config file that resides within the subdirectory named 'private'), and config file that resides within the subdirectory named 'private'), and

View File

@ -99,6 +99,11 @@ class StorageServer(service.MultiService, Referenceable):
def __repr__(self): def __repr__(self):
return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),) return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
def have_shares(self):
# quick test to decide if we need to commit to an implicit
# permutation-seed or if we should use a new one
return bool(set(os.listdir(self.sharedir)) - set(["incoming"]))
def add_bucket_counter(self): def add_bucket_counter(self):
statefile = os.path.join(self.storedir, "bucket_counter.state") statefile = os.path.join(self.storedir, "bucket_counter.state")
self.bucket_counter = BucketCountingCrawler(self, statefile) self.bucket_counter = BucketCountingCrawler(self, statefile)

View File

@ -29,11 +29,11 @@ the foolscap-based server implemented in src/allmydata/storage/*.py .
# 6: implement other sorts of IStorageClient classes: S3, etc # 6: implement other sorts of IStorageClient classes: S3, etc
import time import re, time
from zope.interface import implements, Interface from zope.interface import implements, Interface
from foolscap.api import eventually from foolscap.api import eventually
from allmydata.interfaces import IStorageBroker from allmydata.interfaces import IStorageBroker
from allmydata.util import idlib, log from allmydata.util import log, base32
from allmydata.util.assertutil import precondition from allmydata.util.assertutil import precondition
from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.util.hashutil import sha1 from allmydata.util.hashutil import sha1
@ -74,8 +74,8 @@ class StorageFarmBroker:
self.introducer_client = None self.introducer_client = None
# these two are used in unit tests # these two are used in unit tests
def test_add_rref(self, serverid, rref): def test_add_rref(self, serverid, rref, ann):
s = NativeStorageServer(serverid, {}) s = NativeStorageServer(serverid, ann.copy())
s.rref = rref s.rref = rref
self.servers[serverid] = s self.servers[serverid] = s
@ -86,21 +86,23 @@ class StorageFarmBroker:
self.introducer_client = ic = introducer_client self.introducer_client = ic = introducer_client
ic.subscribe_to("storage", self._got_announcement) ic.subscribe_to("storage", self._got_announcement)
def _got_announcement(self, serverid, ann_d): def _got_announcement(self, key_s, ann):
precondition(isinstance(serverid, str), serverid) if key_s is not None:
precondition(len(serverid) == 20, serverid) precondition(isinstance(key_s, str), key_s)
assert ann_d["service-name"] == "storage" precondition(key_s.startswith("v0-"), key_s)
assert ann["service-name"] == "storage"
s = NativeStorageServer(key_s, ann)
serverid = s.get_serverid()
old = self.servers.get(serverid) old = self.servers.get(serverid)
if old: if old:
if old.get_announcement() == ann_d: if old.get_announcement() == ann:
return # duplicate return # duplicate
# replacement # replacement
del self.servers[serverid] del self.servers[serverid]
old.stop_connecting() old.stop_connecting()
# now we forget about them and start using the new one # now we forget about them and start using the new one
dsc = NativeStorageServer(serverid, ann_d) self.servers[serverid] = s
self.servers[serverid] = dsc s.start_connecting(self.tub, self._trigger_connections)
dsc.start_connecting(self.tub, self._trigger_connections)
# the descriptor will manage their own Reconnector, and each time we # the descriptor will manage their own Reconnector, and each time we
# need servers, we'll ask them if they're connected or not. # need servers, we'll ask them if they're connected or not.
@ -173,13 +175,25 @@ class NativeStorageServer:
"application-version": "unknown: no get_version()", "application-version": "unknown: no get_version()",
} }
def __init__(self, serverid, ann_d, min_shares=1): def __init__(self, key_s, ann, min_shares=1):
self.serverid = serverid self.key_s = key_s
self._tubid = serverid self.announcement = ann
self.announcement = ann_d
self.min_shares = min_shares self.min_shares = min_shares
self.serverid_s = idlib.shortnodeid_b2a(self.serverid) assert "anonymous-storage-FURL" in ann, ann
furl = str(ann["anonymous-storage-FURL"])
m = re.match(r'pb://(\w+)@', furl)
assert m, furl
tubid_s = m.group(1).lower()
self._tubid = base32.a2b(tubid_s)
assert "permutation-seed-base32" in ann, ann
ps = base32.a2b(str(ann["permutation-seed-base32"]))
self._permutation_seed = ps
name = key_s or tubid_s
self._long_description = name
self._short_description = name[:8] # TODO: decide who adds []
self.announcement_time = time.time() self.announcement_time = time.time()
self.last_connect_time = None self.last_connect_time = None
self.last_loss_time = None self.last_loss_time = None
@ -191,17 +205,17 @@ class NativeStorageServer:
def __repr__(self): def __repr__(self):
return "<NativeStorageServer for %s>" % self.get_name() return "<NativeStorageServer for %s>" % self.get_name()
def get_serverid(self): def get_serverid(self):
return self._tubid return self._tubid # XXX replace with self.key_s
def get_permutation_seed(self): def get_permutation_seed(self):
return self._tubid return self._permutation_seed
def get_version(self): def get_version(self):
if self.rref: if self.rref:
return self.rref.version return self.rref.version
return None return None
def get_name(self): # keep methodname short def get_name(self): # keep methodname short
return self.serverid_s return self._short_description
def get_longname(self): def get_longname(self):
return idlib.nodeid_b2a(self._tubid) return self._long_description
def get_lease_seed(self): def get_lease_seed(self):
return self._tubid return self._tubid
def get_foolscap_write_enabler_seed(self): def get_foolscap_write_enabler_seed(self):
@ -221,7 +235,7 @@ class NativeStorageServer:
return self.announcement_time return self.announcement_time
def start_connecting(self, tub, trigger_cb): def start_connecting(self, tub, trigger_cb):
furl = self.announcement["FURL"] furl = str(self.announcement["anonymous-storage-FURL"])
self._trigger_cb = trigger_cb self._trigger_cb = trigger_cb
self._reconnector = tub.connectTo(furl, self._got_connection) self._reconnector = tub.connectTo(furl, self._got_connection)

View File

@ -22,15 +22,16 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
for (peerid, nickname) in [("\x00"*20, "peer-0"), for (peerid, nickname) in [("\x00"*20, "peer-0"),
("\xff"*20, "peer-f"), ("\xff"*20, "peer-f"),
("\x11"*20, "peer-11")] : ("\x11"*20, "peer-11")] :
ann_d = { "version": 0, ann = { "version": 0,
"service-name": "storage", "service-name": "storage",
"FURL": "fake furl", "anonymous-storage-FURL": "pb://abcde@nowhere/fake",
"nickname": unicode(nickname), "permutation-seed-base32": "",
"app-versions": {}, # need #466 and v2 introducer "nickname": unicode(nickname),
"my-version": "ver", "app-versions": {}, # need #466 and v2 introducer
"oldest-supported": "oldest", "my-version": "ver",
} "oldest-supported": "oldest",
s = NativeStorageServer(peerid, ann_d) }
s = NativeStorageServer(peerid, ann)
sb.test_add_server(peerid, s) sb.test_add_server(peerid, s)
c = FakeClient() c = FakeClient()
c.storage_broker = sb c.storage_broker = sb

View File

@ -136,12 +136,14 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0) self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0)
def _permute(self, sb, key): def _permute(self, sb, key):
return [ s.get_serverid() for s in sb.get_servers_for_psi(key) ] return [ s.get_longname() for s in sb.get_servers_for_psi(key) ]
def test_permute(self): def test_permute(self):
sb = StorageFarmBroker(None, True) sb = StorageFarmBroker(None, True)
for k in ["%d" % i for i in range(5)]: for k in ["%d" % i for i in range(5)]:
sb.test_add_rref(k, "rref") ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
"permutation-seed-base32": base32.b2a(k) }
sb.test_add_rref(k, "rref", ann)
self.failUnlessReallyEqual(self._permute(sb, "one"), ['3','1','0','4','2']) self.failUnlessReallyEqual(self._permute(sb, "one"), ['3','1','0','4','2'])
self.failUnlessReallyEqual(self._permute(sb, "two"), ['0','4','2','1','3']) self.failUnlessReallyEqual(self._permute(sb, "two"), ['0','4','2','1','3'])

View File

@ -1,6 +1,7 @@
import os, re import os, re
from base64 import b32decode from base64 import b32decode
import simplejson
from twisted.trial import unittest from twisted.trial import unittest
from twisted.internet import defer from twisted.internet import defer
@ -9,11 +10,16 @@ from twisted.python import log
from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
from twisted.application import service from twisted.application import service
from allmydata.interfaces import InsufficientVersionError from allmydata.interfaces import InsufficientVersionError
from allmydata.introducer.client import IntroducerClient from allmydata.introducer.client import IntroducerClient, \
WrapV2ClientInV1Interface
from allmydata.introducer.server import IntroducerService from allmydata.introducer.server import IntroducerService
from allmydata.introducer.common import get_tubid_string_from_ann, \
get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \
UnknownKeyError
from allmydata.introducer import old
# test compatibility with old introducer .tac files # test compatibility with old introducer .tac files
from allmydata.introducer import IntroducerNode from allmydata.introducer import IntroducerNode
from allmydata.util import pollmixin from allmydata.util import pollmixin, keyutil
import allmydata.test.common_util as testutil import allmydata.test.common_util as testutil
class LoggingMultiService(service.MultiService): class LoggingMultiService(service.MultiService):
@ -47,14 +53,14 @@ class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
def test_create(self): def test_create(self):
ic = IntroducerClient(None, "introducer.furl", u"my_nickname", ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
"my_version", "oldest_version") "my_version", "oldest_version", {})
self.failUnless(isinstance(ic, IntroducerClient)) self.failUnless(isinstance(ic, IntroducerClient))
def test_listen(self): def test_listen(self):
i = IntroducerService() i = IntroducerService()
i.setServiceParent(self.parent) i.setServiceParent(self.parent)
def test_duplicate(self): def test_duplicate_publish(self):
i = IntroducerService() i = IntroducerService()
self.failUnlessEqual(len(i.get_announcements()), 0) self.failUnlessEqual(len(i.get_announcements()), 0)
self.failUnlessEqual(len(i.get_subscribers()), 0) self.failUnlessEqual(len(i.get_subscribers()), 0)
@ -73,6 +79,223 @@ class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
self.failUnlessEqual(len(i.get_announcements()), 2) self.failUnlessEqual(len(i.get_announcements()), 2)
self.failUnlessEqual(len(i.get_subscribers()), 0) self.failUnlessEqual(len(i.get_subscribers()), 0)
def test_id_collision(self):
# test replacement case where tubid equals a keyid (one should
# not replace the other)
i = IntroducerService()
ic = IntroducerClient(None,
"introducer.furl", u"my_nickname",
"my_version", "oldest_version", {})
sk_s, vk_s = keyutil.make_keypair()
sk, _ignored = keyutil.parse_privkey(sk_s)
keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
i.remote_publish_v2(ann_t, Referenceable())
announcements = i.get_announcements()
self.failUnlessEqual(len(announcements), 1)
key1 = ("storage", "v0-"+keyid, None)
self.failUnless(key1 in announcements)
(ign, ign, ann1_out, ign) = announcements[key1]
self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
i.remote_publish(ann2)
self.failUnlessEqual(len(announcements), 2)
key2 = ("storage", None, keyid)
self.failUnless(key2 in announcements)
(ign, ign, ann2_out, ign) = announcements[key2]
self.failUnlessEqual(ann2_out["anonymous-storage-FURL"], furl2)
def make_ann(furl):
ann = { "anonymous-storage-FURL": furl,
"permutation-seed-base32": get_tubid_string(furl) }
return ann
def make_ann_t(ic, furl, privkey):
return ic.create_announcement("storage", make_ann(furl), privkey)
class Client(unittest.TestCase):
def test_duplicate_receive_v1(self):
ic = IntroducerClient(None,
"introducer.furl", u"my_nickname",
"my_version", "oldest_version", {})
announcements = []
ic.subscribe_to("storage",
lambda key_s,ann: announcements.append(ann))
furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
ca = WrapV2ClientInV1Interface(ic)
ca.remote_announce([ann1])
d = fireEventually()
def _then(ign):
self.failUnlessEqual(len(announcements), 1)
self.failUnlessEqual(announcements[0]["nickname"], u"nick1")
self.failUnlessEqual(announcements[0]["my-version"], "ver23")
self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1)
self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
self.failUnlessEqual(ic._debug_counts["update"], 0)
self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0)
# now send a duplicate announcement: this should not notify clients
ca.remote_announce([ann1])
return fireEventually()
d.addCallback(_then)
def _then2(ign):
self.failUnlessEqual(len(announcements), 1)
self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2)
self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
self.failUnlessEqual(ic._debug_counts["update"], 0)
self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
# and a replacement announcement: same FURL, new other stuff.
# Clients should be notified.
ca.remote_announce([ann1b])
return fireEventually()
d.addCallback(_then2)
def _then3(ign):
self.failUnlessEqual(len(announcements), 2)
self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3)
self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
self.failUnlessEqual(ic._debug_counts["update"], 1)
self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
# test that the other stuff changed
self.failUnlessEqual(announcements[-1]["nickname"], u"nick1")
self.failUnlessEqual(announcements[-1]["my-version"], "ver24")
d.addCallback(_then3)
return d
def test_duplicate_receive_v2(self):
ic1 = IntroducerClient(None,
"introducer.furl", u"my_nickname",
"ver23", "oldest_version", {})
# we use a second client just to create a different-looking
# announcement
ic2 = IntroducerClient(None,
"introducer.furl", u"my_nickname",
"ver24","oldest_version",{})
announcements = []
def _received(key_s, ann):
announcements.append( (key_s, ann) )
ic1.subscribe_to("storage", _received)
furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:7777/gydnp"
furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/ttwwoo"
privkey_s, pubkey_vs = keyutil.make_keypair()
privkey, _ignored = keyutil.parse_privkey(privkey_s)
pubkey_s = keyutil.remove_prefix(pubkey_vs, "pub-")
# ann1: ic1, furl1
# ann1a: ic1, furl1a (same SturdyRef, different connection hints)
# ann1b: ic2, furl1
# ann2: ic2, furl2
self.ann1 = make_ann_t(ic1, furl1, privkey)
self.ann1a = make_ann_t(ic1, furl1a, privkey)
self.ann1b = make_ann_t(ic2, furl1, privkey)
self.ann2 = make_ann_t(ic2, furl2, privkey)
ic1.remote_announce_v2([self.ann1]) # queues eventual-send
d = fireEventually()
def _then1(ign):
self.failUnlessEqual(len(announcements), 1)
key_s,ann = announcements[0]
self.failUnlessEqual(key_s, pubkey_s)
self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
self.failUnlessEqual(ann["my-version"], "ver23")
d.addCallback(_then1)
# now send a duplicate announcement. This should not fire the
# subscriber
d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1]))
d.addCallback(fireEventually)
def _then2(ign):
self.failUnlessEqual(len(announcements), 1)
d.addCallback(_then2)
# and a replacement announcement: same FURL, new other stuff. The
# subscriber *should* be fired.
d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b]))
d.addCallback(fireEventually)
def _then3(ign):
self.failUnlessEqual(len(announcements), 2)
key_s,ann = announcements[-1]
self.failUnlessEqual(key_s, pubkey_s)
self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
self.failUnlessEqual(ann["my-version"], "ver24")
d.addCallback(_then3)
# and a replacement announcement with a different FURL (it uses
# different connection hints)
d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a]))
d.addCallback(fireEventually)
def _then4(ign):
self.failUnlessEqual(len(announcements), 3)
key_s,ann = announcements[-1]
self.failUnlessEqual(key_s, pubkey_s)
self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
self.failUnlessEqual(ann["my-version"], "ver23")
d.addCallback(_then4)
# now add a new subscription, which should be called with the
# backlog. The introducer only records one announcement per index, so
# the backlog will only have the latest message.
announcements2 = []
def _received2(key_s, ann):
announcements2.append( (key_s, ann) )
d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2))
d.addCallback(fireEventually)
def _then5(ign):
self.failUnlessEqual(len(announcements2), 1)
key_s,ann = announcements2[-1]
self.failUnlessEqual(key_s, pubkey_s)
self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
self.failUnlessEqual(ann["my-version"], "ver23")
d.addCallback(_then5)
return d
def test_id_collision(self):
# test replacement case where tubid equals a keyid (one should
# not replace the other)
ic = IntroducerClient(None,
"introducer.furl", u"my_nickname",
"my_version", "oldest_version", {})
announcements = []
ic.subscribe_to("storage",
lambda key_s,ann: announcements.append(ann))
sk_s, vk_s = keyutil.make_keypair()
sk, _ignored = keyutil.parse_privkey(sk_s)
keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
ic.remote_announce_v2([ann_t])
d = fireEventually()
def _then(ign):
# first announcement has been processed
self.failUnlessEqual(len(announcements), 1)
self.failUnlessEqual(announcements[0]["anonymous-storage-FURL"],
furl1)
# now submit a second one, with a tubid that happens to look just
# like the pubkey-based serverid we just processed. They should
# not overlap.
ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
ca = WrapV2ClientInV1Interface(ic)
ca.remote_announce([ann2])
return fireEventually()
d.addCallback(_then)
def _then2(ign):
# if they overlapped, the second announcement would be ignored
self.failUnlessEqual(len(announcements), 2)
self.failUnlessEqual(announcements[1]["anonymous-storage-FURL"],
furl2)
d.addCallback(_then2)
return d
class SystemTestMixin(ServiceMixin, pollmixin.PollMixin): class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
def create_tub(self, portnum=0): def create_tub(self, portnum=0):
@ -88,36 +311,86 @@ class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
assert self.central_portnum == portnum assert self.central_portnum == portnum
tub.setLocation("localhost:%d" % self.central_portnum) tub.setLocation("localhost:%d" % self.central_portnum)
class Queue(SystemTestMixin, unittest.TestCase):
def test_queue_until_connected(self):
self.basedir = "introducer/QueueUntilConnected/queued"
os.makedirs(self.basedir)
self.create_tub()
introducer = IntroducerService()
introducer.setServiceParent(self.parent)
iff = os.path.join(self.basedir, "introducer.furl")
ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
tub2 = Tub()
tub2.setServiceParent(self.parent)
c = IntroducerClient(tub2, ifurl,
u"nickname", "version", "oldest", {})
furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
sk_s, vk_s = keyutil.make_keypair()
sk, _ignored = keyutil.parse_privkey(sk_s)
d = introducer.disownServiceParent()
def _offline(ign):
# now that the introducer server is offline, create a client and
# publish some messages
c.setServiceParent(self.parent) # this starts the reconnector
c.publish("storage", make_ann(furl1), sk)
introducer.setServiceParent(self.parent) # restart the server
# now wait for the messages to be delivered
def _got_announcement():
return bool(introducer.get_announcements())
return self.poll(_got_announcement)
d.addCallback(_offline)
def _done(ign):
v = list(introducer.get_announcements().values())[0]
(ign, ign, ann1_out, ign) = v
self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
d.addCallback(_done)
# now let the ack get back
def _wait_until_idle(ign):
def _idle():
if c._debug_outstanding:
return False
if introducer._debug_outstanding:
return False
return True
return self.poll(_idle)
d.addCallback(_wait_until_idle)
return d
V1 = "v1"; V2 = "v2"
class SystemTest(SystemTestMixin, unittest.TestCase): class SystemTest(SystemTestMixin, unittest.TestCase):
def test_system(self): def do_system_test(self, server_version):
self.basedir = "introducer/SystemTest/system"
os.makedirs(self.basedir)
return self.do_system_test(IntroducerService)
test_system.timeout = 480 # occasionally takes longer than 350s on "draco"
def do_system_test(self, create_introducer):
self.create_tub() self.create_tub()
introducer = create_introducer() if server_version == V1:
introducer = old.IntroducerService_v1()
else:
introducer = IntroducerService()
introducer.setServiceParent(self.parent) introducer.setServiceParent(self.parent)
iff = os.path.join(self.basedir, "introducer.furl") iff = os.path.join(self.basedir, "introducer.furl")
tub = self.central_tub tub = self.central_tub
ifurl = self.central_tub.registerReference(introducer, furlFile=iff) ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
self.introducer_furl = ifurl self.introducer_furl = ifurl
NUMCLIENTS = 5 # we have 5 clients who publish themselves as storage servers, and a
# we have 5 clients who publish themselves, and an extra one does # sixth which does which not. All 6 clients subscriber to hear about
# which not. When the connections are fully established, all six nodes # storage. When the connections are fully established, all six nodes
# should have 5 connections each. # should have 5 connections each.
NUM_STORAGE = 5
NUM_CLIENTS = 6
clients = [] clients = []
tubs = {} tubs = {}
received_announcements = {} received_announcements = {}
NUM_SERVERS = NUMCLIENTS
subscribing_clients = [] subscribing_clients = []
publishing_clients = [] publishing_clients = []
privkeys = {}
expected_announcements = [0 for c in range(NUM_CLIENTS)]
for i in range(NUMCLIENTS+1): for i in range(NUM_CLIENTS):
tub = Tub() tub = Tub()
#tub.setOption("logLocalFailures", True) #tub.setOption("logLocalFailures", True)
#tub.setOption("logRemoteFailures", True) #tub.setOption("logRemoteFailures", True)
@ -128,62 +401,171 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
tub.setLocation("localhost:%d" % portnum) tub.setLocation("localhost:%d" % portnum)
log.msg("creating client %d: %s" % (i, tub.getShortTubID())) log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
c = IntroducerClient(tub, self.introducer_furl, u"nickname-%d" % i, if i == 0:
"version", "oldest") c = old.IntroducerClient_v1(tub, self.introducer_furl,
u"nickname-%d" % i,
"version", "oldest")
else:
c = IntroducerClient(tub, self.introducer_furl,
u"nickname-%d" % i,
"version", "oldest",
{"component": "component-v1"})
received_announcements[c] = {} received_announcements[c] = {}
def got(serverid, ann_d, announcements): def got(key_s_or_tubid, ann, announcements, i):
announcements[serverid] = ann_d if i == 0:
c.subscribe_to("storage", got, received_announcements[c]) index = get_tubid_string_from_ann(ann)
else:
index = key_s_or_tubid or get_tubid_string_from_ann(ann)
announcements[index] = ann
c.subscribe_to("storage", got, received_announcements[c], i)
subscribing_clients.append(c) subscribing_clients.append(c)
expected_announcements[i] += 1 # all expect a 'storage' announcement
if i < NUMCLIENTS: node_furl = tub.registerReference(Referenceable())
node_furl = tub.registerReference(Referenceable()) if i < NUM_STORAGE:
c.publish(node_furl, "storage", "ri_name") if i == 0:
c.publish(node_furl, "storage", "ri_name")
elif i == 1:
# sign the announcement
privkey_s, pubkey_s = keyutil.make_keypair()
privkey, _ignored = keyutil.parse_privkey(privkey_s)
privkeys[c] = privkey
c.publish("storage", make_ann(node_furl), privkey)
else:
c.publish("storage", make_ann(node_furl))
publishing_clients.append(c) publishing_clients.append(c)
# the last one does not publish anything else:
# the last one does not publish anything
pass
if i == 0:
# users of the V1 client were required to publish a
# 'stub_client' record (somewhat after they published the
# 'storage' record), so the introducer could see their
# version. Match that behavior.
c.publish(node_furl, "stub_client", "stub_ri_name")
if i == 2:
# also publish something that nobody cares about
boring_furl = tub.registerReference(Referenceable())
c.publish("boring", make_ann(boring_furl))
c.setServiceParent(self.parent) c.setServiceParent(self.parent)
clients.append(c) clients.append(c)
tubs[c] = tub tubs[c] = tub
def _wait_for_all_connections():
for c in subscribing_clients: def _wait_for_connected(ign):
if len(received_announcements[c]) < NUM_SERVERS: def _connected():
for c in clients:
if not c.connected_to_introducer():
return False
return True
return self.poll(_connected)
# we watch the clients to determine when the system has settled down.
# Then we can look inside the server to assert things about its
# state.
def _wait_for_expected_announcements(ign):
def _got_expected_announcements():
for i,c in enumerate(subscribing_clients):
if len(received_announcements[c]) < expected_announcements[i]:
return False
return True
return self.poll(_got_expected_announcements)
# before shutting down any Tub, we'd like to know that there are no
# messages outstanding
def _wait_until_idle(ign):
def _idle():
for c in subscribing_clients + publishing_clients:
if c._debug_outstanding:
return False
if introducer._debug_outstanding:
return False return False
return True return True
d = self.poll(_wait_for_all_connections) return self.poll(_idle)
d = defer.succeed(None)
d.addCallback(_wait_for_connected)
d.addCallback(_wait_for_expected_announcements)
d.addCallback(_wait_until_idle)
def _check1(res): def _check1(res):
log.msg("doing _check1") log.msg("doing _check1")
dc = introducer._debug_counts dc = introducer._debug_counts
self.failUnlessEqual(dc["inbound_message"], NUM_SERVERS) if server_version == V1:
self.failUnlessEqual(dc["inbound_duplicate"], 0) # each storage server publishes a record, and (after its
# 'subscribe' has been ACKed) also publishes a "stub_client".
# The non-storage client (which subscribes) also publishes a
# stub_client. There is also one "boring" service. The number
# of messages is higher, because the stub_clients aren't
# published until after we get the 'subscribe' ack (since we
# don't realize that we're dealing with a v1 server [which
# needs stub_clients] until then), and the act of publishing
# the stub_client causes us to re-send all previous
# announcements.
self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"],
NUM_STORAGE + NUM_CLIENTS + 1)
else:
# each storage server publishes a record. There is also one
# "stub_client" and one "boring"
self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2)
self.failUnlessEqual(dc["inbound_duplicate"], 0)
self.failUnlessEqual(dc["inbound_update"], 0) self.failUnlessEqual(dc["inbound_update"], 0)
self.failUnless(dc["outbound_message"]) self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
# the number of outbound messages is tricky.. I think it depends
# upon a race between the publish and the subscribe messages.
self.failUnless(dc["outbound_message"] > 0)
# each client subscribes to "storage", and each server publishes
self.failUnlessEqual(dc["outbound_announcements"],
NUM_STORAGE*NUM_CLIENTS)
for c in clients:
self.failUnless(c.connected_to_introducer())
for c in subscribing_clients: for c in subscribing_clients:
cdc = c._debug_counts cdc = c._debug_counts
self.failUnless(cdc["inbound_message"]) self.failUnless(cdc["inbound_message"])
self.failUnlessEqual(cdc["inbound_announcement"], self.failUnlessEqual(cdc["inbound_announcement"],
NUM_SERVERS) NUM_STORAGE)
self.failUnlessEqual(cdc["wrong_service"], 0) self.failUnlessEqual(cdc["wrong_service"], 0)
self.failUnlessEqual(cdc["duplicate_announcement"], 0) self.failUnlessEqual(cdc["duplicate_announcement"], 0)
self.failUnlessEqual(cdc["update"], 0) self.failUnlessEqual(cdc["update"], 0)
self.failUnlessEqual(cdc["new_announcement"], self.failUnlessEqual(cdc["new_announcement"],
NUM_SERVERS) NUM_STORAGE)
anns = received_announcements[c] anns = received_announcements[c]
self.failUnlessEqual(len(anns), NUM_SERVERS) self.failUnlessEqual(len(anns), NUM_STORAGE)
nodeid0 = b32decode(tubs[clients[0]].tubID.upper()) nodeid0 = tubs[clients[0]].tubID
ann_d = anns[nodeid0] ann = anns[nodeid0]
nick = ann_d["nickname"] nick = ann["nickname"]
self.failUnlessEqual(type(nick), unicode) self.failUnlessEqual(type(nick), unicode)
self.failUnlessEqual(nick, u"nickname-0") self.failUnlessEqual(nick, u"nickname-0")
for c in publishing_clients: if server_version == V1:
cdc = c._debug_counts for c in publishing_clients:
self.failUnlessEqual(cdc["outbound_message"], 1) cdc = c._debug_counts
expected = 1 # storage
if c is clients[2]:
expected += 1 # boring
if c is not clients[0]:
# the v2 client tries to call publish_v2, which fails
# because the server is v1. It then re-sends
# everything it has so far, plus a stub_client record
expected = 2*expected + 1
if c is clients[0]:
# we always tell v1 client to send stub_client
expected += 1
self.failUnlessEqual(cdc["outbound_message"], expected)
else:
for c in publishing_clients:
cdc = c._debug_counts
expected = 1
if c in [clients[0], # stub_client
clients[2], # boring
]:
expected = 2
self.failUnlessEqual(cdc["outbound_message"], expected)
log.msg("_check1 done")
d.addCallback(_check1) d.addCallback(_check1)
# force an introducer reconnect, by shutting down the Tub it's using # force an introducer reconnect, by shutting down the Tub it's using
@ -196,67 +578,54 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub")) d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
d.addCallback(lambda _ign: self.central_tub.disownServiceParent()) d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
def _wait_for_introducer_loss(): def _wait_for_introducer_loss(ign):
for c in clients: def _introducer_lost():
if c.connected_to_introducer(): for c in clients:
return False if c.connected_to_introducer():
return True return False
d.addCallback(lambda res: self.poll(_wait_for_introducer_loss)) return True
return self.poll(_introducer_lost)
d.addCallback(_wait_for_introducer_loss)
def _restart_introducer_tub(_ign): def _restart_introducer_tub(_ign):
log.msg("restarting introducer's Tub") log.msg("restarting introducer's Tub")
# reset counters
dc = introducer._debug_counts for i in range(NUM_CLIENTS):
self.expected_count = dc["inbound_message"] + NUM_SERVERS c = subscribing_clients[i]
self.expected_subscribe_count = dc["inbound_subscribe"] + NUMCLIENTS+1 for k in c._debug_counts:
introducer._debug0 = dc["outbound_message"] c._debug_counts[k] = 0
for c in subscribing_clients: for k in introducer._debug_counts:
cdc = c._debug_counts introducer._debug_counts[k] = 0
c._debug0 = cdc["inbound_message"] expected_announcements[i] += 1 # new 'storage' for everyone
self.create_tub(self.central_portnum) self.create_tub(self.central_portnum)
newfurl = self.central_tub.registerReference(introducer, newfurl = self.central_tub.registerReference(introducer,
furlFile=iff) furlFile=iff)
assert newfurl == self.introducer_furl assert newfurl == self.introducer_furl
d.addCallback(_restart_introducer_tub) d.addCallback(_restart_introducer_tub)
def _wait_for_introducer_reconnect(): d.addCallback(_wait_for_connected)
# wait until: d.addCallback(_wait_for_expected_announcements)
# all clients are connected d.addCallback(_wait_until_idle)
# the introducer has received publish messages from all of them d.addCallback(lambda _ign: log.msg(" reconnected"))
# the introducer has received subscribe messages from all of them
# the introducer has sent (duplicate) announcements to all of them
# all clients have received (duplicate) announcements
dc = introducer._debug_counts
for c in clients:
if not c.connected_to_introducer():
return False
if dc["inbound_message"] < self.expected_count:
return False
if dc["inbound_subscribe"] < self.expected_subscribe_count:
return False
for c in subscribing_clients:
cdc = c._debug_counts
if cdc["inbound_message"] < c._debug0+1:
return False
return True
d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect))
# TODO: publish something while the introducer is offline, then
# confirm it gets delivered when the connection is reestablished
def _check2(res): def _check2(res):
log.msg("doing _check2") log.msg("doing _check2")
# assert that the introducer sent out new messages, one per # assert that the introducer sent out new messages, one per
# subscriber # subscriber
dc = introducer._debug_counts dc = introducer._debug_counts
self.failUnlessEqual(dc["inbound_message"], 2*NUM_SERVERS) self.failUnlessEqual(dc["outbound_announcements"],
self.failUnlessEqual(dc["inbound_duplicate"], NUM_SERVERS) NUM_STORAGE*NUM_CLIENTS)
self.failUnlessEqual(dc["inbound_update"], 0) self.failUnless(dc["outbound_message"] > 0)
self.failUnlessEqual(dc["outbound_message"], self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
introducer._debug0 + len(subscribing_clients))
for c in clients:
self.failUnless(c.connected_to_introducer())
for c in subscribing_clients: for c in subscribing_clients:
cdc = c._debug_counts cdc = c._debug_counts
self.failUnlessEqual(cdc["duplicate_announcement"], NUM_SERVERS) self.failUnlessEqual(cdc["inbound_message"], 1)
self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
self.failUnlessEqual(cdc["new_announcement"], 0)
self.failUnlessEqual(cdc["wrong_service"], 0)
self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
d.addCallback(_check2) d.addCallback(_check2)
# Then force an introducer restart, by shutting down the Tub, # Then force an introducer restart, by shutting down the Tub,
@ -267,71 +636,211 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
d.addCallback(lambda _ign: log.msg("shutting down introducer")) d.addCallback(lambda _ign: log.msg("shutting down introducer"))
d.addCallback(lambda _ign: self.central_tub.disownServiceParent()) d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
d.addCallback(lambda res: self.poll(_wait_for_introducer_loss)) d.addCallback(_wait_for_introducer_loss)
d.addCallback(lambda _ign: log.msg("introducer lost"))
def _restart_introducer(_ign): def _restart_introducer(_ign):
log.msg("restarting introducer") log.msg("restarting introducer")
self.create_tub(self.central_portnum) self.create_tub(self.central_portnum)
# reset counters
for c in subscribing_clients: for i in range(NUM_CLIENTS):
# record some counters for later comparison. Stash the values c = subscribing_clients[i]
# on the client itself, because I'm lazy. for k in c._debug_counts:
cdc = c._debug_counts c._debug_counts[k] = 0
c._debug1 = cdc["inbound_announcement"] expected_announcements[i] += 1 # new 'storage' for everyone
c._debug2 = cdc["inbound_message"] if server_version == V1:
c._debug3 = cdc["new_announcement"] introducer = old.IntroducerService_v1()
newintroducer = create_introducer() else:
self.expected_message_count = NUM_SERVERS introducer = IntroducerService()
self.expected_announcement_count = NUM_SERVERS*len(subscribing_clients) newfurl = self.central_tub.registerReference(introducer,
self.expected_subscribe_count = len(subscribing_clients)
newfurl = self.central_tub.registerReference(newintroducer,
furlFile=iff) furlFile=iff)
assert newfurl == self.introducer_furl assert newfurl == self.introducer_furl
d.addCallback(_restart_introducer) d.addCallback(_restart_introducer)
def _wait_for_introducer_reconnect2():
# wait until: d.addCallback(_wait_for_connected)
# all clients are connected d.addCallback(_wait_for_expected_announcements)
# the introducer has received publish messages from all of them d.addCallback(_wait_until_idle)
# the introducer has received subscribe messages from all of them
# the introducer has sent announcements for everybody to everybody
# all clients have received all the (duplicate) announcements
# at that point, the system should be quiescent
dc = introducer._debug_counts
for c in clients:
if not c.connected_to_introducer():
return False
if dc["inbound_message"] < self.expected_message_count:
return False
if dc["outbound_announcements"] < self.expected_announcement_count:
return False
if dc["inbound_subscribe"] < self.expected_subscribe_count:
return False
for c in subscribing_clients:
cdc = c._debug_counts
if cdc["inbound_announcement"] < c._debug1+NUM_SERVERS:
return False
return True
d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect2))
def _check3(res): def _check3(res):
log.msg("doing _check3") log.msg("doing _check3")
for c in clients: dc = introducer._debug_counts
self.failUnless(c.connected_to_introducer()) self.failUnlessEqual(dc["outbound_announcements"],
NUM_STORAGE*NUM_CLIENTS)
self.failUnless(dc["outbound_message"] > 0)
self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
for c in subscribing_clients: for c in subscribing_clients:
cdc = c._debug_counts cdc = c._debug_counts
self.failUnless(cdc["inbound_announcement"] > c._debug1) self.failUnless(cdc["inbound_message"] > 0)
self.failUnless(cdc["inbound_message"] > c._debug2) self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
# there should have been no new announcements self.failUnlessEqual(cdc["new_announcement"], 0)
self.failUnlessEqual(cdc["new_announcement"], c._debug3) self.failUnlessEqual(cdc["wrong_service"], 0)
# and the right number of duplicate ones. There were self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
# NUM_SERVERS from the servertub restart, and there should be
# another NUM_SERVERS now
self.failUnlessEqual(cdc["duplicate_announcement"],
2*NUM_SERVERS)
d.addCallback(_check3) d.addCallback(_check3)
return d return d
def test_system_v2_server(self):
self.basedir = "introducer/SystemTest/system_v2_server"
os.makedirs(self.basedir)
return self.do_system_test(V2)
test_system_v2_server.timeout = 480
# occasionally takes longer than 350s on "draco"
def test_system_v1_server(self):
self.basedir = "introducer/SystemTest/system_v1_server"
os.makedirs(self.basedir)
return self.do_system_test(V1)
test_system_v1_server.timeout = 480
# occasionally takes longer than 350s on "draco"
class FakeRemoteReference:
def notifyOnDisconnect(self, *args, **kwargs): pass
def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
class ClientInfo(unittest.TestCase):
def test_client_v2(self):
introducer = IntroducerService()
tub = introducer_furl = None
app_versions = {"whizzy": "fizzy"}
client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
"my_version", "oldest", app_versions)
#furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
#ann_s = make_ann_t(client_v2, furl1, None)
#introducer.remote_publish_v2(ann_s, Referenceable())
subscriber = FakeRemoteReference()
introducer.remote_subscribe_v2(subscriber, "storage",
client_v2._my_subscriber_info)
s = introducer.get_subscribers()
self.failUnlessEqual(len(s), 1)
sn, when, si, rref = s[0]
self.failUnlessIdentical(rref, subscriber)
self.failUnlessEqual(sn, "storage")
self.failUnlessEqual(si["version"], 0)
self.failUnlessEqual(si["oldest-supported"], "oldest")
self.failUnlessEqual(si["app-versions"], app_versions)
self.failUnlessEqual(si["nickname"], u"nick-v2")
self.failUnlessEqual(si["my-version"], "my_version")
def test_client_v1(self):
introducer = IntroducerService()
subscriber = FakeRemoteReference()
introducer.remote_subscribe(subscriber, "storage")
# the v1 subscribe interface had no subscriber_info: that was usually
# sent in a separate stub_client pseudo-announcement
s = introducer.get_subscribers()
self.failUnlessEqual(len(s), 1)
sn, when, si, rref = s[0]
# rref will be a WrapV1SubscriberInV2Interface around the real
# subscriber
self.failUnlessIdentical(rref.original, subscriber)
self.failUnlessEqual(si, None) # not known yet
self.failUnlessEqual(sn, "storage")
# now submit the stub_client announcement
furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
ann = (furl1, "stub_client", "RIStubClient",
u"nick-v1".encode("utf-8"), "my_version", "oldest")
introducer.remote_publish(ann)
# the server should correlate the two
s = introducer.get_subscribers()
self.failUnlessEqual(len(s), 1)
sn, when, si, rref = s[0]
self.failUnlessIdentical(rref.original, subscriber)
self.failUnlessEqual(sn, "storage")
self.failUnlessEqual(si["version"], 0)
self.failUnlessEqual(si["oldest-supported"], "oldest")
# v1 announcements do not contain app-versions
self.failUnlessEqual(si["app-versions"], {})
self.failUnlessEqual(si["nickname"], u"nick-v1")
self.failUnlessEqual(si["my-version"], "my_version")
# a subscription that arrives after the stub_client announcement
# should be correlated too
subscriber2 = FakeRemoteReference()
introducer.remote_subscribe(subscriber2, "thing2")
s = introducer.get_subscribers()
subs = dict([(sn, (si,rref)) for sn, when, si, rref in s])
self.failUnlessEqual(len(subs), 2)
(si,rref) = subs["thing2"]
self.failUnlessIdentical(rref.original, subscriber2)
self.failUnlessEqual(si["version"], 0)
self.failUnlessEqual(si["oldest-supported"], "oldest")
# v1 announcements do not contain app-versions
self.failUnlessEqual(si["app-versions"], {})
self.failUnlessEqual(si["nickname"], u"nick-v1")
self.failUnlessEqual(si["my-version"], "my_version")
class Announcements(unittest.TestCase):
def test_client_v2_unsigned(self):
introducer = IntroducerService()
tub = introducer_furl = None
app_versions = {"whizzy": "fizzy"}
client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
"my_version", "oldest", app_versions)
furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
ann_s0 = make_ann_t(client_v2, furl1, None)
canary0 = Referenceable()
introducer.remote_publish_v2(ann_s0, canary0)
a = introducer.get_announcements()
self.failUnlessEqual(len(a), 1)
(index, (ann_s, canary, ann, when)) = a.items()[0]
self.failUnlessIdentical(canary, canary0)
self.failUnlessEqual(index, ("storage", None, tubid))
self.failUnlessEqual(ann["app-versions"], app_versions)
self.failUnlessEqual(ann["nickname"], u"nick-v2")
self.failUnlessEqual(ann["service-name"], "storage")
self.failUnlessEqual(ann["my-version"], "my_version")
self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
def test_client_v2_signed(self):
introducer = IntroducerService()
tub = introducer_furl = None
app_versions = {"whizzy": "fizzy"}
client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
"my_version", "oldest", app_versions)
furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
sk_s, vk_s = keyutil.make_keypair()
sk, _ignored = keyutil.parse_privkey(sk_s)
pks = keyutil.remove_prefix(vk_s, "pub-")
ann_t0 = make_ann_t(client_v2, furl1, sk)
canary0 = Referenceable()
introducer.remote_publish_v2(ann_t0, canary0)
a = introducer.get_announcements()
self.failUnlessEqual(len(a), 1)
(index, (ann_s, canary, ann, when)) = a.items()[0]
self.failUnlessIdentical(canary, canary0)
self.failUnlessEqual(index, ("storage", pks, None))
self.failUnlessEqual(ann["app-versions"], app_versions)
self.failUnlessEqual(ann["nickname"], u"nick-v2")
self.failUnlessEqual(ann["service-name"], "storage")
self.failUnlessEqual(ann["my-version"], "my_version")
self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
def test_client_v1(self):
introducer = IntroducerService()
furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
ann = (furl1, "storage", "RIStorage",
u"nick-v1".encode("utf-8"), "my_version", "oldest")
introducer.remote_publish(ann)
a = introducer.get_announcements()
self.failUnlessEqual(len(a), 1)
(index, (ann_s, canary, ann, when)) = a.items()[0]
self.failUnlessEqual(canary, None)
self.failUnlessEqual(index, ("storage", None, tubid))
self.failUnlessEqual(ann["app-versions"], {})
self.failUnlessEqual(ann["nickname"], u"nick-v1".encode("utf-8"))
self.failUnlessEqual(ann["service-name"], "storage")
self.failUnlessEqual(ann["my-version"], "my_version")
self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
class TooNewServer(IntroducerService): class TooNewServer(IntroducerService):
VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999": VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
{ }, { },
@ -359,10 +868,10 @@ class NonV1Server(SystemTestMixin, unittest.TestCase):
tub.setLocation("localhost:%d" % portnum) tub.setLocation("localhost:%d" % portnum)
c = IntroducerClient(tub, self.introducer_furl, c = IntroducerClient(tub, self.introducer_furl,
u"nickname-client", "version", "oldest") u"nickname-client", "version", "oldest", {})
announcements = {} announcements = {}
def got(serverid, ann_d): def got(key_s, ann):
announcements[serverid] = ann_d announcements[key_s] = ann
c.subscribe_to("storage", got) c.subscribe_to("storage", got)
c.setServiceParent(self.parent) c.setServiceParent(self.parent)
@ -374,7 +883,8 @@ class NonV1Server(SystemTestMixin, unittest.TestCase):
d = self.poll(_got_bad) d = self.poll(_got_bad)
def _done(res): def _done(res):
self.failUnless(c._introducer_error) self.failUnless(c._introducer_error)
self.failUnless(c._introducer_error.check(InsufficientVersionError)) self.failUnless(c._introducer_error.check(InsufficientVersionError),
c._introducer_error)
d.addCallback(_done) d.addCallback(_done)
return d return d
@ -388,3 +898,44 @@ class DecodeFurl(unittest.TestCase):
nodeid = b32decode(m.group(1).upper()) nodeid = b32decode(m.group(1).upper())
self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d") self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
class Signatures(unittest.TestCase):
def test_sign(self):
ann = {"key1": "value1"}
sk_s,vk_s = keyutil.make_keypair()
sk,ignored = keyutil.parse_privkey(sk_s)
ann_t = sign_to_foolscap(ann, sk)
(msg, sig, key) = ann_t
self.failUnlessEqual(type(msg), type("".encode("utf-8"))) # bytes
self.failUnlessEqual(simplejson.loads(msg.decode("utf-8")), ann)
self.failUnless(sig.startswith("v0-"))
self.failUnless(key.startswith("v0-"))
(ann2,key2) = unsign_from_foolscap(ann_t)
self.failUnlessEqual(ann2, ann)
self.failUnlessEqual("pub-"+key2, vk_s)
# bad signature
bad_ann = {"key1": "value2"}
bad_msg = simplejson.dumps(bad_ann).encode("utf-8")
self.failUnlessRaises(keyutil.BadSignatureError,
unsign_from_foolscap, (bad_msg,sig,key))
# sneaky bad signature should be ignored
(ann2,key2) = unsign_from_foolscap( (bad_msg,None,key) )
self.failUnlessEqual(key2, None)
self.failUnlessEqual(ann2, bad_ann)
# unrecognized signatures
self.failUnlessRaises(UnknownKeyError,
unsign_from_foolscap, (bad_msg,"v999-sig",key))
self.failUnlessRaises(UnknownKeyError,
unsign_from_foolscap, (bad_msg,sig,"v999-key"))
# add tests of StorageFarmBroker: if it receives duplicate announcements, it
# should leave the Reconnector in place, also if it receives
# same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it
# should tear down the Reconnector and make a new one. This behavior used to
# live in the IntroducerClient, and thus used to be tested by test_introducer
# copying more tests from old branch:
# then also add Upgrade test

View File

@ -228,7 +228,9 @@ def make_storagebroker(s=None, num_peers=10):
storage_broker = StorageFarmBroker(None, True) storage_broker = StorageFarmBroker(None, True)
for peerid in peerids: for peerid in peerids:
fss = FakeStorageServer(peerid, s) fss = FakeStorageServer(peerid, s)
storage_broker.test_add_rref(peerid, fss) ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid),
"permutation-seed-base32": base32.b2a(peerid) }
storage_broker.test_add_rref(peerid, fss, ann)
return storage_broker return storage_broker
def make_nodemaker(s=None, num_peers=10): def make_nodemaker(s=None, num_peers=10):

View File

@ -783,7 +783,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
newappverstr = "%s: %s" % (allmydata.__appname__, altverstr) newappverstr = "%s: %s" % (allmydata.__appname__, altverstr)
self.failUnless((appverstr in res) or (newappverstr in res), (appverstr, newappverstr, res)) self.failUnless((appverstr in res) or (newappverstr in res), (appverstr, newappverstr, res))
self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res) self.failUnless("Announcement Summary: storage: 5" in res)
self.failUnless("Subscription Summary: storage: 5" in res) self.failUnless("Subscription Summary: storage: 5" in res)
self.failUnless("tahoe.css" in res) self.failUnless("tahoe.css" in res)
except unittest.FailTest: except unittest.FailTest:
@ -804,9 +804,9 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
self.failUnlessEqual(data["subscription_summary"], self.failUnlessEqual(data["subscription_summary"],
{"storage": 5}) {"storage": 5})
self.failUnlessEqual(data["announcement_summary"], self.failUnlessEqual(data["announcement_summary"],
{"storage": 5, "stub_client": 5}) {"storage": 5})
self.failUnlessEqual(data["announcement_distinct_hosts"], self.failUnlessEqual(data["announcement_distinct_hosts"],
{"storage": 1, "stub_client": 1}) {"storage": 1})
except unittest.FailTest: except unittest.FailTest:
print print
print "GET %s?t=json output was:" % self.introweb_url print "GET %s?t=json output was:" % self.introweb_url

View File

@ -11,7 +11,7 @@ import allmydata # for __full_version__
from allmydata import uri, monitor, client from allmydata import uri, monitor, client
from allmydata.immutable import upload, encode from allmydata.immutable import upload, encode
from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
from allmydata.util import log from allmydata.util import log, base32
from allmydata.util.assertutil import precondition from allmydata.util.assertutil import precondition
from allmydata.util.deferredutil import DeferredListShouldSucceed from allmydata.util.deferredutil import DeferredListShouldSucceed
from allmydata.test.no_network import GridTestMixin from allmydata.test.no_network import GridTestMixin
@ -197,7 +197,9 @@ class FakeClient:
for fakeid in range(self.num_servers) ] for fakeid in range(self.num_servers) ]
self.storage_broker = StorageFarmBroker(None, permute_peers=True) self.storage_broker = StorageFarmBroker(None, permute_peers=True)
for (serverid, rref) in servers: for (serverid, rref) in servers:
self.storage_broker.test_add_rref(serverid, rref) ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid),
"permutation-seed-base32": base32.b2a(serverid) }
self.storage_broker.test_add_rref(serverid, rref, ann)
self.last_servers = [s[1] for s in servers] self.last_servers = [s[1] for s in servers]
def log(self, *args, **kwargs): def log(self, *args, **kwargs):

View File

@ -34,15 +34,20 @@ class IntroducerRoot(rend.Page):
def render_JSON(self, ctx): def render_JSON(self, ctx):
res = {} res = {}
clients = self.introducer_service.get_subscribers()
subscription_summary = dict([ (name, len(clients[name])) counts = {}
for name in clients ]) subscribers = self.introducer_service.get_subscribers()
res["subscription_summary"] = subscription_summary for (service_name, ign, ign, ign) in subscribers:
if service_name not in counts:
counts[service_name] = 0
counts[service_name] += 1
res["subscription_summary"] = counts
announcement_summary = {} announcement_summary = {}
service_hosts = {} service_hosts = {}
for (ann,when) in self.introducer_service.get_announcements().values(): for a in self.introducer_service.get_announcements().values():
(furl, service_name, ri_name, nickname, ver, oldest) = ann (_, _, ann, when) = a
service_name = ann["service-name"]
if service_name not in announcement_summary: if service_name not in announcement_summary:
announcement_summary[service_name] = 0 announcement_summary[service_name] = 0
announcement_summary[service_name] += 1 announcement_summary[service_name] += 1
@ -55,6 +60,7 @@ class IntroducerRoot(rend.Page):
# enough: when multiple services are run on a single host, # enough: when multiple services are run on a single host,
# they're usually either configured with the same addresses, # they're usually either configured with the same addresses,
# or setLocationAutomatically picks up the same interfaces. # or setLocationAutomatically picks up the same interfaces.
furl = ann["anonymous-storage-FURL"]
locations = SturdyRef(furl).getTubRef().getLocations() locations = SturdyRef(furl).getTubRef().getLocations()
# list of tuples, ("ipv4", host, port) # list of tuples, ("ipv4", host, port)
host = frozenset([hint[1] host = frozenset([hint[1]
@ -79,8 +85,9 @@ class IntroducerRoot(rend.Page):
def render_announcement_summary(self, ctx, data): def render_announcement_summary(self, ctx, data):
services = {} services = {}
for (ann,when) in self.introducer_service.get_announcements().values(): for a in self.introducer_service.get_announcements().values():
(furl, service_name, ri_name, nickname, ver, oldest) = ann (_, _, ann, when) = a
service_name = ann["service-name"]
if service_name not in services: if service_name not in services:
services[service_name] = 0 services[service_name] = 0
services[service_name] += 1 services[service_name] += 1
@ -90,65 +97,52 @@ class IntroducerRoot(rend.Page):
for service_name in service_names]) for service_name in service_names])
def render_client_summary(self, ctx, data): def render_client_summary(self, ctx, data):
counts = {}
clients = self.introducer_service.get_subscribers() clients = self.introducer_service.get_subscribers()
service_names = clients.keys() for (service_name, ign, ign, ign) in clients:
service_names.sort() if service_name not in counts:
return ", ".join(["%s: %d" % (service_name, len(clients[service_name])) counts[service_name] = 0
for service_name in service_names]) counts[service_name] += 1
return ", ".join([ "%s: %d" % (name, counts[name])
for name in sorted(counts.keys()) ] )
def data_services(self, ctx, data): def data_services(self, ctx, data):
introsvc = self.introducer_service introsvc = self.introducer_service
ann = [(since,a) services = []
for (a,since) in introsvc.get_announcements().values() for a in introsvc.get_announcements().values():
if a[1] != "stub_client"] (_, _, ann, when) = a
ann.sort(lambda a,b: cmp( (a[1][1], a), (b[1][1], b) ) ) if ann["service-name"] == "stub_client":
return ann continue
services.append( (when, ann) )
services.sort(key=lambda x: (x[1]["service-name"], x[1]["nickname"]))
# this used to be:
#services.sort(lambda a,b: cmp( (a[1][1], a), (b[1][1], b) ) )
# service_name was the primary key, then the whole tuple (starting
# with the furl) was the secondary key
return services
def render_service_row(self, ctx, (since,announcement)): def render_service_row(self, ctx, (since,ann)):
(furl, service_name, ri_name, nickname, ver, oldest) = announcement sr = SturdyRef(ann["anonymous-storage-FURL"])
sr = SturdyRef(furl)
nodeid = sr.tubID nodeid = sr.tubID
advertised = self.show_location_hints(sr) advertised = self.show_location_hints(sr)
ctx.fillSlots("peerid", nodeid) ctx.fillSlots("peerid", nodeid)
ctx.fillSlots("nickname", nickname) ctx.fillSlots("nickname", ann["nickname"])
ctx.fillSlots("advertised", " ".join(advertised)) ctx.fillSlots("advertised", " ".join(advertised))
ctx.fillSlots("connected", "?") ctx.fillSlots("connected", "?")
TIME_FORMAT = "%H:%M:%S %d-%b-%Y" TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
ctx.fillSlots("announced", ctx.fillSlots("announced",
time.strftime(TIME_FORMAT, time.localtime(since))) time.strftime(TIME_FORMAT, time.localtime(since)))
ctx.fillSlots("version", ver) ctx.fillSlots("version", ann["my-version"])
ctx.fillSlots("service_name", service_name) ctx.fillSlots("service_name", ann["service-name"])
return ctx.tag return ctx.tag
def data_subscribers(self, ctx, data): def data_subscribers(self, ctx, data):
# use the "stub_client" announcements to get information per nodeid return self.introducer_service.get_subscribers()
clients = {}
for (ann,when) in self.introducer_service.get_announcements().values():
if ann[1] != "stub_client":
continue
(furl, service_name, ri_name, nickname, ver, oldest) = ann
sr = SturdyRef(furl)
nodeid = sr.tubID
clients[nodeid] = ann
# then we actually provide information per subscriber
s = []
introsvc = self.introducer_service
for service_name, subscribers in introsvc.get_subscribers().items():
for (rref, timestamp) in subscribers.items():
sr = rref.getSturdyRef()
nodeid = sr.tubID
ann = clients.get(nodeid)
s.append( (service_name, rref, timestamp, ann) )
s.sort()
return s
def render_subscriber_row(self, ctx, s): def render_subscriber_row(self, ctx, s):
(service_name, rref, since, ann) = s (service_name, since, info, rref) = s
nickname = "?" nickname = info.get("nickname", "?")
version = "?" version = info.get("my-version", "?")
if ann:
(furl, service_name_2, ri_name, nickname, version, oldest) = ann
sr = rref.getSturdyRef() sr = rref.getSturdyRef()
# if the subscriber didn't do Tub.setLocation, nodeid will be None # if the subscriber didn't do Tub.setLocation, nodeid will be None