diff --git a/newsfragments/3468.minor b/newsfragments/3468.minor new file mode 100644 index 000000000..e69de29bb diff --git a/src/allmydata/immutable/offloaded.py b/src/allmydata/immutable/offloaded.py index 652e68b2f..d574b980d 100644 --- a/src/allmydata/immutable/offloaded.py +++ b/src/allmydata/immutable/offloaded.py @@ -36,10 +36,11 @@ class CHKCheckerAndUEBFetcher(object): less than 'N' shares present. If the file is completely healthy, I return a tuple of (sharemap, - UEB_data, UEB_hash). + UEB_data, UEB_hash). A sharemap is a dict with share numbers as keys and + sets of server ids (which hold that share) as values. """ - def __init__(self, peer_getter, storage_index, logparent=None): + def __init__(self, peer_getter, storage_index, logparent): self._peer_getter = peer_getter self._found_shares = set() self._storage_index = storage_index @@ -57,6 +58,12 @@ class CHKCheckerAndUEBFetcher(object): return log.msg(*args, **kwargs) def check(self): + """ + :return Deferred[bool|(DictOfSets, dict, bytes)]: If no share can be found + with a usable UEB block or fewer than N shares can be found then the + Deferred fires with ``False``. Otherwise, it fires with a tuple of + the sharemap, the UEB data, and the UEB hash. + """ d = self._get_all_shareholders(self._storage_index) d.addCallback(self._get_uri_extension) d.addCallback(self._done) @@ -496,6 +503,19 @@ class LocalCiphertextReader(AskUntilSuccessMixin): @implementer(interfaces.RIHelper, interfaces.IStatsProducer) class Helper(Referenceable): + """ + :ivar dict[bytes, CHKUploadHelper] _active_uploads: For any uploads which + have been started but not finished, a mapping from storage index to the + upload helper. + + :ivar chk_checker: A callable which returns an object like a + CHKCheckerAndUEBFetcher instance which can check CHK shares. + Primarily for the convenience of tests to override. + + :ivar chk_upload: A callable which returns an object like a + CHKUploadHelper instance which can upload CHK shares. Primarily for + the convenience of tests to override. + """ # this is the non-distributed version. When we need to have multiple # helpers, this object will become the HelperCoordinator, and will query # the farm of Helpers to see if anyone has the storage_index of interest, @@ -509,6 +529,9 @@ class Helper(Referenceable): } MAX_UPLOAD_STATUSES = 10 + chk_checker = CHKCheckerAndUEBFetcher + chk_upload = CHKUploadHelper + def __init__(self, basedir, storage_broker, secret_holder, stats_provider, history): self._basedir = basedir @@ -580,6 +603,9 @@ class Helper(Referenceable): return self.VERSION def remote_upload_chk(self, storage_index): + """ + See ``RIHelper.upload_chk`` + """ self.count("chk_upload_helper.upload_requests") lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_b2a(storage_index)) @@ -602,7 +628,7 @@ class Helper(Referenceable): lp2 = self.log("doing a quick check+UEBfetch", parent=lp, level=log.NOISY) sb = self._storage_broker - c = CHKCheckerAndUEBFetcher(sb.get_servers_for_psi, storage_index, lp2) + c = self.chk_checker(sb.get_servers_for_psi, storage_index, lp2) d = c.check() def _checked(res): if res: @@ -644,14 +670,18 @@ class Helper(Referenceable): return (None, uh) def _make_chk_upload_helper(self, storage_index, lp): - si_s = si_b2a(storage_index) + si_s = si_b2a(storage_index).decode('ascii') incoming_file = os.path.join(self._chk_incoming, si_s) encoding_file = os.path.join(self._chk_encoding, si_s) - uh = CHKUploadHelper(storage_index, self, - self._storage_broker, - self._secret_holder, - incoming_file, encoding_file, - lp) + uh = self.chk_upload( + storage_index, + self, + self._storage_broker, + self._secret_holder, + incoming_file, + encoding_file, + lp, + ) return uh def _add_upload(self, uh): diff --git a/src/allmydata/test/test_helper.py b/src/allmydata/test/test_helper.py index 0e628c81b..65c07135a 100644 --- a/src/allmydata/test/test_helper.py +++ b/src/allmydata/test/test_helper.py @@ -11,21 +11,54 @@ if PY2: from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 import os +from struct import ( + pack, +) +from functools import ( + partial, +) +import attr + from twisted.internet import defer from twisted.trial import unittest from twisted.application import service from foolscap.api import Tub, fireEventually, flushEventualQueue +from eliot.twisted import ( + inline_callbacks, +) + from allmydata.crypto import aes -from allmydata.storage.server import si_b2a +from allmydata.storage.server import ( + si_b2a, + StorageServer, +) from allmydata.storage_client import StorageFarmBroker +from allmydata.immutable.layout import ( + make_write_bucket_proxy, +) from allmydata.immutable import offloaded, upload from allmydata import uri, client -from allmydata.util import hashutil, fileutil, mathutil +from allmydata.util import hashutil, fileutil, mathutil, dictutil +from .no_network import ( + NoNetworkServer, + LocalWrapper, + fireNow, +) from .common import ( EMPTY_CLIENT_CONFIG, + SyncTestCase, +) + +from testtools.matchers import ( + Equals, + MatchesListwise, + IsInstance, +) +from testtools.twistedsupport import ( + succeeded, ) MiB = 1024*1024 @@ -66,35 +99,30 @@ class CHKUploadHelper_fake(offloaded.CHKUploadHelper): d.addCallback(_got_size) return d -class Helper_fake_upload(offloaded.Helper): - def _make_chk_upload_helper(self, storage_index, lp): - si_s = str(si_b2a(storage_index), "utf-8") - incoming_file = os.path.join(self._chk_incoming, si_s) - encoding_file = os.path.join(self._chk_encoding, si_s) - uh = CHKUploadHelper_fake(storage_index, self, - self._storage_broker, - self._secret_holder, - incoming_file, encoding_file, - lp) - return uh +@attr.s +class FakeCHKCheckerAndUEBFetcher(object): + """ + A fake of ``CHKCheckerAndUEBFetcher`` which hard-codes some check result. + """ + peer_getter = attr.ib() + storage_index = attr.ib() + logparent = attr.ib() -class Helper_already_uploaded(Helper_fake_upload): - def _check_chk(self, storage_index, lp): - res = upload.HelperUploadResults() - res.uri_extension_hash = hashutil.uri_extension_hash(b"") + _sharemap = attr.ib() + _ueb_data = attr.ib() - # we're pretending that the file they're trying to upload was already - # present in the grid. We return some information about the file, so - # the client can decide if they like the way it looks. The parameters - # used here are chosen to match the defaults. - PARAMS = FakeClient.DEFAULT_ENCODING_PARAMETERS - ueb_data = {"needed_shares": PARAMS["k"], - "total_shares": PARAMS["n"], - "segment_size": min(PARAMS["max_segment_size"], len(DATA)), - "size": len(DATA), - } - res.uri_extension_data = ueb_data - return defer.succeed(res) + @property + def _ueb_hash(self): + return hashutil.uri_extension_hash( + uri.pack_extension(self._ueb_data), + ) + + def check(self): + return defer.succeed(( + self._sharemap, + self._ueb_data, + self._ueb_hash, + )) class FakeClient(service.MultiService): introducer_clients = [] @@ -129,6 +157,26 @@ def upload_data(uploader, data, convergence): u = upload.Data(data, convergence=convergence) return uploader.upload(u) + +def make_uploader(helper_furl, parent, override_name=None): + """ + Make an ``upload.Uploader`` service pointed at the given helper and with + the given service parent. + + :param bytes helper_furl: The Foolscap URL of the upload helper. + + :param IServiceCollection parent: A parent to assign to the new uploader. + + :param str override_name: If not ``None``, a new name for the uploader + service. Multiple services cannot coexist with the same name. + """ + u = upload.Uploader(helper_furl) + if override_name is not None: + u.name = override_name + u.setServiceParent(parent) + return u + + class AssistedUpload(unittest.TestCase): def setUp(self): self.tub = t = Tub() @@ -148,13 +196,20 @@ class AssistedUpload(unittest.TestCase): # bogus host/port t.setLocation(b"bogus:1234") - def setUpHelper(self, basedir, helper_class=Helper_fake_upload): + def setUpHelper(self, basedir, chk_upload=CHKUploadHelper_fake, chk_checker=None): fileutil.make_dirs(basedir) - self.helper = h = helper_class(basedir, - self.s.storage_broker, - self.s.secret_holder, - None, None) - self.helper_furl = self.tub.registerReference(h) + self.helper = offloaded.Helper( + basedir, + self.s.storage_broker, + self.s.secret_holder, + None, + None, + ) + if chk_upload is not None: + self.helper.chk_upload = chk_upload + if chk_checker is not None: + self.helper.chk_checker = chk_checker + self.helper_furl = self.tub.registerReference(self.helper) def tearDown(self): d = self.s.stopService() @@ -162,34 +217,84 @@ class AssistedUpload(unittest.TestCase): d.addBoth(flush_but_dont_ignore) return d - def test_one(self): + """ + Some data that has never been uploaded before can be uploaded in CHK + format using the ``RIHelper`` provider and ``Uploader.upload``. + """ self.basedir = "helper/AssistedUpload/test_one" self.setUpHelper(self.basedir) - u = upload.Uploader(self.helper_furl) - u.setServiceParent(self.s) + u = make_uploader(self.helper_furl, self.s) d = wait_a_few_turns() def _ready(res): - assert u._helper - + self.assertTrue( + u._helper, + "Expected uploader to have a helper reference, had {} instead.".format( + u._helper, + ), + ) return upload_data(u, DATA, convergence=b"some convergence string") d.addCallback(_ready) + def _uploaded(results): the_uri = results.get_uri() - assert b"CHK" in the_uri + self.assertIn(b"CHK", the_uri) + self.assertNotEqual( + results.get_pushed_shares(), + 0, + ) d.addCallback(_uploaded) def _check_empty(res): + # Make sure the intermediate artifacts aren't left lying around. files = os.listdir(os.path.join(self.basedir, "CHK_encoding")) - self.failUnlessEqual(files, []) + self.assertEqual(files, []) files = os.listdir(os.path.join(self.basedir, "CHK_incoming")) - self.failUnlessEqual(files, []) + self.assertEqual(files, []) d.addCallback(_check_empty) return d + @inline_callbacks + def test_concurrent(self): + """ + The same data can be uploaded by more than one ``Uploader`` at a time. + """ + self.basedir = "helper/AssistedUpload/test_concurrent" + self.setUpHelper(self.basedir) + u1 = make_uploader(self.helper_furl, self.s, "u1") + u2 = make_uploader(self.helper_furl, self.s, "u2") + + yield wait_a_few_turns() + + for u in [u1, u2]: + self.assertTrue( + u._helper, + "Expected uploader to have a helper reference, had {} instead.".format( + u._helper, + ), + ) + + uploads = list( + upload_data(u, DATA, convergence=b"some convergence string") + for u + in [u1, u2] + ) + + result1, result2 = yield defer.gatherResults(uploads) + + self.assertEqual( + result1.get_uri(), + result2.get_uri(), + ) + # It would be really cool to assert that result1.get_pushed_shares() + + # result2.get_pushed_shares() == total_shares here. However, we're + # faking too much for that to be meaningful here. Also it doesn't + # hold because we don't actually push _anything_, we just lie about + # having pushed stuff. + def test_previous_upload_failed(self): self.basedir = "helper/AssistedUpload/test_previous_upload_failed" self.setUpHelper(self.basedir) @@ -217,8 +322,7 @@ class AssistedUpload(unittest.TestCase): f.write(aes.encrypt_data(encryptor, DATA)) f.close() - u = upload.Uploader(self.helper_furl) - u.setServiceParent(self.s) + u = make_uploader(self.helper_furl, self.s) d = wait_a_few_turns() @@ -240,29 +344,247 @@ class AssistedUpload(unittest.TestCase): return d + @inline_callbacks def test_already_uploaded(self): + """ + If enough shares to satisfy the needed parameter already exist, the upload + succeeds without pushing any shares. + """ + params = FakeClient.DEFAULT_ENCODING_PARAMETERS + chk_checker = partial( + FakeCHKCheckerAndUEBFetcher, + sharemap=dictutil.DictOfSets({ + 0: {b"server0"}, + 1: {b"server1"}, + }), + ueb_data={ + "size": len(DATA), + "segment_size": min(params["max_segment_size"], len(DATA)), + "needed_shares": params["k"], + "total_shares": params["n"], + }, + ) self.basedir = "helper/AssistedUpload/test_already_uploaded" - self.setUpHelper(self.basedir, helper_class=Helper_already_uploaded) - u = upload.Uploader(self.helper_furl) - u.setServiceParent(self.s) + self.setUpHelper( + self.basedir, + chk_checker=chk_checker, + ) + u = make_uploader(self.helper_furl, self.s) - d = wait_a_few_turns() + yield wait_a_few_turns() - def _ready(res): - assert u._helper + assert u._helper - return upload_data(u, DATA, convergence=b"some convergence string") - d.addCallback(_ready) - def _uploaded(results): - the_uri = results.get_uri() - assert b"CHK" in the_uri - d.addCallback(_uploaded) + results = yield upload_data(u, DATA, convergence=b"some convergence string") + the_uri = results.get_uri() + assert b"CHK" in the_uri - def _check_empty(res): - files = os.listdir(os.path.join(self.basedir, "CHK_encoding")) - self.failUnlessEqual(files, []) - files = os.listdir(os.path.join(self.basedir, "CHK_incoming")) - self.failUnlessEqual(files, []) - d.addCallback(_check_empty) + files = os.listdir(os.path.join(self.basedir, "CHK_encoding")) + self.failUnlessEqual(files, []) + files = os.listdir(os.path.join(self.basedir, "CHK_incoming")) + self.failUnlessEqual(files, []) - return d + self.assertEqual( + results.get_pushed_shares(), + 0, + ) + + +class CHKCheckerAndUEBFetcherTests(SyncTestCase): + """ + Tests for ``CHKCheckerAndUEBFetcher``. + """ + def test_check_no_peers(self): + """ + If the supplied "peer getter" returns no peers then + ``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred`` that fires + with ``False``. + """ + storage_index = b"a" * 16 + peers = {storage_index: []} + caf = offloaded.CHKCheckerAndUEBFetcher( + peers.get, + storage_index, + None, + ) + self.assertThat( + caf.check(), + succeeded(Equals(False)), + ) + + @inline_callbacks + def test_check_ueb_unavailable(self): + """ + If the UEB cannot be read from any of the peers supplied by the "peer + getter" then ``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred`` + that fires with ``False``. + """ + storage_index = b"a" * 16 + serverid = b"b" * 20 + storage = StorageServer(self.mktemp(), serverid) + rref_without_ueb = LocalWrapper(storage, fireNow) + yield write_bad_share(rref_without_ueb, storage_index) + server_without_ueb = NoNetworkServer(serverid, rref_without_ueb) + peers = {storage_index: [server_without_ueb]} + caf = offloaded.CHKCheckerAndUEBFetcher( + peers.get, + storage_index, + None, + ) + self.assertThat( + caf.check(), + succeeded(Equals(False)), + ) + + @inline_callbacks + def test_not_enough_shares(self): + """ + If fewer shares are found than are required to reassemble the data then + ``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred`` that fires + with ``False``. + """ + storage_index = b"a" * 16 + serverid = b"b" * 20 + storage = StorageServer(self.mktemp(), serverid) + rref_with_ueb = LocalWrapper(storage, fireNow) + ueb = { + "needed_shares": 2, + "total_shares": 2, + "segment_size": 128 * 1024, + "size": 1024, + } + yield write_good_share(rref_with_ueb, storage_index, ueb, [0]) + + server_with_ueb = NoNetworkServer(serverid, rref_with_ueb) + peers = {storage_index: [server_with_ueb]} + caf = offloaded.CHKCheckerAndUEBFetcher( + peers.get, + storage_index, + None, + ) + self.assertThat( + caf.check(), + succeeded(Equals(False)), + ) + + @inline_callbacks + def test_enough_shares(self): + """ + If enough shares are found to reassemble the data then + ``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred`` that fires + with share and share placement information. + """ + storage_index = b"a" * 16 + serverids = list( + ch * 20 + for ch + in [b"b", b"c"] + ) + storages = list( + StorageServer(self.mktemp(), serverid) + for serverid + in serverids + ) + rrefs_with_ueb = list( + LocalWrapper(storage, fireNow) + for storage + in storages + ) + ueb = { + "needed_shares": len(serverids), + "total_shares": len(serverids), + "segment_size": 128 * 1024, + "size": 1024, + } + for n, rref_with_ueb in enumerate(rrefs_with_ueb): + yield write_good_share(rref_with_ueb, storage_index, ueb, [n]) + + servers_with_ueb = list( + NoNetworkServer(serverid, rref_with_ueb) + for (serverid, rref_with_ueb) + in zip(serverids, rrefs_with_ueb) + ) + peers = {storage_index: servers_with_ueb} + caf = offloaded.CHKCheckerAndUEBFetcher( + peers.get, + storage_index, + None, + ) + self.assertThat( + caf.check(), + succeeded(MatchesListwise([ + Equals({ + n: {serverid} + for (n, serverid) + in enumerate(serverids) + }), + Equals(ueb), + IsInstance(bytes), + ])), + ) + + +def write_bad_share(storage_rref, storage_index): + """ + Write a share with a corrupt URI extension block. + """ + # Write some trash to the right bucket on this storage server. It won't + # have a recoverable UEB block. + return write_share(storage_rref, storage_index, [0], b"\0" * 1024) + + +def write_good_share(storage_rref, storage_index, ueb, sharenums): + """ + Write a valid share with the given URI extension block. + """ + write_proxy = make_write_bucket_proxy( + storage_rref, + None, + 1024, + ueb["segment_size"], + 1, + 1, + ueb["size"], + ) + # See allmydata/immutable/layout.py + offset = write_proxy._offsets["uri_extension"] + filler = b"\0" * (offset - len(write_proxy._offset_data)) + ueb_data = uri.pack_extension(ueb) + data = ( + write_proxy._offset_data + + filler + + pack(write_proxy.fieldstruct, len(ueb_data)) + + ueb_data + ) + return write_share(storage_rref, storage_index, sharenums, data) + + +@inline_callbacks +def write_share(storage_rref, storage_index, sharenums, sharedata): + """ + Write the given share data to the given storage index using the given + IStorageServer remote reference. + + :param foolscap.ipb.IRemoteReference storage_rref: A remote reference to + an IStorageServer. + + :param bytes storage_index: The storage index to which to write the share + data. + + :param [int] sharenums: The share numbers to which to write this sharedata. + + :param bytes sharedata: The ciphertext to write as the share. + """ + ignored, writers = yield storage_rref.callRemote( + "allocate_buckets", + storage_index, + b"x" * 16, + b"x" * 16, + sharenums, + len(sharedata), + LocalWrapper(None), + + ) + [writer] = writers.values() + yield writer.callRemote("write", 0, sharedata) + yield writer.callRemote("close")