mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-22 18:22:40 +00:00
Merge pull request #1211 from tahoe-lafs/3915-immutable-size
Match up allocated size of immutable and what actually gets written Fixes ticket:3915
This commit is contained in:
commit
508dda1646
0
newsfragments/3915.minor
Normal file
0
newsfragments/3915.minor
Normal file
@ -694,3 +694,24 @@ class Encoder(object):
|
|||||||
return self.uri_extension_data
|
return self.uri_extension_data
|
||||||
def get_uri_extension_hash(self):
|
def get_uri_extension_hash(self):
|
||||||
return self.uri_extension_hash
|
return self.uri_extension_hash
|
||||||
|
|
||||||
|
def get_uri_extension_size(self):
|
||||||
|
"""
|
||||||
|
Calculate the size of the URI extension that gets written at the end of
|
||||||
|
immutables.
|
||||||
|
|
||||||
|
This may be done earlier than actual encoding, so e.g. we might not
|
||||||
|
know the crypttext hashes, but that's fine for our purposes since we
|
||||||
|
only care about the length.
|
||||||
|
"""
|
||||||
|
params = self.uri_extension_data.copy()
|
||||||
|
params["crypttext_hash"] = b"\x00" * hashutil.CRYPTO_VAL_SIZE
|
||||||
|
params["crypttext_root_hash"] = b"\x00" * hashutil.CRYPTO_VAL_SIZE
|
||||||
|
params["share_root_hash"] = b"\x00" * hashutil.CRYPTO_VAL_SIZE
|
||||||
|
assert params.keys() == {
|
||||||
|
"codec_name", "codec_params", "size", "segment_size", "num_segments",
|
||||||
|
"needed_shares", "total_shares", "tail_codec_params",
|
||||||
|
"crypttext_hash", "crypttext_root_hash", "share_root_hash"
|
||||||
|
}, params.keys()
|
||||||
|
uri_extension = uri.pack_extension(params)
|
||||||
|
return len(uri_extension)
|
||||||
|
@ -19,6 +19,7 @@ from allmydata.util import mathutil, observer, pipeline, log
|
|||||||
from allmydata.util.assertutil import precondition
|
from allmydata.util.assertutil import precondition
|
||||||
from allmydata.storage.server import si_b2a
|
from allmydata.storage.server import si_b2a
|
||||||
|
|
||||||
|
|
||||||
class LayoutInvalid(Exception):
|
class LayoutInvalid(Exception):
|
||||||
""" There is something wrong with these bytes so they can't be
|
""" There is something wrong with these bytes so they can't be
|
||||||
interpreted as the kind of immutable file that I know how to download."""
|
interpreted as the kind of immutable file that I know how to download."""
|
||||||
@ -90,7 +91,7 @@ FORCE_V2 = False # set briefly by unit tests to make small-sized V2 shares
|
|||||||
|
|
||||||
def make_write_bucket_proxy(rref, server,
|
def make_write_bucket_proxy(rref, server,
|
||||||
data_size, block_size, num_segments,
|
data_size, block_size, num_segments,
|
||||||
num_share_hashes, uri_extension_size_max):
|
num_share_hashes, uri_extension_size):
|
||||||
# Use layout v1 for small files, so they'll be readable by older versions
|
# Use layout v1 for small files, so they'll be readable by older versions
|
||||||
# (<tahoe-1.3.0). Use layout v2 for large files; they'll only be readable
|
# (<tahoe-1.3.0). Use layout v2 for large files; they'll only be readable
|
||||||
# by tahoe-1.3.0 or later.
|
# by tahoe-1.3.0 or later.
|
||||||
@ -99,11 +100,11 @@ def make_write_bucket_proxy(rref, server,
|
|||||||
raise FileTooLargeError
|
raise FileTooLargeError
|
||||||
wbp = WriteBucketProxy(rref, server,
|
wbp = WriteBucketProxy(rref, server,
|
||||||
data_size, block_size, num_segments,
|
data_size, block_size, num_segments,
|
||||||
num_share_hashes, uri_extension_size_max)
|
num_share_hashes, uri_extension_size)
|
||||||
except FileTooLargeError:
|
except FileTooLargeError:
|
||||||
wbp = WriteBucketProxy_v2(rref, server,
|
wbp = WriteBucketProxy_v2(rref, server,
|
||||||
data_size, block_size, num_segments,
|
data_size, block_size, num_segments,
|
||||||
num_share_hashes, uri_extension_size_max)
|
num_share_hashes, uri_extension_size)
|
||||||
return wbp
|
return wbp
|
||||||
|
|
||||||
@implementer(IStorageBucketWriter)
|
@implementer(IStorageBucketWriter)
|
||||||
@ -112,20 +113,20 @@ class WriteBucketProxy(object):
|
|||||||
fieldstruct = ">L"
|
fieldstruct = ">L"
|
||||||
|
|
||||||
def __init__(self, rref, server, data_size, block_size, num_segments,
|
def __init__(self, rref, server, data_size, block_size, num_segments,
|
||||||
num_share_hashes, uri_extension_size_max, pipeline_size=50000):
|
num_share_hashes, uri_extension_size, pipeline_size=50000):
|
||||||
self._rref = rref
|
self._rref = rref
|
||||||
self._server = server
|
self._server = server
|
||||||
self._data_size = data_size
|
self._data_size = data_size
|
||||||
self._block_size = block_size
|
self._block_size = block_size
|
||||||
self._num_segments = num_segments
|
self._num_segments = num_segments
|
||||||
|
self._written_bytes = 0
|
||||||
|
|
||||||
effective_segments = mathutil.next_power_of_k(num_segments,2)
|
effective_segments = mathutil.next_power_of_k(num_segments,2)
|
||||||
self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
|
self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
|
||||||
# how many share hashes are included in each share? This will be
|
# how many share hashes are included in each share? This will be
|
||||||
# about ln2(num_shares).
|
# about ln2(num_shares).
|
||||||
self._share_hashtree_size = num_share_hashes * (2+HASH_SIZE)
|
self._share_hashtree_size = num_share_hashes * (2+HASH_SIZE)
|
||||||
# we commit to not sending a uri extension larger than this
|
self._uri_extension_size = uri_extension_size
|
||||||
self._uri_extension_size_max = uri_extension_size_max
|
|
||||||
|
|
||||||
self._create_offsets(block_size, data_size)
|
self._create_offsets(block_size, data_size)
|
||||||
|
|
||||||
@ -137,7 +138,7 @@ class WriteBucketProxy(object):
|
|||||||
|
|
||||||
def get_allocated_size(self):
|
def get_allocated_size(self):
|
||||||
return (self._offsets['uri_extension'] + self.fieldsize +
|
return (self._offsets['uri_extension'] + self.fieldsize +
|
||||||
self._uri_extension_size_max)
|
self._uri_extension_size)
|
||||||
|
|
||||||
def _create_offsets(self, block_size, data_size):
|
def _create_offsets(self, block_size, data_size):
|
||||||
if block_size >= 2**32 or data_size >= 2**32:
|
if block_size >= 2**32 or data_size >= 2**32:
|
||||||
@ -195,6 +196,14 @@ class WriteBucketProxy(object):
|
|||||||
return self._write(offset, data)
|
return self._write(offset, data)
|
||||||
|
|
||||||
def put_crypttext_hashes(self, hashes):
|
def put_crypttext_hashes(self, hashes):
|
||||||
|
# plaintext_hash_tree precedes crypttext_hash_tree. It is not used, and
|
||||||
|
# so is not explicitly written, but we need to write everything, so
|
||||||
|
# fill it in with nulls.
|
||||||
|
d = self._write(self._offsets['plaintext_hash_tree'], b"\x00" * self._segment_hash_size)
|
||||||
|
d.addCallback(lambda _: self._really_put_crypttext_hashes(hashes))
|
||||||
|
return d
|
||||||
|
|
||||||
|
def _really_put_crypttext_hashes(self, hashes):
|
||||||
offset = self._offsets['crypttext_hash_tree']
|
offset = self._offsets['crypttext_hash_tree']
|
||||||
assert isinstance(hashes, list)
|
assert isinstance(hashes, list)
|
||||||
data = b"".join(hashes)
|
data = b"".join(hashes)
|
||||||
@ -233,8 +242,7 @@ class WriteBucketProxy(object):
|
|||||||
def put_uri_extension(self, data):
|
def put_uri_extension(self, data):
|
||||||
offset = self._offsets['uri_extension']
|
offset = self._offsets['uri_extension']
|
||||||
assert isinstance(data, bytes)
|
assert isinstance(data, bytes)
|
||||||
precondition(len(data) <= self._uri_extension_size_max,
|
precondition(len(data) == self._uri_extension_size)
|
||||||
len(data), self._uri_extension_size_max)
|
|
||||||
length = struct.pack(self.fieldstruct, len(data))
|
length = struct.pack(self.fieldstruct, len(data))
|
||||||
return self._write(offset, length+data)
|
return self._write(offset, length+data)
|
||||||
|
|
||||||
@ -244,11 +252,12 @@ class WriteBucketProxy(object):
|
|||||||
# would reduce the foolscap CPU overhead per share, but wouldn't
|
# would reduce the foolscap CPU overhead per share, but wouldn't
|
||||||
# reduce the number of round trips, so it might not be worth the
|
# reduce the number of round trips, so it might not be worth the
|
||||||
# effort.
|
# effort.
|
||||||
|
self._written_bytes += len(data)
|
||||||
return self._pipeline.add(len(data),
|
return self._pipeline.add(len(data),
|
||||||
self._rref.callRemote, "write", offset, data)
|
self._rref.callRemote, "write", offset, data)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
|
assert self._written_bytes == self.get_allocated_size(), f"{self._written_bytes} != {self.get_allocated_size()}"
|
||||||
d = self._pipeline.add(0, self._rref.callRemote, "close")
|
d = self._pipeline.add(0, self._rref.callRemote, "close")
|
||||||
d.addCallback(lambda ign: self._pipeline.flush())
|
d.addCallback(lambda ign: self._pipeline.flush())
|
||||||
return d
|
return d
|
||||||
@ -303,8 +312,6 @@ class WriteBucketProxy_v2(WriteBucketProxy):
|
|||||||
@implementer(IStorageBucketReader)
|
@implementer(IStorageBucketReader)
|
||||||
class ReadBucketProxy(object):
|
class ReadBucketProxy(object):
|
||||||
|
|
||||||
MAX_UEB_SIZE = 2000 # actual size is closer to 419, but varies by a few bytes
|
|
||||||
|
|
||||||
def __init__(self, rref, server, storage_index):
|
def __init__(self, rref, server, storage_index):
|
||||||
self._rref = rref
|
self._rref = rref
|
||||||
self._server = server
|
self._server = server
|
||||||
@ -332,11 +339,6 @@ class ReadBucketProxy(object):
|
|||||||
# TODO: for small shares, read the whole bucket in _start()
|
# TODO: for small shares, read the whole bucket in _start()
|
||||||
d = self._fetch_header()
|
d = self._fetch_header()
|
||||||
d.addCallback(self._parse_offsets)
|
d.addCallback(self._parse_offsets)
|
||||||
# XXX The following two callbacks implement a slightly faster/nicer
|
|
||||||
# way to get the ueb and sharehashtree, but it requires that the
|
|
||||||
# storage server be >= v1.3.0.
|
|
||||||
# d.addCallback(self._fetch_sharehashtree_and_ueb)
|
|
||||||
# d.addCallback(self._parse_sharehashtree_and_ueb)
|
|
||||||
def _fail_waiters(f):
|
def _fail_waiters(f):
|
||||||
self._ready.fire(f)
|
self._ready.fire(f)
|
||||||
def _notify_waiters(result):
|
def _notify_waiters(result):
|
||||||
@ -381,29 +383,6 @@ class ReadBucketProxy(object):
|
|||||||
self._offsets[field] = offset
|
self._offsets[field] = offset
|
||||||
return self._offsets
|
return self._offsets
|
||||||
|
|
||||||
def _fetch_sharehashtree_and_ueb(self, offsets):
|
|
||||||
sharehashtree_size = offsets['uri_extension'] - offsets['share_hashes']
|
|
||||||
return self._read(offsets['share_hashes'],
|
|
||||||
self.MAX_UEB_SIZE+sharehashtree_size)
|
|
||||||
|
|
||||||
def _parse_sharehashtree_and_ueb(self, data):
|
|
||||||
sharehashtree_size = self._offsets['uri_extension'] - self._offsets['share_hashes']
|
|
||||||
if len(data) < sharehashtree_size:
|
|
||||||
raise LayoutInvalid("share hash tree truncated -- should have at least %d bytes -- not %d" % (sharehashtree_size, len(data)))
|
|
||||||
if sharehashtree_size % (2+HASH_SIZE) != 0:
|
|
||||||
raise LayoutInvalid("share hash tree malformed -- should have an even multiple of %d bytes -- not %d" % (2+HASH_SIZE, sharehashtree_size))
|
|
||||||
self._share_hashes = []
|
|
||||||
for i in range(0, sharehashtree_size, 2+HASH_SIZE):
|
|
||||||
hashnum = struct.unpack(">H", data[i:i+2])[0]
|
|
||||||
hashvalue = data[i+2:i+2+HASH_SIZE]
|
|
||||||
self._share_hashes.append( (hashnum, hashvalue) )
|
|
||||||
|
|
||||||
i = self._offsets['uri_extension']-self._offsets['share_hashes']
|
|
||||||
if len(data) < i+self._fieldsize:
|
|
||||||
raise LayoutInvalid("not enough bytes to encode URI length -- should be at least %d bytes long, not %d " % (i+self._fieldsize, len(data),))
|
|
||||||
length = struct.unpack(self._fieldstruct, data[i:i+self._fieldsize])[0]
|
|
||||||
self._ueb_data = data[i+self._fieldsize:i+self._fieldsize+length]
|
|
||||||
|
|
||||||
def _get_block_data(self, unused, blocknum, blocksize, thisblocksize):
|
def _get_block_data(self, unused, blocknum, blocksize, thisblocksize):
|
||||||
offset = self._offsets['data'] + blocknum * blocksize
|
offset = self._offsets['data'] + blocknum * blocksize
|
||||||
return self._read(offset, thisblocksize)
|
return self._read(offset, thisblocksize)
|
||||||
@ -446,20 +425,18 @@ class ReadBucketProxy(object):
|
|||||||
else:
|
else:
|
||||||
return defer.succeed([])
|
return defer.succeed([])
|
||||||
|
|
||||||
def _get_share_hashes(self, unused=None):
|
|
||||||
if hasattr(self, '_share_hashes'):
|
|
||||||
return self._share_hashes
|
|
||||||
return self._get_share_hashes_the_old_way()
|
|
||||||
|
|
||||||
def get_share_hashes(self):
|
def get_share_hashes(self):
|
||||||
d = self._start_if_needed()
|
d = self._start_if_needed()
|
||||||
d.addCallback(self._get_share_hashes)
|
d.addCallback(self._get_share_hashes)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def _get_share_hashes_the_old_way(self):
|
def _get_share_hashes(self, _ignore):
|
||||||
""" Tahoe storage servers < v1.3.0 would return an error if you tried
|
""" Tahoe storage servers < v1.3.0 would return an error if you tried
|
||||||
to read past the end of the share, so we need to use the offset and
|
to read past the end of the share, so we need to use the offset and
|
||||||
read just that much."""
|
read just that much.
|
||||||
|
|
||||||
|
HTTP-based storage protocol also doesn't like reading past the end.
|
||||||
|
"""
|
||||||
offset = self._offsets['share_hashes']
|
offset = self._offsets['share_hashes']
|
||||||
size = self._offsets['uri_extension'] - offset
|
size = self._offsets['uri_extension'] - offset
|
||||||
if size % (2+HASH_SIZE) != 0:
|
if size % (2+HASH_SIZE) != 0:
|
||||||
@ -477,32 +454,29 @@ class ReadBucketProxy(object):
|
|||||||
d.addCallback(_unpack_share_hashes)
|
d.addCallback(_unpack_share_hashes)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def _get_uri_extension_the_old_way(self, unused=None):
|
def _get_uri_extension(self, unused=None):
|
||||||
""" Tahoe storage servers < v1.3.0 would return an error if you tried
|
""" Tahoe storage servers < v1.3.0 would return an error if you tried
|
||||||
to read past the end of the share, so we need to fetch the UEB size
|
to read past the end of the share, so we need to fetch the UEB size
|
||||||
and then read just that much."""
|
and then read just that much.
|
||||||
|
|
||||||
|
HTTP-based storage protocol also doesn't like reading past the end.
|
||||||
|
"""
|
||||||
offset = self._offsets['uri_extension']
|
offset = self._offsets['uri_extension']
|
||||||
d = self._read(offset, self._fieldsize)
|
d = self._read(offset, self._fieldsize)
|
||||||
def _got_length(data):
|
def _got_length(data):
|
||||||
if len(data) != self._fieldsize:
|
if len(data) != self._fieldsize:
|
||||||
raise LayoutInvalid("not enough bytes to encode URI length -- should be %d bytes long, not %d " % (self._fieldsize, len(data),))
|
raise LayoutInvalid("not enough bytes to encode URI length -- should be %d bytes long, not %d " % (self._fieldsize, len(data),))
|
||||||
length = struct.unpack(self._fieldstruct, data)[0]
|
length = struct.unpack(self._fieldstruct, data)[0]
|
||||||
if length >= 2**31:
|
if length >= 2000:
|
||||||
# URI extension blocks are around 419 bytes long, so this
|
# URI extension blocks are around 419 bytes long; in previous
|
||||||
# must be corrupted. Anyway, the foolscap interface schema
|
# versions of the code 1000 was used as a default catchall. So
|
||||||
# for "read" will not allow >= 2**31 bytes length.
|
# 2000 or more must be corrupted.
|
||||||
raise RidiculouslyLargeURIExtensionBlock(length)
|
raise RidiculouslyLargeURIExtensionBlock(length)
|
||||||
|
|
||||||
return self._read(offset+self._fieldsize, length)
|
return self._read(offset+self._fieldsize, length)
|
||||||
d.addCallback(_got_length)
|
d.addCallback(_got_length)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def _get_uri_extension(self, unused=None):
|
|
||||||
if hasattr(self, '_ueb_data'):
|
|
||||||
return self._ueb_data
|
|
||||||
else:
|
|
||||||
return self._get_uri_extension_the_old_way()
|
|
||||||
|
|
||||||
def get_uri_extension(self):
|
def get_uri_extension(self):
|
||||||
d = self._start_if_needed()
|
d = self._start_if_needed()
|
||||||
d.addCallback(self._get_uri_extension)
|
d.addCallback(self._get_uri_extension)
|
||||||
|
@ -242,31 +242,26 @@ class UploadResults(object):
|
|||||||
def get_verifycapstr(self):
|
def get_verifycapstr(self):
|
||||||
return self._verifycapstr
|
return self._verifycapstr
|
||||||
|
|
||||||
# our current uri_extension is 846 bytes for small files, a few bytes
|
|
||||||
# more for larger ones (since the filesize is encoded in decimal in a
|
|
||||||
# few places). Ask for a little bit more just in case we need it. If
|
|
||||||
# the extension changes size, we can change EXTENSION_SIZE to
|
|
||||||
# allocate a more accurate amount of space.
|
|
||||||
EXTENSION_SIZE = 1000
|
|
||||||
# TODO: actual extensions are closer to 419 bytes, so we can probably lower
|
|
||||||
# this.
|
|
||||||
|
|
||||||
def pretty_print_shnum_to_servers(s):
|
def pretty_print_shnum_to_servers(s):
|
||||||
return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.items() ])
|
return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.items() ])
|
||||||
|
|
||||||
|
|
||||||
class ServerTracker(object):
|
class ServerTracker(object):
|
||||||
def __init__(self, server,
|
def __init__(self, server,
|
||||||
sharesize, blocksize, num_segments, num_share_hashes,
|
sharesize, blocksize, num_segments, num_share_hashes,
|
||||||
storage_index,
|
storage_index,
|
||||||
bucket_renewal_secret, bucket_cancel_secret):
|
bucket_renewal_secret, bucket_cancel_secret,
|
||||||
|
uri_extension_size):
|
||||||
self._server = server
|
self._server = server
|
||||||
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
|
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
|
||||||
self.sharesize = sharesize
|
self.sharesize = sharesize
|
||||||
|
self.uri_extension_size = uri_extension_size
|
||||||
|
|
||||||
wbp = layout.make_write_bucket_proxy(None, None, sharesize,
|
wbp = layout.make_write_bucket_proxy(None, None, sharesize,
|
||||||
blocksize, num_segments,
|
blocksize, num_segments,
|
||||||
num_share_hashes,
|
num_share_hashes,
|
||||||
EXTENSION_SIZE)
|
uri_extension_size)
|
||||||
self.wbp_class = wbp.__class__ # to create more of them
|
self.wbp_class = wbp.__class__ # to create more of them
|
||||||
self.allocated_size = wbp.get_allocated_size()
|
self.allocated_size = wbp.get_allocated_size()
|
||||||
self.blocksize = blocksize
|
self.blocksize = blocksize
|
||||||
@ -314,7 +309,7 @@ class ServerTracker(object):
|
|||||||
self.blocksize,
|
self.blocksize,
|
||||||
self.num_segments,
|
self.num_segments,
|
||||||
self.num_share_hashes,
|
self.num_share_hashes,
|
||||||
EXTENSION_SIZE)
|
self.uri_extension_size)
|
||||||
b[sharenum] = bp
|
b[sharenum] = bp
|
||||||
self.buckets.update(b)
|
self.buckets.update(b)
|
||||||
return (alreadygot, set(b.keys()))
|
return (alreadygot, set(b.keys()))
|
||||||
@ -487,7 +482,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
|||||||
def get_shareholders(self, storage_broker, secret_holder,
|
def get_shareholders(self, storage_broker, secret_holder,
|
||||||
storage_index, share_size, block_size,
|
storage_index, share_size, block_size,
|
||||||
num_segments, total_shares, needed_shares,
|
num_segments, total_shares, needed_shares,
|
||||||
min_happiness):
|
min_happiness, uri_extension_size):
|
||||||
"""
|
"""
|
||||||
@return: (upload_trackers, already_serverids), where upload_trackers
|
@return: (upload_trackers, already_serverids), where upload_trackers
|
||||||
is a set of ServerTracker instances that have agreed to hold
|
is a set of ServerTracker instances that have agreed to hold
|
||||||
@ -529,7 +524,8 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
|||||||
# figure out how much space to ask for
|
# figure out how much space to ask for
|
||||||
wbp = layout.make_write_bucket_proxy(None, None,
|
wbp = layout.make_write_bucket_proxy(None, None,
|
||||||
share_size, 0, num_segments,
|
share_size, 0, num_segments,
|
||||||
num_share_hashes, EXTENSION_SIZE)
|
num_share_hashes,
|
||||||
|
uri_extension_size)
|
||||||
allocated_size = wbp.get_allocated_size()
|
allocated_size = wbp.get_allocated_size()
|
||||||
|
|
||||||
# decide upon the renewal/cancel secrets, to include them in the
|
# decide upon the renewal/cancel secrets, to include them in the
|
||||||
@ -554,7 +550,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
|||||||
def _create_server_tracker(server, renew, cancel):
|
def _create_server_tracker(server, renew, cancel):
|
||||||
return ServerTracker(
|
return ServerTracker(
|
||||||
server, share_size, block_size, num_segments, num_share_hashes,
|
server, share_size, block_size, num_segments, num_share_hashes,
|
||||||
storage_index, renew, cancel,
|
storage_index, renew, cancel, uri_extension_size
|
||||||
)
|
)
|
||||||
|
|
||||||
readonly_trackers, write_trackers = self._create_trackers(
|
readonly_trackers, write_trackers = self._create_trackers(
|
||||||
@ -1326,7 +1322,8 @@ class CHKUploader(object):
|
|||||||
d = server_selector.get_shareholders(storage_broker, secret_holder,
|
d = server_selector.get_shareholders(storage_broker, secret_holder,
|
||||||
storage_index,
|
storage_index,
|
||||||
share_size, block_size,
|
share_size, block_size,
|
||||||
num_segments, n, k, desired)
|
num_segments, n, k, desired,
|
||||||
|
encoder.get_uri_extension_size())
|
||||||
def _done(res):
|
def _done(res):
|
||||||
self._server_selection_elapsed = time.time() - server_selection_started
|
self._server_selection_elapsed = time.time() - server_selection_started
|
||||||
return res
|
return res
|
||||||
|
@ -397,7 +397,9 @@ class BucketWriter(object):
|
|||||||
"""
|
"""
|
||||||
Write data at given offset, return whether the upload is complete.
|
Write data at given offset, return whether the upload is complete.
|
||||||
"""
|
"""
|
||||||
# Delay the timeout, since we received data:
|
# Delay the timeout, since we received data; if we get an
|
||||||
|
# AlreadyCancelled error, that means there's a bug in the client and
|
||||||
|
# write() was called after close().
|
||||||
self._timeout.reset(30 * 60)
|
self._timeout.reset(30 * 60)
|
||||||
start = self._clock.seconds()
|
start = self._clock.seconds()
|
||||||
precondition(not self.closed)
|
precondition(not self.closed)
|
||||||
@ -419,14 +421,18 @@ class BucketWriter(object):
|
|||||||
self._already_written.set(True, offset, end)
|
self._already_written.set(True, offset, end)
|
||||||
self.ss.add_latency("write", self._clock.seconds() - start)
|
self.ss.add_latency("write", self._clock.seconds() - start)
|
||||||
self.ss.count("write")
|
self.ss.count("write")
|
||||||
|
return self._is_finished()
|
||||||
|
|
||||||
# Return whether the whole thing has been written. See
|
def _is_finished(self):
|
||||||
# https://github.com/mlenzen/collections-extended/issues/169 and
|
"""
|
||||||
# https://github.com/mlenzen/collections-extended/issues/172 for why
|
Return whether the whole thing has been written.
|
||||||
# it's done this way.
|
"""
|
||||||
return sum([mr.stop - mr.start for mr in self._already_written.ranges()]) == self._max_size
|
return sum([mr.stop - mr.start for mr in self._already_written.ranges()]) == self._max_size
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
|
# This can't actually be enabled, because it's not backwards compatible
|
||||||
|
# with old Foolscap clients.
|
||||||
|
# assert self._is_finished()
|
||||||
precondition(not self.closed)
|
precondition(not self.closed)
|
||||||
self._timeout.cancel()
|
self._timeout.cancel()
|
||||||
start = self._clock.seconds()
|
start = self._clock.seconds()
|
||||||
|
@ -251,6 +251,12 @@ class Verifier(GridTestMixin, unittest.TestCase, RepairTestMixin):
|
|||||||
self.judge_invisible_corruption)
|
self.judge_invisible_corruption)
|
||||||
|
|
||||||
def test_corrupt_ueb(self):
|
def test_corrupt_ueb(self):
|
||||||
|
# Note that in some rare situations this might fail, specifically if
|
||||||
|
# the length of the UEB is corrupted to be a value that is bigger than
|
||||||
|
# the size but less than 2000, it might not get caught... But that's
|
||||||
|
# mostly because in that case it doesn't meaningfully corrupt it. See
|
||||||
|
# _get_uri_extension_the_old_way() in layout.py for where the 2000
|
||||||
|
# number comes from.
|
||||||
self.basedir = "repairer/Verifier/corrupt_ueb"
|
self.basedir = "repairer/Verifier/corrupt_ueb"
|
||||||
return self._help_test_verify(common._corrupt_uri_extension,
|
return self._help_test_verify(common._corrupt_uri_extension,
|
||||||
self.judge_invisible_corruption)
|
self.judge_invisible_corruption)
|
||||||
|
@ -463,7 +463,7 @@ class BucketProxy(unittest.TestCase):
|
|||||||
block_size=10,
|
block_size=10,
|
||||||
num_segments=5,
|
num_segments=5,
|
||||||
num_share_hashes=3,
|
num_share_hashes=3,
|
||||||
uri_extension_size_max=500)
|
uri_extension_size=500)
|
||||||
self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
|
self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
|
||||||
|
|
||||||
def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
|
def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
|
||||||
@ -494,7 +494,7 @@ class BucketProxy(unittest.TestCase):
|
|||||||
block_size=25,
|
block_size=25,
|
||||||
num_segments=4,
|
num_segments=4,
|
||||||
num_share_hashes=3,
|
num_share_hashes=3,
|
||||||
uri_extension_size_max=len(uri_extension))
|
uri_extension_size=len(uri_extension))
|
||||||
|
|
||||||
d = bp.put_header()
|
d = bp.put_header()
|
||||||
d.addCallback(lambda res: bp.put_block(0, b"a"*25))
|
d.addCallback(lambda res: bp.put_block(0, b"a"*25))
|
||||||
|
@ -983,7 +983,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
|||||||
num_segments = encoder.get_param("num_segments")
|
num_segments = encoder.get_param("num_segments")
|
||||||
d = selector.get_shareholders(broker, sh, storage_index,
|
d = selector.get_shareholders(broker, sh, storage_index,
|
||||||
share_size, block_size, num_segments,
|
share_size, block_size, num_segments,
|
||||||
10, 3, 4)
|
10, 3, 4, encoder.get_uri_extension_size())
|
||||||
def _have_shareholders(upload_trackers_and_already_servers):
|
def _have_shareholders(upload_trackers_and_already_servers):
|
||||||
(upload_trackers, already_servers) = upload_trackers_and_already_servers
|
(upload_trackers, already_servers) = upload_trackers_and_already_servers
|
||||||
assert servers_to_break <= len(upload_trackers)
|
assert servers_to_break <= len(upload_trackers)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user