Switch from fireEventually to until

This commit is contained in:
Jean-Paul Calderone 2021-01-20 13:54:37 -05:00
parent 932481ad47
commit 12087738d6
2 changed files with 98 additions and 31 deletions

View File

@ -13,19 +13,33 @@ if PY2:
from past.builtins import long, unicode
from six import ensure_str
try:
from typing import List
except ImportError:
pass
import os, time, weakref, itertools
from functools import (
partial,
)
import attr
from zope.interface import implementer
from twisted.python import failure
from twisted.internet import defer
from twisted.application import service
from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
from foolscap.api import Referenceable, Copyable, RemoteCopy
from allmydata.crypto import aes
from allmydata.util.hashutil import file_renewal_secret_hash, \
file_cancel_secret_hash, bucket_renewal_secret_hash, \
bucket_cancel_secret_hash, plaintext_hasher, \
storage_index_hash, plaintext_segment_hasher, convergence_hasher
from allmydata.util.deferredutil import timeout_call
from allmydata.util.deferredutil import (
timeout_call,
until,
)
from allmydata import hashtree, uri
from allmydata.storage.server import si_b2a
from allmydata.immutable import encode
@ -900,13 +914,41 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
raise UploadUnhappinessError(msg)
@attr.s
class _Accum(object):
"""
Accumulate up to some known amount of ciphertext.
:ivar remaining: The number of bytes still expected.
:ivar ciphertext: The bytes accumulated so far.
"""
remaining = attr.ib(validator=attr.validators.instance_of(int)) # type: int
ciphertext = attr.ib(default=attr.Factory(list)) # type: List[bytes]
def extend(self,
size, # type: int
ciphertext, # type: List[bytes]
):
"""
Accumulate some more ciphertext.
:param size: The amount of data the new ciphertext represents towards
the goal. This may be more than the actual size of the given
ciphertext if the source has run out of data.
:param ciphertext: The new ciphertext to accumulate.
"""
self.remaining -= size
self.ciphertext.extend(ciphertext)
@implementer(IEncryptedUploadable)
class EncryptAnUploadable(object):
"""This is a wrapper that takes an IUploadable and provides
IEncryptedUploadable."""
CHUNKSIZE = 50*1024
def __init__(self, original, log_parent=None, progress=None):
def __init__(self, original, log_parent=None, progress=None, chunk_size=None):
precondition(original.default_params_set,
"set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,))
self.original = IUploadable(original)
@ -920,6 +962,8 @@ class EncryptAnUploadable(object):
self._ciphertext_bytes_read = 0
self._status = None
self._progress = progress
if chunk_size is not None:
self.CHUNKSIZE = chunk_size
def set_upload_status(self, upload_status):
self._status = IUploadStatus(upload_status)
@ -1026,47 +1070,43 @@ class EncryptAnUploadable(object):
# and size
d.addCallback(lambda ignored: self.get_size())
d.addCallback(lambda ignored: self._get_encryptor())
# then fetch and encrypt the plaintext. The unusual structure here
# (passing a Deferred *into* a function) is needed to avoid
# overflowing the stack: Deferreds don't optimize out tail recursion.
# We also pass in a list, to which _read_encrypted will append
# ciphertext.
ciphertext = []
d2 = defer.Deferred()
d.addCallback(lambda ignored:
self._read_encrypted(length, ciphertext, hash_only, d2))
d.addCallback(lambda ignored: d2)
accum = _Accum(length)
action = partial(self._read_encrypted, accum, hash_only)
condition = lambda: accum.remaining == 0
d.addCallback(lambda ignored: until(action, condition))
d.addCallback(lambda ignored: accum.ciphertext)
return d
def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
if not remaining:
fire_when_done.callback(ciphertext)
return None
def _read_encrypted(self,
ciphertext_accum, # type: _Accum
hash_only, # type: bool
):
# type: (...) -> defer.Deferred
"""
Read the next chunk of plaintext, encrypt it, and extend the accumulator
with the resulting ciphertext.
"""
# tolerate large length= values without consuming a lot of RAM by
# reading just a chunk (say 50kB) at a time. This only really matters
# when hash_only==True (i.e. resuming an interrupted upload), since
# that's the case where we will be skipping over a lot of data.
size = min(remaining, self.CHUNKSIZE)
remaining = remaining - size
size = min(ciphertext_accum.remaining, self.CHUNKSIZE)
# read a chunk of plaintext..
d = defer.maybeDeferred(self.original.read, size)
# N.B.: if read() is synchronous, then since everything else is
# actually synchronous too, we'd blow the stack unless we stall for a
# tick. Once you accept a Deferred from IUploadable.read(), you must
# be prepared to have it fire immediately too.
d.addCallback(fireEventually)
def _good(plaintext):
# and encrypt it..
# o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
ciphertext.extend(ct)
self._read_encrypted(remaining, ciphertext, hash_only,
fire_when_done)
def _err(why):
fire_when_done.errback(why)
# Intentionally tell the accumulator about the expected size, not
# the actual size. If we run out of data we still want remaining
# to drop otherwise it will never reach 0 and the loop will never
# end.
ciphertext_accum.extend(size, ct)
d.addCallback(_good)
d.addErrback(_err)
return None
return d
def _hash_and_encrypt_plaintext(self, data, hash_only):
assert isinstance(data, (tuple, list)), type(data)

View File

@ -2097,6 +2097,33 @@ class EncryptAnUploadableTests(unittest.TestCase):
b64encode(ciphertext),
)
def test_large_read(self):
"""
``EncryptAnUploadable.read_encrypted`` succeeds even when the requested
data length is much larger than the chunk size.
"""
convergence = b"\x42" * 16
# 4kB of plaintext
plaintext = b"\xde\xad\xbe\xef" * 1024
uploadable = upload.FileHandle(BytesIO(plaintext), convergence)
uploadable.set_default_encoding_parameters({
"k": 3,
"happy": 5,
"n": 10,
"max_segment_size": 128 * 1024,
})
# Make the chunk size very small so we don't have to operate on a huge
# amount of data to exercise the relevant codepath.
encrypter = upload.EncryptAnUploadable(uploadable, chunk_size=1)
d = encrypter.read_encrypted(len(plaintext), False)
ciphertext = self.successResultOf(d)
self.assertEqual(
list(map(len, ciphertext)),
# Chunk size was specified as 1 above so we will get the whole
# plaintext in one byte chunks.
[1] * len(plaintext),
)
# TODO:
# upload with exactly 75 servers (shares_of_happiness)