diff --git a/src/allmydata/immutable/offloaded.py b/src/allmydata/immutable/offloaded.py index 784b7e090..0802a12fb 100644 --- a/src/allmydata/immutable/offloaded.py +++ b/src/allmydata/immutable/offloaded.py @@ -25,10 +25,11 @@ class CHKCheckerAndUEBFetcher(object): less than 'N' shares present. If the file is completely healthy, I return a tuple of (sharemap, - UEB_data, UEB_hash). + UEB_data, UEB_hash). A sharemap is a dict with share numbers as keys and + sets of server ids (which hold that share) as values. """ - def __init__(self, peer_getter, storage_index, logparent=None): + def __init__(self, peer_getter, storage_index, logparent): self._peer_getter = peer_getter self._found_shares = set() self._storage_index = storage_index @@ -46,6 +47,12 @@ class CHKCheckerAndUEBFetcher(object): return log.msg(*args, **kwargs) def check(self): + """ + :return Deferred[bool|(DictOfSets, dict, bytes)]: If no share can be found + with a usable UEB block or fewer than N shares can be found then the + Deferred fires with ``False``. Otherwise, it fires with a tuple of + the sharemap, the UEB data, and the UEB hash. + """ d = self._get_all_shareholders(self._storage_index) d.addCallback(self._get_uri_extension) d.addCallback(self._done) @@ -594,12 +601,14 @@ class Helper(Referenceable): d.addErrback(_err) return d + chk_checker = CHKCheckerAndUEBFetcher + def _check_chk(self, storage_index, lp): # see if this file is already in the grid lp2 = self.log("doing a quick check+UEBfetch", parent=lp, level=log.NOISY) sb = self._storage_broker - c = CHKCheckerAndUEBFetcher(sb.get_servers_for_psi, storage_index, lp2) + c = self.chk_checker(sb.get_servers_for_psi, storage_index, lp2) d = c.check() def _checked(res): if res: diff --git a/src/allmydata/test/test_helper.py b/src/allmydata/test/test_helper.py index 016044bbf..8d689664f 100644 --- a/src/allmydata/test/test_helper.py +++ b/src/allmydata/test/test_helper.py @@ -8,6 +8,11 @@ if PY2: from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 import os +from functools import ( + partial, +) +import attr + from twisted.internet import defer from twisted.trial import unittest from twisted.application import service @@ -23,7 +28,7 @@ from allmydata.storage.server import si_b2a from allmydata.storage_client import StorageFarmBroker from allmydata.immutable import offloaded, upload from allmydata import uri, client -from allmydata.util import hashutil, fileutil, mathutil +from allmydata.util import hashutil, fileutil, mathutil, dictutil from .common import ( EMPTY_CLIENT_CONFIG, @@ -79,23 +84,30 @@ class Helper_fake_upload(offloaded.Helper): lp) return uh -class Helper_already_uploaded(Helper_fake_upload): - def _check_chk(self, storage_index, lp): - res = upload.HelperUploadResults() - res.uri_extension_hash = hashutil.uri_extension_hash(b"") +@attr.s +class FakeCHKCheckerAndUEBFetcher(object): + """ + A fake of ``CHKCheckerAndUEBFetcher`` which hard-codes some check result. + """ + peer_getter = attr.ib() + storage_index = attr.ib() + logparent = attr.ib() - # we're pretending that the file they're trying to upload was already - # present in the grid. We return some information about the file, so - # the client can decide if they like the way it looks. The parameters - # used here are chosen to match the defaults. - PARAMS = FakeClient.DEFAULT_ENCODING_PARAMETERS - ueb_data = {"needed_shares": PARAMS["k"], - "total_shares": PARAMS["n"], - "segment_size": min(PARAMS["max_segment_size"], len(DATA)), - "size": len(DATA), - } - res.uri_extension_data = ueb_data - return defer.succeed(res) + _sharemap = attr.ib() + _ueb_data = attr.ib() + + @property + def _ueb_hash(self): + return hashutil.uri_extension_hash( + uri.pack_extension(self._ueb_data), + ) + + def check(self): + return defer.succeed(( + self._sharemap, + self._ueb_data, + self._ueb_hash, + )) class FakeClient(service.MultiService): introducer_clients = [] @@ -171,13 +183,15 @@ class AssistedUpload(unittest.TestCase): # bogus host/port t.setLocation(b"bogus:1234") - def setUpHelper(self, basedir, helper_class=Helper_fake_upload): + def setUpHelper(self, basedir, helper_class=Helper_fake_upload, chk_checker=None): fileutil.make_dirs(basedir) - self.helper = h = helper_class(basedir, + self.helper = helper_class(basedir, self.s.storage_broker, self.s.secret_holder, None, None) - self.helper_furl = self.tub.registerReference(h) + if chk_checker is not None: + self.helper.chk_checker = chk_checker + self.helper_furl = self.tub.registerReference(self.helper) def tearDown(self): d = self.s.stopService() @@ -209,6 +223,10 @@ class AssistedUpload(unittest.TestCase): def _uploaded(results): the_uri = results.get_uri() self.assertIn(b"CHK", the_uri) + self.assertNotEqual( + results.get_pushed_shares(), + 0, + ) d.addCallback(_uploaded) def _check_empty(res): @@ -253,6 +271,11 @@ class AssistedUpload(unittest.TestCase): result1.get_uri(), result2.get_uri(), ) + # It would be really cool to assert that result1.get_pushed_shares() + + # result2.get_pushed_shares() == total_shares here. However, we're + # faking too much for that to be meaningful here. Also it doesn't + # hold because we don't actually push _anything_, we just lie about + # having pushed stuff. def test_previous_upload_failed(self): self.basedir = "helper/AssistedUpload/test_previous_upload_failed" @@ -303,28 +326,46 @@ class AssistedUpload(unittest.TestCase): return d + @inline_callbacks def test_already_uploaded(self): + """ + If enough shares to satisfy the needed parameter already exist, + """ + params = FakeClient.DEFAULT_ENCODING_PARAMETERS + chk_checker = partial( + FakeCHKCheckerAndUEBFetcher, + sharemap=dictutil.DictOfSets({ + 0: {b"server0"}, + 1: {b"server1"}, + }), + ueb_data={ + "size": len(DATA), + "segment_size": min(params["max_segment_size"], len(DATA)), + "needed_shares": params["k"], + "total_shares": params["n"], + }, + ) self.basedir = "helper/AssistedUpload/test_already_uploaded" - self.setUpHelper(self.basedir, helper_class=Helper_already_uploaded) + self.setUpHelper( + self.basedir, + chk_checker=chk_checker, + ) u = make_uploader(self.helper_furl, self.s) - d = wait_a_few_turns() + yield wait_a_few_turns() - def _ready(res): - assert u._helper + assert u._helper - return upload_data(u, DATA, convergence=b"some convergence string") - d.addCallback(_ready) - def _uploaded(results): - the_uri = results.get_uri() - assert b"CHK" in the_uri - d.addCallback(_uploaded) + results = yield upload_data(u, DATA, convergence=b"some convergence string") + the_uri = results.get_uri() + assert b"CHK" in the_uri - def _check_empty(res): - files = os.listdir(os.path.join(self.basedir, "CHK_encoding")) - self.failUnlessEqual(files, []) - files = os.listdir(os.path.join(self.basedir, "CHK_incoming")) - self.failUnlessEqual(files, []) - d.addCallback(_check_empty) + files = os.listdir(os.path.join(self.basedir, "CHK_encoding")) + self.failUnlessEqual(files, []) + files = os.listdir(os.path.join(self.basedir, "CHK_incoming")) + self.failUnlessEqual(files, []) - return d + self.assertEqual( + results.get_pushed_shares(), + 0, + )