mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-01 18:56:41 +00:00
encode/download: reduce memory footprint by deleting large intermediate buffers as soon as possible, improve hash tree usage
This commit is contained in:
parent
c81f2b01ff
commit
b2caf7fb9a
@ -9,6 +9,7 @@ from allmydata.util import idlib, mathutil, bencode
|
|||||||
from allmydata.util.assertutil import _assert
|
from allmydata.util.assertutil import _assert
|
||||||
from allmydata import codec, hashtree
|
from allmydata import codec, hashtree
|
||||||
from allmydata.Crypto.Cipher import AES
|
from allmydata.Crypto.Cipher import AES
|
||||||
|
from allmydata.Crypto.Hash import SHA256
|
||||||
from allmydata.uri import unpack_uri
|
from allmydata.uri import unpack_uri
|
||||||
from allmydata.interfaces import IDownloadTarget, IDownloader
|
from allmydata.interfaces import IDownloadTarget, IDownloader
|
||||||
|
|
||||||
@ -34,6 +35,7 @@ class Output:
|
|||||||
self._verifierid_hasher = sha.new(netstring("allmydata_verifierid_v1"))
|
self._verifierid_hasher = sha.new(netstring("allmydata_verifierid_v1"))
|
||||||
self._fileid_hasher = sha.new(netstring("allmydata_fileid_v1"))
|
self._fileid_hasher = sha.new(netstring("allmydata_fileid_v1"))
|
||||||
self.length = 0
|
self.length = 0
|
||||||
|
self._segment_number = 0
|
||||||
self._plaintext_hash_tree = None
|
self._plaintext_hash_tree = None
|
||||||
self._crypttext_hash_tree = None
|
self._crypttext_hash_tree = None
|
||||||
|
|
||||||
@ -44,11 +46,34 @@ class Output:
|
|||||||
def open(self):
|
def open(self):
|
||||||
self.downloadable.open()
|
self.downloadable.open()
|
||||||
|
|
||||||
def write(self, crypttext):
|
def write_segment(self, crypttext):
|
||||||
self.length += len(crypttext)
|
self.length += len(crypttext)
|
||||||
|
|
||||||
|
# memory footprint: 'crypttext' is the only segment_size usage
|
||||||
|
# outstanding. While we decrypt it into 'plaintext', we hit
|
||||||
|
# 2*segment_size.
|
||||||
self._verifierid_hasher.update(crypttext)
|
self._verifierid_hasher.update(crypttext)
|
||||||
|
if self._crypttext_hash_tree:
|
||||||
|
ch = SHA256.new(netstring("allmydata_crypttext_segment_v1"))
|
||||||
|
ch.update(crypttext)
|
||||||
|
crypttext_leaves = {self._segment_number: ch.digest()}
|
||||||
|
self._crypttext_hash_tree.set_hashes(leaves=crypttext_leaves)
|
||||||
|
|
||||||
plaintext = self._decryptor.decrypt(crypttext)
|
plaintext = self._decryptor.decrypt(crypttext)
|
||||||
|
del crypttext
|
||||||
|
|
||||||
|
# now we're back down to 1*segment_size.
|
||||||
|
|
||||||
self._fileid_hasher.update(plaintext)
|
self._fileid_hasher.update(plaintext)
|
||||||
|
if self._plaintext_hash_tree:
|
||||||
|
ph = SHA256.new(netstring("allmydata_plaintext_segment_v1"))
|
||||||
|
ph.update(plaintext)
|
||||||
|
plaintext_leaves = {self._segment_number: ph.digest()}
|
||||||
|
self._plaintext_hash_tree.set_hashes(leaves=plaintext_leaves)
|
||||||
|
|
||||||
|
self._segment_number += 1
|
||||||
|
# We're still at 1*segment_size. The Downloadable is responsible for
|
||||||
|
# any memory usage beyond this.
|
||||||
self.downloadable.write(plaintext)
|
self.downloadable.write(plaintext)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
@ -458,13 +483,28 @@ class FileDownloader:
|
|||||||
return d
|
return d
|
||||||
|
|
||||||
def _download_segment(self, res, segnum):
|
def _download_segment(self, res, segnum):
|
||||||
|
# memory footprint: when the SegmentDownloader finishes pulling down
|
||||||
|
# all shares, we have 1*segment_size of usage.
|
||||||
segmentdler = SegmentDownloader(self, segnum, self._num_needed_shares)
|
segmentdler = SegmentDownloader(self, segnum, self._num_needed_shares)
|
||||||
d = segmentdler.start()
|
d = segmentdler.start()
|
||||||
|
# while the codec does its job, we hit 2*segment_size
|
||||||
d.addCallback(lambda (shares, shareids):
|
d.addCallback(lambda (shares, shareids):
|
||||||
self._codec.decode(shares, shareids))
|
self._codec.decode(shares, shareids))
|
||||||
def _done(res):
|
# once the codec is done, we drop back to 1*segment_size, because
|
||||||
for buf in res:
|
# 'shares' goes out of scope. The memory usage is all in the
|
||||||
self._output.write(buf)
|
# plaintext now, spread out into a bunch of tiny buffers.
|
||||||
|
def _done(buffers):
|
||||||
|
# we start by joining all these buffers together into a single
|
||||||
|
# string. This makes Output.write easier, since it wants to hash
|
||||||
|
# data one segment at a time anyways, and doesn't impact our
|
||||||
|
# memory footprint since we're already peaking at 2*segment_size
|
||||||
|
# inside the codec a moment ago.
|
||||||
|
segment = "".join(buffers)
|
||||||
|
del buffers
|
||||||
|
# we're down to 1*segment_size right now, but write_segment()
|
||||||
|
# will decrypt a copy of the segment internally, which will push
|
||||||
|
# us up to 2*segment_size while it runs.
|
||||||
|
self._output.write_segment(segment)
|
||||||
d.addCallback(_done)
|
d.addCallback(_done)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
@ -473,14 +513,16 @@ class FileDownloader:
|
|||||||
d = segmentdler.start()
|
d = segmentdler.start()
|
||||||
d.addCallback(lambda (shares, shareids):
|
d.addCallback(lambda (shares, shareids):
|
||||||
self._tail_codec.decode(shares, shareids))
|
self._tail_codec.decode(shares, shareids))
|
||||||
def _done(res):
|
def _done(buffers):
|
||||||
# trim off any padding added by the upload side
|
# trim off any padding added by the upload side
|
||||||
data = ''.join(res)
|
segment = "".join(buffers)
|
||||||
|
del buffers
|
||||||
# we never send empty segments. If the data was an exact multiple
|
# we never send empty segments. If the data was an exact multiple
|
||||||
# of the segment size, the last segment will be full.
|
# of the segment size, the last segment will be full.
|
||||||
pad_size = mathutil.pad_size(self._size, self._segment_size)
|
pad_size = mathutil.pad_size(self._size, self._segment_size)
|
||||||
tail_size = self._segment_size - pad_size
|
tail_size = self._segment_size - pad_size
|
||||||
self._output.write(data[:tail_size])
|
segment = segment[:tail_size]
|
||||||
|
self._output.write_segment(segment)
|
||||||
d.addCallback(_done)
|
d.addCallback(_done)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
@ -226,18 +226,29 @@ class Encoder(object):
|
|||||||
plaintext_hasher = SHA256.new(netstring("allmydata_plaintext_segment_v1"))
|
plaintext_hasher = SHA256.new(netstring("allmydata_plaintext_segment_v1"))
|
||||||
crypttext_hasher = SHA256.new(netstring("allmydata_crypttext_segment_v1"))
|
crypttext_hasher = SHA256.new(netstring("allmydata_crypttext_segment_v1"))
|
||||||
|
|
||||||
|
# memory footprint: we only hold a tiny piece of the plaintext at any
|
||||||
|
# given time. We build up a segment's worth of cryptttext, then hand
|
||||||
|
# it to the encoder. Assuming 25-of-100 encoding (4x expansion) and
|
||||||
|
# 2MiB max_segment_size, we get a peak memory footprint of 5*2MiB =
|
||||||
|
# 10MiB. Lowering max_segment_size to, say, 100KiB would drop the
|
||||||
|
# footprint to 500KiB at the expense of more hash-tree overhead.
|
||||||
|
|
||||||
for i in range(self.required_shares):
|
for i in range(self.required_shares):
|
||||||
input_piece = self.infile.read(input_piece_size)
|
input_piece = self.infile.read(input_piece_size)
|
||||||
# non-tail segments should be the full segment size
|
# non-tail segments should be the full segment size
|
||||||
assert len(input_piece) == input_piece_size
|
assert len(input_piece) == input_piece_size
|
||||||
plaintext_hasher.update(input_piece)
|
plaintext_hasher.update(input_piece)
|
||||||
encrypted_piece = self.cryptor.encrypt(input_piece)
|
encrypted_piece = self.cryptor.encrypt(input_piece)
|
||||||
|
assert len(encrypted_piece) == len(input_piece)
|
||||||
crypttext_hasher.update(encrypted_piece)
|
crypttext_hasher.update(encrypted_piece)
|
||||||
|
|
||||||
chunks.append(encrypted_piece)
|
chunks.append(encrypted_piece)
|
||||||
|
|
||||||
self._plaintext_hashes.append(plaintext_hasher.digest())
|
self._plaintext_hashes.append(plaintext_hasher.digest())
|
||||||
self._crypttext_hashes.append(crypttext_hasher.digest())
|
self._crypttext_hashes.append(crypttext_hasher.digest())
|
||||||
d = codec.encode(chunks)
|
|
||||||
|
d = codec.encode(chunks) # during this call, we hit 5*segsize memory
|
||||||
|
del chunks
|
||||||
d.addCallback(self._encoded_segment, segnum)
|
d.addCallback(self._encoded_segment, segnum)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
@ -252,17 +263,22 @@ class Encoder(object):
|
|||||||
for i in range(self.required_shares):
|
for i in range(self.required_shares):
|
||||||
input_piece = self.infile.read(input_piece_size)
|
input_piece = self.infile.read(input_piece_size)
|
||||||
plaintext_hasher.update(input_piece)
|
plaintext_hasher.update(input_piece)
|
||||||
if len(input_piece) < input_piece_size:
|
|
||||||
# padding
|
|
||||||
input_piece += ('\x00' * (input_piece_size - len(input_piece)))
|
|
||||||
encrypted_piece = self.cryptor.encrypt(input_piece)
|
encrypted_piece = self.cryptor.encrypt(input_piece)
|
||||||
|
assert len(encrypted_piece) == len(input_piece)
|
||||||
crypttext_hasher.update(encrypted_piece)
|
crypttext_hasher.update(encrypted_piece)
|
||||||
|
|
||||||
|
if len(encrypted_piece) < input_piece_size:
|
||||||
|
# padding
|
||||||
|
pad_size = (input_piece_size - len(encrypted_piece))
|
||||||
|
encrypted_piece += ('\x00' * pad_size)
|
||||||
|
|
||||||
chunks.append(encrypted_piece)
|
chunks.append(encrypted_piece)
|
||||||
|
|
||||||
self._plaintext_hashes.append(plaintext_hasher.digest())
|
self._plaintext_hashes.append(plaintext_hasher.digest())
|
||||||
self._crypttext_hashes.append(crypttext_hasher.digest())
|
self._crypttext_hashes.append(crypttext_hasher.digest())
|
||||||
|
|
||||||
d = codec.encode(chunks)
|
d = codec.encode(chunks)
|
||||||
|
del chunks
|
||||||
d.addCallback(self._encoded_segment, segnum)
|
d.addCallback(self._encoded_segment, segnum)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user