mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-20 17:52:50 +00:00
offloaded uploader: don't use a huge amount of memory when skipping over previously-uploaded data
This commit is contained in:
parent
525bfeca25
commit
46fe024612
@ -332,7 +332,7 @@ class Encoder(object):
|
||||
if not num_chunks:
|
||||
return defer.succeed(previous_chunks)
|
||||
|
||||
d = self._uploadable.read_encrypted(input_chunk_size)
|
||||
d = self._uploadable.read_encrypted(input_chunk_size, False)
|
||||
def _got(data):
|
||||
if self._aborted:
|
||||
raise UploadAborted()
|
||||
|
@ -988,9 +988,12 @@ class IEncryptedUploadable(Interface):
|
||||
"""Return a Deferred that fires with a 16-byte storage index.
|
||||
"""
|
||||
|
||||
def read_encrypted(length):
|
||||
def read_encrypted(length, hash_only):
|
||||
"""This behaves just like IUploadable.read(), but returns crypttext
|
||||
instead of plaintext."""
|
||||
instead of plaintext. If hash_only is True, then this discards the
|
||||
data (and returns an empty list); this improves efficiency when
|
||||
resuming an interrupted upload (where we need to compute the
|
||||
plaintext hashes, but don't need the redundant encrypted data)."""
|
||||
|
||||
def get_plaintext_hashtree_leaves(first, last, num_segments):
|
||||
"""Get the leaf nodes of a merkle hash tree over the plaintext
|
||||
|
@ -232,7 +232,10 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
|
||||
fetch_size = min(needed, self.CHUNK_SIZE)
|
||||
if fetch_size == 0:
|
||||
return True # all done
|
||||
self.log("fetching %d-%d" % (self._have, self._have+fetch_size),
|
||||
self.log(format="fetching %(start)d-%(end)d of %(total)d",
|
||||
start=self._have,
|
||||
end=self._have+fetch_size,
|
||||
total=self._expected_size,
|
||||
level=log.NOISY)
|
||||
d = self.call("read_encrypted", self._have, fetch_size)
|
||||
def _got_data(ciphertext_v):
|
||||
@ -286,7 +289,8 @@ class LocalCiphertextReader(AskUntilSuccessMixin):
|
||||
def get_storage_index(self):
|
||||
return defer.succeed(self._storage_index)
|
||||
|
||||
def read_encrypted(self, length):
|
||||
def read_encrypted(self, length, hash_only):
|
||||
assert hash_only is False
|
||||
d = defer.maybeDeferred(self.f.read, length)
|
||||
d.addCallback(lambda data: [data])
|
||||
return d
|
||||
|
@ -328,9 +328,10 @@ class EncryptAnUploadable:
|
||||
"""This is a wrapper that takes an IUploadable and provides
|
||||
IEncryptedUploadable."""
|
||||
implements(IEncryptedUploadable)
|
||||
CHUNKSIZE = 50*1000
|
||||
|
||||
def __init__(self, original, default_encoding_parameters):
|
||||
self.original = original
|
||||
self.original = IUploadable(original)
|
||||
assert isinstance(default_encoding_parameters, dict)
|
||||
self._default_encoding_parameters = default_encoding_parameters
|
||||
self._encryptor = None
|
||||
@ -451,32 +452,50 @@ class EncryptAnUploadable:
|
||||
|
||||
offset += this_segment
|
||||
|
||||
def read_encrypted(self, length):
|
||||
def read_encrypted(self, length, hash_only):
|
||||
# make sure our parameters have been set up first
|
||||
d = self.get_all_encoding_parameters()
|
||||
d.addCallback(lambda ignored: self._get_encryptor())
|
||||
# then fetch the plaintext
|
||||
d.addCallback(lambda ignored: self.original.read(length))
|
||||
# and encrypt it..
|
||||
# through the fields we go, hashing all the way, sHA! sHA! sHA!
|
||||
def _got(data):
|
||||
assert isinstance(data, (tuple, list)), type(data)
|
||||
data = list(data)
|
||||
cryptdata = []
|
||||
# we use data.pop(0) instead of 'for chunk in data' to save
|
||||
# memory: each chunk is destroyed as soon as we're done with it.
|
||||
while data:
|
||||
chunk = data.pop(0)
|
||||
log.msg(" read_encrypted handling %dB-sized chunk" % len(chunk),
|
||||
level=log.NOISY)
|
||||
self._plaintext_hasher.update(chunk)
|
||||
self._update_segment_hash(chunk)
|
||||
cryptdata.append(self._encryptor.process(chunk))
|
||||
del chunk
|
||||
return cryptdata
|
||||
d.addCallback(_got)
|
||||
remaining = length
|
||||
ciphertext = []
|
||||
while remaining:
|
||||
# tolerate large length= values without consuming a lot of RAM
|
||||
chunksize = min(remaining, self.CHUNKSIZE)
|
||||
remaining -= chunksize
|
||||
d.addCallback(lambda ignored: self.original.read(chunksize))
|
||||
# and encrypt it..
|
||||
# o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
|
||||
d.addCallback(self._hash_and_encrypt_plaintext, hash_only)
|
||||
d.addCallback(ciphertext.extend)
|
||||
d.addCallback(lambda res: ciphertext)
|
||||
return d
|
||||
|
||||
def _hash_and_encrypt_plaintext(self, data, hash_only):
|
||||
assert isinstance(data, (tuple, list)), type(data)
|
||||
data = list(data)
|
||||
cryptdata = []
|
||||
# we use data.pop(0) instead of 'for chunk in data' to save
|
||||
# memory: each chunk is destroyed as soon as we're done with it.
|
||||
while data:
|
||||
chunk = data.pop(0)
|
||||
log.msg(" read_encrypted handling %dB-sized chunk" % len(chunk),
|
||||
level=log.NOISY)
|
||||
self._plaintext_hasher.update(chunk)
|
||||
self._update_segment_hash(chunk)
|
||||
# TODO: we have to encrypt the data (even if hash_only==True)
|
||||
# because pycryptopp's AES-CTR implementation doesn't offer a
|
||||
# way to change the counter value. Once pycryptopp acquires
|
||||
# this ability, change this to simply update the counter
|
||||
# before each call to (hash_only==False) _encryptor.process()
|
||||
ciphertext = self._encryptor.process(chunk)
|
||||
if not hash_only:
|
||||
log.msg(" skipping encryption")
|
||||
cryptdata.append(ciphertext)
|
||||
del ciphertext
|
||||
del chunk
|
||||
return cryptdata
|
||||
|
||||
def get_plaintext_hashtree_leaves(self, first, last, num_segments):
|
||||
if len(self._plaintext_segment_hashes) < num_segments:
|
||||
# close out the last one
|
||||
@ -650,6 +669,18 @@ class RemoteEncryptedUploadable(Referenceable):
|
||||
def remote_get_all_encoding_parameters(self):
|
||||
return self._eu.get_all_encoding_parameters()
|
||||
|
||||
def _read_encrypted(self, length, hash_only):
|
||||
d = self._eu.read_encrypted(length, hash_only)
|
||||
def _read(strings):
|
||||
if hash_only:
|
||||
self._offset += length
|
||||
else:
|
||||
size = sum([len(data) for data in strings])
|
||||
self._offset += size
|
||||
return strings
|
||||
d.addCallback(_read)
|
||||
return d
|
||||
|
||||
def remote_read_encrypted(self, offset, length):
|
||||
# we don't support seek backwards, but we allow skipping forwards
|
||||
precondition(offset >= 0, offset)
|
||||
@ -662,25 +693,25 @@ class RemoteEncryptedUploadable(Referenceable):
|
||||
skip = offset - self._offset
|
||||
log.msg("remote_read_encrypted skipping ahead to %d, skip=%d" %
|
||||
(self._offset, skip), level=log.UNUSUAL, parent=lp)
|
||||
d = self.remote_read_encrypted(self._offset, skip)
|
||||
def _ignore(strings):
|
||||
size = sum([len(data) for data in strings])
|
||||
self._bytes_sent -= size
|
||||
return self.remote_read_encrypted(offset, length)
|
||||
d.addCallback(_ignore)
|
||||
return d
|
||||
d = self._read_encrypted(skip, hash_only=True)
|
||||
else:
|
||||
d = defer.succeed(None)
|
||||
|
||||
def _at_correct_offset(res):
|
||||
assert offset == self._offset, "%d != %d" % (offset, self._offset)
|
||||
if self._cutoff is not None and offset+length > self._cutoff:
|
||||
self._cutoff_cb()
|
||||
|
||||
return self._read_encrypted(length, hash_only=False)
|
||||
d.addCallback(_at_correct_offset)
|
||||
|
||||
assert offset == self._offset, "%d != %d" % (offset, self._offset)
|
||||
if self._cutoff is not None and offset+length > self._cutoff:
|
||||
self._cutoff_cb()
|
||||
d = self._eu.read_encrypted(length)
|
||||
def _read(strings):
|
||||
size = sum([len(data) for data in strings])
|
||||
self._bytes_sent += size
|
||||
self._offset += size
|
||||
return strings
|
||||
d.addCallback(_read)
|
||||
return d
|
||||
|
||||
def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
|
||||
log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
|
||||
(first, last-1, num_segments),
|
||||
|
Loading…
x
Reference in New Issue
Block a user