Various cleanups, fixes and improvements

Squashed all commits that were meejah's between
30d68fb499f300a393fa0ced5980229f4bb6efda
and
33c268ed3a8c63a809f4403e307ecc13d848b1ab
On the branch meejah:1382.markberger-rewrite-rebase.6 as
per review
This commit is contained in:
meejah 2017-02-14 16:36:57 -07:00
parent 19c5bbb43b
commit 05f48c3601
21 changed files with 733 additions and 420 deletions

View File

@ -120,8 +120,6 @@ We calculate share placement like so:
shares, where an edge exists between an arbitrary readonly server S and an
arbitrary share T if and only if S currently holds T.
^--- all passed in to the Happiness_Upload ctor
3. Calculate a maximum matching graph of G1 (a set of S->T edges that has or
is-tied-for the highest "happiness score"). There is a clever efficient
algorithm for this, named "Ford-Fulkerson". There may be more than one
@ -130,8 +128,6 @@ We calculate share placement like so:
maps shares to servers, where each share appears at most once, and each
server appears at most once.
^-- is this the "readonly_mappings"
4. Construct a bipartite graph G2 of readwrite servers to pre-existing
shares. Then remove any edge (from G2) that uses a server or a share found
in M1. Let an edge exist between server S and share T if and only if S

View File

@ -1,57 +0,0 @@
# -*- coding: utf-8 -*-
from twisted.trial import unittest
from hypothesis import given
from hypothesis.strategies import text, sets
from allmydata.immutable import happiness_upload
@given(
sets(elements=text(min_size=1), min_size=4, max_size=4),
sets(elements=text(min_size=1), min_size=4),
)
def test_hypothesis_unhappy(peers, shares):
"""
similar to test_unhappy we test that the resulting happiness is
always 4 since the size of peers is 4.
"""
# https://hypothesis.readthedocs.io/en/latest/data.html#hypothesis.strategies.sets
# hypothesis.strategies.sets(elements=None, min_size=None, average_size=None, max_size=None)[source]
readonly_peers = set()
peers_to_shares = {}
places = happiness_upload.share_placement(peers, readonly_peers, shares, peers_to_shares)
happiness = happiness_upload.calculate_happiness(places)
assert set(places.keys()) == shares
assert happiness == 4
@given(
sets(elements=text(min_size=1), min_size=1, max_size=10),
# can we make a readonly_peers that's a subset of ^
sets(elements=text(min_size=1), min_size=1, max_size=20),
)
def test_more_hypothesis(peers, shares):
"""
similar to test_unhappy we test that the resulting happiness is
always either the number of peers or the number of shares
whichever is smaller.
"""
# https://hypothesis.readthedocs.io/en/latest/data.html#hypothesis.strategies.sets
# hypothesis.strategies.sets(elements=None, min_size=None, average_size=None, max_size=None)[source]
# XXX would be nice to paramaterize these by hypothesis too
readonly_peers = set()
peers_to_shares = {}
places = happiness_upload.share_placement(peers, readonly_peers, set(list(shares)), peers_to_shares)
happiness = happiness_upload.calculate_happiness(places)
# every share should get placed
assert set(places.keys()) == shares
# we should only use peers that exist
assert set(places.values()).issubset(peers)
# if we have more shares than peers, happiness is at most # of
# peers; if we have fewer shares than peers happiness is capped at
# # of peers.
assert happiness == min(len(peers), len(shares))

View File

@ -0,0 +1,53 @@
import sys
import time
import shutil
from os import mkdir, unlink, listdir
from os.path import join, exists
from twisted.internet import defer, reactor, task
from twisted.internet.error import ProcessTerminated
import util
import pytest
@pytest.inlineCallbacks
def test_upload_immutable(reactor, temp_dir, introducer_furl, flog_gatherer, storage_nodes, request):
# hmm, for some reason this still gets storage enabled ...
process = yield util._create_node(
reactor, request, temp_dir, introducer_furl, flog_gatherer, "edna",
web_port="tcp:9983:interface=localhost",
storage=False,
needed=3,
happy=10,
total=10,
)
node_dir = join(temp_dir, 'edna')
print("waiting 5 seconds unil we're maybe ready")
yield task.deferLater(reactor, 5, lambda: None)
# upload a file, which should fail because we have don't have 7
# storage servers (but happiness is set to 7)
proto = util._CollectOutputProtocol()
transport = reactor.spawnProcess(
proto,
sys.executable,
[
sys.executable, '-m', 'allmydata.scripts.runner',
'-d', node_dir,
'put', __file__,
]
)
try:
yield proto.done
assert False, "should raise exception"
except Exception as e:
assert isinstance(e, ProcessTerminated)
output = proto.output.getvalue()
assert "shares could be placed on only" in output

View File

@ -132,7 +132,12 @@ def _run_node(reactor, node_dir, request, magic_text):
return protocol.magic_seen
def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, name, web_port, storage=True, magic_text=None):
def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, name, web_port,
storage=True,
magic_text=None,
needed=2,
happy=3,
total=4):
"""
Helper to create a single node, run it and return the instance
spawnProcess returned (ITransport)
@ -161,10 +166,11 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam
sys.executable,
args,
)
pytest.blockon(done_proto.done)
created_d = done_proto.done
with open(join(node_dir, 'tahoe.cfg'), 'w') as f:
f.write('''
def created(_):
with open(join(node_dir, 'tahoe.cfg'), 'w') as f:
f.write('''
[node]
nickname = %(name)s
web.port = %(web_port)s
@ -174,18 +180,28 @@ log_gatherer.furl = %(log_furl)s
[client]
# Which services should this client connect to?
introducer.furl = %(furl)s
shares.needed = 2
shares.happy = 3
shares.total = 4
shares.needed = %(needed)d
shares.happy = %(happy)d
shares.total = %(total)d
''' % {
'name': name,
'furl': introducer_furl,
'web_port': web_port,
'log_furl': flog_gatherer,
'needed': needed,
'happy': happy,
'total': total,
})
created_d.addCallback(created)
else:
created_d = defer.succeed(None)
return _run_node(reactor, node_dir, request, magic_text)
d = Deferred()
d.callback(None)
d.addCallback(lambda _: created_d)
d.addCallback(lambda _: _run_node(reactor, node_dir, request, magic_text))
return d
def await_file_contents(path, contents, timeout=15):

View File

@ -655,6 +655,6 @@ class Client(node.Node, pollmixin.PollMixin):
return self.nodemaker.create_mutable_file(contents, keysize,
version=version)
def upload(self, uploadable):
def upload(self, uploadable, reactor=None):
uploader = self.getServiceNamed("uploader")
return uploader.upload(uploadable)
return uploader.upload(uploadable, reactor=reactor)

View File

@ -72,6 +72,7 @@ class ControlServer(Referenceable, service.Service):
f.close()
uploader = self.parent.getServiceNamed("uploader")
u = upload.FileName(filename, convergence=convergence)
# XXX should pass reactor arg
d = uploader.upload(u)
d.addCallback(lambda results: results.get_uri())
def _done(uri):

View File

@ -599,6 +599,7 @@ class DirectoryNode(object):
name = normalize(namex)
if self.is_readonly():
return defer.fail(NotWriteableError())
# XXX should pass reactor arg
d = self._uploader.upload(uploadable, progress=progress)
d.addCallback(lambda results:
self._create_and_validate_node(results.get_uri(), None,

View File

@ -724,12 +724,16 @@ class Checker(log.PrefixingLogMixin):
def _check_server_shares(self, s):
"""Return a deferred which eventually fires with a tuple of
(set(sharenum), server, set(), set(), responded) showing all the
shares claimed to be served by this server. In case the server is
disconnected then it fires with (set(), server, set(), set(), False)
(a server disconnecting when we ask it for buckets is the same, for
our purposes, as a server that says it has none, except that we want
to track and report whether or not each server responded.)"""
(set(sharenum), server, set(corrupt), set(incompatible),
responded) showing all the shares claimed to be served by this
server. In case the server is disconnected then it fires with
(set(), server, set(), set(), False) (a server disconnecting
when we ask it for buckets is the same, for our purposes, as a
server that says it has none, except that we want to track and
report whether or not each server responded.)
see also _verify_server_shares()
"""
def _curry_empty_corrupted(res):
buckets, responded = res
return (set(buckets), s, set(), set(), responded)

View File

@ -63,7 +63,6 @@ class ShareFinder:
if not self._started:
si = self.verifycap.storage_index
servers = self._storage_broker.get_servers_for_psi(si)
servers.sort(key=lambda s: s.get_serverid())
self._servers = iter(servers)
self._started = True

View File

@ -18,9 +18,12 @@ from common import COMPLETE, CORRUPT, DEAD, BADSEGNUM
class LayoutInvalid(Exception):
pass
class DataUnavailable(Exception):
pass
class Share:
"""I represent a single instance of a single share (e.g. I reference the
shnum2 for share SI=abcde on server xy12t, not the one on server ab45q).

View File

@ -122,7 +122,7 @@ class Encoder(object):
assert not self._codec
k, happy, n, segsize = params
self.required_shares = k
self.servers_of_happiness = happy
self.min_happiness = happy
self.num_shares = n
self.segment_size = segsize
self.log("got encoding parameters: %d/%d/%d %d" % (k,happy,n, segsize))
@ -180,7 +180,7 @@ class Encoder(object):
if name == "storage_index":
return self._storage_index
elif name == "share_counts":
return (self.required_shares, self.servers_of_happiness,
return (self.required_shares, self.min_happiness,
self.num_shares)
elif name == "num_segments":
return self.num_segments
@ -503,17 +503,17 @@ class Encoder(object):
self.log("they weren't in our list of landlords", parent=ln,
level=log.WEIRD, umid="TQGFRw")
happiness = happinessutil.servers_of_happiness(self.servermap)
if happiness < self.servers_of_happiness:
if happiness < self.min_happiness:
peerids = set(happinessutil.shares_by_server(self.servermap).keys())
msg = happinessutil.failure_message(len(peerids),
self.required_shares,
self.servers_of_happiness,
self.min_happiness,
happiness)
msg = "%s: %s" % (msg, why)
raise UploadUnhappinessError(msg)
self.log("but we can still continue with %s shares, we'll be happy "
"with at least %s" % (happiness,
self.servers_of_happiness),
self.min_happiness),
parent=ln)
def _gather_responses(self, dl):

View File

@ -318,10 +318,10 @@ def _flow_network(peerIndices, shareIndices):
def share_placement(peers, readonly_peers, shares, peers_to_shares):
"""
Generates the allocations the upload should based on the given
information. We construct a dictionary of 'share_num' -> set(server_ids)
and return it to the caller. Each share should be placed on each server
in the corresponding set. Existing allocations appear as placements
because attempting to place an existing allocation will renew the share.
information. We construct a dictionary of 'share_num' ->
'server_id' and return it to the caller. Existing allocations
appear as placements because attempting to place an existing
allocation will renew the share.
For more information on the algorithm this class implements, refer to
docs/specifications/servers-of-happiness.rst

View File

@ -61,6 +61,7 @@ class Repairer(log.PrefixingLogMixin):
# (http://tahoe-lafs.org/trac/tahoe-lafs/ticket/1212)
happy = 0
self._encodingparams = (k, happy, N, segsize)
# XXX should pass a reactor to this
ul = upload.CHKUploader(self._storage_broker, self._secret_holder)
return ul.start(self) # I am the IEncryptedUploadable
d.addCallback(_got_segsize)

View File

@ -9,6 +9,7 @@ from allmydata.util.hashutil import file_renewal_secret_hash, \
file_cancel_secret_hash, bucket_renewal_secret_hash, \
bucket_cancel_secret_hash, plaintext_hasher, \
storage_index_hash, plaintext_segment_hasher, convergence_hasher
from allmydata.util.deferredutil import timeout_call
from allmydata import hashtree, uri
from allmydata.storage.server import si_b2a
from allmydata.immutable import encode
@ -117,7 +118,7 @@ EXTENSION_SIZE = 1000
def pretty_print_shnum_to_servers(s):
return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
class ServerTracker:
class ServerTracker(object):
def __init__(self, server,
sharesize, blocksize, num_segments, num_share_hashes,
storage_index,
@ -202,46 +203,39 @@ def str_shareloc(shnum, bucketwriter):
return "%s: %s" % (shnum, bucketwriter.get_servername(),)
class PeerSelector():
implements(IPeerSelector)
@implementer(IPeerSelector)
class PeerSelector(object):
def __init__(self, num_segments, total_shares, needed_shares, servers_of_happiness):
def __init__(self, num_segments, total_shares, needed_shares, min_happiness):
self.num_segments = num_segments
self.total_shares = total_shares
self.needed_shares = needed_shares
self.min_happiness = servers_of_happiness
self.min_happiness = min_happiness
self.existing_shares = {}
self.confirmed_allocations = {}
self.peers = set()
self.full_peers = set()
self.readonly_peers = set()
self.bad_peers = set()
def add_peer_with_share(self, peerid, shnum):
if peerid in self.existing_shares.keys():
try:
self.existing_shares[peerid].add(shnum)
else:
except KeyError:
self.existing_shares[peerid] = set([shnum])
def confirm_share_allocation(self, shnum, peer):
self.confirmed_allocations.setdefault(shnum, set()).add(peer)
def get_allocations(self):
return self.confirmed_allocations
def add_peer(self, peerid):
self.peers.add(peerid)
def mark_full_peer(self, peerid):
self.full_peers.add(peerid)
def mark_readonly_peer(self, peerid):
self.readonly_peers.add(peerid)
self.peers.remove(peerid)
def mark_bad_peer(self, peerid):
if peerid in self.peers:
self.peers.remove(peerid)
self.bad_peers.add(peerid)
elif peerid in self.full_peers:
self.full_peers.remove(peerid)
elif peerid in self.readonly_peers:
self.readonly_peers.remove(peerid)
self.bad_peers.add(peerid)
def get_sharemap_of_preexisting_shares(self):
@ -251,40 +245,100 @@ class PeerSelector():
preexisting.add(share, server)
return preexisting
def get_tasks(self):
def get_share_placements(self):
shares = set(range(self.total_shares))
self.happiness_mappings = share_placement(self.peers, self.full_peers, shares, self.existing_shares)
self.happiness_mappings = share_placement(self.peers, self.readonly_peers, shares, self.existing_shares)
self.happiness = calculate_happiness(self.happiness_mappings)
return self.happiness_mappings
def is_healthy(self):
return self.min_happiness <= self.happiness
class _QueryStatistics(object):
def __init__(self):
self.total = 0
self.good = 0
self.bad = 0
self.full = 0
self.error = 0
self.contacted = 0
def __str__(self):
return "QueryStatistics(total={} good={} bad={} full={} " \
"error={} contacted={})".format(
self.total,
self.good,
self.bad,
self.full,
self.error,
self.contacted,
)
class Tahoe2ServerSelector(log.PrefixingLogMixin):
peer_selector_class = PeerSelector
def __init__(self, upload_id, logparent=None, upload_status=None):
def __init__(self, upload_id, logparent=None, upload_status=None, reactor=None):
self.upload_id = upload_id
self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
# Servers that are working normally, but full.
self.full_count = 0
self.error_count = 0
self.num_servers_contacted = 0
self._query_stats = _QueryStatistics()
self.last_failure_msg = None
self._status = IUploadStatus(upload_status)
log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
self.log("starting", level=log.OPERATIONAL)
if reactor is None:
from twisted.internet import reactor
self._reactor = reactor
def __repr__(self):
return "<Tahoe2ServerSelector for upload %s>" % self.upload_id
def _create_trackers(self, candidate_servers, allocated_size,
file_renewal_secret, file_cancel_secret, create_server_tracker):
# filter the list of servers according to which ones can accomodate
# this request. This excludes older servers (which used a 4-byte size
# field) from getting large shares (for files larger than about
# 12GiB). See #439 for details.
def _get_maxsize(server):
v0 = server.get_rref().version
v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
return v1["maximum-immutable-share-size"]
for server in candidate_servers:
self.peer_selector.add_peer(server.get_serverid())
writeable_servers = [
server for server in candidate_servers
if _get_maxsize(server) >= allocated_size
]
readonly_servers = set(candidate_servers) - set(writeable_servers)
for server in readonly_servers:
self.peer_selector.mark_readonly_peer(server.get_serverid())
def _make_trackers(servers):
trackers = []
for s in servers:
seed = s.get_lease_seed()
renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
st = create_server_tracker(s, renew, cancel)
trackers.append(st)
return trackers
write_trackers = _make_trackers(writeable_servers)
# We don't try to allocate shares to these servers, since they've
# said that they're incapable of storing shares of the size that we'd
# want to store. We ask them about existing shares for this storage
# index, which we want to know about for accurate
# servers_of_happiness accounting, then we forget about them.
readonly_trackers = _make_trackers(readonly_servers)
return readonly_trackers, write_trackers
@defer.inlineCallbacks
def get_shareholders(self, storage_broker, secret_holder,
storage_index, share_size, block_size,
num_segments, total_shares, needed_shares,
servers_of_happiness):
min_happiness):
"""
@return: (upload_trackers, already_serverids), where upload_trackers
is a set of ServerTracker instances that have agreed to hold
@ -294,14 +348,17 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
already have the share.
"""
# re-initialize statistics
self._query_status = _QueryStatistics()
if self._status:
self._status.set_status("Contacting Servers..")
self.peer_selector = self.peer_selector_class(num_segments, total_shares,
needed_shares, servers_of_happiness)
self.peer_selector = PeerSelector(num_segments, total_shares,
needed_shares, min_happiness)
self.total_shares = total_shares
self.servers_of_happiness = servers_of_happiness
self.min_happiness = min_happiness
self.needed_shares = needed_shares
self.homeless_shares = set(range(total_shares))
@ -326,6 +383,17 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
num_share_hashes, EXTENSION_SIZE)
allocated_size = wbp.get_allocated_size()
# decide upon the renewal/cancel secrets, to include them in the
# allocate_buckets query.
file_renewal_secret = file_renewal_secret_hash(
secret_holder.get_renewal_secret(),
storage_index,
)
file_cancel_secret = file_cancel_secret_hash(
secret_holder.get_cancel_secret(),
storage_index,
)
# see docs/specifications/servers-of-happiness.rst
# 0. Start with an ordered list of servers. Maybe *2N* of them.
#
@ -334,108 +402,186 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
if not all_servers:
raise NoServersError("client gave us zero servers")
# filter the list of servers according to which ones can accomodate
# this request. This excludes older servers (which used a 4-byte size
# field) from getting large shares (for files larger than about
# 12GiB). See #439 for details.
def _get_maxsize(server):
v0 = server.get_rref().version
v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
return v1["maximum-immutable-share-size"]
def _create_server_tracker(server, renew, cancel):
return ServerTracker(
server, share_size, block_size, num_segments, num_share_hashes,
storage_index, renew, cancel,
)
candidate_servers = all_servers[:2*total_shares]
for server in candidate_servers:
self.peer_selector.add_peer(server.get_serverid())
writeable_servers = [server for server in candidate_servers
if _get_maxsize(server) >= allocated_size]
readonly_servers = set(candidate_servers) - set(writeable_servers)
for server in readonly_servers:
self.peer_selector.mark_full_peer(server.get_serverid())
# decide upon the renewal/cancel secrets, to include them in the
# allocate_buckets query.
client_renewal_secret = secret_holder.get_renewal_secret()
client_cancel_secret = secret_holder.get_cancel_secret()
file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
storage_index)
file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
storage_index)
def _make_trackers(servers):
trackers = []
for s in servers:
seed = s.get_lease_seed()
renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
st = ServerTracker(s,
share_size, block_size,
num_segments, num_share_hashes,
storage_index,
renew, cancel)
trackers.append(st)
return trackers
# We assign each servers/trackers into one three lists. They all
# start in the "first pass" list. During the first pass, as we ask
# each one to hold a share, we move their tracker to the "second
# pass" list, until the first-pass list is empty. Then during the
# second pass, as we ask each to hold more shares, we move their
# tracker to the "next pass" list, until the second-pass list is
# empty. Then we move everybody from the next-pass list back to the
# second-pass list and repeat the "second" pass (really the third,
# fourth, etc pass), until all shares are assigned, or we've run out
# of potential servers.
write_trackers = _make_trackers(writeable_servers)
# We don't try to allocate shares to these servers, since they've
# said that they're incapable of storing shares of the size that we'd
# want to store. We ask them about existing shares for this storage
# index, which we want to know about for accurate
# servers_of_happiness accounting, then we forget about them.
readonly_trackers = _make_trackers(readonly_servers)
readonly_trackers, write_trackers = self._create_trackers(
all_servers[:(2 * total_shares)],
allocated_size,
file_renewal_secret,
file_cancel_secret,
_create_server_tracker,
)
# see docs/specifications/servers-of-happiness.rst
# 1. Query all servers for existing shares.
#
# The spec doesn't say what to do for timeouts/errors. This
# adds a timeout to each request, and rejects any that reply
# with error (i.e. just removed from the list)
# We now ask servers that can't hold any new shares about existing
# shares that they might have for our SI. Once this is done, we
# start placing the shares that we haven't already accounted
# for.
ds = []
if self._status and readonly_trackers:
self._status.set_status("Contacting readonly servers to find "
"any existing shares")
self._status.set_status(
"Contacting readonly servers to find any existing shares"
)
# in the "pre servers-of-happiness" code, it was a little
# ambigious whether "merely asking" counted as a "query" or
# not, because "allocate_buckets" with nothing to allocate was
# used to "ask" a write-able server what it held. Now we count
# "actual allocation queries" only, because those are the only
# things that actually affect what the server does.
for tracker in readonly_trackers:
assert isinstance(tracker, ServerTracker)
d = tracker.ask_about_existing_shares()
d = timeout_call(self._reactor, tracker.ask_about_existing_shares(), 15)
d.addBoth(self._handle_existing_response, tracker)
ds.append(d)
self.num_servers_contacted += 1
self.query_count += 1
self.log("asking server %s for any existing shares" %
(tracker.get_name(),), level=log.NOISY)
for tracker in write_trackers:
assert isinstance(tracker, ServerTracker)
d = tracker.query(set())
d = timeout_call(self._reactor, tracker.ask_about_existing_shares(), 15)
def timed_out(f, tracker):
# print("TIMEOUT {}: {}".format(tracker, f))
write_trackers.remove(tracker)
readonly_trackers.append(tracker)
return f
d.addErrback(timed_out, tracker)
d.addBoth(self._handle_existing_write_response, tracker, set())
ds.append(d)
self.num_servers_contacted += 1
self.query_count += 1
self.log("asking server %s for any existing shares" %
(tracker.get_name(),), level=log.NOISY)
self.trackers = write_trackers + readonly_trackers
trackers = set(write_trackers) | set(readonly_trackers)
dl = defer.DeferredList(ds)
dl.addCallback(lambda ign: self._calculate_tasks())
dl.addCallback(lambda ign: self._request_another_allocation())
return dl
# these will always be (True, None) because errors are handled
# in the _handle_existing_write_response etc callbacks
yield defer.DeferredList(ds)
# okay, we've queried the 2N servers, time to get the share
# placements and attempt to actually place the shares (or
# renew them on read-only servers). We want to run the loop
# below *at least once* because even read-only servers won't
# renew their shares until "allocate_buckets" is called (via
# tracker.query())
def _calculate_tasks(self):
self.tasks = self.peer_selector.get_tasks()
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/778#comment:48
# min_happiness will be 0 for the repairer, so we set current
# effective_happiness to less than zero so this loop runs at
# least once for the repairer...
def _bad_server(fail, tracker):
self.last_failure_msg = fail
return False # will mark it readonly
def _make_readonly(tracker):
# print("making {} read-only".format(tracker.get_serverid()))
try:
write_trackers.remove(tracker)
except ValueError:
pass
# XXX can we just use a set() or does order matter?
if tracker not in readonly_trackers:
readonly_trackers.append(tracker)
return None
# so we *always* want to run this loop at least once, even if
# we only have read-only servers -- because asking them to
# allocate buckets renews those shares they already have. For
# subsequent loops, we give up if we've achieved happiness OR
# if we have zero writable servers left
last_happiness = None
effective_happiness = -1
while effective_happiness < min_happiness and \
(last_happiness is None or len(write_trackers)):
errors_before = self._query_stats.bad
self._share_placements = self.peer_selector.get_share_placements()
placements = []
for tracker in trackers:
shares_to_ask = self._allocation_for(tracker)
# if we already tried to upload share X to this very
# same server in a previous iteration, we should *not*
# ask again. If we *do* ask, there's no real harm, but
# the server will respond with an empty dict and that
# confuses our statistics. However, if the server is a
# readonly sever, we *do* want to ask so it refreshes
# the share.
if shares_to_ask != set(tracker.buckets.keys()) or tracker in readonly_trackers:
self._query_stats.total += 1
self._query_stats.contacted += 1
d = timeout_call(self._reactor, tracker.query(shares_to_ask), 15)
d.addBoth(self._buckets_allocated, tracker, shares_to_ask)
d.addErrback(lambda f, tr: _bad_server(f, tr), tracker)
d.addCallback(lambda x, tr: _make_readonly(tr) if not x else x, tracker)
placements.append(d)
yield defer.DeferredList(placements)
merged = merge_servers(self.peer_selector.get_sharemap_of_preexisting_shares(), self.use_trackers)
effective_happiness = servers_of_happiness(merged)
if effective_happiness == last_happiness:
# print("effective happiness still {}".format(last_happiness))
# we haven't improved over the last iteration; give up
break;
if errors_before == self._query_stats.bad:
if False: print("no more errors; break")
break;
last_happiness = effective_happiness
# print("write trackers left: {}".format(len(write_trackers)))
# note: peer_selector.get_allocations() only maps "things we
# uploaded in the above loop" and specificaly does *not*
# include any pre-existing shares on read-only servers .. but
# we *do* want to count those shares towards total happiness.
# no more servers. If we haven't placed enough shares, we fail.
# XXX note sometimes we're not running the loop at least once,
# and so 'merged' must be (re-)computed here.
merged = merge_servers(self.peer_selector.get_sharemap_of_preexisting_shares(), self.use_trackers)
effective_happiness = servers_of_happiness(merged)
# print("placements completed {} vs {}".format(effective_happiness, min_happiness))
# for k, v in merged.items():
# print(" {} -> {}".format(k, v))
if effective_happiness < min_happiness:
msg = failure_message(
peer_count=len(self.serverids_with_shares),
k=self.needed_shares,
happy=min_happiness,
effective_happy=effective_happiness,
)
msg = ("server selection failed for %s: %s (%s), merged=%s" %
(self, msg, self._get_progress_message(),
pretty_print_shnum_to_servers(merged)))
if self.last_failure_msg:
msg += " (%s)" % (self.last_failure_msg,)
self.log(msg, level=log.UNUSUAL)
self._failed(msg) # raises UploadUnhappinessError
return
# we placed (or already had) enough to be happy, so we're done
if self._status:
self._status.set_status("Placed all shares")
msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
"self.use_trackers: %s, self.preexisting_shares: %s") \
% (self, self._get_progress_message(),
pretty_print_shnum_to_servers(merged),
[', '.join([str_shareloc(k,v)
for k,v in st.buckets.iteritems()])
for st in self.use_trackers],
pretty_print_shnum_to_servers(self.preexisting_shares))
self.log(msg, level=log.OPERATIONAL)
defer.returnValue((self.use_trackers, self.peer_selector.get_sharemap_of_preexisting_shares()))
def _handle_existing_response(self, res, tracker):
"""
@ -447,8 +593,6 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
self.log("%s got error during existing shares check: %s"
% (tracker.get_name(), res), level=log.UNUSUAL)
self.peer_selector.mark_bad_peer(serverid)
self.error_count += 1
self.bad_query_count += 1
else:
buckets = res
if buckets:
@ -471,15 +615,12 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
self.log("%s got error during server selection: %s" % (tracker, res),
level=log.UNUSUAL)
self.homeless_shares |= shares_to_ask
msg = ("last failure (from %s) was: %s" % (tracker, res))
self.last_failure_msg = msg
else:
(alreadygot, allocated) = res
for share in alreadygot:
for share in res.keys():
self.peer_selector.add_peer_with_share(tracker.get_serverid(), share)
def _get_progress_message(self):
if not self.homeless_shares:
msg = "placed all %d shares, " % (self.total_shares)
@ -488,36 +629,34 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
(self.total_shares - len(self.homeless_shares),
self.total_shares,
len(self.homeless_shares)))
return (msg + "want to place shares on at least %d servers such that "
"any %d of them have enough shares to recover the file, "
"sent %d queries to %d servers, "
"%d queries placed some shares, %d placed none "
"(of which %d placed none due to the server being"
" full and %d placed none due to an error)" %
(self.servers_of_happiness, self.needed_shares,
self.query_count, self.num_servers_contacted,
self.good_query_count, self.bad_query_count,
self.full_count, self.error_count))
assert self._query_stats.bad == (self._query_stats.full + self._query_stats.error)
return (
msg + "want to place shares on at least {happy} servers such that "
"any {needed} of them have enough shares to recover the file, "
"sent {queries} queries to {servers} servers, "
"{good} queries placed some shares, {bad} placed none "
"(of which {full} placed none due to the server being"
" full and {error} placed none due to an error)".format(
happy=self.min_happiness,
needed=self.needed_shares,
queries=self._query_stats.total,
servers=self._query_stats.contacted,
good=self._query_stats.good,
bad=self._query_stats.bad,
full=self._query_stats.full,
error=self._query_stats.error
)
)
def _get_next_allocation(self):
def _allocation_for(self, tracker):
"""
Return the next share allocation that we need to make.
Specifically, I return a tuple (tracker, shares_to_ask), where
tracker is a ServerTracker instance and shares_to_ask is a set of
shares that we should store on that server. If there are no more
allocations to make, I return None.
Given a ServerTracker, return a list of shares that we should
store on that server.
"""
if len(self.trackers) == 0:
return None
tracker = self.trackers.pop(0)
# TODO: don't pre-convert all serverids to ServerTrackers
assert isinstance(tracker, ServerTracker)
shares_to_ask = set()
servermap = self.tasks
servermap = self._share_placements
for shnum, tracker_id in servermap.items():
if tracker_id == None:
continue
@ -531,81 +670,27 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
" %d shares left.."
% (tracker.get_name(),
len(self.homeless_shares)))
return (tracker, shares_to_ask)
return shares_to_ask
def _request_another_allocation(self):
def _buckets_allocated(self, res, tracker, shares_to_ask):
"""
see docs/specifications/servers-of-happiness.rst
10. If any placements from step 9 fail, mark the server as read-only. Go back
to step 2 (since we may discover a server is/has-become read-only, or has
failed, during step 9).
Internal helper. If this returns an error or False, the server
will be considered read-only for any future iterations.
"""
allocation = self._get_next_allocation()
if allocation is not None:
tracker, shares_to_ask = allocation
# see docs/specifications/servers-of-happiness.rst
# 8. Renew the shares on their respective servers from M1 and M2.
d = tracker.query(shares_to_ask)
d.addBoth(self._got_response, tracker, shares_to_ask)
return d
else:
# no more servers. If we haven't placed enough shares, we fail.
merged = merge_servers(self.peer_selector.get_sharemap_of_preexisting_shares(), self.use_trackers)
effective_happiness = servers_of_happiness(self.peer_selector.get_allocations())
if effective_happiness < self.servers_of_happiness:
msg = failure_message(
peer_count=len(self.serverids_with_shares),
k=self.needed_shares,
happy=self.servers_of_happiness,
effective_happy=effective_happiness,
)
msg = ("server selection failed for %s: %s (%s), merged=%s" %
(self, msg, self._get_progress_message(),
pretty_print_shnum_to_servers(merged)))
if self.last_failure_msg:
msg += " (%s)" % (self.last_failure_msg,)
self.log(msg, level=log.UNUSUAL)
return self._failed(msg)
else:
# we placed enough to be happy, so we're done
if self._status:
self._status.set_status("Placed all shares")
msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
"self.use_trackers: %s, self.preexisting_shares: %s") \
% (self, self._get_progress_message(),
pretty_print_shnum_to_servers(merged),
[', '.join([str_shareloc(k,v)
for k,v in st.buckets.iteritems()])
for st in self.use_trackers],
pretty_print_shnum_to_servers(self.preexisting_shares))
self.log(msg, level=log.OPERATIONAL)
return (self.use_trackers, self.peer_selector.get_sharemap_of_preexisting_shares())
def _got_response(self, res, tracker, shares_to_ask):
if isinstance(res, failure.Failure):
# This is unusual, and probably indicates a bug or a network
# problem.
self.log("%s got error during server selection: %s" % (tracker, res),
level=log.UNUSUAL)
self.error_count += 1
self.bad_query_count += 1
self._query_stats.error += 1
self._query_stats.bad += 1
self.homeless_shares |= shares_to_ask
if (self.trackers):
# there is still hope, so just loop
try:
self.peer_selector.mark_readonly_peer(tracker.get_serverid())
except KeyError:
pass
else:
# No more servers, so this upload might fail (it depends upon
# whether we've hit servers_of_happiness or not). Log the last
# failure we got: if a coding error causes all servers to fail
# in the same way, this allows the common failure to be seen
# by the uploader and should help with debugging
msg = ("last failure (from %s) was: %s" % (tracker, res))
self.last_failure_msg = msg
return res
else:
(alreadygot, allocated) = res
self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
@ -614,7 +699,6 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
level=log.NOISY)
progress = False
for s in alreadygot:
self.peer_selector.confirm_share_allocation(s, tracker.get_serverid())
self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
if s in self.homeless_shares:
self.homeless_shares.remove(s)
@ -627,8 +711,6 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
if allocated:
self.use_trackers.add(tracker)
progress = True
for s in allocated:
self.peer_selector.confirm_share_allocation(s, tracker.get_serverid())
if allocated or alreadygot:
self.serverids_with_shares.add(tracker.get_serverid())
@ -636,16 +718,6 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
not_yet_present = set(shares_to_ask) - set(alreadygot)
still_homeless = not_yet_present - set(allocated)
if progress:
# They accepted at least one of the shares that we asked
# them to accept, or they had a share that we didn't ask
# them to accept but that we hadn't placed yet, so this
# was a productive query
self.good_query_count += 1
else:
self.bad_query_count += 1
self.full_count += 1
if still_homeless:
# In networks with lots of space, this is very unusual and
# probably indicates an error. In networks with servers that
@ -660,10 +732,19 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
# Since they were unable to accept all of our requests, so it
# is safe to assume that asking them again won't help.
# now loop
return self._request_another_allocation()
if progress:
# They accepted at least one of the shares that we asked
# them to accept, or they had a share that we didn't ask
# them to accept but that we hadn't placed yet, so this
# was a productive query
self._query_stats.good += 1
else:
# if we asked for some allocations, but the server
# didn't return any at all (i.e. empty dict) it must
# be full
self._query_stats.full += 1
self._query_stats.bad += 1
return progress
def _failed(self, msg):
"""
@ -955,10 +1036,9 @@ class UploadStatus(object):
def set_results(self, value):
self.results = value
class CHKUploader:
server_selector_class = Tahoe2ServerSelector
class CHKUploader(object):
def __init__(self, storage_broker, secret_holder, progress=None):
def __init__(self, storage_broker, secret_holder, progress=None, reactor=None):
# server_selector needs storage_broker and secret_holder
self._storage_broker = storage_broker
self._secret_holder = secret_holder
@ -969,6 +1049,7 @@ class CHKUploader:
self._upload_status.set_helper(False)
self._upload_status.set_active(True)
self._progress = progress
self._reactor = reactor
# locate_all_shareholders() will create the following attribute:
# self._server_trackers = {} # k: shnum, v: instance of ServerTracker
@ -1039,14 +1120,17 @@ class CHKUploader:
self._storage_index = storage_index
upload_id = si_b2a(storage_index)[:5]
self.log("using storage index %s" % upload_id)
server_selector = self.server_selector_class(upload_id,
self._log_number,
self._upload_status)
server_selector = Tahoe2ServerSelector(
upload_id,
self._log_number,
self._upload_status,
reactor=self._reactor,
)
share_size = encoder.get_param("share_size")
block_size = encoder.get_param("block_size")
num_segments = encoder.get_param("num_segments")
k,desired,n = encoder.get_param("share_counts")
k, desired, n = encoder.get_param("share_counts")
self._server_selection_started = time.time()
d = server_selector.get_shareholders(storage_broker, secret_holder,
@ -1625,7 +1709,7 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
return (self._helper_furl, bool(self._helper))
def upload(self, uploadable, progress=None):
def upload(self, uploadable, progress=None, reactor=None):
"""
Returns a Deferred that will fire with the UploadResults instance.
"""
@ -1661,7 +1745,7 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
else:
storage_broker = self.parent.get_storage_broker()
secret_holder = self.parent._secret_holder
uploader = CHKUploader(storage_broker, secret_holder, progress=progress)
uploader = CHKUploader(storage_broker, secret_holder, progress=progress, reactor=reactor)
d2.addCallback(lambda x: uploader.start(eu))
self._all_uploads[uploader] = None

View File

@ -762,7 +762,7 @@ class IPeerSelector(Interface):
potential candidates for storing a file.
"""
def mark_full_peer(peerid):
def mark_readonly_peer(peerid):
"""
Mark the peer peerid as full. This means that any
peer-with-share relationships I know about for peerid remain
@ -777,31 +777,10 @@ class IPeerSelector(Interface):
with peerid, and will not attempt to assign it any more shares.
"""
def get_tasks():
def get_share_placements():
"""
Return a tuple of tasks to our caller.
Specifically, return (queries, placements), where queries and
allocations are both lists of things to do. Each query is a
request for our caller to ask a server about the shares it holds
for this upload; the results will be fed back into the
allocator. Each allocation is a request for some share or shares
to be placed on a server. Result may be None, in which case the
selector thinks that the share placement is as reliably or
correctly placed as it can be.
"""
def is_healthy():
"""
I return whether the share assignments I'm currently using
reflect a healthy file, based on my internal definitions.
"""
def needs_recomputation():
"""
I return True if the share assignments I last returned may have
become stale. This is a hint to the caller that they should call
get_share_assignments again.
Return the share-placement map (a dict) which maps shares to
server-ids
"""

View File

@ -142,6 +142,7 @@ class NodeMaker(object):
convergence = self.secret_holder.get_convergence_secret()
packed = pack_children(children, None, deep_immutable=True)
uploadable = Data(packed, convergence)
# XXX should pass reactor arg
d = self.uploader.upload(uploadable)
d.addCallback(lambda results:
self.create_from_cap(None, results.get_uri()))

View File

@ -360,7 +360,7 @@ class BalancingAct(GridTestMixin, unittest.TestCase):
shares_chart.setdefault(shnum, []).append(names[serverid])
return shares_chart
def _test_good_share_hosts(self):
def test_good_share_hosts(self):
self.basedir = "checker/BalancingAct/1115"
self.set_up_grid(num_servers=1)
c0 = self.g.clients[0]
@ -388,11 +388,9 @@ class BalancingAct(GridTestMixin, unittest.TestCase):
d.addCallback(add_three, i)
def _check_and_repair(_):
print("check_and_repair")
return self.imm.check_and_repair(Monitor())
def _check_counts(crr, shares_good, good_share_hosts):
prr = crr.get_post_repair_results()
print self._pretty_shares_chart(self.uri)
self.failUnlessEqual(prr.get_share_counter_good(), shares_good)
self.failUnlessEqual(prr.get_host_counter_good_shares(),
good_share_hosts)
@ -410,11 +408,16 @@ class BalancingAct(GridTestMixin, unittest.TestCase):
d.addCallback(_check_counts, 4, 5)
d.addCallback(lambda _: self.delete_shares_numbered(self.uri, [3]))
d.addCallback(_check_and_repair)
# XXX this isn't always true, "sometimes" the repairer happens
# to do better and place things so there are 5 happy
# servers. for example PYTHONHASHSEED=3 gets 5 happy whereas
# PYTHONHASHSEED=4 gets 4 happy
d.addCallback(_check_counts, 4, 4)
# it can happen that our uploader will choose, e.g., to upload
# to servers B, C, D, E .. which will mean that all 5 serves
# now contain our shares (and thus "respond").
def _check_happy(crr):
prr = crr.get_post_repair_results()
self.assertTrue(prr.get_host_counter_good_shares() >= 4)
return crr
d.addCallback(_check_happy)
d.addCallback(lambda _: all([self.g.break_server(sid)
for sid in self.g.get_all_serverids()]))
d.addCallback(_check_and_repair)

View File

@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
from twisted.trial import unittest
from hypothesis import given
from hypothesis.strategies import text, sets
from allmydata.immutable import happiness_upload
@ -25,6 +27,18 @@ class HappinessUtils(unittest.TestCase):
self.assertEqual(residual, [[1], [2], [3], []])
self.assertEqual(capacity, [[0, 1, 0, 0], [-1, 0, 1, 0], [0, -1, 0, 1], [0, 0, -1, 0]])
def test_trivial_maximum_graph(self):
self.assertEqual(
{},
happiness_upload._compute_maximum_graph([], {})
)
def test_trivial_flow_graph(self):
self.assertEqual(
[],
happiness_upload._servermap_flow_graph(set(), set(), {})
)
class Happiness(unittest.TestCase):
@ -40,10 +54,6 @@ class Happiness(unittest.TestCase):
places = happiness_upload.share_placement(peers, readonly_peers, shares, peers_to_shares)
if False:
for k, v in places.items():
print(" {} -> {}".format(k, v))
self.assertEqual(
places,
{
@ -124,18 +134,16 @@ class Happiness(unittest.TestCase):
self.assertEqual(2, happiness)
# process just gets killed with anything like 200 (see
# test_upload.py)
def no_test_50(self):
peers = set(['peer{}'.format(x) for x in range(50)])
shares = set(['share{}'.format(x) for x in range(50)])
def test_100(self):
peers = set(['peer{}'.format(x) for x in range(100)])
shares = set(['share{}'.format(x) for x in range(100)])
readonly_peers = set()
peers_to_shares = dict()
places = happiness_upload.share_placement(peers, readonly_peers, shares, peers_to_shares)
happiness = happiness_upload.calculate_happiness(places)
self.assertEqual(50, happiness)
self.assertEqual(100, happiness)
def test_redistribute(self):
"""
@ -209,3 +217,55 @@ class Happiness(unittest.TestCase):
places = happiness_upload.share_placement(peers, set(), shares, {})
self.assertEqual(places, dict())
class PlacementTests(unittest.TestCase):
@given(
sets(elements=text(min_size=1), min_size=4, max_size=4),
sets(elements=text(min_size=1), min_size=4),
)
def test_hypothesis_unhappy(self, peers, shares):
"""
similar to test_unhappy we test that the resulting happiness is
always 4 since the size of peers is 4.
"""
# https://hypothesis.readthedocs.io/en/latest/data.html#hypothesis.strategies.sets
# hypothesis.strategies.sets(elements=None, min_size=None, average_size=None, max_size=None)[source]
readonly_peers = set()
peers_to_shares = {}
places = happiness_upload.share_placement(peers, readonly_peers, shares, peers_to_shares)
happiness = happiness_upload.calculate_happiness(places)
assert set(places.keys()) == shares
assert happiness == 4
@given(
sets(elements=text(min_size=1), min_size=1, max_size=10),
# can we make a readonly_peers that's a subset of ^
sets(elements=text(min_size=1), min_size=1, max_size=20),
)
def test_more_hypothesis(self, peers, shares):
"""
similar to test_unhappy we test that the resulting happiness is
always either the number of peers or the number of shares
whichever is smaller.
"""
# https://hypothesis.readthedocs.io/en/latest/data.html#hypothesis.strategies.sets
# hypothesis.strategies.sets(elements=None, min_size=None, average_size=None, max_size=None)[source]
# XXX would be nice to paramaterize these by hypothesis too
readonly_peers = set()
peers_to_shares = {}
places = happiness_upload.share_placement(peers, readonly_peers, set(list(shares)), peers_to_shares)
happiness = happiness_upload.calculate_happiness(places)
# every share should get placed
assert set(places.keys()) == shares
# we should only use peers that exist
assert set(places.values()).issubset(peers)
# if we have more shares than peers, happiness is at most # of
# peers; if we have fewer shares than peers happiness is capped at
# # of peers.
assert happiness == min(len(peers), len(shares))

View File

@ -233,16 +233,12 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin,
done = []
d = self._set_up(False, "test_5_overdue_immutable")
def _reduce_max_outstanding_requests_and_download(ign):
# find all servers (it's a 2-tuple because of what
# self._hang() wants, but it only looks at the first one,
# which is the ID)
servers = [
(srv, None) for shn, srv, sharef in self.shares
]
# we sort the servers (by id) because that's what the
# download-finder is going to do, and we want to hang the
# first 5 servers which it will make requests to.
self._hang(sorted(servers)[:5])
# we need to hang the first 5 servers, so we have to
# figure out where the shares were placed.
si = uri.from_string(self.uri).get_storage_index()
placed = self.c0.storage_broker.get_servers_for_psi(si)
self._hang([(s.get_serverid(), s) for s in placed[:5]])
n = self.c0.create_node_from_uri(self.uri)
n._cnode._maybe_create_download_node()
self._sf = n._cnode._node._sharefinder

View File

@ -4,7 +4,7 @@ import os, shutil
from cStringIO import StringIO
from twisted.trial import unittest
from twisted.python.failure import Failure
from twisted.internet import defer
from twisted.internet import defer, task
from foolscap.api import fireEventually
import allmydata # for __full_version__
@ -101,19 +101,26 @@ class SetDEPMixin:
self.node.encoding_params = p
class FakeStorageServer:
def __init__(self, mode):
def __init__(self, mode, reactor=None):
self.mode = mode
self.allocated = []
self.queries = 0
self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
{ "maximum-immutable-share-size": 2**32 - 1 },
"application-version": str(allmydata.__full_version__),
}
self._alloc_queries = 0
self._get_queries = 0
self.version = {
"http://allmydata.org/tahoe/protocols/storage/v1" :
{
"maximum-immutable-share-size": 2**32 - 1,
},
"application-version": str(allmydata.__full_version__),
}
if mode == "small":
self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
{ "maximum-immutable-share-size": 10 },
"application-version": str(allmydata.__full_version__),
}
self.version = {
"http://allmydata.org/tahoe/protocols/storage/v1" :
{
"maximum-immutable-share-size": 10,
},
"application-version": str(allmydata.__full_version__),
}
def callRemote(self, methname, *args, **kwargs):
@ -126,14 +133,16 @@ class FakeStorageServer:
def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
sharenums, share_size, canary):
#print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
# print "FakeStorageServer.allocate_buckets(num=%d, size=%d, mode=%s, queries=%d)" % (len(sharenums), share_size, self.mode, self._alloc_queries)
if self.mode == "timeout":
return defer.Deferred()
if self.mode == "first-fail":
if self.queries == 0:
if self._alloc_queries == 0:
raise ServerError
if self.mode == "second-fail":
if self.queries == 1:
if self._alloc_queries == 1:
raise ServerError
self.queries += 1
self._alloc_queries += 1
if self.mode == "full":
return (set(), {},)
elif self.mode == "already got them":
@ -146,6 +155,18 @@ class FakeStorageServer:
for shnum in sharenums]),
)
def get_buckets(self, storage_index, **kw):
# this should map shnum to a BucketReader but there isn't a
# handy FakeBucketReader and we don't actually read the shares
# back anyway (just the keys)
return {
shnum: None
for (si, shnum) in self.allocated
if si == storage_index
}
class FakeBucketWriter:
# a diagnostic version of storageserver.BucketWriter
def __init__(self, size):
@ -184,20 +205,23 @@ class FakeBucketWriter:
def remote_abort(self):
pass
class FakeClient:
DEFAULT_ENCODING_PARAMETERS = {"k":25,
"happy": 25,
"n": 100,
"max_segment_size": 1*MiB,
}
class FakeClient(object):
DEFAULT_ENCODING_PARAMETERS = {
"k":25,
"happy": 25,
"n": 100,
"max_segment_size": 1 * MiB,
}
def __init__(self, mode="good", num_servers=50):
def __init__(self, mode="good", num_servers=50, reactor=None):
self.num_servers = num_servers
self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
if type(mode) is str:
mode = dict([i,mode] for i in range(num_servers))
servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
for fakeid in range(self.num_servers) ]
servers = [
("%20d" % fakeid, FakeStorageServer(mode[fakeid], reactor=reactor))
for fakeid in range(self.num_servers)
]
self.storage_broker = StorageFarmBroker(permute_peers=True, tub_maker=None)
for (serverid, rref) in servers:
ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid),
@ -248,15 +272,21 @@ SIZE_ZERO = 0
SIZE_SMALL = 16
SIZE_LARGE = len(DATA)
def upload_data(uploader, data):
def upload_data(uploader, data, reactor=None):
u = upload.Data(data, convergence=None)
return uploader.upload(u)
def upload_filename(uploader, filename):
return uploader.upload(u, reactor=reactor)
def upload_filename(uploader, filename, reactor=None):
u = upload.FileName(filename, convergence=None)
return uploader.upload(u)
def upload_filehandle(uploader, fh):
return uploader.upload(u, reactor=reactor)
def upload_filehandle(uploader, fh, reactor=None):
u = upload.FileHandle(fh, convergence=None)
return uploader.upload(u)
return uploader.upload(u, reactor=reactor)
class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
def setUp(self):
@ -431,12 +461,103 @@ class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
"server selection failed",
upload_data, self.u, DATA)
def _check((f,)):
self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
# there should also be a 'last failure was' message
self.failUnlessIn("ServerError", str(f.value))
self.failUnlessIn("shares could be placed or found on only 10 server(s)", str(f.value))
d.addCallback(_check)
return d
def test_allocation_error_some(self):
self.make_node({
0: "good",
1: "good",
2: "good",
3: "good",
4: "good",
5: "first-fail",
6: "first-fail",
7: "first-fail",
8: "first-fail",
9: "first-fail",
})
self.set_encoding_parameters(3, 7, 10)
d = self.shouldFail(UploadUnhappinessError, "second_error_some",
"server selection failed",
upload_data, self.u, DATA)
def _check((f,)):
self.failUnlessIn("shares could be placed on only 5 server(s)", str(f.value))
d.addCallback(_check)
return d
def test_allocation_error_recovery(self):
self.make_node({
0: "good",
1: "good",
2: "good",
3: "good",
4: "second-fail",
5: "second-fail",
6: "first-fail",
7: "first-fail",
8: "first-fail",
9: "first-fail",
})
self.set_encoding_parameters(3, 7, 10)
# we placed shares on 0 through 5, which wasn't enough. so
# then we looped and only placed on 0-3 (because now 4-9 have
# all failed) ... so the error message should say we only
# placed on 6 servers (not 4) because those two shares *did*
# at some point succeed.
d = self.shouldFail(UploadUnhappinessError, "second_error_some",
"server selection failed",
upload_data, self.u, DATA)
def _check((f,)):
self.failUnlessIn("shares could be placed on only 6 server(s)", str(f.value))
d.addCallback(_check)
return d
def test_good_servers_stay_writable(self):
self.make_node({
0: "good",
1: "good",
2: "second-fail",
3: "second-fail",
4: "second-fail",
5: "first-fail",
6: "first-fail",
7: "first-fail",
8: "first-fail",
9: "first-fail",
})
self.set_encoding_parameters(3, 7, 10)
# we placed shares on 0 through 5, which wasn't enough. so
# then we looped and only placed on 0-3 (because now 4-9 have
# all failed) ... so the error message should say we only
# placed on 6 servers (not 4) because those two shares *did*
# at some point succeed.
d = self.shouldFail(UploadUnhappinessError, "good_servers_stay_writable",
"server selection failed",
upload_data, self.u, DATA)
def _check((f,)):
self.failUnlessIn("shares could be placed on only 5 server(s)", str(f.value))
d.addCallback(_check)
return d
def test_timeout(self):
clock = task.Clock()
self.make_node("timeout")
self.set_encoding_parameters(k=25, happy=1, n=50)
d = self.shouldFail(
UploadUnhappinessError, __name__,
"server selection failed",
upload_data, self.u, DATA, reactor=clock,
)
# XXX double-check; it's doing 3 iterations?
# XXX should only do 1!
clock.advance(15)
clock.advance(15)
return d
class FullServer(unittest.TestCase):
def setUp(self):
self.node = FakeClient(mode="full")
@ -495,7 +616,7 @@ class ServerSelection(unittest.TestCase):
for s in self.node.last_servers:
allocated = s.allocated
self.failUnlessEqual(len(allocated), 1)
self.failUnlessEqual(s.queries, 2)
self.failUnlessEqual(s._alloc_queries, 1)
d.addCallback(_check)
return d
@ -514,7 +635,7 @@ class ServerSelection(unittest.TestCase):
for s in self.node.last_servers:
allocated = s.allocated
self.failUnlessEqual(len(allocated), 2)
self.failUnlessEqual(s.queries, 2)
self.failUnlessEqual(s._alloc_queries, 1)
d.addCallback(_check)
return d
@ -535,10 +656,10 @@ class ServerSelection(unittest.TestCase):
allocated = s.allocated
self.failUnless(len(allocated) in (1,2), len(allocated))
if len(allocated) == 1:
self.failUnlessEqual(s.queries, 2)
self.failUnlessEqual(s._alloc_queries, 1)
got_one.append(s)
else:
self.failUnlessEqual(s.queries, 2)
self.failUnlessEqual(s._alloc_queries, 1)
got_two.append(s)
self.failUnlessEqual(len(got_one), 49)
self.failUnlessEqual(len(got_two), 1)
@ -562,7 +683,7 @@ class ServerSelection(unittest.TestCase):
for s in self.node.last_servers:
allocated = s.allocated
self.failUnlessEqual(len(allocated), 4)
self.failUnlessEqual(s.queries, 2)
self.failUnlessEqual(s._alloc_queries, 1)
d.addCallback(_check)
return d
@ -624,7 +745,7 @@ class ServerSelection(unittest.TestCase):
def _check(res):
servers_contacted = []
for s in self.node.last_servers:
if(s.queries != 0):
if(s._alloc_queries != 0):
servers_contacted.append(s)
self.failUnless(len(servers_contacted), 20)
d.addCallback(_check)
@ -723,16 +844,11 @@ def is_happy_enough(servertoshnums, h, k):
""" I calculate whether servertoshnums achieves happiness level h. I do this with a naïve "brute force search" approach. (See src/allmydata/util/happinessutil.py for a better algorithm.) """
if len(servertoshnums) < h:
return False
# print "servertoshnums: ", servertoshnums, h, k
for happysetcombo in combinations(servertoshnums.iterkeys(), h):
# print "happysetcombo: ", happysetcombo
for subsetcombo in combinations(happysetcombo, k):
shnums = reduce(set.union, [ servertoshnums[s] for s in subsetcombo ])
# print "subsetcombo: ", subsetcombo, ", shnums: ", shnums
if len(shnums) < k:
# print "NOT HAAPP{Y", shnums, k
return False
# print "HAAPP{Y"
return True
class FakeServerTracker:
@ -817,6 +933,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
ss = self.g.make_server(server_number, readonly)
log.msg("just created a server, number: %s => %s" % (server_number, ss,))
self.g.add_server(server_number, ss)
self.g.rebuild_serverlist()
def _add_server_with_share(self, server_number, share_number=None,
readonly=False):
@ -1614,7 +1731,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
d.addCallback(_then)
d.addCallback(lambda c:
self.shouldFail(UploadUnhappinessError, "test_query_counting",
"2 placed none (of which 2 placed none due to "
"4 placed none (of which 4 placed none due to "
"the server being full",
c.upload, upload.Data("data" * 10000,
convergence="")))
@ -1862,6 +1979,33 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
self.failUnless(self._has_happy_share_distribution()))
return d
def test_problem_layout_ticket_1118(self):
# #1118 includes a report from a user who hit an assertion in
# the upload code with this layout.
# Note that 'servers of happiness' lets this test work now
self.basedir = self.mktemp()
d = self._setup_and_upload(k=2, n=4)
# server 0: no shares
# server 1: shares 0, 3
# server 3: share 1
# server 2: share 2
# The order that they get queries is 0, 1, 3, 2
def _setup(ign):
self._add_server(server_number=0)
self._add_server_with_share(server_number=1, share_number=0)
self._add_server_with_share(server_number=2, share_number=2)
self._add_server_with_share(server_number=3, share_number=1)
# Copy shares
self._copy_share_to_server(3, 1)
self.delete_all_shares(self.get_serverdir(0))
client = self.g.clients[0]
client.encoding_params['happy'] = 4
return client
d.addCallback(_setup)
return d
def test_problem_layout_ticket_1128(self):
# #1118 includes a report from a user who hit an assertion in
# the upload code with this layout.

View File

@ -2,13 +2,42 @@
import time
from foolscap.api import eventually, fireEventually
from twisted.internet import defer, reactor
from twisted.internet import defer, reactor, error
from twisted.python.failure import Failure
from allmydata.util import log
from allmydata.util.assertutil import _assert
from allmydata.util.pollmixin import PollMixin
class TimeoutError(Exception):
pass
def timeout_call(reactor, d, timeout):
"""
This returns the result of 'd', unless 'timeout' expires before
'd' is completed in which case a TimeoutError is raised.
"""
timer_d = defer.Deferred()
def _timed_out():
timer_d.errback(Failure(TimeoutError()))
def _got_result(x):
try:
timer.cancel()
timer_d.callback(x)
except error.AlreadyCalled, defer.AlreadyCalledError:
pass
return None
timer = reactor.callLater(timeout, _timed_out)
d.addBoth(_got_result)
return timer_d
# utility wrapper for DeferredList
def _check_deferred_list(results):
# if any of the component Deferreds failed, return the first failure such