big rework of introducer client: change local API, split division of responsibilites better, remove old-code testing, improve error logging

This commit is contained in:
Brian Warner 2009-06-22 19:10:47 -07:00
parent 546266c806
commit 8df15e9f30
22 changed files with 714 additions and 490 deletions

View File

@ -6,7 +6,6 @@ from zope.interface import implements
from twisted.internet import reactor
from twisted.application.internet import TimerService
from foolscap.api import Referenceable
from foolscap.logging import log
from pycryptopp.publickey import rsa
import allmydata
@ -18,7 +17,7 @@ from allmydata.immutable.filenode import FileNode, LiteralFileNode
from allmydata.immutable.offloaded import Helper
from allmydata.control import ControlServer
from allmydata.introducer.client import IntroducerClient
from allmydata.util import hashutil, base32, pollmixin, cachedir
from allmydata.util import hashutil, base32, pollmixin, cachedir, log
from allmydata.util.abbreviate import parse_abbreviated_size
from allmydata.util.time_format import parse_duration, parse_date
from allmydata.uri import LiteralFileURI
@ -128,8 +127,6 @@ class Client(node.Node, pollmixin.PollMixin):
d = self.when_tub_ready()
def _start_introducer_client(res):
ic.setServiceParent(self)
# nodes that want to upload and download will need storage servers
ic.subscribe_to("storage")
d.addCallback(_start_introducer_client)
d.addErrback(log.err, facility="tahoe.init",
level=log.BAD, umid="URyI5w")
@ -235,9 +232,11 @@ class Client(node.Node, pollmixin.PollMixin):
def init_client_storage_broker(self):
# create a StorageFarmBroker object, for use by Uploader/Downloader
# (and everybody else who wants to use storage servers)
self.storage_broker = sb = storage_client.StorageFarmBroker()
sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
self.storage_broker = sb
# load static server specifications from tahoe.cfg, if any
# load static server specifications from tahoe.cfg, if any.
# Not quite ready yet.
#if self.config.has_section("client-server-selection"):
# server_params = {} # maps serverid to dict of parameters
# for (name, value) in self.config.items("client-server-selection"):
@ -390,8 +389,7 @@ class Client(node.Node, pollmixin.PollMixin):
temporary test network and need to know when it is safe to proceed
with an upload or download."""
def _check():
current_clients = list(self.storage_broker.get_all_serverids())
return len(current_clients) >= num_clients
return len(self.storage_broker.get_all_servers()) >= num_clients
d = self.poll(_check, 0.5)
d.addCallback(lambda res: None)
return d

View File

@ -70,10 +70,10 @@ class ControlServer(Referenceable, service.Service):
# phase to take more than 10 seconds. Expect worst-case latency to be
# 300ms.
results = {}
conns = self.parent.introducer_client.get_all_connections_for("storage")
everyone = [(peerid,rref) for (peerid, service_name, rref) in conns]
sb = self.parent.get_storage_broker()
everyone = sb.get_all_servers()
num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
everyone = everyone * num_pings
everyone = list(everyone) * num_pings
d = self._do_one_ping(None, everyone, results)
return d
def _do_one_ping(self, res, everyone_left, results):

View File

@ -8,9 +8,10 @@ from foolscap.api import DeadReferenceError, RemoteException, eventually
from allmydata.util import base32, deferredutil, hashutil, log, mathutil, idlib
from allmydata.util.assertutil import _assert, precondition
from allmydata import codec, hashtree, uri
from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, IVerifierURI, \
from allmydata.interfaces import IDownloadTarget, IDownloader, \
IFileURI, IVerifierURI, \
IDownloadStatus, IDownloadResults, IValidatedThingProxy, \
IStorageBroker, NotEnoughSharesError, \
IStorageBroker, NotEnoughSharesError, NoServersError, \
UnableToFetchCriticalDownloadDataError
from allmydata.immutable import layout
from allmydata.monitor import Monitor
@ -747,7 +748,10 @@ class CiphertextDownloader(log.PrefixingLogMixin):
def _get_all_shareholders(self):
dl = []
sb = self._storage_broker
for (peerid,ss) in sb.get_servers_for_index(self._storage_index):
servers = sb.get_servers_for_index(self._storage_index)
if not servers:
raise NoServersError("broker gave us no servers!")
for (peerid,ss) in servers:
self.log(format="sending DYHB to [%(peerid)s]",
peerid=idlib.shortnodeid_b2a(peerid),
level=log.NOISY, umid="rT03hg")

View File

@ -360,13 +360,56 @@ class IStorageBroker(Interface):
"""
def get_all_serverids():
"""
@return: iterator of serverid strings
@return: frozenset of serverid strings
"""
def get_nickname_for_serverid(serverid):
"""
@return: unicode nickname, or None
"""
# methods moved from IntroducerClient, need review
def get_all_connections():
"""Return a frozenset of (nodeid, service_name, rref) tuples, one for
each active connection we've established to a remote service. This is
mostly useful for unit tests that need to wait until a certain number
of connections have been made."""
def get_all_connectors():
"""Return a dict that maps from (nodeid, service_name) to a
RemoteServiceConnector instance for all services that we are actively
trying to connect to. Each RemoteServiceConnector has the following
public attributes::
service_name: the type of service provided, like 'storage'
announcement_time: when we first heard about this service
last_connect_time: when we last established a connection
last_loss_time: when we last lost a connection
version: the peer's version, from the most recent connection
oldest_supported: the peer's oldest supported version, same
rref: the RemoteReference, if connected, otherwise None
remote_host: the IAddress, if connected, otherwise None
This method is intended for monitoring interfaces, such as a web page
which describes connecting and connected peers.
"""
def get_all_peerids():
"""Return a frozenset of all peerids to whom we have a connection (to
one or more services) established. Mostly useful for unit tests."""
def get_all_connections_for(service_name):
"""Return a frozenset of (nodeid, service_name, rref) tuples, one
for each active connection that provides the given SERVICE_NAME."""
def get_permuted_peers(service_name, key):
"""Returns an ordered list of (peerid, rref) tuples, selecting from
the connections that provide SERVICE_NAME, using a hash-based
permutation keyed by KEY. This randomizes the service list in a
repeatable way, to distribute load over many peers.
"""
# hm, we need a solution for forward references in schemas
FileNode_ = Any() # TODO: foolscap needs constraints on copyables

View File

@ -1,108 +1,13 @@
import re, time, sha
from base64 import b32decode
from zope.interface import implements
from twisted.application import service
from foolscap.api import Referenceable
from foolscap.api import Referenceable, SturdyRef, eventually
from allmydata.interfaces import InsufficientVersionError
from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
IIntroducerClient
from allmydata.util import log, idlib
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.introducer.common import make_index
class RemoteServiceConnector:
"""I hold information about a peer service that we want to connect to. If
we are connected, I hold the RemoteReference, the peer's address, and the
peer's version information. I remember information about when we were
last connected to the peer too, even if we aren't currently connected.
@ivar announcement_time: when we first heard about this service
@ivar last_connect_time: when we last established a connection
@ivar last_loss_time: when we last lost a connection
@ivar version: the peer's version, from the most recent announcement
@ivar oldest_supported: the peer's oldest supported version, same
@ivar nickname: the peer's self-reported nickname, same
@ivar rref: the RemoteReference, if connected, otherwise None
@ivar remote_host: the IAddress, if connected, otherwise None
"""
VERSION_DEFAULTS = {
"storage": { "http://allmydata.org/tahoe/protocols/storage/v1" :
{ "maximum-immutable-share-size": 2**32,
"tolerates-immutable-read-overrun": False,
"delete-mutable-shares-with-zero-length-writev": False,
},
"application-version": "unknown: no get_version()",
},
"stub_client": { },
}
def __init__(self, announcement, tub, ic):
self._tub = tub
self._announcement = announcement
self._ic = ic
(furl, service_name, ri_name, nickname, ver, oldest) = announcement
self._furl = furl
m = re.match(r'pb://(\w+)@', furl)
assert m
self._nodeid = b32decode(m.group(1).upper())
self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
self.service_name = service_name
self.log("attempting to connect to %s" % self._nodeid_s)
self.announcement_time = time.time()
self.last_loss_time = None
self.rref = None
self.remote_host = None
self.last_connect_time = None
self.version = ver
self.oldest_supported = oldest
self.nickname = nickname
def log(self, *args, **kwargs):
return self._ic.log(*args, **kwargs)
def startConnecting(self):
self._reconnector = self._tub.connectTo(self._furl, self._got_service)
def stopConnecting(self):
self._reconnector.stopConnecting()
def _got_service(self, rref):
self.log("got connection to %s, getting versions" % self._nodeid_s)
default = self.VERSION_DEFAULTS.get(self.service_name, {})
d = add_version_to_remote_reference(rref, default)
d.addCallback(self._got_versioned_service)
def _got_versioned_service(self, rref):
self.log("connected to %s, version %s" % (self._nodeid_s, rref.version))
self.last_connect_time = time.time()
self.remote_host = rref.tracker.broker.transport.getPeer()
self.rref = rref
self._ic.add_connection(self._nodeid, self.service_name, rref)
rref.notifyOnDisconnect(self._lost, rref)
def _lost(self, rref):
self.log("lost connection to %s" % self._nodeid_s)
self.last_loss_time = time.time()
self.rref = None
self.remote_host = None
self._ic.remove_connection(self._nodeid, self.service_name, rref)
def reset(self):
self._reconnector.reset()
from allmydata.util.rrefutil import add_version_to_remote_reference, trap_deadref
class IntroducerClient(service.Service, Referenceable):
@ -113,32 +18,40 @@ class IntroducerClient(service.Service, Referenceable):
self._tub = tub
self.introducer_furl = introducer_furl
self._nickname = nickname.encode("utf-8")
assert type(nickname) is unicode
self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
self._my_version = my_version
self._oldest_supported = oldest_supported
self._published_announcements = set()
self._publisher = None
self._connected = False
self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
self._subscribed_service_names = set()
self._subscriptions = set() # requests we've actually sent
self._received_announcements = set()
# TODO: this set will grow without bound, until the node is restarted
# we only accept one announcement per (peerid+service_name) pair.
# This insures that an upgraded host replace their previous
# announcement. It also means that each peer must have their own Tub
# (no sharing), which is slightly weird but consistent with the rest
# of the Tahoe codebase.
self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector
# self._connections is a set of (peerid, service_name, rref) tuples
self._connections = set()
# _current_announcements remembers one announcement per
# (servicename,serverid) pair. Anything that arrives with the same
# pair will displace the previous one. This stores unpacked
# announcement dictionaries, which can be compared for equality to
# distinguish re-announcement from updates. It also provides memory
# for clients who subscribe after startup.
self._current_announcements = {}
self.counter = 0 # incremented each time we change state, for tests
self.encoding_parameters = None
# hooks for unit tests
self._debug_counts = {
"inbound_message": 0,
"inbound_announcement": 0,
"wrong_service": 0,
"duplicate_announcement": 0,
"update": 0,
"new_announcement": 0,
"outbound_message": 0,
}
def startService(self):
service.Service.startService(self)
self._introducer_error = None
@ -170,7 +83,6 @@ class IntroducerClient(service.Service, Referenceable):
needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
if needed not in publisher.version:
raise InsufficientVersionError(needed, publisher.version)
self._connected = True
self._publisher = publisher
publisher.notifyOnDisconnect(self._disconnected)
self._maybe_publish()
@ -178,16 +90,9 @@ class IntroducerClient(service.Service, Referenceable):
def _disconnected(self):
self.log("bummer, we've lost our connection to the introducer")
self._connected = False
self._publisher = None
self._subscriptions.clear()
def stopService(self):
service.Service.stopService(self)
self._introducer_reconnector.stopConnecting()
for rsc in self._connectors.itervalues():
rsc.stopConnecting()
def log(self, *args, **kwargs):
if "facility" not in kwargs:
kwargs["facility"] = "tahoe.introducer"
@ -195,14 +100,19 @@ class IntroducerClient(service.Service, Referenceable):
def publish(self, furl, service_name, remoteinterface_name):
assert type(self._nickname_utf8) is str # we always send UTF-8
ann = (furl, service_name, remoteinterface_name,
self._nickname, self._my_version, self._oldest_supported)
self._nickname_utf8, self._my_version, self._oldest_supported)
self._published_announcements.add(ann)
self._maybe_publish()
def subscribe_to(self, service_name):
def subscribe_to(self, service_name, cb, *args, **kwargs):
self._local_subscribers.append( (service_name,cb,args,kwargs) )
self._subscribed_service_names.add(service_name)
self._maybe_subscribe()
for (servicename,nodeid),ann_d in self._current_announcements.items():
if servicename == service_name:
eventually(cb, nodeid, ann_d)
def _maybe_subscribe(self):
if not self._publisher:
@ -215,7 +125,9 @@ class IntroducerClient(service.Service, Referenceable):
# duplicate requests.
self._subscriptions.add(service_name)
d = self._publisher.callRemote("subscribe", self, service_name)
d.addErrback(log.err, facility="tahoe.introducer",
d.addErrback(trap_deadref)
d.addErrback(log.err, format="server errored during subscribe",
facility="tahoe.introducer",
level=log.WEIRD, umid="2uMScQ")
def _maybe_publish(self):
@ -224,100 +136,83 @@ class IntroducerClient(service.Service, Referenceable):
return
# this re-publishes everything. The Introducer ignores duplicates
for ann in self._published_announcements:
self._debug_counts["outbound_message"] += 1
d = self._publisher.callRemote("publish", ann)
d.addErrback(log.err, facility="tahoe.introducer",
d.addErrback(trap_deadref)
d.addErrback(log.err,
format="server errored during publish %(ann)s",
ann=ann, facility="tahoe.introducer",
level=log.WEIRD, umid="xs9pVQ")
def remote_announce(self, announcements):
self.log("received %d announcements" % len(announcements))
self._debug_counts["inbound_message"] += 1
for ann in announcements:
self.log("received %d announcements" % len(announcements))
(furl, service_name, ri_name, nickname, ver, oldest) = ann
if service_name not in self._subscribed_service_names:
self.log("announcement for a service we don't care about [%s]"
% (service_name,), level=log.UNUSUAL, umid="dIpGNA")
continue
if ann in self._received_announcements:
self.log("ignoring old announcement: %s" % (ann,),
level=log.NOISY)
continue
self.log("new announcement[%s]: %s" % (service_name, ann))
self._received_announcements.add(ann)
self._new_announcement(ann)
try:
self._process_announcement(ann)
except:
log.err(format="unable to process announcement %(ann)s",
ann=ann)
# Don't let a corrupt announcement prevent us from processing
# the remaining ones. Don't return an error to the server,
# since they'd just ignore it anyways.
pass
def _new_announcement(self, announcement):
# this will only be called for new announcements
index = make_index(announcement)
if index in self._connectors:
self.log("replacing earlier announcement", level=log.NOISY)
self._connectors[index].stopConnecting()
rsc = RemoteServiceConnector(announcement, self._tub, self)
self._connectors[index] = rsc
rsc.startConnecting()
def _process_announcement(self, ann):
self._debug_counts["inbound_announcement"] += 1
(furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
if service_name not in self._subscribed_service_names:
self.log("announcement for a service we don't care about [%s]"
% (service_name,), level=log.UNUSUAL, umid="dIpGNA")
self._debug_counts["wrong_service"] += 1
return
self.log("announcement for [%s]: %s" % (service_name, ann),
umid="BoKEag")
assert type(furl) is str
assert type(service_name) is str
assert type(ri_name) is str
assert type(nickname_utf8) is str
nickname = nickname_utf8.decode("utf-8")
assert type(nickname) is unicode
assert type(ver) is str
assert type(oldest) is str
def add_connection(self, nodeid, service_name, rref):
self._connections.add( (nodeid, service_name, rref) )
self.counter += 1
# when one connection is established, reset the timers on all others,
# to trigger a reconnection attempt in one second. This is intended
# to accelerate server connections when we've been offline for a
# while. The goal is to avoid hanging out for a long time with
# connections to only a subset of the servers, which would increase
# the chances that we'll put shares in weird places (and not update
# existing shares of mutable files). See #374 for more details.
for rsc in self._connectors.values():
rsc.reset()
nodeid = b32decode(SturdyRef(furl).tubID.upper())
nodeid_s = idlib.shortnodeid_b2a(nodeid)
def remove_connection(self, nodeid, service_name, rref):
self._connections.discard( (nodeid, service_name, rref) )
self.counter += 1
ann_d = { "version": 0,
"service-name": service_name,
"FURL": furl,
"nickname": nickname,
"app-versions": {}, # need #466 and v2 introducer
"my-version": ver,
"oldest-supported": oldest,
}
def get_all_connections(self):
return frozenset(self._connections)
index = (service_name, nodeid)
if self._current_announcements.get(index, None) == ann_d:
self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
service=service_name, nodeid=nodeid_s,
level=log.UNUSUAL, umid="B1MIdA")
self._debug_counts["duplicate_announcement"] += 1
return
if index in self._current_announcements:
self._debug_counts["update"] += 1
else:
self._debug_counts["new_announcement"] += 1
def get_all_connectors(self):
return self._connectors.copy()
self._current_announcements[index] = ann_d
# note: we never forget an index, but we might update its value
def get_all_peerids(self):
return frozenset([peerid
for (peerid, service_name, rref)
in self._connections])
def get_nickname_for_peerid(self, peerid):
for k in self._connectors:
(peerid0, svcname0) = k
if peerid0 == peerid:
rsc = self._connectors[k]
return rsc.nickname
return None
def get_all_connections_for(self, service_name):
return frozenset([c
for c in self._connections
if c[1] == service_name])
def get_peers(self, service_name):
"""Return a set of (peerid, versioned-rref) tuples."""
return frozenset([(peerid, r) for (peerid, servname, r) in self._connections if servname == service_name])
def get_permuted_peers(self, service_name, key):
"""Return an ordered list of (peerid, versioned-rref) tuples."""
servers = self.get_peers(service_name)
return sorted(servers, key=lambda x: sha.new(key+x[0]).digest())
for (service_name2,cb,args,kwargs) in self._local_subscribers:
if service_name2 == service_name:
eventually(cb, nodeid, ann_d, *args, **kwargs)
def remote_set_encoding_parameters(self, parameters):
self.encoding_parameters = parameters
def connected_to_introducer(self):
return self._connected
def debug_disconnect_from_peerid(self, victim_nodeid):
# for unit tests: locate and sever all connections to the given
# peerid.
for (nodeid, service_name, rref) in self._connections:
if nodeid == victim_nodeid:
rref.tracker.broker.transport.loseConnection()
return bool(self._publisher)

View File

@ -1,11 +0,0 @@
import re
from base64 import b32decode
def make_index(announcement):
(furl, service_name, ri_name, nickname, ver, oldest) = announcement
m = re.match(r'pb://(\w+)@', furl)
assert m
nodeid = b32decode(m.group(1).upper())
return (nodeid, service_name)

View File

@ -88,53 +88,33 @@ class IIntroducerClient(Interface):
parameter: this is supposed to be a globally-unique string that
identifies the RemoteInterface that is implemented."""
def subscribe_to(service_name):
def subscribe_to(service_name, callback, *args, **kwargs):
"""Call this if you will eventually want to use services with the
given SERVICE_NAME. This will prompt me to subscribe to announcements
of those services. You can pick up the announcements later by calling
get_all_connections_for() or get_permuted_peers().
"""
of those services. Your callback will be invoked with at least two
arguments: a serverid (binary string), and an announcement
dictionary, followed by any additional callback args/kwargs you give
me. I will run your callback for both new announcements and for
announcements that have changed, but you must be prepared to tolerate
duplicates.
def get_all_connections():
"""Return a frozenset of (nodeid, service_name, rref) tuples, one for
each active connection we've established to a remote service. This is
mostly useful for unit tests that need to wait until a certain number
of connections have been made."""
The announcement dictionary that I give you will have the following
keys:
def get_all_connectors():
"""Return a dict that maps from (nodeid, service_name) to a
RemoteServiceConnector instance for all services that we are actively
trying to connect to. Each RemoteServiceConnector has the following
public attributes::
version: 0
service-name: str('storage')
service_name: the type of service provided, like 'storage'
announcement_time: when we first heard about this service
last_connect_time: when we last established a connection
last_loss_time: when we last lost a connection
FURL: str(furl)
remoteinterface-name: str(ri_name)
nickname: unicode
app-versions: {}
my-version: str
oldest-supported: str
version: the peer's version, from the most recent connection
oldest_supported: the peer's oldest supported version, same
rref: the RemoteReference, if connected, otherwise None
remote_host: the IAddress, if connected, otherwise None
This method is intended for monitoring interfaces, such as a web page
which describes connecting and connected peers.
"""
def get_all_peerids():
"""Return a frozenset of all peerids to whom we have a connection (to
one or more services) established. Mostly useful for unit tests."""
def get_all_connections_for(service_name):
"""Return a frozenset of (nodeid, service_name, rref) tuples, one
for each active connection that provides the given SERVICE_NAME."""
def get_permuted_peers(service_name, key):
"""Returns an ordered list of (peerid, rref) tuples, selecting from
the connections that provide SERVICE_NAME, using a hash-based
permutation keyed by KEY. This randomizes the service list in a
repeatable way, to distribute load over many peers.
Note that app-version will be an empty dictionary until #466 is done
and both the introducer and the remote client have been upgraded. For
current (native) server types, the serverid will always be equal to
the binary form of the FURL's tubid.
"""
def connected_to_introducer():

View File

@ -11,7 +11,13 @@ from foolscap.api import Referenceable
from allmydata.util import log, idlib
from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
IIntroducerClient, RIIntroducerPublisherAndSubscriberService
from allmydata.introducer.common import make_index
def make_index(announcement):
(furl, service_name, ri_name, nickname, ver, oldest) = announcement
m = re.match(r'pb://(\w+)@', furl)
assert m
nodeid = b32decode(m.group(1).upper())
return (nodeid, service_name)
class RemoteServiceConnector:
"""I hold information about a peer service that we want to connect to. If

View File

@ -1,14 +1,14 @@
import time, os.path
from base64 import b32decode
from zope.interface import implements
from twisted.application import service
from foolscap.api import Referenceable
from foolscap.api import Referenceable, SturdyRef
import allmydata
from allmydata import node
from allmydata.util import log
from allmydata.util import log, rrefutil
from allmydata.introducer.interfaces import \
RIIntroducerPublisherAndSubscriberService
from allmydata.introducer.common import make_index
class IntroducerNode(node.Node):
PORTNUMFILE = "introducer.port"
@ -55,9 +55,15 @@ class IntroducerService(service.MultiService, Referenceable):
def __init__(self, basedir="."):
service.MultiService.__init__(self)
self.introducer_url = None
# 'index' is (tubid, service_name)
# 'index' is (service_name, tubid)
self._announcements = {} # dict of index -> (announcement, timestamp)
self._subscribers = {} # dict of (rref->timestamp) dicts
self._debug_counts = {"inbound_message": 0,
"inbound_duplicate": 0,
"inbound_update": 0,
"outbound_message": 0,
"outbound_announcements": 0,
"inbound_subscribe": 0}
def log(self, *args, **kwargs):
if "facility" not in kwargs:
@ -73,23 +79,46 @@ class IntroducerService(service.MultiService, Referenceable):
return self.VERSION
def remote_publish(self, announcement):
try:
self._publish(announcement)
except:
log.err(format="Introducer.remote_publish failed on %(ann)s",
ann=announcement, level=log.UNUSUAL, umid="620rWA")
raise
def _publish(self, announcement):
self._debug_counts["inbound_message"] += 1
self.log("introducer: announcement published: %s" % (announcement,) )
index = make_index(announcement)
(furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement
nodeid = b32decode(SturdyRef(furl).tubID.upper())
index = (service_name, nodeid)
if index in self._announcements:
(old_announcement, timestamp) = self._announcements[index]
if old_announcement == announcement:
self.log("but we already knew it, ignoring", level=log.NOISY)
self._debug_counts["inbound_duplicate"] += 1
return
else:
self.log("old announcement being updated", level=log.NOISY)
self._debug_counts["inbound_update"] += 1
self._announcements[index] = (announcement, time.time())
(furl, service_name, ri_name, nickname, ver, oldest) = announcement
for s in self._subscribers.get(service_name, []):
s.callRemote("announce", set([announcement]))
self._debug_counts["outbound_message"] += 1
self._debug_counts["outbound_announcements"] += 1
d = s.callRemote("announce", set([announcement]))
d.addErrback(rrefutil.trap_deadref)
d.addErrback(log.err,
format="subscriber errored on announcement %(ann)s",
ann=announcement, facility="tahoe.introducer",
level=log.UNUSUAL, umid="jfGMXQ")
def remote_subscribe(self, subscriber, service_name):
self.log("introducer: subscription[%s] request at %s" % (service_name,
subscriber))
self._debug_counts["inbound_subscribe"] += 1
if service_name not in self._subscribers:
self._subscribers[service_name] = {}
subscribers = self._subscribers[service_name]
@ -104,11 +133,16 @@ class IntroducerService(service.MultiService, Referenceable):
subscribers.pop(subscriber, None)
subscriber.notifyOnDisconnect(_remove)
announcements = set( [ ann
for idx,(ann,when) in self._announcements.items()
if idx[1] == service_name] )
announcements = set(
[ ann
for (sn2,nodeid),(ann,when) in self._announcements.items()
if sn2 == service_name] )
self._debug_counts["outbound_message"] += 1
self._debug_counts["outbound_announcements"] += len(announcements)
d = subscriber.callRemote("announce", announcements)
d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)
d.addErrback(rrefutil.trap_deadref)
d.addErrback(log.err,
format="subscriber errored during subscribe %(anns)s",
anns=announcements, facility="tahoe.introducer",
level=log.UNUSUAL, umid="mtZepQ")

View File

@ -62,6 +62,7 @@ class Node(service.MultiService):
nickname_utf8 = self.get_config("node", "nickname", "<unspecified>")
self.nickname = nickname_utf8.decode("utf-8")
assert type(self.nickname) is unicode
self.init_tempdir()
self.create_tub()

View File

@ -6,21 +6,50 @@ the foolscap-based server implemented in src/allmydata/storage/*.py .
# roadmap:
#
# implement ServerFarm, change Client to create it, change
# uploader/servermap to get rrefs from it. ServerFarm calls
# IntroducerClient.subscribe_to .
# 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
# create it, change uploader/servermap to get rrefs from it. ServerFarm calls
# IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
# to clients. webapi status pages call broker.get_info_about_serverid.
#
# implement NativeStorageClient, change Tahoe2PeerSelector to use it. All
# NativeStorageClients come from the introducer
# 2: move get_info methods to the descriptor, webapi status pages call
# broker.get_descriptor_for_serverid().get_info
#
# change web/check_results.py to get NativeStorageClients from check results,
# ask it for a nickname (instead of using client.get_nickname_for_serverid)
# 3?later?: store descriptors in UploadResults/etc instead of serverids,
# webapi status pages call descriptor.get_info and don't use storage_broker
# or Client
#
# implement tahoe.cfg scanner, create static NativeStorageClients
# 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
# optional. This closes #467
#
# 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
# clients. Clients stop doing callRemote(), use NativeStorageClient methods
# instead (which might do something else, i.e. http or whatever). The
# introducer and tahoe.cfg only create NativeStorageClients for now.
#
# 6: implement other sorts of IStorageClient classes: S3, etc
import sha
from zope.interface import implements
import sha, time
from zope.interface import implements, Interface
from foolscap.api import eventually
from allmydata.interfaces import IStorageBroker
from allmydata.util import idlib, log
from allmydata.util.rrefutil import add_version_to_remote_reference
# who is responsible for de-duplication?
# both?
# IC remembers the unpacked announcements it receives, to provide for late
# subscribers and to remove duplicates
# if a client subscribes after startup, will they receive old announcements?
# yes
# who will be responsible for signature checking?
# make it be IntroducerClient, so they can push the filter outwards and
# reduce inbound network traffic
# what should the interface between StorageFarmBroker and IntroducerClient
# look like?
# don't pass signatures: only pass validated blessed-objects
class StorageFarmBroker:
implements(IStorageBroker)
@ -30,16 +59,57 @@ class StorageFarmBroker:
I'm also responsible for subscribing to the IntroducerClient to find out
about new servers as they are announced by the Introducer.
"""
def __init__(self, permute_peers=True):
def __init__(self, tub, permute_peers):
self.tub = tub
assert permute_peers # False not implemented yet
self.servers = {} # serverid -> StorageClient instance
self.permute_peers = permute_peers
# self.descriptors maps serverid -> IServerDescriptor, and keeps
# track of all the storage servers that we've heard about. Each
# descriptor manages its own Reconnector, and will give us a
# RemoteReference when we ask them for it.
self.descriptors = {}
# self.servers are statically configured from unit tests
self.test_servers = {} # serverid -> rref
self.introducer_client = None
def add_server(self, serverid, s):
self.servers[serverid] = s
# these two are used in unit tests
def test_add_server(self, serverid, rref):
self.test_servers[serverid] = rref
def test_add_descriptor(self, serverid, dsc):
self.descriptors[serverid] = dsc
def use_introducer(self, introducer_client):
self.introducer_client = ic = introducer_client
ic.subscribe_to("storage")
ic.subscribe_to("storage", self._got_announcement)
def _got_announcement(self, serverid, ann_d):
assert ann_d["service-name"] == "storage"
old = self.descriptors.get(serverid)
if old:
if old.get_announcement() == ann_d:
return # duplicate
# replacement
del self.descriptors[serverid]
old.stop_connecting()
# now we forget about them and start using the new one
dsc = NativeStorageClientDescriptor(serverid, ann_d)
self.descriptors[serverid] = dsc
dsc.start_connecting(self.tub, self._trigger_connections)
# the descriptor will manage their own Reconnector, and each time we
# need servers, we'll ask them if they're connected or not.
def _trigger_connections(self):
# when one connection is established, reset the timers on all others,
# to trigger a reconnection attempt in one second. This is intended
# to accelerate server connections when we've been offline for a
# while. The goal is to avoid hanging out for a long time with
# connections to only a subset of the servers, which would increase
# the chances that we'll put shares in weird places (and not update
# existing shares of mutable files). See #374 for more details.
for dsc in self.descriptors.values():
dsc.try_to_connect()
def get_servers_for_index(self, peer_selection_index):
# first cut: return a list of (peerid, versioned-rref) tuples
@ -51,34 +121,141 @@ class StorageFarmBroker:
def get_all_servers(self):
# return a frozenset of (peerid, versioned-rref) tuples
servers = {}
for serverid,server in self.servers.items():
servers[serverid] = server
if self.introducer_client:
ic = self.introducer_client
for serverid,server in ic.get_peers("storage"):
servers[serverid] = server
for serverid,rref in self.test_servers.items():
servers[serverid] = rref
for serverid,dsc in self.descriptors.items():
rref = dsc.get_rref()
if rref:
servers[serverid] = rref
return frozenset(servers.items())
def get_all_serverids(self):
for serverid in self.servers:
yield serverid
if self.introducer_client:
for serverid,server in self.introducer_client.get_peers("storage"):
yield serverid
serverids = set()
serverids.update(self.test_servers.keys())
serverids.update(self.descriptors.keys())
return frozenset(serverids)
def get_all_descriptors(self):
return sorted(self.descriptors.values(),
key=lambda dsc: dsc.get_serverid())
def get_nickname_for_serverid(self, serverid):
if serverid in self.servers:
return self.servers[serverid].nickname
if self.introducer_client:
return self.introducer_client.get_nickname_for_peerid(serverid)
if serverid in self.descriptors:
return self.descriptors[serverid].get_nickname()
return None
class NativeStorageClient:
def __init__(self, serverid, furl, nickname, min_shares=1):
class IServerDescriptor(Interface):
def start_connecting(tub, trigger_cb):
pass
def get_nickname():
pass
def get_rref():
pass
class NativeStorageClientDescriptor:
"""I hold information about a storage server that we want to connect to.
If we are connected, I hold the RemoteReference, their host address, and
the their version information. I remember information about when we were
last connected too, even if we aren't currently connected.
@ivar announcement_time: when we first heard about this service
@ivar last_connect_time: when we last established a connection
@ivar last_loss_time: when we last lost a connection
@ivar version: the server's versiondict, from the most recent announcement
@ivar nickname: the server's self-reported nickname (unicode), same
@ivar rref: the RemoteReference, if connected, otherwise None
@ivar remote_host: the IAddress, if connected, otherwise None
"""
implements(IServerDescriptor)
VERSION_DEFAULTS = {
"http://allmydata.org/tahoe/protocols/storage/v1" :
{ "maximum-immutable-share-size": 2**32,
"tolerates-immutable-read-overrun": False,
"delete-mutable-shares-with-zero-length-writev": False,
},
"application-version": "unknown: no get_version()",
}
def __init__(self, serverid, ann_d, min_shares=1):
self.serverid = serverid
self.furl = furl
self.nickname = nickname
self.announcement = ann_d
self.min_shares = min_shares
self.serverid_s = idlib.shortnodeid_b2a(self.serverid)
self.announcement_time = time.time()
self.last_connect_time = None
self.last_loss_time = None
self.remote_host = None
self.rref = None
self._reconnector = None
self._trigger_cb = None
def get_serverid(self):
return self.serverid
def get_nickname(self):
return self.announcement["nickname"].decode("utf-8")
def get_announcement(self):
return self.announcement
def get_remote_host(self):
return self.remote_host
def get_last_connect_time(self):
return self.last_connect_time
def get_last_loss_time(self):
return self.last_loss_time
def get_announcement_time(self):
return self.announcement_time
def start_connecting(self, tub, trigger_cb):
furl = self.announcement["FURL"]
self._trigger_cb = trigger_cb
self._reconnector = tub.connectTo(furl, self._got_connection)
def _got_connection(self, rref):
lp = log.msg(format="got connection to %(serverid)s, getting versions",
serverid=self.serverid_s,
facility="tahoe.storage_broker", umid="coUECQ")
if self._trigger_cb:
eventually(self._trigger_cb)
default = self.VERSION_DEFAULTS
d = add_version_to_remote_reference(rref, default)
d.addCallback(self._got_versioned_service, lp)
d.addErrback(log.err, format="storageclient._got_connection",
serverid=self.serverid_s, umid="Sdq3pg")
def _got_versioned_service(self, rref, lp):
log.msg(format="%(serverid)s provided version info %(version)s",
serverid=self.serverid_s, version=rref.version,
facility="tahoe.storage_broker", umid="SWmJYg",
level=log.NOISY, parent=lp)
self.last_connect_time = time.time()
self.remote_host = rref.getPeer()
self.rref = rref
rref.notifyOnDisconnect(self._lost)
def get_rref(self):
return self.rref
def _lost(self):
log.msg(format="lost connection to %(serverid)s",
serverid=self.serverid_s,
facility="tahoe.storage_broker", umid="zbRllw")
self.last_loss_time = time.time()
self.rref = None
self.remote_host = None
def stop_connecting(self):
# used when this descriptor has been superceded by another
self._reconnector.stopConnecting()
def try_to_connect(self):
# used when the broker wants us to hurry up
self._reconnector.reset()
class UnknownServerTypeError(Exception):
pass

View File

@ -533,10 +533,10 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
def _check_connections(self):
for c in self.clients:
ic = c.introducer_client
if not ic.connected_to_introducer():
if not c.connected_to_introducer():
return False
if len(ic.get_all_peerids()) != self.numclients:
sb = c.get_storage_broker()
if len(sb.get_all_servers()) != self.numclients:
return False
return True

View File

@ -25,7 +25,6 @@ from allmydata import uri as tahoe_uri
from allmydata.client import Client
from allmydata.storage.server import StorageServer, storage_index_to_dir
from allmydata.util import fileutil, idlib, hashutil
from allmydata.introducer.client import RemoteServiceConnector
from allmydata.test.common_web import HTTPClientGETFactory
from allmydata.interfaces import IStorageBroker
@ -93,17 +92,13 @@ class LocalWrapper:
def dontNotifyOnDisconnect(self, marker):
del self.disconnectors[marker]
def wrap(original, service_name):
def wrap_storage_server(original):
# Much of the upload/download code uses rref.version (which normally
# comes from rrefutil.add_version_to_remote_reference). To avoid using a
# network, we want a LocalWrapper here. Try to satisfy all these
# constraints at the same time.
wrapper = LocalWrapper(original)
try:
version = original.remote_get_version()
except AttributeError:
version = RemoteServiceConnector.VERSION_DEFAULTS[service_name]
wrapper.version = version
wrapper.version = original.remote_get_version()
return wrapper
class NoNetworkStorageBroker:
@ -220,7 +215,7 @@ class NoNetworkGrid(service.MultiService):
ss.setServiceParent(middleman)
serverid = ss.my_nodeid
self.servers_by_number[i] = ss
self.servers_by_id[serverid] = wrap(ss, "storage")
self.servers_by_id[serverid] = wrap_storage_server(ss)
self.all_servers = frozenset(self.servers_by_id.items())
for c in self.clients:
c._servers = self.all_servers

View File

@ -3,7 +3,7 @@ import simplejson
from twisted.trial import unittest
from allmydata import check_results, uri
from allmydata.web import check_results as web_check_results
from allmydata.storage_client import StorageFarmBroker, NativeStorageClient
from allmydata.storage_client import StorageFarmBroker, NativeStorageClientDescriptor
from common_web import WebRenderingMixin
class FakeClient:
@ -13,12 +13,20 @@ class FakeClient:
class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
def create_fake_client(self):
sb = StorageFarmBroker()
sb = StorageFarmBroker(None, True)
for (peerid, nickname) in [("\x00"*20, "peer-0"),
("\xff"*20, "peer-f"),
("\x11"*20, "peer-11")] :
n = NativeStorageClient(peerid, None, nickname)
sb.add_server(peerid, n)
ann_d = { "version": 0,
"service-name": "storage",
"FURL": "fake furl",
"nickname": unicode(nickname),
"app-versions": {}, # need #466 and v2 introducer
"my-version": "ver",
"oldest-supported": "oldest",
}
dsc = NativeStorageClientDescriptor(peerid, ann_d)
sb.test_add_descriptor(peerid, dsc)
c = FakeClient()
c.storage_broker = sb
return c

View File

@ -146,13 +146,13 @@ class Basic(unittest.TestCase):
for (peerid,rref) in sb.get_servers_for_index(key) ]
def test_permute(self):
sb = StorageFarmBroker()
sb = StorageFarmBroker(None, True)
for k in ["%d" % i for i in range(5)]:
sb.add_server(k, None)
sb.test_add_server(k, None)
self.failUnlessEqual(self._permute(sb, "one"), ['3','1','0','4','2'])
self.failUnlessEqual(self._permute(sb, "two"), ['0','4','2','1','3'])
sb.servers = {}
sb.test_servers.clear()
self.failUnlessEqual(self._permute(sb, "one"), [])
def test_versions(self):

View File

@ -63,7 +63,7 @@ class FakeClient(service.MultiService):
"max_segment_size": 1*MiB,
}
stats_provider = None
storage_broker = StorageFarmBroker()
storage_broker = StorageFarmBroker(None, True)
def log(self, *args, **kwargs):
return log.msg(*args, **kwargs)
def get_encoding_parameters(self):

View File

@ -11,16 +11,12 @@ from twisted.application import service
from allmydata.interfaces import InsufficientVersionError
from allmydata.introducer.client import IntroducerClient
from allmydata.introducer.server import IntroducerService
from allmydata.introducer.common import make_index
# test compatibility with old introducer .tac files
from allmydata.introducer import IntroducerNode
from allmydata.introducer import old
from allmydata.util import idlib, pollmixin
from allmydata.util import pollmixin
import common_util as testutil
class FakeNode(Referenceable):
pass
class LoggingMultiService(service.MultiService):
def log(self, msg, **kw):
log.msg(msg, **kw)
@ -51,7 +47,7 @@ class ServiceMixin:
class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
def test_create(self):
ic = IntroducerClient(None, "introducer.furl", "my_nickname",
ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
"my_version", "oldest_version")
def test_listen(self):
@ -79,33 +75,35 @@ class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
def setUp(self):
ServiceMixin.setUp(self)
self.central_tub = tub = Tub()
def create_tub(self, portnum=0):
tubfile = os.path.join(self.basedir, "tub.pem")
self.central_tub = tub = Tub(certFile=tubfile)
#tub.setOption("logLocalFailures", True)
#tub.setOption("logRemoteFailures", True)
tub.setOption("expose-remote-exception-types", False)
tub.setServiceParent(self.parent)
l = tub.listenOn("tcp:0")
portnum = l.getPortnum()
tub.setLocation("localhost:%d" % portnum)
l = tub.listenOn("tcp:%d" % portnum)
self.central_portnum = l.getPortnum()
if portnum != 0:
assert self.central_portnum == portnum
tub.setLocation("localhost:%d" % self.central_portnum)
class SystemTest(SystemTestMixin, unittest.TestCase):
def test_system(self):
i = IntroducerService()
i.setServiceParent(self.parent)
self.introducer_furl = self.central_tub.registerReference(i)
return self.do_system_test()
self.basedir = "introducer/SystemTest/system"
os.makedirs(self.basedir)
return self.do_system_test(IntroducerService)
test_system.timeout = 480 # occasionally takes longer than 350s on "draco"
def test_system_oldserver(self):
i = old.IntroducerService_V1()
i.setServiceParent(self.parent)
self.introducer_furl = self.central_tub.registerReference(i)
return self.do_system_test()
def do_system_test(self):
def do_system_test(self, create_introducer):
self.create_tub()
introducer = create_introducer()
introducer.setServiceParent(self.parent)
iff = os.path.join(self.basedir, "introducer.furl")
tub = self.central_tub
ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
self.introducer_furl = ifurl
NUMCLIENTS = 5
# we have 5 clients who publish themselves, and an extra one does
@ -114,6 +112,11 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
clients = []
tubs = {}
received_announcements = {}
NUM_SERVERS = NUMCLIENTS
subscribing_clients = []
publishing_clients = []
for i in range(NUMCLIENTS+1):
tub = Tub()
#tub.setOption("logLocalFailures", True)
@ -124,101 +127,75 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
portnum = l.getPortnum()
tub.setLocation("localhost:%d" % portnum)
n = FakeNode()
log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
client_class = IntroducerClient
if i == 0:
client_class = old.IntroducerClient_V1
c = client_class(tub, self.introducer_furl,
"nickname-%d" % i, "version", "oldest")
if i < NUMCLIENTS:
node_furl = tub.registerReference(n)
c.publish(node_furl, "storage", "ri_name")
# the last one does not publish anything
c = IntroducerClient(tub, self.introducer_furl, u"nickname-%d" % i,
"version", "oldest")
received_announcements[c] = ra = {}
def got(serverid, ann_d, announcements):
announcements[serverid] = ann_d
c.subscribe_to("storage", got, received_announcements[c])
subscribing_clients.append(c)
c.subscribe_to("storage")
if i < NUMCLIENTS:
node_furl = tub.registerReference(Referenceable())
c.publish(node_furl, "storage", "ri_name")
publishing_clients.append(c)
# the last one does not publish anything
c.setServiceParent(self.parent)
clients.append(c)
tubs[c] = tub
def _wait_for_all_connections():
for c in clients:
if len(c.get_all_connections()) < NUMCLIENTS:
for c in subscribing_clients:
if len(received_announcements[c]) < NUM_SERVERS:
return False
return True
d = self.poll(_wait_for_all_connections)
def _check1(res):
log.msg("doing _check1")
dc = introducer._debug_counts
self.failUnlessEqual(dc["inbound_message"], NUM_SERVERS)
self.failUnlessEqual(dc["inbound_duplicate"], 0)
self.failUnlessEqual(dc["inbound_update"], 0)
self.failUnless(dc["outbound_message"])
for c in clients:
self.failUnless(c.connected_to_introducer())
self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS)
self.failUnlessEqual(len(c.get_all_connections_for("storage")),
NUMCLIENTS)
for c in subscribing_clients:
cdc = c._debug_counts
self.failUnless(cdc["inbound_message"])
self.failUnlessEqual(cdc["inbound_announcement"],
NUM_SERVERS)
self.failUnlessEqual(cdc["wrong_service"], 0)
self.failUnlessEqual(cdc["duplicate_announcement"], 0)
self.failUnlessEqual(cdc["update"], 0)
self.failUnlessEqual(cdc["new_announcement"],
NUM_SERVERS)
anns = received_announcements[c]
self.failUnlessEqual(len(anns), NUM_SERVERS)
nodeid0 = b32decode(tubs[clients[0]].tubID.upper())
self.failUnlessEqual(c.get_nickname_for_peerid(nodeid0),
"nickname-0")
ann_d = anns[nodeid0]
nick = ann_d["nickname"]
self.failUnlessEqual(type(nick), unicode)
self.failUnlessEqual(nick, u"nickname-0")
for c in publishing_clients:
cdc = c._debug_counts
self.failUnlessEqual(cdc["outbound_message"], 1)
d.addCallback(_check1)
origin_c = clients[0]
def _disconnect_somebody_else(res):
# now disconnect somebody's connection to someone else
current_counter = origin_c.counter
victim_nodeid = b32decode(tubs[clients[1]].tubID.upper())
log.msg(" disconnecting %s->%s" %
(tubs[origin_c].tubID,
idlib.shortnodeid_b2a(victim_nodeid)))
origin_c.debug_disconnect_from_peerid(victim_nodeid)
log.msg(" did disconnect")
# force an introducer reconnect, by shutting down the Tub it's using
# and starting a new Tub (with the old introducer). Everybody should
# reconnect and republish, but the introducer should ignore the
# republishes as duplicates. However, because the server doesn't know
# what each client does and does not know, it will send them a copy
# of the current announcement table anyway.
# then wait until something changes, which ought to be them
# noticing the loss
def _compare():
return current_counter != origin_c.counter
return self.poll(_compare)
d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
d.addCallback(_disconnect_somebody_else)
# and wait for them to reconnect
d.addCallback(lambda res: self.poll(_wait_for_all_connections))
def _check2(res):
log.msg("doing _check2")
for c in clients:
self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
d.addCallback(_check2)
def _disconnect_yourself(res):
# now disconnect somebody's connection to themselves.
current_counter = origin_c.counter
victim_nodeid = b32decode(tubs[clients[0]].tubID.upper())
log.msg(" disconnecting %s->%s" %
(tubs[origin_c].tubID,
idlib.shortnodeid_b2a(victim_nodeid)))
origin_c.debug_disconnect_from_peerid(victim_nodeid)
log.msg(" did disconnect from self")
def _compare():
return current_counter != origin_c.counter
return self.poll(_compare)
d.addCallback(_disconnect_yourself)
d.addCallback(lambda res: self.poll(_wait_for_all_connections))
def _check3(res):
log.msg("doing _check3")
for c in clients:
self.failUnlessEqual(len(c.get_all_connections_for("storage")),
NUMCLIENTS)
d.addCallback(_check3)
def _shutdown_introducer(res):
# now shut down the introducer. We do this by shutting down the
# tub it's using. Nobody's connections (to each other) should go
# down. All clients should notice the loss, and no other errors
# should occur.
log.msg("shutting down the introducer")
return self.central_tub.disownServiceParent()
d.addCallback(_shutdown_introducer)
def _wait_for_introducer_loss():
for c in clients:
if c.connected_to_introducer():
@ -226,13 +203,134 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
return True
d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
def _check4(res):
log.msg("doing _check4")
def _restart_introducer_tub(_ign):
log.msg("restarting introducer's Tub")
# note: old.Server doesn't have this count
dc = introducer._debug_counts
self.expected_count = dc["inbound_message"] + NUM_SERVERS
self.expected_subscribe_count = dc["inbound_subscribe"] + NUMCLIENTS+1
introducer._debug0 = dc["outbound_message"]
for c in subscribing_clients:
cdc = c._debug_counts
c._debug0 = cdc["inbound_message"]
self.create_tub(self.central_portnum)
newfurl = self.central_tub.registerReference(introducer,
furlFile=iff)
assert newfurl == self.introducer_furl
d.addCallback(_restart_introducer_tub)
def _wait_for_introducer_reconnect():
# wait until:
# all clients are connected
# the introducer has received publish messages from all of them
# the introducer has received subscribe messages from all of them
# the introducer has sent (duplicate) announcements to all of them
# all clients have received (duplicate) announcements
dc = introducer._debug_counts
for c in clients:
self.failUnlessEqual(len(c.get_all_connections_for("storage")),
NUMCLIENTS)
self.failIf(c.connected_to_introducer())
d.addCallback(_check4)
if not c.connected_to_introducer():
return False
if dc["inbound_message"] < self.expected_count:
return False
if dc["inbound_subscribe"] < self.expected_subscribe_count:
return False
for c in subscribing_clients:
cdc = c._debug_counts
if cdc["inbound_message"] < c._debug0+1:
return False
return True
d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect))
def _check2(res):
log.msg("doing _check2")
# assert that the introducer sent out new messages, one per
# subscriber
dc = introducer._debug_counts
self.failUnlessEqual(dc["inbound_message"], 2*NUM_SERVERS)
self.failUnlessEqual(dc["inbound_duplicate"], NUM_SERVERS)
self.failUnlessEqual(dc["inbound_update"], 0)
self.failUnlessEqual(dc["outbound_message"],
introducer._debug0 + len(subscribing_clients))
for c in clients:
self.failUnless(c.connected_to_introducer())
for c in subscribing_clients:
cdc = c._debug_counts
self.failUnlessEqual(cdc["duplicate_announcement"], NUM_SERVERS)
d.addCallback(_check2)
# Then force an introducer restart, by shutting down the Tub,
# destroying the old introducer, and starting a new Tub+Introducer.
# Everybody should reconnect and republish, and the (new) introducer
# will distribute the new announcements, but the clients should
# ignore the republishes as duplicates.
d.addCallback(lambda _ign: log.msg("shutting down introducer"))
d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
def _restart_introducer(_ign):
log.msg("restarting introducer")
self.create_tub(self.central_portnum)
for c in subscribing_clients:
# record some counters for later comparison. Stash the values
# on the client itself, because I'm lazy.
cdc = c._debug_counts
c._debug1 = cdc["inbound_announcement"]
c._debug2 = cdc["inbound_message"]
c._debug3 = cdc["new_announcement"]
newintroducer = create_introducer()
self.expected_message_count = NUM_SERVERS
self.expected_announcement_count = NUM_SERVERS*len(subscribing_clients)
self.expected_subscribe_count = len(subscribing_clients)
newfurl = self.central_tub.registerReference(newintroducer,
furlFile=iff)
assert newfurl == self.introducer_furl
d.addCallback(_restart_introducer)
def _wait_for_introducer_reconnect2():
# wait until:
# all clients are connected
# the introducer has received publish messages from all of them
# the introducer has received subscribe messages from all of them
# the introducer has sent announcements for everybody to everybody
# all clients have received all the (duplicate) announcements
# at that point, the system should be quiescent
dc = introducer._debug_counts
for c in clients:
if not c.connected_to_introducer():
return False
if dc["inbound_message"] < self.expected_message_count:
return False
if dc["outbound_announcements"] < self.expected_announcement_count:
return False
if dc["inbound_subscribe"] < self.expected_subscribe_count:
return False
for c in subscribing_clients:
cdc = c._debug_counts
if cdc["inbound_announcement"] < c._debug1+NUM_SERVERS:
return False
return True
d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect2))
def _check3(res):
log.msg("doing _check3")
for c in clients:
self.failUnless(c.connected_to_introducer())
for c in subscribing_clients:
cdc = c._debug_counts
self.failUnless(cdc["inbound_announcement"] > c._debug1)
self.failUnless(cdc["inbound_message"] > c._debug2)
# there should have been no new announcements
self.failUnlessEqual(cdc["new_announcement"], c._debug3)
# and the right number of duplicate ones. There were
# NUM_SERVERS from the servertub restart, and there should be
# another NUM_SERVERS now
self.failUnlessEqual(cdc["duplicate_announcement"],
2*NUM_SERVERS)
d.addCallback(_check3)
return d
class TooNewServer(IntroducerService):
@ -247,6 +345,9 @@ class NonV1Server(SystemTestMixin, unittest.TestCase):
# exception.
def test_failure(self):
self.basedir = "introducer/NonV1Server/failure"
os.makedirs(self.basedir)
self.create_tub()
i = TooNewServer()
i.setServiceParent(self.parent)
self.introducer_furl = self.central_tub.registerReference(i)
@ -258,10 +359,12 @@ class NonV1Server(SystemTestMixin, unittest.TestCase):
portnum = l.getPortnum()
tub.setLocation("localhost:%d" % portnum)
n = FakeNode()
c = IntroducerClient(tub, self.introducer_furl,
"nickname-client", "version", "oldest")
c.subscribe_to("storage")
u"nickname-client", "version", "oldest")
announcements = {}
def got(serverid, ann_d):
announcements[serverid] = ann_d
c.subscribe_to("storage", got)
c.setServiceParent(self.parent)
@ -283,7 +386,7 @@ class Index(unittest.TestCase):
ann = ('pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i',
'storage', 'RIStorageServer.tahoe.allmydata.com',
'plancha', 'allmydata-tahoe/1.4.1', '1.0.0')
(nodeid, service_name) = make_index(ann)
(nodeid, service_name) = old.make_index(ann)
self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
self.failUnlessEqual(service_name, "storage")

View File

@ -174,19 +174,19 @@ class FakeClient:
peerids = [tagged_hash("peerid", "%d" % i)[:20]
for i in range(self._num_peers)]
self.nodeid = "fakenodeid"
self.storage_broker = StorageFarmBroker()
self.storage_broker = StorageFarmBroker(None, True)
for peerid in peerids:
fss = FakeStorageServer(peerid, self._storage)
self.storage_broker.add_server(peerid, fss)
self.storage_broker.test_add_server(peerid, fss)
def get_storage_broker(self):
return self.storage_broker
def debug_break_connection(self, peerid):
self.storage_broker.servers[peerid].broken = True
self.storage_broker.test_servers[peerid].broken = True
def debug_remove_connection(self, peerid):
self.storage_broker.servers.pop(peerid)
self.storage_broker.test_servers.pop(peerid)
def debug_get_connection(self, peerid):
return self.storage_broker.servers[peerid]
return self.storage_broker.test_servers[peerid]
def get_encoding_parameters(self):
return {"k": 3, "n": 10}
@ -1569,7 +1569,7 @@ class MultipleEncodings(unittest.TestCase):
sharemap = {}
sb = self._client.get_storage_broker()
for i,peerid in enumerate(sb.get_all_serverids()):
for peerid in sorted(sb.get_all_serverids()):
peerid_s = shortnodeid_b2a(peerid)
for shnum in self._shares1.get(peerid, {}):
if shnum < len(places):
@ -1794,13 +1794,13 @@ class LessFakeClient(FakeClient):
self._num_peers = num_peers
peerids = [tagged_hash("peerid", "%d" % i)[:20]
for i in range(self._num_peers)]
self.storage_broker = StorageFarmBroker()
self.storage_broker = StorageFarmBroker(None, True)
for peerid in peerids:
peerdir = os.path.join(basedir, idlib.shortnodeid_b2a(peerid))
make_dirs(peerdir)
ss = StorageServer(peerdir, peerid)
lw = LocalWrapper(ss)
self.storage_broker.add_server(peerid, lw)
self.storage_broker.test_add_server(peerid, lw)
self.nodeid = "fakenodeid"

View File

@ -73,10 +73,10 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
def _check(extra_node):
self.extra_node = extra_node
for c in self.clients:
all_peerids = list(c.get_storage_broker().get_all_serverids())
all_peerids = c.get_storage_broker().get_all_serverids()
self.failUnlessEqual(len(all_peerids), self.numclients+1)
sb = c.storage_broker
permuted_peers = list(sb.get_servers_for_index("a"))
permuted_peers = sb.get_servers_for_index("a")
self.failUnlessEqual(len(permuted_peers), self.numclients+1)
d.addCallback(_check)
@ -108,10 +108,10 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
d = self.set_up_nodes()
def _check_connections(res):
for c in self.clients:
all_peerids = list(c.get_storage_broker().get_all_serverids())
all_peerids = c.get_storage_broker().get_all_serverids()
self.failUnlessEqual(len(all_peerids), self.numclients)
sb = c.storage_broker
permuted_peers = list(sb.get_servers_for_index("a"))
permuted_peers = sb.get_servers_for_index("a")
self.failUnlessEqual(len(permuted_peers), self.numclients)
d.addCallback(_check_connections)

View File

@ -173,9 +173,9 @@ class FakeClient:
else:
peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
for fakeid in range(self.num_servers) ]
self.storage_broker = StorageFarmBroker()
self.storage_broker = StorageFarmBroker(None, permute_peers=True)
for (serverid, server) in peers:
self.storage_broker.add_server(serverid, server)
self.storage_broker.test_add_server(serverid, server)
self.last_peers = [p[1] for p in peers]
def log(self, *args, **kwargs):

View File

@ -31,14 +31,6 @@ from allmydata.test.common_web import HTTPClientGETFactory, \
timeout = 480 # Most of these take longer than 240 seconds on Francois's arm box.
class FakeIntroducerClient:
def get_all_connectors(self):
return {}
def get_all_connections_for(self, service_name):
return frozenset()
def get_all_peerids(self):
return frozenset()
class FakeStatsProvider:
def get_stats(self):
stats = {'stats': {}, 'counters': {}}
@ -55,7 +47,7 @@ class FakeClient(service.MultiService):
'zfec': "fake",
}
introducer_furl = "None"
introducer_client = FakeIntroducerClient()
_all_upload_status = [upload.UploadStatus()]
_all_download_status = [download.DownloadStatus()]
_all_mapupdate_statuses = [servermap.UpdateStatus()]
@ -67,7 +59,7 @@ class FakeClient(service.MultiService):
def connected_to_introducer(self):
return False
storage_broker = StorageFarmBroker()
storage_broker = StorageFarmBroker(None, permute_peers=True)
def get_storage_broker(self):
return self.storage_broker

View File

@ -238,30 +238,24 @@ class Root(rend.Page):
return "no"
def data_known_storage_servers(self, ctx, data):
ic = self.client.introducer_client
servers = [c
for c in ic.get_all_connectors().values()
if c.service_name == "storage"]
return len(servers)
sb = self.client.get_storage_broker()
return len(sb.get_all_serverids())
def data_connected_storage_servers(self, ctx, data):
ic = self.client.introducer_client
return len(ic.get_all_connections_for("storage"))
sb = self.client.get_storage_broker()
return len(sb.get_all_servers())
def data_services(self, ctx, data):
ic = self.client.introducer_client
c = [ (service_name, nodeid, rsc)
for (nodeid, service_name), rsc
in ic.get_all_connectors().items() ]
c.sort()
return c
sb = self.client.get_storage_broker()
return sb.get_all_descriptors()
def render_service_row(self, ctx, descriptor):
nodeid = descriptor.get_serverid()
def render_service_row(self, ctx, data):
(service_name, nodeid, rsc) = data
ctx.fillSlots("peerid", idlib.nodeid_b2a(nodeid))
ctx.fillSlots("nickname", rsc.nickname)
if rsc.rref:
rhost = rsc.remote_host
ctx.fillSlots("nickname", descriptor.get_nickname())
rhost = descriptor.get_remote_host()
if rhost:
if nodeid == self.client.nodeid:
rhost_s = "(loopback)"
elif isinstance(rhost, address.IPv4Address):
@ -269,19 +263,24 @@ class Root(rend.Page):
else:
rhost_s = str(rhost)
connected = "Yes: to " + rhost_s
since = rsc.last_connect_time
since = descriptor.get_last_connect_time()
else:
connected = "No"
since = rsc.last_loss_time
since = descriptor.get_last_loss_time()
announced = descriptor.get_announcement_time()
announcement = descriptor.get_announcement()
version = announcement["version"]
service_name = announcement["service-name"]
TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
ctx.fillSlots("connected", connected)
ctx.fillSlots("connected-bool", not not rsc.rref)
ctx.fillSlots("since", time.strftime(TIME_FORMAT, time.localtime(since)))
ctx.fillSlots("connected-bool", bool(rhost))
ctx.fillSlots("since", time.strftime(TIME_FORMAT,
time.localtime(since)))
ctx.fillSlots("announced", time.strftime(TIME_FORMAT,
time.localtime(rsc.announcement_time)))
ctx.fillSlots("version", rsc.version)
ctx.fillSlots("service_name", rsc.service_name)
time.localtime(announced)))
ctx.fillSlots("version", version)
ctx.fillSlots("service_name", service_name)
return ctx.tag