mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-19 21:17:54 +00:00
Merge pull request #868 from tahoe-lafs/3468.offloaded-test-coverage
More test coverage for the upload helper
This commit is contained in:
commit
a10c2606f4
0
newsfragments/3468.minor
Normal file
0
newsfragments/3468.minor
Normal file
@ -36,10 +36,11 @@ class CHKCheckerAndUEBFetcher(object):
|
||||
less than 'N' shares present.
|
||||
|
||||
If the file is completely healthy, I return a tuple of (sharemap,
|
||||
UEB_data, UEB_hash).
|
||||
UEB_data, UEB_hash). A sharemap is a dict with share numbers as keys and
|
||||
sets of server ids (which hold that share) as values.
|
||||
"""
|
||||
|
||||
def __init__(self, peer_getter, storage_index, logparent=None):
|
||||
def __init__(self, peer_getter, storage_index, logparent):
|
||||
self._peer_getter = peer_getter
|
||||
self._found_shares = set()
|
||||
self._storage_index = storage_index
|
||||
@ -57,6 +58,12 @@ class CHKCheckerAndUEBFetcher(object):
|
||||
return log.msg(*args, **kwargs)
|
||||
|
||||
def check(self):
|
||||
"""
|
||||
:return Deferred[bool|(DictOfSets, dict, bytes)]: If no share can be found
|
||||
with a usable UEB block or fewer than N shares can be found then the
|
||||
Deferred fires with ``False``. Otherwise, it fires with a tuple of
|
||||
the sharemap, the UEB data, and the UEB hash.
|
||||
"""
|
||||
d = self._get_all_shareholders(self._storage_index)
|
||||
d.addCallback(self._get_uri_extension)
|
||||
d.addCallback(self._done)
|
||||
@ -496,6 +503,19 @@ class LocalCiphertextReader(AskUntilSuccessMixin):
|
||||
|
||||
@implementer(interfaces.RIHelper, interfaces.IStatsProducer)
|
||||
class Helper(Referenceable):
|
||||
"""
|
||||
:ivar dict[bytes, CHKUploadHelper] _active_uploads: For any uploads which
|
||||
have been started but not finished, a mapping from storage index to the
|
||||
upload helper.
|
||||
|
||||
:ivar chk_checker: A callable which returns an object like a
|
||||
CHKCheckerAndUEBFetcher instance which can check CHK shares.
|
||||
Primarily for the convenience of tests to override.
|
||||
|
||||
:ivar chk_upload: A callable which returns an object like a
|
||||
CHKUploadHelper instance which can upload CHK shares. Primarily for
|
||||
the convenience of tests to override.
|
||||
"""
|
||||
# this is the non-distributed version. When we need to have multiple
|
||||
# helpers, this object will become the HelperCoordinator, and will query
|
||||
# the farm of Helpers to see if anyone has the storage_index of interest,
|
||||
@ -509,6 +529,9 @@ class Helper(Referenceable):
|
||||
}
|
||||
MAX_UPLOAD_STATUSES = 10
|
||||
|
||||
chk_checker = CHKCheckerAndUEBFetcher
|
||||
chk_upload = CHKUploadHelper
|
||||
|
||||
def __init__(self, basedir, storage_broker, secret_holder,
|
||||
stats_provider, history):
|
||||
self._basedir = basedir
|
||||
@ -580,6 +603,9 @@ class Helper(Referenceable):
|
||||
return self.VERSION
|
||||
|
||||
def remote_upload_chk(self, storage_index):
|
||||
"""
|
||||
See ``RIHelper.upload_chk``
|
||||
"""
|
||||
self.count("chk_upload_helper.upload_requests")
|
||||
lp = self.log(format="helper: upload_chk query for SI %(si)s",
|
||||
si=si_b2a(storage_index))
|
||||
@ -602,7 +628,7 @@ class Helper(Referenceable):
|
||||
lp2 = self.log("doing a quick check+UEBfetch",
|
||||
parent=lp, level=log.NOISY)
|
||||
sb = self._storage_broker
|
||||
c = CHKCheckerAndUEBFetcher(sb.get_servers_for_psi, storage_index, lp2)
|
||||
c = self.chk_checker(sb.get_servers_for_psi, storage_index, lp2)
|
||||
d = c.check()
|
||||
def _checked(res):
|
||||
if res:
|
||||
@ -644,14 +670,18 @@ class Helper(Referenceable):
|
||||
return (None, uh)
|
||||
|
||||
def _make_chk_upload_helper(self, storage_index, lp):
|
||||
si_s = si_b2a(storage_index)
|
||||
si_s = si_b2a(storage_index).decode('ascii')
|
||||
incoming_file = os.path.join(self._chk_incoming, si_s)
|
||||
encoding_file = os.path.join(self._chk_encoding, si_s)
|
||||
uh = CHKUploadHelper(storage_index, self,
|
||||
self._storage_broker,
|
||||
self._secret_holder,
|
||||
incoming_file, encoding_file,
|
||||
lp)
|
||||
uh = self.chk_upload(
|
||||
storage_index,
|
||||
self,
|
||||
self._storage_broker,
|
||||
self._secret_holder,
|
||||
incoming_file,
|
||||
encoding_file,
|
||||
lp,
|
||||
)
|
||||
return uh
|
||||
|
||||
def _add_upload(self, uh):
|
||||
|
@ -11,21 +11,54 @@ if PY2:
|
||||
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
|
||||
|
||||
import os
|
||||
from struct import (
|
||||
pack,
|
||||
)
|
||||
from functools import (
|
||||
partial,
|
||||
)
|
||||
import attr
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.trial import unittest
|
||||
from twisted.application import service
|
||||
|
||||
from foolscap.api import Tub, fireEventually, flushEventualQueue
|
||||
|
||||
from eliot.twisted import (
|
||||
inline_callbacks,
|
||||
)
|
||||
|
||||
from allmydata.crypto import aes
|
||||
from allmydata.storage.server import si_b2a
|
||||
from allmydata.storage.server import (
|
||||
si_b2a,
|
||||
StorageServer,
|
||||
)
|
||||
from allmydata.storage_client import StorageFarmBroker
|
||||
from allmydata.immutable.layout import (
|
||||
make_write_bucket_proxy,
|
||||
)
|
||||
from allmydata.immutable import offloaded, upload
|
||||
from allmydata import uri, client
|
||||
from allmydata.util import hashutil, fileutil, mathutil
|
||||
from allmydata.util import hashutil, fileutil, mathutil, dictutil
|
||||
|
||||
from .no_network import (
|
||||
NoNetworkServer,
|
||||
LocalWrapper,
|
||||
fireNow,
|
||||
)
|
||||
from .common import (
|
||||
EMPTY_CLIENT_CONFIG,
|
||||
SyncTestCase,
|
||||
)
|
||||
|
||||
from testtools.matchers import (
|
||||
Equals,
|
||||
MatchesListwise,
|
||||
IsInstance,
|
||||
)
|
||||
from testtools.twistedsupport import (
|
||||
succeeded,
|
||||
)
|
||||
|
||||
MiB = 1024*1024
|
||||
@ -66,35 +99,30 @@ class CHKUploadHelper_fake(offloaded.CHKUploadHelper):
|
||||
d.addCallback(_got_size)
|
||||
return d
|
||||
|
||||
class Helper_fake_upload(offloaded.Helper):
|
||||
def _make_chk_upload_helper(self, storage_index, lp):
|
||||
si_s = str(si_b2a(storage_index), "utf-8")
|
||||
incoming_file = os.path.join(self._chk_incoming, si_s)
|
||||
encoding_file = os.path.join(self._chk_encoding, si_s)
|
||||
uh = CHKUploadHelper_fake(storage_index, self,
|
||||
self._storage_broker,
|
||||
self._secret_holder,
|
||||
incoming_file, encoding_file,
|
||||
lp)
|
||||
return uh
|
||||
@attr.s
|
||||
class FakeCHKCheckerAndUEBFetcher(object):
|
||||
"""
|
||||
A fake of ``CHKCheckerAndUEBFetcher`` which hard-codes some check result.
|
||||
"""
|
||||
peer_getter = attr.ib()
|
||||
storage_index = attr.ib()
|
||||
logparent = attr.ib()
|
||||
|
||||
class Helper_already_uploaded(Helper_fake_upload):
|
||||
def _check_chk(self, storage_index, lp):
|
||||
res = upload.HelperUploadResults()
|
||||
res.uri_extension_hash = hashutil.uri_extension_hash(b"")
|
||||
_sharemap = attr.ib()
|
||||
_ueb_data = attr.ib()
|
||||
|
||||
# we're pretending that the file they're trying to upload was already
|
||||
# present in the grid. We return some information about the file, so
|
||||
# the client can decide if they like the way it looks. The parameters
|
||||
# used here are chosen to match the defaults.
|
||||
PARAMS = FakeClient.DEFAULT_ENCODING_PARAMETERS
|
||||
ueb_data = {"needed_shares": PARAMS["k"],
|
||||
"total_shares": PARAMS["n"],
|
||||
"segment_size": min(PARAMS["max_segment_size"], len(DATA)),
|
||||
"size": len(DATA),
|
||||
}
|
||||
res.uri_extension_data = ueb_data
|
||||
return defer.succeed(res)
|
||||
@property
|
||||
def _ueb_hash(self):
|
||||
return hashutil.uri_extension_hash(
|
||||
uri.pack_extension(self._ueb_data),
|
||||
)
|
||||
|
||||
def check(self):
|
||||
return defer.succeed((
|
||||
self._sharemap,
|
||||
self._ueb_data,
|
||||
self._ueb_hash,
|
||||
))
|
||||
|
||||
class FakeClient(service.MultiService):
|
||||
introducer_clients = []
|
||||
@ -129,6 +157,26 @@ def upload_data(uploader, data, convergence):
|
||||
u = upload.Data(data, convergence=convergence)
|
||||
return uploader.upload(u)
|
||||
|
||||
|
||||
def make_uploader(helper_furl, parent, override_name=None):
|
||||
"""
|
||||
Make an ``upload.Uploader`` service pointed at the given helper and with
|
||||
the given service parent.
|
||||
|
||||
:param bytes helper_furl: The Foolscap URL of the upload helper.
|
||||
|
||||
:param IServiceCollection parent: A parent to assign to the new uploader.
|
||||
|
||||
:param str override_name: If not ``None``, a new name for the uploader
|
||||
service. Multiple services cannot coexist with the same name.
|
||||
"""
|
||||
u = upload.Uploader(helper_furl)
|
||||
if override_name is not None:
|
||||
u.name = override_name
|
||||
u.setServiceParent(parent)
|
||||
return u
|
||||
|
||||
|
||||
class AssistedUpload(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tub = t = Tub()
|
||||
@ -148,13 +196,20 @@ class AssistedUpload(unittest.TestCase):
|
||||
# bogus host/port
|
||||
t.setLocation(b"bogus:1234")
|
||||
|
||||
def setUpHelper(self, basedir, helper_class=Helper_fake_upload):
|
||||
def setUpHelper(self, basedir, chk_upload=CHKUploadHelper_fake, chk_checker=None):
|
||||
fileutil.make_dirs(basedir)
|
||||
self.helper = h = helper_class(basedir,
|
||||
self.s.storage_broker,
|
||||
self.s.secret_holder,
|
||||
None, None)
|
||||
self.helper_furl = self.tub.registerReference(h)
|
||||
self.helper = offloaded.Helper(
|
||||
basedir,
|
||||
self.s.storage_broker,
|
||||
self.s.secret_holder,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
if chk_upload is not None:
|
||||
self.helper.chk_upload = chk_upload
|
||||
if chk_checker is not None:
|
||||
self.helper.chk_checker = chk_checker
|
||||
self.helper_furl = self.tub.registerReference(self.helper)
|
||||
|
||||
def tearDown(self):
|
||||
d = self.s.stopService()
|
||||
@ -162,34 +217,84 @@ class AssistedUpload(unittest.TestCase):
|
||||
d.addBoth(flush_but_dont_ignore)
|
||||
return d
|
||||
|
||||
|
||||
def test_one(self):
|
||||
"""
|
||||
Some data that has never been uploaded before can be uploaded in CHK
|
||||
format using the ``RIHelper`` provider and ``Uploader.upload``.
|
||||
"""
|
||||
self.basedir = "helper/AssistedUpload/test_one"
|
||||
self.setUpHelper(self.basedir)
|
||||
u = upload.Uploader(self.helper_furl)
|
||||
u.setServiceParent(self.s)
|
||||
u = make_uploader(self.helper_furl, self.s)
|
||||
|
||||
d = wait_a_few_turns()
|
||||
|
||||
def _ready(res):
|
||||
assert u._helper
|
||||
|
||||
self.assertTrue(
|
||||
u._helper,
|
||||
"Expected uploader to have a helper reference, had {} instead.".format(
|
||||
u._helper,
|
||||
),
|
||||
)
|
||||
return upload_data(u, DATA, convergence=b"some convergence string")
|
||||
d.addCallback(_ready)
|
||||
|
||||
def _uploaded(results):
|
||||
the_uri = results.get_uri()
|
||||
assert b"CHK" in the_uri
|
||||
self.assertIn(b"CHK", the_uri)
|
||||
self.assertNotEqual(
|
||||
results.get_pushed_shares(),
|
||||
0,
|
||||
)
|
||||
d.addCallback(_uploaded)
|
||||
|
||||
def _check_empty(res):
|
||||
# Make sure the intermediate artifacts aren't left lying around.
|
||||
files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
|
||||
self.failUnlessEqual(files, [])
|
||||
self.assertEqual(files, [])
|
||||
files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
|
||||
self.failUnlessEqual(files, [])
|
||||
self.assertEqual(files, [])
|
||||
d.addCallback(_check_empty)
|
||||
|
||||
return d
|
||||
|
||||
@inline_callbacks
|
||||
def test_concurrent(self):
|
||||
"""
|
||||
The same data can be uploaded by more than one ``Uploader`` at a time.
|
||||
"""
|
||||
self.basedir = "helper/AssistedUpload/test_concurrent"
|
||||
self.setUpHelper(self.basedir)
|
||||
u1 = make_uploader(self.helper_furl, self.s, "u1")
|
||||
u2 = make_uploader(self.helper_furl, self.s, "u2")
|
||||
|
||||
yield wait_a_few_turns()
|
||||
|
||||
for u in [u1, u2]:
|
||||
self.assertTrue(
|
||||
u._helper,
|
||||
"Expected uploader to have a helper reference, had {} instead.".format(
|
||||
u._helper,
|
||||
),
|
||||
)
|
||||
|
||||
uploads = list(
|
||||
upload_data(u, DATA, convergence=b"some convergence string")
|
||||
for u
|
||||
in [u1, u2]
|
||||
)
|
||||
|
||||
result1, result2 = yield defer.gatherResults(uploads)
|
||||
|
||||
self.assertEqual(
|
||||
result1.get_uri(),
|
||||
result2.get_uri(),
|
||||
)
|
||||
# It would be really cool to assert that result1.get_pushed_shares() +
|
||||
# result2.get_pushed_shares() == total_shares here. However, we're
|
||||
# faking too much for that to be meaningful here. Also it doesn't
|
||||
# hold because we don't actually push _anything_, we just lie about
|
||||
# having pushed stuff.
|
||||
|
||||
def test_previous_upload_failed(self):
|
||||
self.basedir = "helper/AssistedUpload/test_previous_upload_failed"
|
||||
self.setUpHelper(self.basedir)
|
||||
@ -217,8 +322,7 @@ class AssistedUpload(unittest.TestCase):
|
||||
f.write(aes.encrypt_data(encryptor, DATA))
|
||||
f.close()
|
||||
|
||||
u = upload.Uploader(self.helper_furl)
|
||||
u.setServiceParent(self.s)
|
||||
u = make_uploader(self.helper_furl, self.s)
|
||||
|
||||
d = wait_a_few_turns()
|
||||
|
||||
@ -240,29 +344,247 @@ class AssistedUpload(unittest.TestCase):
|
||||
|
||||
return d
|
||||
|
||||
@inline_callbacks
|
||||
def test_already_uploaded(self):
|
||||
"""
|
||||
If enough shares to satisfy the needed parameter already exist, the upload
|
||||
succeeds without pushing any shares.
|
||||
"""
|
||||
params = FakeClient.DEFAULT_ENCODING_PARAMETERS
|
||||
chk_checker = partial(
|
||||
FakeCHKCheckerAndUEBFetcher,
|
||||
sharemap=dictutil.DictOfSets({
|
||||
0: {b"server0"},
|
||||
1: {b"server1"},
|
||||
}),
|
||||
ueb_data={
|
||||
"size": len(DATA),
|
||||
"segment_size": min(params["max_segment_size"], len(DATA)),
|
||||
"needed_shares": params["k"],
|
||||
"total_shares": params["n"],
|
||||
},
|
||||
)
|
||||
self.basedir = "helper/AssistedUpload/test_already_uploaded"
|
||||
self.setUpHelper(self.basedir, helper_class=Helper_already_uploaded)
|
||||
u = upload.Uploader(self.helper_furl)
|
||||
u.setServiceParent(self.s)
|
||||
self.setUpHelper(
|
||||
self.basedir,
|
||||
chk_checker=chk_checker,
|
||||
)
|
||||
u = make_uploader(self.helper_furl, self.s)
|
||||
|
||||
d = wait_a_few_turns()
|
||||
yield wait_a_few_turns()
|
||||
|
||||
def _ready(res):
|
||||
assert u._helper
|
||||
assert u._helper
|
||||
|
||||
return upload_data(u, DATA, convergence=b"some convergence string")
|
||||
d.addCallback(_ready)
|
||||
def _uploaded(results):
|
||||
the_uri = results.get_uri()
|
||||
assert b"CHK" in the_uri
|
||||
d.addCallback(_uploaded)
|
||||
results = yield upload_data(u, DATA, convergence=b"some convergence string")
|
||||
the_uri = results.get_uri()
|
||||
assert b"CHK" in the_uri
|
||||
|
||||
def _check_empty(res):
|
||||
files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
|
||||
self.failUnlessEqual(files, [])
|
||||
files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
|
||||
self.failUnlessEqual(files, [])
|
||||
d.addCallback(_check_empty)
|
||||
files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
|
||||
self.failUnlessEqual(files, [])
|
||||
files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
|
||||
self.failUnlessEqual(files, [])
|
||||
|
||||
return d
|
||||
self.assertEqual(
|
||||
results.get_pushed_shares(),
|
||||
0,
|
||||
)
|
||||
|
||||
|
||||
class CHKCheckerAndUEBFetcherTests(SyncTestCase):
|
||||
"""
|
||||
Tests for ``CHKCheckerAndUEBFetcher``.
|
||||
"""
|
||||
def test_check_no_peers(self):
|
||||
"""
|
||||
If the supplied "peer getter" returns no peers then
|
||||
``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred`` that fires
|
||||
with ``False``.
|
||||
"""
|
||||
storage_index = b"a" * 16
|
||||
peers = {storage_index: []}
|
||||
caf = offloaded.CHKCheckerAndUEBFetcher(
|
||||
peers.get,
|
||||
storage_index,
|
||||
None,
|
||||
)
|
||||
self.assertThat(
|
||||
caf.check(),
|
||||
succeeded(Equals(False)),
|
||||
)
|
||||
|
||||
@inline_callbacks
|
||||
def test_check_ueb_unavailable(self):
|
||||
"""
|
||||
If the UEB cannot be read from any of the peers supplied by the "peer
|
||||
getter" then ``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred``
|
||||
that fires with ``False``.
|
||||
"""
|
||||
storage_index = b"a" * 16
|
||||
serverid = b"b" * 20
|
||||
storage = StorageServer(self.mktemp(), serverid)
|
||||
rref_without_ueb = LocalWrapper(storage, fireNow)
|
||||
yield write_bad_share(rref_without_ueb, storage_index)
|
||||
server_without_ueb = NoNetworkServer(serverid, rref_without_ueb)
|
||||
peers = {storage_index: [server_without_ueb]}
|
||||
caf = offloaded.CHKCheckerAndUEBFetcher(
|
||||
peers.get,
|
||||
storage_index,
|
||||
None,
|
||||
)
|
||||
self.assertThat(
|
||||
caf.check(),
|
||||
succeeded(Equals(False)),
|
||||
)
|
||||
|
||||
@inline_callbacks
|
||||
def test_not_enough_shares(self):
|
||||
"""
|
||||
If fewer shares are found than are required to reassemble the data then
|
||||
``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred`` that fires
|
||||
with ``False``.
|
||||
"""
|
||||
storage_index = b"a" * 16
|
||||
serverid = b"b" * 20
|
||||
storage = StorageServer(self.mktemp(), serverid)
|
||||
rref_with_ueb = LocalWrapper(storage, fireNow)
|
||||
ueb = {
|
||||
"needed_shares": 2,
|
||||
"total_shares": 2,
|
||||
"segment_size": 128 * 1024,
|
||||
"size": 1024,
|
||||
}
|
||||
yield write_good_share(rref_with_ueb, storage_index, ueb, [0])
|
||||
|
||||
server_with_ueb = NoNetworkServer(serverid, rref_with_ueb)
|
||||
peers = {storage_index: [server_with_ueb]}
|
||||
caf = offloaded.CHKCheckerAndUEBFetcher(
|
||||
peers.get,
|
||||
storage_index,
|
||||
None,
|
||||
)
|
||||
self.assertThat(
|
||||
caf.check(),
|
||||
succeeded(Equals(False)),
|
||||
)
|
||||
|
||||
@inline_callbacks
|
||||
def test_enough_shares(self):
|
||||
"""
|
||||
If enough shares are found to reassemble the data then
|
||||
``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred`` that fires
|
||||
with share and share placement information.
|
||||
"""
|
||||
storage_index = b"a" * 16
|
||||
serverids = list(
|
||||
ch * 20
|
||||
for ch
|
||||
in [b"b", b"c"]
|
||||
)
|
||||
storages = list(
|
||||
StorageServer(self.mktemp(), serverid)
|
||||
for serverid
|
||||
in serverids
|
||||
)
|
||||
rrefs_with_ueb = list(
|
||||
LocalWrapper(storage, fireNow)
|
||||
for storage
|
||||
in storages
|
||||
)
|
||||
ueb = {
|
||||
"needed_shares": len(serverids),
|
||||
"total_shares": len(serverids),
|
||||
"segment_size": 128 * 1024,
|
||||
"size": 1024,
|
||||
}
|
||||
for n, rref_with_ueb in enumerate(rrefs_with_ueb):
|
||||
yield write_good_share(rref_with_ueb, storage_index, ueb, [n])
|
||||
|
||||
servers_with_ueb = list(
|
||||
NoNetworkServer(serverid, rref_with_ueb)
|
||||
for (serverid, rref_with_ueb)
|
||||
in zip(serverids, rrefs_with_ueb)
|
||||
)
|
||||
peers = {storage_index: servers_with_ueb}
|
||||
caf = offloaded.CHKCheckerAndUEBFetcher(
|
||||
peers.get,
|
||||
storage_index,
|
||||
None,
|
||||
)
|
||||
self.assertThat(
|
||||
caf.check(),
|
||||
succeeded(MatchesListwise([
|
||||
Equals({
|
||||
n: {serverid}
|
||||
for (n, serverid)
|
||||
in enumerate(serverids)
|
||||
}),
|
||||
Equals(ueb),
|
||||
IsInstance(bytes),
|
||||
])),
|
||||
)
|
||||
|
||||
|
||||
def write_bad_share(storage_rref, storage_index):
|
||||
"""
|
||||
Write a share with a corrupt URI extension block.
|
||||
"""
|
||||
# Write some trash to the right bucket on this storage server. It won't
|
||||
# have a recoverable UEB block.
|
||||
return write_share(storage_rref, storage_index, [0], b"\0" * 1024)
|
||||
|
||||
|
||||
def write_good_share(storage_rref, storage_index, ueb, sharenums):
|
||||
"""
|
||||
Write a valid share with the given URI extension block.
|
||||
"""
|
||||
write_proxy = make_write_bucket_proxy(
|
||||
storage_rref,
|
||||
None,
|
||||
1024,
|
||||
ueb["segment_size"],
|
||||
1,
|
||||
1,
|
||||
ueb["size"],
|
||||
)
|
||||
# See allmydata/immutable/layout.py
|
||||
offset = write_proxy._offsets["uri_extension"]
|
||||
filler = b"\0" * (offset - len(write_proxy._offset_data))
|
||||
ueb_data = uri.pack_extension(ueb)
|
||||
data = (
|
||||
write_proxy._offset_data +
|
||||
filler +
|
||||
pack(write_proxy.fieldstruct, len(ueb_data)) +
|
||||
ueb_data
|
||||
)
|
||||
return write_share(storage_rref, storage_index, sharenums, data)
|
||||
|
||||
|
||||
@inline_callbacks
|
||||
def write_share(storage_rref, storage_index, sharenums, sharedata):
|
||||
"""
|
||||
Write the given share data to the given storage index using the given
|
||||
IStorageServer remote reference.
|
||||
|
||||
:param foolscap.ipb.IRemoteReference storage_rref: A remote reference to
|
||||
an IStorageServer.
|
||||
|
||||
:param bytes storage_index: The storage index to which to write the share
|
||||
data.
|
||||
|
||||
:param [int] sharenums: The share numbers to which to write this sharedata.
|
||||
|
||||
:param bytes sharedata: The ciphertext to write as the share.
|
||||
"""
|
||||
ignored, writers = yield storage_rref.callRemote(
|
||||
"allocate_buckets",
|
||||
storage_index,
|
||||
b"x" * 16,
|
||||
b"x" * 16,
|
||||
sharenums,
|
||||
len(sharedata),
|
||||
LocalWrapper(None),
|
||||
|
||||
)
|
||||
[writer] = writers.values()
|
||||
yield writer.callRemote("write", 0, sharedata)
|
||||
yield writer.callRemote("close")
|
||||
|
Loading…
Reference in New Issue
Block a user