diff --git a/docs/specifications/servers-of-happiness.rst b/docs/specifications/servers-of-happiness.rst index a44cc4979..a9d7041d4 100644 --- a/docs/specifications/servers-of-happiness.rst +++ b/docs/specifications/servers-of-happiness.rst @@ -120,8 +120,6 @@ We calculate share placement like so: shares, where an edge exists between an arbitrary readonly server S and an arbitrary share T if and only if S currently holds T. -^--- all passed in to the Happiness_Upload ctor - 3. Calculate a maximum matching graph of G1 (a set of S->T edges that has or is-tied-for the highest "happiness score"). There is a clever efficient algorithm for this, named "Ford-Fulkerson". There may be more than one @@ -130,8 +128,6 @@ We calculate share placement like so: maps shares to servers, where each share appears at most once, and each server appears at most once. -^-- is this the "readonly_mappings" - 4. Construct a bipartite graph G2 of readwrite servers to pre-existing shares. Then remove any edge (from G2) that uses a server or a share found in M1. Let an edge exist between server S and share T if and only if S diff --git a/integration/test_hypothesis_happiness.py b/integration/test_hypothesis_happiness.py deleted file mode 100644 index 5f0f2ffab..000000000 --- a/integration/test_hypothesis_happiness.py +++ /dev/null @@ -1,57 +0,0 @@ -# -*- coding: utf-8 -*- - -from twisted.trial import unittest -from hypothesis import given -from hypothesis.strategies import text, sets -from allmydata.immutable import happiness_upload - - -@given( - sets(elements=text(min_size=1), min_size=4, max_size=4), - sets(elements=text(min_size=1), min_size=4), -) -def test_hypothesis_unhappy(peers, shares): - """ - similar to test_unhappy we test that the resulting happiness is - always 4 since the size of peers is 4. - """ - # https://hypothesis.readthedocs.io/en/latest/data.html#hypothesis.strategies.sets - # hypothesis.strategies.sets(elements=None, min_size=None, average_size=None, max_size=None)[source] - readonly_peers = set() - peers_to_shares = {} - places = happiness_upload.share_placement(peers, readonly_peers, shares, peers_to_shares) - happiness = happiness_upload.calculate_happiness(places) - assert set(places.keys()) == shares - assert happiness == 4 - - -@given( - sets(elements=text(min_size=1), min_size=1, max_size=10), - # can we make a readonly_peers that's a subset of ^ - sets(elements=text(min_size=1), min_size=1, max_size=20), -) -def test_more_hypothesis(peers, shares): - """ - similar to test_unhappy we test that the resulting happiness is - always either the number of peers or the number of shares - whichever is smaller. - """ - # https://hypothesis.readthedocs.io/en/latest/data.html#hypothesis.strategies.sets - # hypothesis.strategies.sets(elements=None, min_size=None, average_size=None, max_size=None)[source] - # XXX would be nice to paramaterize these by hypothesis too - readonly_peers = set() - peers_to_shares = {} - - places = happiness_upload.share_placement(peers, readonly_peers, set(list(shares)), peers_to_shares) - happiness = happiness_upload.calculate_happiness(places) - - # every share should get placed - assert set(places.keys()) == shares - - # we should only use peers that exist - assert set(places.values()).issubset(peers) - - # if we have more shares than peers, happiness is at most # of - # peers; if we have fewer shares than peers happiness is capped at - # # of peers. - assert happiness == min(len(peers), len(shares)) diff --git a/integration/test_servers_of_happiness.py b/integration/test_servers_of_happiness.py new file mode 100644 index 000000000..34c4c8b28 --- /dev/null +++ b/integration/test_servers_of_happiness.py @@ -0,0 +1,53 @@ +import sys +import time +import shutil +from os import mkdir, unlink, listdir +from os.path import join, exists + +from twisted.internet import defer, reactor, task +from twisted.internet.error import ProcessTerminated + +import util + +import pytest + + +@pytest.inlineCallbacks +def test_upload_immutable(reactor, temp_dir, introducer_furl, flog_gatherer, storage_nodes, request): + + # hmm, for some reason this still gets storage enabled ... + process = yield util._create_node( + reactor, request, temp_dir, introducer_furl, flog_gatherer, "edna", + web_port="tcp:9983:interface=localhost", + storage=False, + needed=3, + happy=10, + total=10, + ) + + + node_dir = join(temp_dir, 'edna') + + print("waiting 5 seconds unil we're maybe ready") + yield task.deferLater(reactor, 5, lambda: None) + + # upload a file, which should fail because we have don't have 7 + # storage servers (but happiness is set to 7) + proto = util._CollectOutputProtocol() + transport = reactor.spawnProcess( + proto, + sys.executable, + [ + sys.executable, '-m', 'allmydata.scripts.runner', + '-d', node_dir, + 'put', __file__, + ] + ) + try: + yield proto.done + assert False, "should raise exception" + except Exception as e: + assert isinstance(e, ProcessTerminated) + + output = proto.output.getvalue() + assert "shares could be placed on only" in output diff --git a/integration/util.py b/integration/util.py index 9a5452b57..7f5843b1f 100644 --- a/integration/util.py +++ b/integration/util.py @@ -132,7 +132,12 @@ def _run_node(reactor, node_dir, request, magic_text): return protocol.magic_seen -def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, name, web_port, storage=True, magic_text=None): +def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, name, web_port, + storage=True, + magic_text=None, + needed=2, + happy=3, + total=4): """ Helper to create a single node, run it and return the instance spawnProcess returned (ITransport) @@ -161,10 +166,11 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam sys.executable, args, ) - pytest.blockon(done_proto.done) + created_d = done_proto.done - with open(join(node_dir, 'tahoe.cfg'), 'w') as f: - f.write(''' + def created(_): + with open(join(node_dir, 'tahoe.cfg'), 'w') as f: + f.write(''' [node] nickname = %(name)s web.port = %(web_port)s @@ -174,18 +180,28 @@ log_gatherer.furl = %(log_furl)s [client] # Which services should this client connect to? introducer.furl = %(furl)s -shares.needed = 2 -shares.happy = 3 -shares.total = 4 +shares.needed = %(needed)d +shares.happy = %(happy)d +shares.total = %(total)d ''' % { 'name': name, 'furl': introducer_furl, 'web_port': web_port, 'log_furl': flog_gatherer, + 'needed': needed, + 'happy': happy, + 'total': total, }) + created_d.addCallback(created) + else: + created_d = defer.succeed(None) - return _run_node(reactor, node_dir, request, magic_text) + d = Deferred() + d.callback(None) + d.addCallback(lambda _: created_d) + d.addCallback(lambda _: _run_node(reactor, node_dir, request, magic_text)) + return d def await_file_contents(path, contents, timeout=15): diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 0e7456bab..8af61c7d9 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -655,6 +655,6 @@ class Client(node.Node, pollmixin.PollMixin): return self.nodemaker.create_mutable_file(contents, keysize, version=version) - def upload(self, uploadable): + def upload(self, uploadable, reactor=None): uploader = self.getServiceNamed("uploader") - return uploader.upload(uploadable) + return uploader.upload(uploadable, reactor=reactor) diff --git a/src/allmydata/control.py b/src/allmydata/control.py index 69ac9a62e..568ebeaf5 100644 --- a/src/allmydata/control.py +++ b/src/allmydata/control.py @@ -72,6 +72,7 @@ class ControlServer(Referenceable, service.Service): f.close() uploader = self.parent.getServiceNamed("uploader") u = upload.FileName(filename, convergence=convergence) + # XXX should pass reactor arg d = uploader.upload(u) d.addCallback(lambda results: results.get_uri()) def _done(uri): diff --git a/src/allmydata/dirnode.py b/src/allmydata/dirnode.py index a215eea34..63b0ad760 100644 --- a/src/allmydata/dirnode.py +++ b/src/allmydata/dirnode.py @@ -599,6 +599,7 @@ class DirectoryNode(object): name = normalize(namex) if self.is_readonly(): return defer.fail(NotWriteableError()) + # XXX should pass reactor arg d = self._uploader.upload(uploadable, progress=progress) d.addCallback(lambda results: self._create_and_validate_node(results.get_uri(), None, diff --git a/src/allmydata/immutable/checker.py b/src/allmydata/immutable/checker.py index 317d4af90..4e1c5d012 100644 --- a/src/allmydata/immutable/checker.py +++ b/src/allmydata/immutable/checker.py @@ -724,12 +724,16 @@ class Checker(log.PrefixingLogMixin): def _check_server_shares(self, s): """Return a deferred which eventually fires with a tuple of - (set(sharenum), server, set(), set(), responded) showing all the - shares claimed to be served by this server. In case the server is - disconnected then it fires with (set(), server, set(), set(), False) - (a server disconnecting when we ask it for buckets is the same, for - our purposes, as a server that says it has none, except that we want - to track and report whether or not each server responded.)""" + (set(sharenum), server, set(corrupt), set(incompatible), + responded) showing all the shares claimed to be served by this + server. In case the server is disconnected then it fires with + (set(), server, set(), set(), False) (a server disconnecting + when we ask it for buckets is the same, for our purposes, as a + server that says it has none, except that we want to track and + report whether or not each server responded.) + + see also _verify_server_shares() + """ def _curry_empty_corrupted(res): buckets, responded = res return (set(buckets), s, set(), set(), responded) diff --git a/src/allmydata/immutable/downloader/finder.py b/src/allmydata/immutable/downloader/finder.py index 2aa4f857d..8bcdca76f 100644 --- a/src/allmydata/immutable/downloader/finder.py +++ b/src/allmydata/immutable/downloader/finder.py @@ -63,7 +63,6 @@ class ShareFinder: if not self._started: si = self.verifycap.storage_index servers = self._storage_broker.get_servers_for_psi(si) - servers.sort(key=lambda s: s.get_serverid()) self._servers = iter(servers) self._started = True diff --git a/src/allmydata/immutable/downloader/share.py b/src/allmydata/immutable/downloader/share.py index ae94af95e..30cddb798 100644 --- a/src/allmydata/immutable/downloader/share.py +++ b/src/allmydata/immutable/downloader/share.py @@ -18,9 +18,12 @@ from common import COMPLETE, CORRUPT, DEAD, BADSEGNUM class LayoutInvalid(Exception): pass + + class DataUnavailable(Exception): pass + class Share: """I represent a single instance of a single share (e.g. I reference the shnum2 for share SI=abcde on server xy12t, not the one on server ab45q). diff --git a/src/allmydata/immutable/encode.py b/src/allmydata/immutable/encode.py index efc7ac3f6..5aec415da 100644 --- a/src/allmydata/immutable/encode.py +++ b/src/allmydata/immutable/encode.py @@ -122,7 +122,7 @@ class Encoder(object): assert not self._codec k, happy, n, segsize = params self.required_shares = k - self.servers_of_happiness = happy + self.min_happiness = happy self.num_shares = n self.segment_size = segsize self.log("got encoding parameters: %d/%d/%d %d" % (k,happy,n, segsize)) @@ -180,7 +180,7 @@ class Encoder(object): if name == "storage_index": return self._storage_index elif name == "share_counts": - return (self.required_shares, self.servers_of_happiness, + return (self.required_shares, self.min_happiness, self.num_shares) elif name == "num_segments": return self.num_segments @@ -503,17 +503,17 @@ class Encoder(object): self.log("they weren't in our list of landlords", parent=ln, level=log.WEIRD, umid="TQGFRw") happiness = happinessutil.servers_of_happiness(self.servermap) - if happiness < self.servers_of_happiness: + if happiness < self.min_happiness: peerids = set(happinessutil.shares_by_server(self.servermap).keys()) msg = happinessutil.failure_message(len(peerids), self.required_shares, - self.servers_of_happiness, + self.min_happiness, happiness) msg = "%s: %s" % (msg, why) raise UploadUnhappinessError(msg) self.log("but we can still continue with %s shares, we'll be happy " "with at least %s" % (happiness, - self.servers_of_happiness), + self.min_happiness), parent=ln) def _gather_responses(self, dl): diff --git a/src/allmydata/immutable/happiness_upload.py b/src/allmydata/immutable/happiness_upload.py index cf4e7ec29..49f701a5a 100644 --- a/src/allmydata/immutable/happiness_upload.py +++ b/src/allmydata/immutable/happiness_upload.py @@ -318,10 +318,10 @@ def _flow_network(peerIndices, shareIndices): def share_placement(peers, readonly_peers, shares, peers_to_shares): """ Generates the allocations the upload should based on the given - information. We construct a dictionary of 'share_num' -> set(server_ids) - and return it to the caller. Each share should be placed on each server - in the corresponding set. Existing allocations appear as placements - because attempting to place an existing allocation will renew the share. + information. We construct a dictionary of 'share_num' -> + 'server_id' and return it to the caller. Existing allocations + appear as placements because attempting to place an existing + allocation will renew the share. For more information on the algorithm this class implements, refer to docs/specifications/servers-of-happiness.rst diff --git a/src/allmydata/immutable/repairer.py b/src/allmydata/immutable/repairer.py index 97fc9df1b..1d3782d10 100644 --- a/src/allmydata/immutable/repairer.py +++ b/src/allmydata/immutable/repairer.py @@ -61,6 +61,7 @@ class Repairer(log.PrefixingLogMixin): # (http://tahoe-lafs.org/trac/tahoe-lafs/ticket/1212) happy = 0 self._encodingparams = (k, happy, N, segsize) + # XXX should pass a reactor to this ul = upload.CHKUploader(self._storage_broker, self._secret_holder) return ul.start(self) # I am the IEncryptedUploadable d.addCallback(_got_segsize) diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index f554401e1..cef226a8e 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -9,6 +9,7 @@ from allmydata.util.hashutil import file_renewal_secret_hash, \ file_cancel_secret_hash, bucket_renewal_secret_hash, \ bucket_cancel_secret_hash, plaintext_hasher, \ storage_index_hash, plaintext_segment_hasher, convergence_hasher +from allmydata.util.deferredutil import timeout_call from allmydata import hashtree, uri from allmydata.storage.server import si_b2a from allmydata.immutable import encode @@ -117,7 +118,7 @@ EXTENSION_SIZE = 1000 def pretty_print_shnum_to_servers(s): return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ]) -class ServerTracker: +class ServerTracker(object): def __init__(self, server, sharesize, blocksize, num_segments, num_share_hashes, storage_index, @@ -202,46 +203,39 @@ def str_shareloc(shnum, bucketwriter): return "%s: %s" % (shnum, bucketwriter.get_servername(),) -class PeerSelector(): - implements(IPeerSelector) +@implementer(IPeerSelector) +class PeerSelector(object): - def __init__(self, num_segments, total_shares, needed_shares, servers_of_happiness): + def __init__(self, num_segments, total_shares, needed_shares, min_happiness): self.num_segments = num_segments self.total_shares = total_shares self.needed_shares = needed_shares - self.min_happiness = servers_of_happiness + self.min_happiness = min_happiness self.existing_shares = {} - self.confirmed_allocations = {} self.peers = set() - self.full_peers = set() + self.readonly_peers = set() self.bad_peers = set() def add_peer_with_share(self, peerid, shnum): - if peerid in self.existing_shares.keys(): + try: self.existing_shares[peerid].add(shnum) - else: + except KeyError: self.existing_shares[peerid] = set([shnum]) - def confirm_share_allocation(self, shnum, peer): - self.confirmed_allocations.setdefault(shnum, set()).add(peer) - - def get_allocations(self): - return self.confirmed_allocations - def add_peer(self, peerid): self.peers.add(peerid) - def mark_full_peer(self, peerid): - self.full_peers.add(peerid) + def mark_readonly_peer(self, peerid): + self.readonly_peers.add(peerid) self.peers.remove(peerid) def mark_bad_peer(self, peerid): if peerid in self.peers: self.peers.remove(peerid) self.bad_peers.add(peerid) - elif peerid in self.full_peers: - self.full_peers.remove(peerid) + elif peerid in self.readonly_peers: + self.readonly_peers.remove(peerid) self.bad_peers.add(peerid) def get_sharemap_of_preexisting_shares(self): @@ -251,40 +245,100 @@ class PeerSelector(): preexisting.add(share, server) return preexisting - def get_tasks(self): + def get_share_placements(self): shares = set(range(self.total_shares)) - self.happiness_mappings = share_placement(self.peers, self.full_peers, shares, self.existing_shares) + self.happiness_mappings = share_placement(self.peers, self.readonly_peers, shares, self.existing_shares) self.happiness = calculate_happiness(self.happiness_mappings) return self.happiness_mappings - def is_healthy(self): - return self.min_happiness <= self.happiness + +class _QueryStatistics(object): + + def __init__(self): + self.total = 0 + self.good = 0 + self.bad = 0 + self.full = 0 + self.error = 0 + self.contacted = 0 + + def __str__(self): + return "QueryStatistics(total={} good={} bad={} full={} " \ + "error={} contacted={})".format( + self.total, + self.good, + self.bad, + self.full, + self.error, + self.contacted, + ) class Tahoe2ServerSelector(log.PrefixingLogMixin): - peer_selector_class = PeerSelector - - def __init__(self, upload_id, logparent=None, upload_status=None): + def __init__(self, upload_id, logparent=None, upload_status=None, reactor=None): self.upload_id = upload_id - self.query_count, self.good_query_count, self.bad_query_count = 0,0,0 - # Servers that are working normally, but full. - self.full_count = 0 - self.error_count = 0 - self.num_servers_contacted = 0 + self._query_stats = _QueryStatistics() self.last_failure_msg = None self._status = IUploadStatus(upload_status) log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id) self.log("starting", level=log.OPERATIONAL) - + if reactor is None: + from twisted.internet import reactor + self._reactor = reactor def __repr__(self): return "" % self.upload_id + def _create_trackers(self, candidate_servers, allocated_size, + file_renewal_secret, file_cancel_secret, create_server_tracker): + + # filter the list of servers according to which ones can accomodate + # this request. This excludes older servers (which used a 4-byte size + # field) from getting large shares (for files larger than about + # 12GiB). See #439 for details. + def _get_maxsize(server): + v0 = server.get_rref().version + v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"] + return v1["maximum-immutable-share-size"] + + for server in candidate_servers: + self.peer_selector.add_peer(server.get_serverid()) + writeable_servers = [ + server for server in candidate_servers + if _get_maxsize(server) >= allocated_size + ] + readonly_servers = set(candidate_servers) - set(writeable_servers) + + for server in readonly_servers: + self.peer_selector.mark_readonly_peer(server.get_serverid()) + + def _make_trackers(servers): + trackers = [] + for s in servers: + seed = s.get_lease_seed() + renew = bucket_renewal_secret_hash(file_renewal_secret, seed) + cancel = bucket_cancel_secret_hash(file_cancel_secret, seed) + st = create_server_tracker(s, renew, cancel) + trackers.append(st) + return trackers + + write_trackers = _make_trackers(writeable_servers) + + # We don't try to allocate shares to these servers, since they've + # said that they're incapable of storing shares of the size that we'd + # want to store. We ask them about existing shares for this storage + # index, which we want to know about for accurate + # servers_of_happiness accounting, then we forget about them. + readonly_trackers = _make_trackers(readonly_servers) + + return readonly_trackers, write_trackers + + @defer.inlineCallbacks def get_shareholders(self, storage_broker, secret_holder, storage_index, share_size, block_size, num_segments, total_shares, needed_shares, - servers_of_happiness): + min_happiness): """ @return: (upload_trackers, already_serverids), where upload_trackers is a set of ServerTracker instances that have agreed to hold @@ -294,14 +348,17 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): already have the share. """ + # re-initialize statistics + self._query_status = _QueryStatistics() + if self._status: self._status.set_status("Contacting Servers..") - self.peer_selector = self.peer_selector_class(num_segments, total_shares, - needed_shares, servers_of_happiness) + self.peer_selector = PeerSelector(num_segments, total_shares, + needed_shares, min_happiness) self.total_shares = total_shares - self.servers_of_happiness = servers_of_happiness + self.min_happiness = min_happiness self.needed_shares = needed_shares self.homeless_shares = set(range(total_shares)) @@ -326,6 +383,17 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): num_share_hashes, EXTENSION_SIZE) allocated_size = wbp.get_allocated_size() + # decide upon the renewal/cancel secrets, to include them in the + # allocate_buckets query. + file_renewal_secret = file_renewal_secret_hash( + secret_holder.get_renewal_secret(), + storage_index, + ) + file_cancel_secret = file_cancel_secret_hash( + secret_holder.get_cancel_secret(), + storage_index, + ) + # see docs/specifications/servers-of-happiness.rst # 0. Start with an ordered list of servers. Maybe *2N* of them. # @@ -334,108 +402,186 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): if not all_servers: raise NoServersError("client gave us zero servers") - # filter the list of servers according to which ones can accomodate - # this request. This excludes older servers (which used a 4-byte size - # field) from getting large shares (for files larger than about - # 12GiB). See #439 for details. - def _get_maxsize(server): - v0 = server.get_rref().version - v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"] - return v1["maximum-immutable-share-size"] + def _create_server_tracker(server, renew, cancel): + return ServerTracker( + server, share_size, block_size, num_segments, num_share_hashes, + storage_index, renew, cancel, + ) - candidate_servers = all_servers[:2*total_shares] - for server in candidate_servers: - self.peer_selector.add_peer(server.get_serverid()) - writeable_servers = [server for server in candidate_servers - if _get_maxsize(server) >= allocated_size] - readonly_servers = set(candidate_servers) - set(writeable_servers) - for server in readonly_servers: - self.peer_selector.mark_full_peer(server.get_serverid()) - - # decide upon the renewal/cancel secrets, to include them in the - # allocate_buckets query. - client_renewal_secret = secret_holder.get_renewal_secret() - client_cancel_secret = secret_holder.get_cancel_secret() - - file_renewal_secret = file_renewal_secret_hash(client_renewal_secret, - storage_index) - file_cancel_secret = file_cancel_secret_hash(client_cancel_secret, - storage_index) - def _make_trackers(servers): - trackers = [] - for s in servers: - seed = s.get_lease_seed() - renew = bucket_renewal_secret_hash(file_renewal_secret, seed) - cancel = bucket_cancel_secret_hash(file_cancel_secret, seed) - st = ServerTracker(s, - share_size, block_size, - num_segments, num_share_hashes, - storage_index, - renew, cancel) - trackers.append(st) - return trackers - - # We assign each servers/trackers into one three lists. They all - # start in the "first pass" list. During the first pass, as we ask - # each one to hold a share, we move their tracker to the "second - # pass" list, until the first-pass list is empty. Then during the - # second pass, as we ask each to hold more shares, we move their - # tracker to the "next pass" list, until the second-pass list is - # empty. Then we move everybody from the next-pass list back to the - # second-pass list and repeat the "second" pass (really the third, - # fourth, etc pass), until all shares are assigned, or we've run out - # of potential servers. - write_trackers = _make_trackers(writeable_servers) - - # We don't try to allocate shares to these servers, since they've - # said that they're incapable of storing shares of the size that we'd - # want to store. We ask them about existing shares for this storage - # index, which we want to know about for accurate - # servers_of_happiness accounting, then we forget about them. - readonly_trackers = _make_trackers(readonly_servers) + readonly_trackers, write_trackers = self._create_trackers( + all_servers[:(2 * total_shares)], + allocated_size, + file_renewal_secret, + file_cancel_secret, + _create_server_tracker, + ) # see docs/specifications/servers-of-happiness.rst # 1. Query all servers for existing shares. # + # The spec doesn't say what to do for timeouts/errors. This + # adds a timeout to each request, and rejects any that reply + # with error (i.e. just removed from the list) - # We now ask servers that can't hold any new shares about existing - # shares that they might have for our SI. Once this is done, we - # start placing the shares that we haven't already accounted - # for. ds = [] if self._status and readonly_trackers: - self._status.set_status("Contacting readonly servers to find " - "any existing shares") + self._status.set_status( + "Contacting readonly servers to find any existing shares" + ) + + # in the "pre servers-of-happiness" code, it was a little + # ambigious whether "merely asking" counted as a "query" or + # not, because "allocate_buckets" with nothing to allocate was + # used to "ask" a write-able server what it held. Now we count + # "actual allocation queries" only, because those are the only + # things that actually affect what the server does. + for tracker in readonly_trackers: assert isinstance(tracker, ServerTracker) - d = tracker.ask_about_existing_shares() + d = timeout_call(self._reactor, tracker.ask_about_existing_shares(), 15) d.addBoth(self._handle_existing_response, tracker) ds.append(d) - self.num_servers_contacted += 1 - self.query_count += 1 self.log("asking server %s for any existing shares" % (tracker.get_name(),), level=log.NOISY) for tracker in write_trackers: assert isinstance(tracker, ServerTracker) - d = tracker.query(set()) + d = timeout_call(self._reactor, tracker.ask_about_existing_shares(), 15) + + def timed_out(f, tracker): + # print("TIMEOUT {}: {}".format(tracker, f)) + write_trackers.remove(tracker) + readonly_trackers.append(tracker) + return f + d.addErrback(timed_out, tracker) d.addBoth(self._handle_existing_write_response, tracker, set()) ds.append(d) - self.num_servers_contacted += 1 - self.query_count += 1 self.log("asking server %s for any existing shares" % (tracker.get_name(),), level=log.NOISY) - self.trackers = write_trackers + readonly_trackers + trackers = set(write_trackers) | set(readonly_trackers) - dl = defer.DeferredList(ds) - dl.addCallback(lambda ign: self._calculate_tasks()) - dl.addCallback(lambda ign: self._request_another_allocation()) - return dl + # these will always be (True, None) because errors are handled + # in the _handle_existing_write_response etc callbacks + yield defer.DeferredList(ds) + # okay, we've queried the 2N servers, time to get the share + # placements and attempt to actually place the shares (or + # renew them on read-only servers). We want to run the loop + # below *at least once* because even read-only servers won't + # renew their shares until "allocate_buckets" is called (via + # tracker.query()) - def _calculate_tasks(self): - self.tasks = self.peer_selector.get_tasks() + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/778#comment:48 + # min_happiness will be 0 for the repairer, so we set current + # effective_happiness to less than zero so this loop runs at + # least once for the repairer... + + def _bad_server(fail, tracker): + self.last_failure_msg = fail + return False # will mark it readonly + + def _make_readonly(tracker): + # print("making {} read-only".format(tracker.get_serverid())) + try: + write_trackers.remove(tracker) + except ValueError: + pass + # XXX can we just use a set() or does order matter? + if tracker not in readonly_trackers: + readonly_trackers.append(tracker) + return None + + # so we *always* want to run this loop at least once, even if + # we only have read-only servers -- because asking them to + # allocate buckets renews those shares they already have. For + # subsequent loops, we give up if we've achieved happiness OR + # if we have zero writable servers left + + last_happiness = None + effective_happiness = -1 + while effective_happiness < min_happiness and \ + (last_happiness is None or len(write_trackers)): + errors_before = self._query_stats.bad + self._share_placements = self.peer_selector.get_share_placements() + + placements = [] + for tracker in trackers: + shares_to_ask = self._allocation_for(tracker) + + # if we already tried to upload share X to this very + # same server in a previous iteration, we should *not* + # ask again. If we *do* ask, there's no real harm, but + # the server will respond with an empty dict and that + # confuses our statistics. However, if the server is a + # readonly sever, we *do* want to ask so it refreshes + # the share. + if shares_to_ask != set(tracker.buckets.keys()) or tracker in readonly_trackers: + self._query_stats.total += 1 + self._query_stats.contacted += 1 + d = timeout_call(self._reactor, tracker.query(shares_to_ask), 15) + d.addBoth(self._buckets_allocated, tracker, shares_to_ask) + d.addErrback(lambda f, tr: _bad_server(f, tr), tracker) + d.addCallback(lambda x, tr: _make_readonly(tr) if not x else x, tracker) + placements.append(d) + + yield defer.DeferredList(placements) + merged = merge_servers(self.peer_selector.get_sharemap_of_preexisting_shares(), self.use_trackers) + effective_happiness = servers_of_happiness(merged) + if effective_happiness == last_happiness: + # print("effective happiness still {}".format(last_happiness)) + # we haven't improved over the last iteration; give up + break; + if errors_before == self._query_stats.bad: + if False: print("no more errors; break") + break; + last_happiness = effective_happiness + # print("write trackers left: {}".format(len(write_trackers))) + + # note: peer_selector.get_allocations() only maps "things we + # uploaded in the above loop" and specificaly does *not* + # include any pre-existing shares on read-only servers .. but + # we *do* want to count those shares towards total happiness. + + # no more servers. If we haven't placed enough shares, we fail. + # XXX note sometimes we're not running the loop at least once, + # and so 'merged' must be (re-)computed here. + merged = merge_servers(self.peer_selector.get_sharemap_of_preexisting_shares(), self.use_trackers) + effective_happiness = servers_of_happiness(merged) + + # print("placements completed {} vs {}".format(effective_happiness, min_happiness)) + # for k, v in merged.items(): + # print(" {} -> {}".format(k, v)) + + if effective_happiness < min_happiness: + msg = failure_message( + peer_count=len(self.serverids_with_shares), + k=self.needed_shares, + happy=min_happiness, + effective_happy=effective_happiness, + ) + msg = ("server selection failed for %s: %s (%s), merged=%s" % + (self, msg, self._get_progress_message(), + pretty_print_shnum_to_servers(merged))) + if self.last_failure_msg: + msg += " (%s)" % (self.last_failure_msg,) + self.log(msg, level=log.UNUSUAL) + self._failed(msg) # raises UploadUnhappinessError + return + + # we placed (or already had) enough to be happy, so we're done + if self._status: + self._status.set_status("Placed all shares") + msg = ("server selection successful for %s: %s: pretty_print_merged: %s, " + "self.use_trackers: %s, self.preexisting_shares: %s") \ + % (self, self._get_progress_message(), + pretty_print_shnum_to_servers(merged), + [', '.join([str_shareloc(k,v) + for k,v in st.buckets.iteritems()]) + for st in self.use_trackers], + pretty_print_shnum_to_servers(self.preexisting_shares)) + self.log(msg, level=log.OPERATIONAL) + defer.returnValue((self.use_trackers, self.peer_selector.get_sharemap_of_preexisting_shares())) def _handle_existing_response(self, res, tracker): """ @@ -447,8 +593,6 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): self.log("%s got error during existing shares check: %s" % (tracker.get_name(), res), level=log.UNUSUAL) self.peer_selector.mark_bad_peer(serverid) - self.error_count += 1 - self.bad_query_count += 1 else: buckets = res if buckets: @@ -471,15 +615,12 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): self.log("%s got error during server selection: %s" % (tracker, res), level=log.UNUSUAL) self.homeless_shares |= shares_to_ask - msg = ("last failure (from %s) was: %s" % (tracker, res)) self.last_failure_msg = msg else: - (alreadygot, allocated) = res - for share in alreadygot: + for share in res.keys(): self.peer_selector.add_peer_with_share(tracker.get_serverid(), share) - def _get_progress_message(self): if not self.homeless_shares: msg = "placed all %d shares, " % (self.total_shares) @@ -488,36 +629,34 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): (self.total_shares - len(self.homeless_shares), self.total_shares, len(self.homeless_shares))) - return (msg + "want to place shares on at least %d servers such that " - "any %d of them have enough shares to recover the file, " - "sent %d queries to %d servers, " - "%d queries placed some shares, %d placed none " - "(of which %d placed none due to the server being" - " full and %d placed none due to an error)" % - (self.servers_of_happiness, self.needed_shares, - self.query_count, self.num_servers_contacted, - self.good_query_count, self.bad_query_count, - self.full_count, self.error_count)) + assert self._query_stats.bad == (self._query_stats.full + self._query_stats.error) + return ( + msg + "want to place shares on at least {happy} servers such that " + "any {needed} of them have enough shares to recover the file, " + "sent {queries} queries to {servers} servers, " + "{good} queries placed some shares, {bad} placed none " + "(of which {full} placed none due to the server being" + " full and {error} placed none due to an error)".format( + happy=self.min_happiness, + needed=self.needed_shares, + queries=self._query_stats.total, + servers=self._query_stats.contacted, + good=self._query_stats.good, + bad=self._query_stats.bad, + full=self._query_stats.full, + error=self._query_stats.error + ) + ) - def _get_next_allocation(self): + def _allocation_for(self, tracker): """ - Return the next share allocation that we need to make. - - Specifically, I return a tuple (tracker, shares_to_ask), where - tracker is a ServerTracker instance and shares_to_ask is a set of - shares that we should store on that server. If there are no more - allocations to make, I return None. + Given a ServerTracker, return a list of shares that we should + store on that server. """ - - if len(self.trackers) == 0: - return None - - tracker = self.trackers.pop(0) - # TODO: don't pre-convert all serverids to ServerTrackers assert isinstance(tracker, ServerTracker) shares_to_ask = set() - servermap = self.tasks + servermap = self._share_placements for shnum, tracker_id in servermap.items(): if tracker_id == None: continue @@ -531,81 +670,27 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): " %d shares left.." % (tracker.get_name(), len(self.homeless_shares))) - return (tracker, shares_to_ask) + return shares_to_ask - - def _request_another_allocation(self): + def _buckets_allocated(self, res, tracker, shares_to_ask): """ - see docs/specifications/servers-of-happiness.rst - 10. If any placements from step 9 fail, mark the server as read-only. Go back - to step 2 (since we may discover a server is/has-become read-only, or has - failed, during step 9). + Internal helper. If this returns an error or False, the server + will be considered read-only for any future iterations. """ - allocation = self._get_next_allocation() - if allocation is not None: - tracker, shares_to_ask = allocation - - # see docs/specifications/servers-of-happiness.rst - # 8. Renew the shares on their respective servers from M1 and M2. - d = tracker.query(shares_to_ask) - - d.addBoth(self._got_response, tracker, shares_to_ask) - return d - - else: - # no more servers. If we haven't placed enough shares, we fail. - merged = merge_servers(self.peer_selector.get_sharemap_of_preexisting_shares(), self.use_trackers) - effective_happiness = servers_of_happiness(self.peer_selector.get_allocations()) - if effective_happiness < self.servers_of_happiness: - msg = failure_message( - peer_count=len(self.serverids_with_shares), - k=self.needed_shares, - happy=self.servers_of_happiness, - effective_happy=effective_happiness, - ) - msg = ("server selection failed for %s: %s (%s), merged=%s" % - (self, msg, self._get_progress_message(), - pretty_print_shnum_to_servers(merged))) - if self.last_failure_msg: - msg += " (%s)" % (self.last_failure_msg,) - self.log(msg, level=log.UNUSUAL) - return self._failed(msg) - else: - # we placed enough to be happy, so we're done - if self._status: - self._status.set_status("Placed all shares") - msg = ("server selection successful for %s: %s: pretty_print_merged: %s, " - "self.use_trackers: %s, self.preexisting_shares: %s") \ - % (self, self._get_progress_message(), - pretty_print_shnum_to_servers(merged), - [', '.join([str_shareloc(k,v) - for k,v in st.buckets.iteritems()]) - for st in self.use_trackers], - pretty_print_shnum_to_servers(self.preexisting_shares)) - self.log(msg, level=log.OPERATIONAL) - return (self.use_trackers, self.peer_selector.get_sharemap_of_preexisting_shares()) - - - def _got_response(self, res, tracker, shares_to_ask): if isinstance(res, failure.Failure): # This is unusual, and probably indicates a bug or a network # problem. self.log("%s got error during server selection: %s" % (tracker, res), level=log.UNUSUAL) - self.error_count += 1 - self.bad_query_count += 1 + self._query_stats.error += 1 + self._query_stats.bad += 1 self.homeless_shares |= shares_to_ask - if (self.trackers): - # there is still hope, so just loop + try: + self.peer_selector.mark_readonly_peer(tracker.get_serverid()) + except KeyError: pass - else: - # No more servers, so this upload might fail (it depends upon - # whether we've hit servers_of_happiness or not). Log the last - # failure we got: if a coding error causes all servers to fail - # in the same way, this allows the common failure to be seen - # by the uploader and should help with debugging - msg = ("last failure (from %s) was: %s" % (tracker, res)) - self.last_failure_msg = msg + return res + else: (alreadygot, allocated) = res self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s" @@ -614,7 +699,6 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): level=log.NOISY) progress = False for s in alreadygot: - self.peer_selector.confirm_share_allocation(s, tracker.get_serverid()) self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid()) if s in self.homeless_shares: self.homeless_shares.remove(s) @@ -627,8 +711,6 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): if allocated: self.use_trackers.add(tracker) progress = True - for s in allocated: - self.peer_selector.confirm_share_allocation(s, tracker.get_serverid()) if allocated or alreadygot: self.serverids_with_shares.add(tracker.get_serverid()) @@ -636,16 +718,6 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): not_yet_present = set(shares_to_ask) - set(alreadygot) still_homeless = not_yet_present - set(allocated) - if progress: - # They accepted at least one of the shares that we asked - # them to accept, or they had a share that we didn't ask - # them to accept but that we hadn't placed yet, so this - # was a productive query - self.good_query_count += 1 - else: - self.bad_query_count += 1 - self.full_count += 1 - if still_homeless: # In networks with lots of space, this is very unusual and # probably indicates an error. In networks with servers that @@ -660,10 +732,19 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): # Since they were unable to accept all of our requests, so it # is safe to assume that asking them again won't help. - - # now loop - return self._request_another_allocation() - + if progress: + # They accepted at least one of the shares that we asked + # them to accept, or they had a share that we didn't ask + # them to accept but that we hadn't placed yet, so this + # was a productive query + self._query_stats.good += 1 + else: + # if we asked for some allocations, but the server + # didn't return any at all (i.e. empty dict) it must + # be full + self._query_stats.full += 1 + self._query_stats.bad += 1 + return progress def _failed(self, msg): """ @@ -955,10 +1036,9 @@ class UploadStatus(object): def set_results(self, value): self.results = value -class CHKUploader: - server_selector_class = Tahoe2ServerSelector +class CHKUploader(object): - def __init__(self, storage_broker, secret_holder, progress=None): + def __init__(self, storage_broker, secret_holder, progress=None, reactor=None): # server_selector needs storage_broker and secret_holder self._storage_broker = storage_broker self._secret_holder = secret_holder @@ -969,6 +1049,7 @@ class CHKUploader: self._upload_status.set_helper(False) self._upload_status.set_active(True) self._progress = progress + self._reactor = reactor # locate_all_shareholders() will create the following attribute: # self._server_trackers = {} # k: shnum, v: instance of ServerTracker @@ -1039,14 +1120,17 @@ class CHKUploader: self._storage_index = storage_index upload_id = si_b2a(storage_index)[:5] self.log("using storage index %s" % upload_id) - server_selector = self.server_selector_class(upload_id, - self._log_number, - self._upload_status) + server_selector = Tahoe2ServerSelector( + upload_id, + self._log_number, + self._upload_status, + reactor=self._reactor, + ) share_size = encoder.get_param("share_size") block_size = encoder.get_param("block_size") num_segments = encoder.get_param("num_segments") - k,desired,n = encoder.get_param("share_counts") + k, desired, n = encoder.get_param("share_counts") self._server_selection_started = time.time() d = server_selector.get_shareholders(storage_broker, secret_holder, @@ -1625,7 +1709,7 @@ class Uploader(service.MultiService, log.PrefixingLogMixin): return (self._helper_furl, bool(self._helper)) - def upload(self, uploadable, progress=None): + def upload(self, uploadable, progress=None, reactor=None): """ Returns a Deferred that will fire with the UploadResults instance. """ @@ -1661,7 +1745,7 @@ class Uploader(service.MultiService, log.PrefixingLogMixin): else: storage_broker = self.parent.get_storage_broker() secret_holder = self.parent._secret_holder - uploader = CHKUploader(storage_broker, secret_holder, progress=progress) + uploader = CHKUploader(storage_broker, secret_holder, progress=progress, reactor=reactor) d2.addCallback(lambda x: uploader.start(eu)) self._all_uploads[uploader] = None diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 7ba5323cd..36c3622d8 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -762,7 +762,7 @@ class IPeerSelector(Interface): potential candidates for storing a file. """ - def mark_full_peer(peerid): + def mark_readonly_peer(peerid): """ Mark the peer peerid as full. This means that any peer-with-share relationships I know about for peerid remain @@ -777,31 +777,10 @@ class IPeerSelector(Interface): with peerid, and will not attempt to assign it any more shares. """ - def get_tasks(): + def get_share_placements(): """ - Return a tuple of tasks to our caller. - - Specifically, return (queries, placements), where queries and - allocations are both lists of things to do. Each query is a - request for our caller to ask a server about the shares it holds - for this upload; the results will be fed back into the - allocator. Each allocation is a request for some share or shares - to be placed on a server. Result may be None, in which case the - selector thinks that the share placement is as reliably or - correctly placed as it can be. - """ - - def is_healthy(): - """ - I return whether the share assignments I'm currently using - reflect a healthy file, based on my internal definitions. - """ - - def needs_recomputation(): - """ - I return True if the share assignments I last returned may have - become stale. This is a hint to the caller that they should call - get_share_assignments again. + Return the share-placement map (a dict) which maps shares to + server-ids """ diff --git a/src/allmydata/nodemaker.py b/src/allmydata/nodemaker.py index 89fa34a6c..72426ae8b 100644 --- a/src/allmydata/nodemaker.py +++ b/src/allmydata/nodemaker.py @@ -142,6 +142,7 @@ class NodeMaker(object): convergence = self.secret_holder.get_convergence_secret() packed = pack_children(children, None, deep_immutable=True) uploadable = Data(packed, convergence) + # XXX should pass reactor arg d = self.uploader.upload(uploadable) d.addCallback(lambda results: self.create_from_cap(None, results.get_uri())) diff --git a/src/allmydata/test/test_checker.py b/src/allmydata/test/test_checker.py index 831b7564f..fccab4fb5 100644 --- a/src/allmydata/test/test_checker.py +++ b/src/allmydata/test/test_checker.py @@ -360,7 +360,7 @@ class BalancingAct(GridTestMixin, unittest.TestCase): shares_chart.setdefault(shnum, []).append(names[serverid]) return shares_chart - def _test_good_share_hosts(self): + def test_good_share_hosts(self): self.basedir = "checker/BalancingAct/1115" self.set_up_grid(num_servers=1) c0 = self.g.clients[0] @@ -388,11 +388,9 @@ class BalancingAct(GridTestMixin, unittest.TestCase): d.addCallback(add_three, i) def _check_and_repair(_): - print("check_and_repair") return self.imm.check_and_repair(Monitor()) def _check_counts(crr, shares_good, good_share_hosts): prr = crr.get_post_repair_results() - print self._pretty_shares_chart(self.uri) self.failUnlessEqual(prr.get_share_counter_good(), shares_good) self.failUnlessEqual(prr.get_host_counter_good_shares(), good_share_hosts) @@ -410,11 +408,16 @@ class BalancingAct(GridTestMixin, unittest.TestCase): d.addCallback(_check_counts, 4, 5) d.addCallback(lambda _: self.delete_shares_numbered(self.uri, [3])) d.addCallback(_check_and_repair) - # XXX this isn't always true, "sometimes" the repairer happens - # to do better and place things so there are 5 happy - # servers. for example PYTHONHASHSEED=3 gets 5 happy whereas - # PYTHONHASHSEED=4 gets 4 happy - d.addCallback(_check_counts, 4, 4) + + # it can happen that our uploader will choose, e.g., to upload + # to servers B, C, D, E .. which will mean that all 5 serves + # now contain our shares (and thus "respond"). + + def _check_happy(crr): + prr = crr.get_post_repair_results() + self.assertTrue(prr.get_host_counter_good_shares() >= 4) + return crr + d.addCallback(_check_happy) d.addCallback(lambda _: all([self.g.break_server(sid) for sid in self.g.get_all_serverids()])) d.addCallback(_check_and_repair) diff --git a/src/allmydata/test/test_happiness.py b/src/allmydata/test/test_happiness.py index 9fd592539..e9ee02b7a 100644 --- a/src/allmydata/test/test_happiness.py +++ b/src/allmydata/test/test_happiness.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- from twisted.trial import unittest +from hypothesis import given +from hypothesis.strategies import text, sets from allmydata.immutable import happiness_upload @@ -25,6 +27,18 @@ class HappinessUtils(unittest.TestCase): self.assertEqual(residual, [[1], [2], [3], []]) self.assertEqual(capacity, [[0, 1, 0, 0], [-1, 0, 1, 0], [0, -1, 0, 1], [0, 0, -1, 0]]) + def test_trivial_maximum_graph(self): + self.assertEqual( + {}, + happiness_upload._compute_maximum_graph([], {}) + ) + + def test_trivial_flow_graph(self): + self.assertEqual( + [], + happiness_upload._servermap_flow_graph(set(), set(), {}) + ) + class Happiness(unittest.TestCase): @@ -40,10 +54,6 @@ class Happiness(unittest.TestCase): places = happiness_upload.share_placement(peers, readonly_peers, shares, peers_to_shares) - if False: - for k, v in places.items(): - print(" {} -> {}".format(k, v)) - self.assertEqual( places, { @@ -124,18 +134,16 @@ class Happiness(unittest.TestCase): self.assertEqual(2, happiness) - # process just gets killed with anything like 200 (see - # test_upload.py) - def no_test_50(self): - peers = set(['peer{}'.format(x) for x in range(50)]) - shares = set(['share{}'.format(x) for x in range(50)]) + def test_100(self): + peers = set(['peer{}'.format(x) for x in range(100)]) + shares = set(['share{}'.format(x) for x in range(100)]) readonly_peers = set() peers_to_shares = dict() places = happiness_upload.share_placement(peers, readonly_peers, shares, peers_to_shares) happiness = happiness_upload.calculate_happiness(places) - self.assertEqual(50, happiness) + self.assertEqual(100, happiness) def test_redistribute(self): """ @@ -209,3 +217,55 @@ class Happiness(unittest.TestCase): places = happiness_upload.share_placement(peers, set(), shares, {}) self.assertEqual(places, dict()) + + +class PlacementTests(unittest.TestCase): + + @given( + sets(elements=text(min_size=1), min_size=4, max_size=4), + sets(elements=text(min_size=1), min_size=4), + ) + def test_hypothesis_unhappy(self, peers, shares): + """ + similar to test_unhappy we test that the resulting happiness is + always 4 since the size of peers is 4. + """ + # https://hypothesis.readthedocs.io/en/latest/data.html#hypothesis.strategies.sets + # hypothesis.strategies.sets(elements=None, min_size=None, average_size=None, max_size=None)[source] + readonly_peers = set() + peers_to_shares = {} + places = happiness_upload.share_placement(peers, readonly_peers, shares, peers_to_shares) + happiness = happiness_upload.calculate_happiness(places) + assert set(places.keys()) == shares + assert happiness == 4 + + @given( + sets(elements=text(min_size=1), min_size=1, max_size=10), + # can we make a readonly_peers that's a subset of ^ + sets(elements=text(min_size=1), min_size=1, max_size=20), + ) + def test_more_hypothesis(self, peers, shares): + """ + similar to test_unhappy we test that the resulting happiness is + always either the number of peers or the number of shares + whichever is smaller. + """ + # https://hypothesis.readthedocs.io/en/latest/data.html#hypothesis.strategies.sets + # hypothesis.strategies.sets(elements=None, min_size=None, average_size=None, max_size=None)[source] + # XXX would be nice to paramaterize these by hypothesis too + readonly_peers = set() + peers_to_shares = {} + + places = happiness_upload.share_placement(peers, readonly_peers, set(list(shares)), peers_to_shares) + happiness = happiness_upload.calculate_happiness(places) + + # every share should get placed + assert set(places.keys()) == shares + + # we should only use peers that exist + assert set(places.values()).issubset(peers) + + # if we have more shares than peers, happiness is at most # of + # peers; if we have fewer shares than peers happiness is capped at + # # of peers. + assert happiness == min(len(peers), len(shares)) diff --git a/src/allmydata/test/test_hung_server.py b/src/allmydata/test/test_hung_server.py index c2f056e8d..8a27b67f5 100644 --- a/src/allmydata/test/test_hung_server.py +++ b/src/allmydata/test/test_hung_server.py @@ -233,16 +233,12 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin, done = [] d = self._set_up(False, "test_5_overdue_immutable") def _reduce_max_outstanding_requests_and_download(ign): - # find all servers (it's a 2-tuple because of what - # self._hang() wants, but it only looks at the first one, - # which is the ID) - servers = [ - (srv, None) for shn, srv, sharef in self.shares - ] - # we sort the servers (by id) because that's what the - # download-finder is going to do, and we want to hang the - # first 5 servers which it will make requests to. - self._hang(sorted(servers)[:5]) + # we need to hang the first 5 servers, so we have to + # figure out where the shares were placed. + si = uri.from_string(self.uri).get_storage_index() + placed = self.c0.storage_broker.get_servers_for_psi(si) + self._hang([(s.get_serverid(), s) for s in placed[:5]]) + n = self.c0.create_node_from_uri(self.uri) n._cnode._maybe_create_download_node() self._sf = n._cnode._node._sharefinder diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index f9fd5c4ae..b118357bc 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -4,7 +4,7 @@ import os, shutil from cStringIO import StringIO from twisted.trial import unittest from twisted.python.failure import Failure -from twisted.internet import defer +from twisted.internet import defer, task from foolscap.api import fireEventually import allmydata # for __full_version__ @@ -101,19 +101,26 @@ class SetDEPMixin: self.node.encoding_params = p class FakeStorageServer: - def __init__(self, mode): + def __init__(self, mode, reactor=None): self.mode = mode self.allocated = [] - self.queries = 0 - self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" : - { "maximum-immutable-share-size": 2**32 - 1 }, - "application-version": str(allmydata.__full_version__), - } + self._alloc_queries = 0 + self._get_queries = 0 + self.version = { + "http://allmydata.org/tahoe/protocols/storage/v1" : + { + "maximum-immutable-share-size": 2**32 - 1, + }, + "application-version": str(allmydata.__full_version__), + } if mode == "small": - self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" : - { "maximum-immutable-share-size": 10 }, - "application-version": str(allmydata.__full_version__), - } + self.version = { + "http://allmydata.org/tahoe/protocols/storage/v1" : + { + "maximum-immutable-share-size": 10, + }, + "application-version": str(allmydata.__full_version__), + } def callRemote(self, methname, *args, **kwargs): @@ -126,14 +133,16 @@ class FakeStorageServer: def allocate_buckets(self, storage_index, renew_secret, cancel_secret, sharenums, share_size, canary): - #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size) + # print "FakeStorageServer.allocate_buckets(num=%d, size=%d, mode=%s, queries=%d)" % (len(sharenums), share_size, self.mode, self._alloc_queries) + if self.mode == "timeout": + return defer.Deferred() if self.mode == "first-fail": - if self.queries == 0: + if self._alloc_queries == 0: raise ServerError if self.mode == "second-fail": - if self.queries == 1: + if self._alloc_queries == 1: raise ServerError - self.queries += 1 + self._alloc_queries += 1 if self.mode == "full": return (set(), {},) elif self.mode == "already got them": @@ -146,6 +155,18 @@ class FakeStorageServer: for shnum in sharenums]), ) + def get_buckets(self, storage_index, **kw): + # this should map shnum to a BucketReader but there isn't a + # handy FakeBucketReader and we don't actually read the shares + # back anyway (just the keys) + return { + shnum: None + for (si, shnum) in self.allocated + if si == storage_index + } + + + class FakeBucketWriter: # a diagnostic version of storageserver.BucketWriter def __init__(self, size): @@ -184,20 +205,23 @@ class FakeBucketWriter: def remote_abort(self): pass -class FakeClient: - DEFAULT_ENCODING_PARAMETERS = {"k":25, - "happy": 25, - "n": 100, - "max_segment_size": 1*MiB, - } +class FakeClient(object): + DEFAULT_ENCODING_PARAMETERS = { + "k":25, + "happy": 25, + "n": 100, + "max_segment_size": 1 * MiB, + } - def __init__(self, mode="good", num_servers=50): + def __init__(self, mode="good", num_servers=50, reactor=None): self.num_servers = num_servers self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy() if type(mode) is str: mode = dict([i,mode] for i in range(num_servers)) - servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid])) - for fakeid in range(self.num_servers) ] + servers = [ + ("%20d" % fakeid, FakeStorageServer(mode[fakeid], reactor=reactor)) + for fakeid in range(self.num_servers) + ] self.storage_broker = StorageFarmBroker(permute_peers=True, tub_maker=None) for (serverid, rref) in servers: ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid), @@ -248,15 +272,21 @@ SIZE_ZERO = 0 SIZE_SMALL = 16 SIZE_LARGE = len(DATA) -def upload_data(uploader, data): + +def upload_data(uploader, data, reactor=None): u = upload.Data(data, convergence=None) - return uploader.upload(u) -def upload_filename(uploader, filename): + return uploader.upload(u, reactor=reactor) + + +def upload_filename(uploader, filename, reactor=None): u = upload.FileName(filename, convergence=None) - return uploader.upload(u) -def upload_filehandle(uploader, fh): + return uploader.upload(u, reactor=reactor) + + +def upload_filehandle(uploader, fh, reactor=None): u = upload.FileHandle(fh, convergence=None) - return uploader.upload(u) + return uploader.upload(u, reactor=reactor) + class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin): def setUp(self): @@ -431,12 +461,103 @@ class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin): "server selection failed", upload_data, self.u, DATA) def _check((f,)): - self.failUnlessIn("placed 0 shares out of 100 total", str(f.value)) - # there should also be a 'last failure was' message - self.failUnlessIn("ServerError", str(f.value)) + self.failUnlessIn("shares could be placed or found on only 10 server(s)", str(f.value)) d.addCallback(_check) return d + def test_allocation_error_some(self): + self.make_node({ + 0: "good", + 1: "good", + 2: "good", + 3: "good", + 4: "good", + 5: "first-fail", + 6: "first-fail", + 7: "first-fail", + 8: "first-fail", + 9: "first-fail", + }) + self.set_encoding_parameters(3, 7, 10) + d = self.shouldFail(UploadUnhappinessError, "second_error_some", + "server selection failed", + upload_data, self.u, DATA) + def _check((f,)): + self.failUnlessIn("shares could be placed on only 5 server(s)", str(f.value)) + d.addCallback(_check) + return d + + def test_allocation_error_recovery(self): + self.make_node({ + 0: "good", + 1: "good", + 2: "good", + 3: "good", + 4: "second-fail", + 5: "second-fail", + 6: "first-fail", + 7: "first-fail", + 8: "first-fail", + 9: "first-fail", + }) + self.set_encoding_parameters(3, 7, 10) + # we placed shares on 0 through 5, which wasn't enough. so + # then we looped and only placed on 0-3 (because now 4-9 have + # all failed) ... so the error message should say we only + # placed on 6 servers (not 4) because those two shares *did* + # at some point succeed. + d = self.shouldFail(UploadUnhappinessError, "second_error_some", + "server selection failed", + upload_data, self.u, DATA) + def _check((f,)): + self.failUnlessIn("shares could be placed on only 6 server(s)", str(f.value)) + d.addCallback(_check) + return d + + def test_good_servers_stay_writable(self): + self.make_node({ + 0: "good", + 1: "good", + 2: "second-fail", + 3: "second-fail", + 4: "second-fail", + 5: "first-fail", + 6: "first-fail", + 7: "first-fail", + 8: "first-fail", + 9: "first-fail", + }) + self.set_encoding_parameters(3, 7, 10) + # we placed shares on 0 through 5, which wasn't enough. so + # then we looped and only placed on 0-3 (because now 4-9 have + # all failed) ... so the error message should say we only + # placed on 6 servers (not 4) because those two shares *did* + # at some point succeed. + d = self.shouldFail(UploadUnhappinessError, "good_servers_stay_writable", + "server selection failed", + upload_data, self.u, DATA) + def _check((f,)): + self.failUnlessIn("shares could be placed on only 5 server(s)", str(f.value)) + d.addCallback(_check) + return d + + def test_timeout(self): + clock = task.Clock() + self.make_node("timeout") + self.set_encoding_parameters(k=25, happy=1, n=50) + d = self.shouldFail( + UploadUnhappinessError, __name__, + "server selection failed", + upload_data, self.u, DATA, reactor=clock, + ) + # XXX double-check; it's doing 3 iterations? + # XXX should only do 1! + clock.advance(15) + clock.advance(15) + return d + + + class FullServer(unittest.TestCase): def setUp(self): self.node = FakeClient(mode="full") @@ -495,7 +616,7 @@ class ServerSelection(unittest.TestCase): for s in self.node.last_servers: allocated = s.allocated self.failUnlessEqual(len(allocated), 1) - self.failUnlessEqual(s.queries, 2) + self.failUnlessEqual(s._alloc_queries, 1) d.addCallback(_check) return d @@ -514,7 +635,7 @@ class ServerSelection(unittest.TestCase): for s in self.node.last_servers: allocated = s.allocated self.failUnlessEqual(len(allocated), 2) - self.failUnlessEqual(s.queries, 2) + self.failUnlessEqual(s._alloc_queries, 1) d.addCallback(_check) return d @@ -535,10 +656,10 @@ class ServerSelection(unittest.TestCase): allocated = s.allocated self.failUnless(len(allocated) in (1,2), len(allocated)) if len(allocated) == 1: - self.failUnlessEqual(s.queries, 2) + self.failUnlessEqual(s._alloc_queries, 1) got_one.append(s) else: - self.failUnlessEqual(s.queries, 2) + self.failUnlessEqual(s._alloc_queries, 1) got_two.append(s) self.failUnlessEqual(len(got_one), 49) self.failUnlessEqual(len(got_two), 1) @@ -562,7 +683,7 @@ class ServerSelection(unittest.TestCase): for s in self.node.last_servers: allocated = s.allocated self.failUnlessEqual(len(allocated), 4) - self.failUnlessEqual(s.queries, 2) + self.failUnlessEqual(s._alloc_queries, 1) d.addCallback(_check) return d @@ -624,7 +745,7 @@ class ServerSelection(unittest.TestCase): def _check(res): servers_contacted = [] for s in self.node.last_servers: - if(s.queries != 0): + if(s._alloc_queries != 0): servers_contacted.append(s) self.failUnless(len(servers_contacted), 20) d.addCallback(_check) @@ -723,16 +844,11 @@ def is_happy_enough(servertoshnums, h, k): """ I calculate whether servertoshnums achieves happiness level h. I do this with a naïve "brute force search" approach. (See src/allmydata/util/happinessutil.py for a better algorithm.) """ if len(servertoshnums) < h: return False - # print "servertoshnums: ", servertoshnums, h, k for happysetcombo in combinations(servertoshnums.iterkeys(), h): - # print "happysetcombo: ", happysetcombo for subsetcombo in combinations(happysetcombo, k): shnums = reduce(set.union, [ servertoshnums[s] for s in subsetcombo ]) - # print "subsetcombo: ", subsetcombo, ", shnums: ", shnums if len(shnums) < k: - # print "NOT HAAPP{Y", shnums, k return False - # print "HAAPP{Y" return True class FakeServerTracker: @@ -817,6 +933,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, ss = self.g.make_server(server_number, readonly) log.msg("just created a server, number: %s => %s" % (server_number, ss,)) self.g.add_server(server_number, ss) + self.g.rebuild_serverlist() def _add_server_with_share(self, server_number, share_number=None, readonly=False): @@ -1614,7 +1731,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, d.addCallback(_then) d.addCallback(lambda c: self.shouldFail(UploadUnhappinessError, "test_query_counting", - "2 placed none (of which 2 placed none due to " + "4 placed none (of which 4 placed none due to " "the server being full", c.upload, upload.Data("data" * 10000, convergence=""))) @@ -1862,6 +1979,33 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, self.failUnless(self._has_happy_share_distribution())) return d + def test_problem_layout_ticket_1118(self): + # #1118 includes a report from a user who hit an assertion in + # the upload code with this layout. + # Note that 'servers of happiness' lets this test work now + self.basedir = self.mktemp() + d = self._setup_and_upload(k=2, n=4) + + # server 0: no shares + # server 1: shares 0, 3 + # server 3: share 1 + # server 2: share 2 + # The order that they get queries is 0, 1, 3, 2 + def _setup(ign): + self._add_server(server_number=0) + self._add_server_with_share(server_number=1, share_number=0) + self._add_server_with_share(server_number=2, share_number=2) + self._add_server_with_share(server_number=3, share_number=1) + # Copy shares + self._copy_share_to_server(3, 1) + self.delete_all_shares(self.get_serverdir(0)) + client = self.g.clients[0] + client.encoding_params['happy'] = 4 + return client + + d.addCallback(_setup) + return d + def test_problem_layout_ticket_1128(self): # #1118 includes a report from a user who hit an assertion in # the upload code with this layout. diff --git a/src/allmydata/util/deferredutil.py b/src/allmydata/util/deferredutil.py index b05263041..8cc5bd5c4 100644 --- a/src/allmydata/util/deferredutil.py +++ b/src/allmydata/util/deferredutil.py @@ -2,13 +2,42 @@ import time from foolscap.api import eventually, fireEventually -from twisted.internet import defer, reactor +from twisted.internet import defer, reactor, error +from twisted.python.failure import Failure from allmydata.util import log from allmydata.util.assertutil import _assert from allmydata.util.pollmixin import PollMixin +class TimeoutError(Exception): + pass + + +def timeout_call(reactor, d, timeout): + """ + This returns the result of 'd', unless 'timeout' expires before + 'd' is completed in which case a TimeoutError is raised. + """ + timer_d = defer.Deferred() + + def _timed_out(): + timer_d.errback(Failure(TimeoutError())) + + def _got_result(x): + try: + timer.cancel() + timer_d.callback(x) + except error.AlreadyCalled, defer.AlreadyCalledError: + pass + return None + + timer = reactor.callLater(timeout, _timed_out) + d.addBoth(_got_result) + return timer_d + + + # utility wrapper for DeferredList def _check_deferred_list(results): # if any of the component Deferreds failed, return the first failure such