mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-20 17:52:50 +00:00
upload: rework passing of default encoding parameters: move more responsibility into BaseUploadable
This commit is contained in:
parent
75e4332462
commit
81c5ceae16
@ -1085,6 +1085,9 @@ class IEncryptedUploadable(Interface):
|
||||
segment_size is larger than filesize, the difference must be stored
|
||||
as padding).
|
||||
|
||||
This usually passes through to the IUploadable method of the same
|
||||
name.
|
||||
|
||||
The encoder strictly obeys the values returned by this method. To
|
||||
make an upload use non-default encoding parameters, you must arrange
|
||||
to control the values that this method returns.
|
||||
@ -1132,34 +1135,46 @@ class IEncryptedUploadable(Interface):
|
||||
"""Just like IUploadable.close()."""
|
||||
|
||||
class IUploadable(Interface):
|
||||
def set_default_encoding_parameters(params):
|
||||
"""Set the default encoding parameters, which must be a dict mapping
|
||||
strings to ints. The meaningful keys are 'k', 'happy', 'n', and
|
||||
'max_segment_size'. These might have an influence on the final
|
||||
encoding parameters returned by get_all_encoding_parameters(), if the
|
||||
Uploadable doesn't have more specific preferences.
|
||||
|
||||
This call is optional: if it is not used, the Uploadable will use
|
||||
some built-in defaults. If used, this method must be called before
|
||||
any other IUploadable methods to have any effect.
|
||||
"""
|
||||
|
||||
def get_size():
|
||||
"""Return a Deferred that will fire with the length of the data to be
|
||||
uploaded, in bytes. This will be called before the data is actually
|
||||
used, to compute encoding parameters.
|
||||
"""
|
||||
|
||||
def get_maximum_segment_size():
|
||||
"""Return a Deferred that fires with None or an integer. None
|
||||
indicates that the Uploadable doesn't care about segment size, and
|
||||
the IEncryptedUploadable wrapper will use a default of probably 1MB.
|
||||
If provided, the integer will be used as the maximum segment size.
|
||||
Larger values reduce hash overhead, smaller values reduce memory
|
||||
footprint and cause data to be delivered in smaller pieces (which may
|
||||
provide a smoother and more predictable download experience).
|
||||
def get_all_encoding_parameters():
|
||||
"""Return a Deferred that fires with a tuple of
|
||||
(k,happy,n,segment_size). The segment_size will be used as-is, and
|
||||
must match the following constraints: it must be a multiple of k, and
|
||||
it shouldn't be unreasonably larger than the file size (if
|
||||
segment_size is larger than filesize, the difference must be stored
|
||||
as padding).
|
||||
|
||||
There are other constraints on the segment size (see
|
||||
IEncryptedUploadable.get_encoding_parameters), so the final segment
|
||||
size may be smaller than the one returned by this method.
|
||||
"""
|
||||
The relative values of k and n allow some IUploadables to request
|
||||
better redundancy than others (in exchange for consuming more space
|
||||
in the grid).
|
||||
|
||||
def get_encoding_parameters():
|
||||
"""Return a Deferred that either fires with None or with a tuple of
|
||||
(k,happy,n). None indicates that the Uploadable doesn't care how it
|
||||
is encoded, causing the Uploader to use default k/happy/n (either
|
||||
hard-coded or provided by the Introducer).
|
||||
Larger values of segment_size reduce hash overhead, while smaller
|
||||
values reduce memory footprint and cause data to be delivered in
|
||||
smaller pieces (which may provide a smoother and more predictable
|
||||
download experience).
|
||||
|
||||
This allows some IUploadables to request better redundancy than
|
||||
others.
|
||||
The encoder strictly obeys the values returned by this method. To
|
||||
make an upload use non-default encoding parameters, you must arrange
|
||||
to control the values that this method returns. One way to influence
|
||||
them may be to call set_encoding_parameters() before calling
|
||||
get_all_encoding_parameters().
|
||||
"""
|
||||
|
||||
def get_encryption_key():
|
||||
|
@ -166,9 +166,11 @@ class Encode(unittest.TestCase):
|
||||
# force use of multiple segments
|
||||
e = encode.Encoder()
|
||||
u = upload.Data(data)
|
||||
params = {"k": 25, "happy": 75, "n": 100,
|
||||
"max_segment_size": max_segment_size}
|
||||
eu = upload.EncryptAnUploadable(u, params)
|
||||
u.max_segment_size = max_segment_size
|
||||
u.encoding_param_k = 25
|
||||
u.encoding_param_happy = 75
|
||||
u.encoding_param_n = 100
|
||||
eu = upload.EncryptAnUploadable(u)
|
||||
d = e.set_encrypted_uploadable(eu)
|
||||
|
||||
all_shareholders = []
|
||||
@ -300,9 +302,11 @@ class Roundtrip(unittest.TestCase):
|
||||
e = encode.Encoder()
|
||||
u = upload.Data(data)
|
||||
# force use of multiple segments by using a low max_segment_size
|
||||
params = {"k": k, "happy": happy, "n": n,
|
||||
"max_segment_size": max_segment_size}
|
||||
eu = upload.EncryptAnUploadable(u, params)
|
||||
u.max_segment_size = max_segment_size
|
||||
u.encoding_param_k = k
|
||||
u.encoding_param_happy = happy
|
||||
u.encoding_param_n = n
|
||||
eu = upload.EncryptAnUploadable(u)
|
||||
d = e.set_encrypted_uploadable(eu)
|
||||
|
||||
shareholders = {}
|
||||
|
@ -331,10 +331,8 @@ class EncryptAnUploadable:
|
||||
implements(IEncryptedUploadable)
|
||||
CHUNKSIZE = 50*1000
|
||||
|
||||
def __init__(self, original, default_encoding_parameters):
|
||||
def __init__(self, original):
|
||||
self.original = IUploadable(original)
|
||||
assert isinstance(default_encoding_parameters, dict)
|
||||
self._default_encoding_parameters = default_encoding_parameters
|
||||
self._encryptor = None
|
||||
self._plaintext_hasher = plaintext_hasher()
|
||||
self._plaintext_segment_hasher = None
|
||||
@ -360,38 +358,15 @@ class EncryptAnUploadable:
|
||||
def get_all_encoding_parameters(self):
|
||||
if self._encoding_parameters is not None:
|
||||
return defer.succeed(self._encoding_parameters)
|
||||
d1 = self.get_size()
|
||||
d2 = self.original.get_maximum_segment_size()
|
||||
d3 = self.original.get_encoding_parameters()
|
||||
d = defer.DeferredList([d1, d2, d3],
|
||||
fireOnOneErrback=True, consumeErrors=True)
|
||||
def _got_pieces(res):
|
||||
file_size = res[0][1]
|
||||
max_segsize = res[1][1]
|
||||
params = res[2][1]
|
||||
|
||||
defaults = self._default_encoding_parameters
|
||||
if max_segsize is None:
|
||||
max_segsize = defaults["max_segment_size"]
|
||||
|
||||
if params is None:
|
||||
k = defaults["k"]
|
||||
happy = defaults["happy"]
|
||||
n = defaults["n"]
|
||||
else:
|
||||
precondition(isinstance(params, tuple), params)
|
||||
(k, happy, n) = params
|
||||
|
||||
# for small files, shrink the segment size to avoid wasting space
|
||||
segsize = min(max_segsize, file_size)
|
||||
# this must be a multiple of 'required_shares'==k
|
||||
segsize = mathutil.next_multiple(segsize, k)
|
||||
d = self.original.get_all_encoding_parameters()
|
||||
def _got(encoding_parameters):
|
||||
(k, happy, n, segsize) = encoding_parameters
|
||||
self._segment_size = segsize # used by segment hashers
|
||||
self._encoding_parameters = (k, happy, n, segsize)
|
||||
self.log("my encoding parameters: %s" %
|
||||
(self._encoding_parameters,), level=log.NOISY)
|
||||
return self._encoding_parameters
|
||||
d.addCallback(_got_pieces)
|
||||
self._encoding_parameters = encoding_parameters
|
||||
self.log("my encoding parameters: %s" % (encoding_parameters,),
|
||||
level=log.NOISY)
|
||||
return encoding_parameters
|
||||
d.addCallback(_got)
|
||||
return d
|
||||
|
||||
def _get_encryptor(self):
|
||||
@ -555,10 +530,8 @@ class EncryptAnUploadable:
|
||||
class CHKUploader:
|
||||
peer_selector_class = Tahoe2PeerSelector
|
||||
|
||||
def __init__(self, client, default_encoding_parameters):
|
||||
def __init__(self, client):
|
||||
self._client = client
|
||||
assert isinstance(default_encoding_parameters, dict)
|
||||
self._default_encoding_parameters = default_encoding_parameters
|
||||
self._log_number = self._client.log("CHKUploader starting")
|
||||
self._encoder = None
|
||||
self._results = UploadResults()
|
||||
@ -580,7 +553,7 @@ class CHKUploader:
|
||||
uploadable = IUploadable(uploadable)
|
||||
self.log("starting upload of %s" % uploadable)
|
||||
|
||||
eu = EncryptAnUploadable(uploadable, self._default_encoding_parameters)
|
||||
eu = EncryptAnUploadable(uploadable)
|
||||
d = self.start_encrypted(eu)
|
||||
def _uploaded(res):
|
||||
d1 = uploadable.get_encryption_key()
|
||||
@ -708,9 +681,6 @@ class LiteralUploader:
|
||||
self._client = client
|
||||
self._results = UploadResults()
|
||||
|
||||
def set_params(self, encoding_parameters):
|
||||
pass
|
||||
|
||||
def start(self, uploadable):
|
||||
uploadable = IUploadable(uploadable)
|
||||
d = uploadable.get_size()
|
||||
@ -803,10 +773,8 @@ class RemoteEncryptedUploadable(Referenceable):
|
||||
|
||||
class AssistedUploader:
|
||||
|
||||
def __init__(self, helper, default_encoding_parameters):
|
||||
def __init__(self, helper):
|
||||
self._helper = helper
|
||||
assert isinstance(default_encoding_parameters, dict)
|
||||
self._default_encoding_parameters = default_encoding_parameters
|
||||
self._log_number = log.msg("AssistedUploader starting")
|
||||
|
||||
def log(self, msg, parent=None, **kwargs):
|
||||
@ -817,7 +785,7 @@ class AssistedUploader:
|
||||
def start(self, uploadable):
|
||||
self._started = time.time()
|
||||
u = IUploadable(uploadable)
|
||||
eu = EncryptAnUploadable(u, self._default_encoding_parameters)
|
||||
eu = EncryptAnUploadable(u)
|
||||
self._encuploadable = eu
|
||||
d = eu.get_size()
|
||||
d.addCallback(self._got_size)
|
||||
@ -851,6 +819,7 @@ class AssistedUploader:
|
||||
def _got_storage_index(self, storage_index):
|
||||
self._storage_index = storage_index
|
||||
|
||||
|
||||
def _contact_helper(self, res):
|
||||
now = self._time_contacting_helper_start = time.time()
|
||||
self._storage_index_elapsed = now - self._started
|
||||
@ -858,6 +827,7 @@ class AssistedUploader:
|
||||
d = self._helper.callRemote("upload_chk", self._storage_index)
|
||||
d.addCallback(self._contacted_helper)
|
||||
return d
|
||||
|
||||
def _contacted_helper(self, (upload_results, upload_helper)):
|
||||
now = time.time()
|
||||
elapsed = now - self._time_contacting_helper_start
|
||||
@ -918,15 +888,55 @@ class AssistedUploader:
|
||||
r.timings["total"] = now - self._started
|
||||
return r
|
||||
|
||||
class NoParameterPreferencesMixin:
|
||||
max_segment_size = None
|
||||
encoding_parameters = None
|
||||
def get_maximum_segment_size(self):
|
||||
return defer.succeed(self.max_segment_size)
|
||||
def get_encoding_parameters(self):
|
||||
return defer.succeed(self.encoding_parameters)
|
||||
class BaseUploadable:
|
||||
default_max_segment_size = 1*MiB # overridden by max_segment_size
|
||||
default_encoding_param_k = 3 # overridden by encoding_parameters
|
||||
default_encoding_param_happy = 7
|
||||
default_encoding_param_n = 10
|
||||
|
||||
class FileHandle(NoParameterPreferencesMixin):
|
||||
max_segment_size = None
|
||||
encoding_param_k = None
|
||||
encoding_param_happy = None
|
||||
encoding_param_n = None
|
||||
|
||||
_all_encoding_parameters = None
|
||||
|
||||
def set_default_encoding_parameters(self, default_params):
|
||||
assert isinstance(default_params, dict)
|
||||
for k,v in default_params.items():
|
||||
precondition(isinstance(k, str), k, v)
|
||||
precondition(isinstance(v, int), k, v)
|
||||
if "k" in default_params:
|
||||
self.default_encoding_param_k = default_params["k"]
|
||||
if "happy" in default_params:
|
||||
self.default_encoding_param_happy = default_params["happy"]
|
||||
if "n" in default_params:
|
||||
self.default_encoding_param_n = default_params["n"]
|
||||
if "max_segment_size" in default_params:
|
||||
self.default_max_segment_size = default_params["max_segment_size"]
|
||||
|
||||
def get_all_encoding_parameters(self):
|
||||
if self._all_encoding_parameters:
|
||||
return defer.succeed(self._all_encoding_parameters)
|
||||
|
||||
max_segsize = self.max_segment_size or self.default_max_segment_size
|
||||
k = self.encoding_param_k or self.default_encoding_param_k
|
||||
happy = self.encoding_param_happy or self.default_encoding_param_happy
|
||||
n = self.encoding_param_n or self.default_encoding_param_n
|
||||
|
||||
d = self.get_size()
|
||||
def _got_size(file_size):
|
||||
# for small files, shrink the segment size to avoid wasting space
|
||||
segsize = min(max_segsize, file_size)
|
||||
# this must be a multiple of 'required_shares'==k
|
||||
segsize = mathutil.next_multiple(segsize, k)
|
||||
encoding_parameters = (k, happy, n, segsize)
|
||||
self._all_encoding_parameters = encoding_parameters
|
||||
return encoding_parameters
|
||||
d.addCallback(_got_size)
|
||||
return d
|
||||
|
||||
class FileHandle(BaseUploadable):
|
||||
implements(IUploadable)
|
||||
|
||||
def __init__(self, filehandle, contenthashkey=True):
|
||||
@ -1023,12 +1033,13 @@ class Uploader(service.MultiService):
|
||||
default_params = self.parent.get_encoding_parameters()
|
||||
precondition(isinstance(default_params, dict), default_params)
|
||||
precondition("max_segment_size" in default_params, default_params)
|
||||
uploadable.set_default_encoding_parameters(default_params)
|
||||
if size <= self.URI_LIT_SIZE_THRESHOLD:
|
||||
uploader = LiteralUploader(self.parent)
|
||||
elif self._helper:
|
||||
uploader = AssistedUploader(self._helper, default_params)
|
||||
uploader = AssistedUploader(self._helper)
|
||||
else:
|
||||
uploader = self.uploader_class(self.parent, default_params)
|
||||
uploader = self.uploader_class(self.parent)
|
||||
return uploader.start(uploadable)
|
||||
d.addCallback(_got_size)
|
||||
def _done(res):
|
||||
|
Loading…
x
Reference in New Issue
Block a user