mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-01 16:58:10 +00:00
Calculate URI extension size upfront, instead of hand-waving with a larger value.
This commit is contained in:
parent
556606271d
commit
d50c98a1e9
@ -624,6 +624,7 @@ class Encoder(object):
|
||||
for k in ('crypttext_root_hash', 'crypttext_hash',
|
||||
):
|
||||
assert k in self.uri_extension_data
|
||||
self.uri_extension_data
|
||||
uri_extension = uri.pack_extension(self.uri_extension_data)
|
||||
ed = {}
|
||||
for k,v in self.uri_extension_data.items():
|
||||
@ -694,3 +695,20 @@ class Encoder(object):
|
||||
return self.uri_extension_data
|
||||
def get_uri_extension_hash(self):
|
||||
return self.uri_extension_hash
|
||||
|
||||
def get_uri_extension_size(self):
|
||||
"""
|
||||
Calculate the size of the URI extension that gets written at the end of
|
||||
immutables.
|
||||
|
||||
This may be done earlier than actual encoding, so e.g. we might not
|
||||
know the crypttext hashes, but that's fine for our purposes since we
|
||||
only care about the length.
|
||||
"""
|
||||
params = self.uri_extension_data.copy()
|
||||
assert params
|
||||
params["crypttext_hash"] = b"\x00" * 32
|
||||
params["crypttext_root_hash"] = b"\x00" * 32
|
||||
params["share_root_hash"] = b"\x00" * 32
|
||||
uri_extension = uri.pack_extension(params)
|
||||
return len(uri_extension)
|
||||
|
@ -90,7 +90,7 @@ FORCE_V2 = False # set briefly by unit tests to make small-sized V2 shares
|
||||
|
||||
def make_write_bucket_proxy(rref, server,
|
||||
data_size, block_size, num_segments,
|
||||
num_share_hashes, uri_extension_size_max):
|
||||
num_share_hashes, uri_extension_size):
|
||||
# Use layout v1 for small files, so they'll be readable by older versions
|
||||
# (<tahoe-1.3.0). Use layout v2 for large files; they'll only be readable
|
||||
# by tahoe-1.3.0 or later.
|
||||
@ -99,11 +99,11 @@ def make_write_bucket_proxy(rref, server,
|
||||
raise FileTooLargeError
|
||||
wbp = WriteBucketProxy(rref, server,
|
||||
data_size, block_size, num_segments,
|
||||
num_share_hashes, uri_extension_size_max)
|
||||
num_share_hashes, uri_extension_size)
|
||||
except FileTooLargeError:
|
||||
wbp = WriteBucketProxy_v2(rref, server,
|
||||
data_size, block_size, num_segments,
|
||||
num_share_hashes, uri_extension_size_max)
|
||||
num_share_hashes, uri_extension_size)
|
||||
return wbp
|
||||
|
||||
@implementer(IStorageBucketWriter)
|
||||
@ -112,7 +112,7 @@ class WriteBucketProxy(object):
|
||||
fieldstruct = ">L"
|
||||
|
||||
def __init__(self, rref, server, data_size, block_size, num_segments,
|
||||
num_share_hashes, uri_extension_size_max, pipeline_size=50000):
|
||||
num_share_hashes, uri_extension_size, pipeline_size=50000):
|
||||
self._rref = rref
|
||||
self._server = server
|
||||
self._data_size = data_size
|
||||
@ -124,8 +124,7 @@ class WriteBucketProxy(object):
|
||||
# how many share hashes are included in each share? This will be
|
||||
# about ln2(num_shares).
|
||||
self._share_hashtree_size = num_share_hashes * (2+HASH_SIZE)
|
||||
# we commit to not sending a uri extension larger than this
|
||||
self._uri_extension_size_max = uri_extension_size_max
|
||||
self._uri_extension_size = uri_extension_size
|
||||
|
||||
self._create_offsets(block_size, data_size)
|
||||
|
||||
@ -137,7 +136,7 @@ class WriteBucketProxy(object):
|
||||
|
||||
def get_allocated_size(self):
|
||||
return (self._offsets['uri_extension'] + self.fieldsize +
|
||||
self._uri_extension_size_max)
|
||||
self._uri_extension_size)
|
||||
|
||||
def _create_offsets(self, block_size, data_size):
|
||||
if block_size >= 2**32 or data_size >= 2**32:
|
||||
@ -233,8 +232,7 @@ class WriteBucketProxy(object):
|
||||
def put_uri_extension(self, data):
|
||||
offset = self._offsets['uri_extension']
|
||||
assert isinstance(data, bytes)
|
||||
precondition(len(data) <= self._uri_extension_size_max,
|
||||
len(data), self._uri_extension_size_max)
|
||||
precondition(len(data) == self._uri_extension_size)
|
||||
length = struct.pack(self.fieldstruct, len(data))
|
||||
return self._write(offset, length+data)
|
||||
|
||||
|
@ -242,31 +242,26 @@ class UploadResults(object):
|
||||
def get_verifycapstr(self):
|
||||
return self._verifycapstr
|
||||
|
||||
# our current uri_extension is 846 bytes for small files, a few bytes
|
||||
# more for larger ones (since the filesize is encoded in decimal in a
|
||||
# few places). Ask for a little bit more just in case we need it. If
|
||||
# the extension changes size, we can change EXTENSION_SIZE to
|
||||
# allocate a more accurate amount of space.
|
||||
EXTENSION_SIZE = 1000
|
||||
# TODO: actual extensions are closer to 419 bytes, so we can probably lower
|
||||
# this.
|
||||
|
||||
def pretty_print_shnum_to_servers(s):
|
||||
return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.items() ])
|
||||
|
||||
|
||||
class ServerTracker(object):
|
||||
def __init__(self, server,
|
||||
sharesize, blocksize, num_segments, num_share_hashes,
|
||||
storage_index,
|
||||
bucket_renewal_secret, bucket_cancel_secret):
|
||||
bucket_renewal_secret, bucket_cancel_secret,
|
||||
uri_extension_size):
|
||||
self._server = server
|
||||
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
|
||||
self.sharesize = sharesize
|
||||
self.uri_extension_size = uri_extension_size
|
||||
|
||||
wbp = layout.make_write_bucket_proxy(None, None, sharesize,
|
||||
blocksize, num_segments,
|
||||
num_share_hashes,
|
||||
EXTENSION_SIZE)
|
||||
uri_extension_size)
|
||||
self.wbp_class = wbp.__class__ # to create more of them
|
||||
self.allocated_size = wbp.get_allocated_size()
|
||||
self.blocksize = blocksize
|
||||
@ -314,7 +309,7 @@ class ServerTracker(object):
|
||||
self.blocksize,
|
||||
self.num_segments,
|
||||
self.num_share_hashes,
|
||||
EXTENSION_SIZE)
|
||||
self.uri_extension_size)
|
||||
b[sharenum] = bp
|
||||
self.buckets.update(b)
|
||||
return (alreadygot, set(b.keys()))
|
||||
@ -487,7 +482,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
||||
def get_shareholders(self, storage_broker, secret_holder,
|
||||
storage_index, share_size, block_size,
|
||||
num_segments, total_shares, needed_shares,
|
||||
min_happiness):
|
||||
min_happiness, uri_extension_size):
|
||||
"""
|
||||
@return: (upload_trackers, already_serverids), where upload_trackers
|
||||
is a set of ServerTracker instances that have agreed to hold
|
||||
@ -529,7 +524,8 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
||||
# figure out how much space to ask for
|
||||
wbp = layout.make_write_bucket_proxy(None, None,
|
||||
share_size, 0, num_segments,
|
||||
num_share_hashes, EXTENSION_SIZE)
|
||||
num_share_hashes,
|
||||
uri_extension_size)
|
||||
allocated_size = wbp.get_allocated_size()
|
||||
|
||||
# decide upon the renewal/cancel secrets, to include them in the
|
||||
@ -554,7 +550,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
||||
def _create_server_tracker(server, renew, cancel):
|
||||
return ServerTracker(
|
||||
server, share_size, block_size, num_segments, num_share_hashes,
|
||||
storage_index, renew, cancel,
|
||||
storage_index, renew, cancel, uri_extension_size
|
||||
)
|
||||
|
||||
readonly_trackers, write_trackers = self._create_trackers(
|
||||
@ -1326,7 +1322,8 @@ class CHKUploader(object):
|
||||
d = server_selector.get_shareholders(storage_broker, secret_holder,
|
||||
storage_index,
|
||||
share_size, block_size,
|
||||
num_segments, n, k, desired)
|
||||
num_segments, n, k, desired,
|
||||
encoder.get_uri_extension_size())
|
||||
def _done(res):
|
||||
self._server_selection_elapsed = time.time() - server_selection_started
|
||||
return res
|
||||
|
Loading…
x
Reference in New Issue
Block a user