From 8be3678cb47f0902a94d2ed1b1d651842b738efd Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Wed, 20 Jan 2021 11:22:22 -0500 Subject: [PATCH 1/7] Directly test read_encrypted behavior --- src/allmydata/test/test_upload.py | 69 +++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index 94d7575c3..7e41bfc24 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -14,6 +14,17 @@ if PY2: import os, shutil from io import BytesIO +from base64 import ( + b64encode, +) + +from hypothesis import ( + given, +) +from hypothesis.strategies import ( + just, + integers, +) from twisted.trial import unittest from twisted.python.failure import Failure @@ -2029,6 +2040,64 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, f.close() return None + +class EncryptAnUploadableTests(unittest.TestCase): + """ + Tests for ``EncryptAnUploadable``. + """ + def test_same_length(self): + """ + ``EncryptAnUploadable.read_encrypted`` returns ciphertext of the same + length as the underlying plaintext. + """ + plaintext = b"hello world" + uploadable = upload.FileHandle(BytesIO(plaintext), None) + uploadable.set_default_encoding_parameters({ + # These values shouldn't matter. + "k": 3, + "happy": 5, + "n": 10, + "max_segment_size": 128 * 1024, + }) + encrypter = upload.EncryptAnUploadable(uploadable) + ciphertext = b"".join(self.successResultOf(encrypter.read_encrypted(1024, False))) + self.assertEqual(len(ciphertext), len(plaintext)) + + @given(just(b"hello world"), integers(min_value=0, max_value=len(b"hello world"))) + def test_known_result(self, plaintext, split_at): + """ + ``EncryptAnUploadable.read_encrypted`` returns a known-correct ciphertext + string for certain inputs. The ciphertext is independent of the read + sizes. + """ + convergence = b"\x42" * 16 + uploadable = upload.FileHandle(BytesIO(plaintext), convergence) + uploadable.set_default_encoding_parameters({ + # The convergence key is a function of k, n, and max_segment_size + # (among other things). The value for happy doesn't matter + # though. + "k": 3, + "happy": 5, + "n": 10, + "max_segment_size": 128 * 1024, + }) + encrypter = upload.EncryptAnUploadable(uploadable) + def read(n): + return b"".join(self.successResultOf(encrypter.read_encrypted(n, False))) + + # Read the string in one or two pieces to make sure underlying state + # is maintained properly. + first = read(split_at) + second = read(len(plaintext) - split_at) + third = read(1) + ciphertext = first + second + third + + self.assertEqual( + b"Jd2LHCRXozwrEJc=", + b64encode(ciphertext), + ) + + # TODO: # upload with exactly 75 servers (shares_of_happiness) # have a download fail From f75f71cba6e98ac508cf06108523a9d0c1a4842f Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Wed, 20 Jan 2021 11:23:35 -0500 Subject: [PATCH 2/7] news fragment --- newsfragments/3594.minor | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 newsfragments/3594.minor diff --git a/newsfragments/3594.minor b/newsfragments/3594.minor new file mode 100644 index 000000000..e69de29bb From 932481ad47c650cb00070394829a7cc268fdd00e Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Wed, 20 Jan 2021 12:58:03 -0500 Subject: [PATCH 3/7] A helper for doing something repeatedly for a while --- src/allmydata/test/test_deferredutil.py | 55 +++++++++++++++++++++++++ src/allmydata/util/deferredutil.py | 30 ++++++++++++++ 2 files changed, 85 insertions(+) diff --git a/src/allmydata/test/test_deferredutil.py b/src/allmydata/test/test_deferredutil.py index 6ebc93556..2a155089f 100644 --- a/src/allmydata/test/test_deferredutil.py +++ b/src/allmydata/test/test_deferredutil.py @@ -74,3 +74,58 @@ class DeferredUtilTests(unittest.TestCase, deferredutil.WaitForDelayedCallsMixin d = defer.succeed(None) d.addBoth(self.wait_for_delayed_calls) return d + + +class UntilTests(unittest.TestCase): + """ + Tests for ``deferredutil.until``. + """ + def test_exception(self): + """ + If the action raises an exception, the ``Deferred`` returned by ``until`` + fires with a ``Failure``. + """ + self.assertFailure( + deferredutil.until(lambda: 1/0, lambda: True), + ZeroDivisionError, + ) + + def test_stops_on_condition(self): + """ + The action is called repeatedly until ``condition`` returns ``True``. + """ + calls = [] + def action(): + calls.append(None) + + def condition(): + return len(calls) == 3 + + self.assertIs( + self.successResultOf( + deferredutil.until(action, condition), + ), + None, + ) + self.assertEqual(3, len(calls)) + + def test_waits_for_deferred(self): + """ + If the action returns a ``Deferred`` then it is called again when the + ``Deferred`` fires. + """ + counter = [0] + r1 = defer.Deferred() + r2 = defer.Deferred() + results = [r1, r2] + def action(): + counter[0] += 1 + return results.pop(0) + + def condition(): + return False + + deferredutil.until(action, condition) + self.assertEqual([1], counter) + r1.callback(None) + self.assertEqual([2], counter) diff --git a/src/allmydata/util/deferredutil.py b/src/allmydata/util/deferredutil.py index 1d13f61e6..ed2a11ee4 100644 --- a/src/allmydata/util/deferredutil.py +++ b/src/allmydata/util/deferredutil.py @@ -15,7 +15,18 @@ if PY2: import time +try: + from typing import ( + Callable, + Any, + ) +except ImportError: + pass + from foolscap.api import eventually +from eliot.twisted import ( + inline_callbacks, +) from twisted.internet import defer, reactor, error from twisted.python.failure import Failure @@ -201,3 +212,22 @@ class WaitForDelayedCallsMixin(PollMixin): d.addErrback(log.err, "error while waiting for delayed calls") d.addBoth(lambda ign: res) return d + +@inline_callbacks +def until( + action, # type: Callable[[], defer.Deferred[Any]] + condition, # type: Callable[[], bool] +): + # type: (...) -> defer.Deferred[None] + """ + Run a Deferred-returning function until a condition is true. + + :param action: The action to run. + :param condition: The predicate signaling stop. + + :return: A Deferred that fires after the condition signals stop. + """ + while True: + yield action() + if condition(): + break From 12087738d682c051aeeb75a7c90dd78d7b8ebfb0 Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Wed, 20 Jan 2021 13:54:37 -0500 Subject: [PATCH 4/7] Switch from fireEventually to `until` --- src/allmydata/immutable/upload.py | 102 +++++++++++++++++++++--------- src/allmydata/test/test_upload.py | 27 ++++++++ 2 files changed, 98 insertions(+), 31 deletions(-) diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index adcdaed10..fe173b46c 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -13,19 +13,33 @@ if PY2: from past.builtins import long, unicode from six import ensure_str +try: + from typing import List +except ImportError: + pass + import os, time, weakref, itertools +from functools import ( + partial, +) + +import attr + from zope.interface import implementer from twisted.python import failure from twisted.internet import defer from twisted.application import service -from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually +from foolscap.api import Referenceable, Copyable, RemoteCopy from allmydata.crypto import aes from allmydata.util.hashutil import file_renewal_secret_hash, \ file_cancel_secret_hash, bucket_renewal_secret_hash, \ bucket_cancel_secret_hash, plaintext_hasher, \ storage_index_hash, plaintext_segment_hasher, convergence_hasher -from allmydata.util.deferredutil import timeout_call +from allmydata.util.deferredutil import ( + timeout_call, + until, +) from allmydata import hashtree, uri from allmydata.storage.server import si_b2a from allmydata.immutable import encode @@ -900,13 +914,41 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): raise UploadUnhappinessError(msg) +@attr.s +class _Accum(object): + """ + Accumulate up to some known amount of ciphertext. + + :ivar remaining: The number of bytes still expected. + :ivar ciphertext: The bytes accumulated so far. + """ + remaining = attr.ib(validator=attr.validators.instance_of(int)) # type: int + ciphertext = attr.ib(default=attr.Factory(list)) # type: List[bytes] + + def extend(self, + size, # type: int + ciphertext, # type: List[bytes] + ): + """ + Accumulate some more ciphertext. + + :param size: The amount of data the new ciphertext represents towards + the goal. This may be more than the actual size of the given + ciphertext if the source has run out of data. + + :param ciphertext: The new ciphertext to accumulate. + """ + self.remaining -= size + self.ciphertext.extend(ciphertext) + + @implementer(IEncryptedUploadable) class EncryptAnUploadable(object): """This is a wrapper that takes an IUploadable and provides IEncryptedUploadable.""" CHUNKSIZE = 50*1024 - def __init__(self, original, log_parent=None, progress=None): + def __init__(self, original, log_parent=None, progress=None, chunk_size=None): precondition(original.default_params_set, "set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,)) self.original = IUploadable(original) @@ -920,6 +962,8 @@ class EncryptAnUploadable(object): self._ciphertext_bytes_read = 0 self._status = None self._progress = progress + if chunk_size is not None: + self.CHUNKSIZE = chunk_size def set_upload_status(self, upload_status): self._status = IUploadStatus(upload_status) @@ -1026,47 +1070,43 @@ class EncryptAnUploadable(object): # and size d.addCallback(lambda ignored: self.get_size()) d.addCallback(lambda ignored: self._get_encryptor()) - # then fetch and encrypt the plaintext. The unusual structure here - # (passing a Deferred *into* a function) is needed to avoid - # overflowing the stack: Deferreds don't optimize out tail recursion. - # We also pass in a list, to which _read_encrypted will append - # ciphertext. - ciphertext = [] - d2 = defer.Deferred() - d.addCallback(lambda ignored: - self._read_encrypted(length, ciphertext, hash_only, d2)) - d.addCallback(lambda ignored: d2) + + accum = _Accum(length) + action = partial(self._read_encrypted, accum, hash_only) + condition = lambda: accum.remaining == 0 + + d.addCallback(lambda ignored: until(action, condition)) + d.addCallback(lambda ignored: accum.ciphertext) return d - def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done): - if not remaining: - fire_when_done.callback(ciphertext) - return None + def _read_encrypted(self, + ciphertext_accum, # type: _Accum + hash_only, # type: bool + ): + # type: (...) -> defer.Deferred + """ + Read the next chunk of plaintext, encrypt it, and extend the accumulator + with the resulting ciphertext. + """ # tolerate large length= values without consuming a lot of RAM by # reading just a chunk (say 50kB) at a time. This only really matters # when hash_only==True (i.e. resuming an interrupted upload), since # that's the case where we will be skipping over a lot of data. - size = min(remaining, self.CHUNKSIZE) - remaining = remaining - size + size = min(ciphertext_accum.remaining, self.CHUNKSIZE) + # read a chunk of plaintext.. d = defer.maybeDeferred(self.original.read, size) - # N.B.: if read() is synchronous, then since everything else is - # actually synchronous too, we'd blow the stack unless we stall for a - # tick. Once you accept a Deferred from IUploadable.read(), you must - # be prepared to have it fire immediately too. - d.addCallback(fireEventually) def _good(plaintext): # and encrypt it.. # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/' ct = self._hash_and_encrypt_plaintext(plaintext, hash_only) - ciphertext.extend(ct) - self._read_encrypted(remaining, ciphertext, hash_only, - fire_when_done) - def _err(why): - fire_when_done.errback(why) + # Intentionally tell the accumulator about the expected size, not + # the actual size. If we run out of data we still want remaining + # to drop otherwise it will never reach 0 and the loop will never + # end. + ciphertext_accum.extend(size, ct) d.addCallback(_good) - d.addErrback(_err) - return None + return d def _hash_and_encrypt_plaintext(self, data, hash_only): assert isinstance(data, (tuple, list)), type(data) diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index 7e41bfc24..07ede2074 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -2097,6 +2097,33 @@ class EncryptAnUploadableTests(unittest.TestCase): b64encode(ciphertext), ) + def test_large_read(self): + """ + ``EncryptAnUploadable.read_encrypted`` succeeds even when the requested + data length is much larger than the chunk size. + """ + convergence = b"\x42" * 16 + # 4kB of plaintext + plaintext = b"\xde\xad\xbe\xef" * 1024 + uploadable = upload.FileHandle(BytesIO(plaintext), convergence) + uploadable.set_default_encoding_parameters({ + "k": 3, + "happy": 5, + "n": 10, + "max_segment_size": 128 * 1024, + }) + # Make the chunk size very small so we don't have to operate on a huge + # amount of data to exercise the relevant codepath. + encrypter = upload.EncryptAnUploadable(uploadable, chunk_size=1) + d = encrypter.read_encrypted(len(plaintext), False) + ciphertext = self.successResultOf(d) + self.assertEqual( + list(map(len, ciphertext)), + # Chunk size was specified as 1 above so we will get the whole + # plaintext in one byte chunks. + [1] * len(plaintext), + ) + # TODO: # upload with exactly 75 servers (shares_of_happiness) From 9c91261fa6675e2f92890c9cd1229474e49883db Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Wed, 20 Jan 2021 13:57:01 -0500 Subject: [PATCH 5/7] news fragment --- newsfragments/3595.minor | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 newsfragments/3595.minor diff --git a/newsfragments/3595.minor b/newsfragments/3595.minor new file mode 100644 index 000000000..e69de29bb From 5a0c913f589de968e7543ba7175386454b509be3 Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Mon, 25 Jan 2021 08:21:39 -0500 Subject: [PATCH 6/7] document the new parameter --- src/allmydata/immutable/upload.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index fe173b46c..27cc923fd 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -949,6 +949,10 @@ class EncryptAnUploadable(object): CHUNKSIZE = 50*1024 def __init__(self, original, log_parent=None, progress=None, chunk_size=None): + """ + :param chunk_size: The number of bytes to read from the uploadable at a + time, or None for some default. + """ precondition(original.default_params_set, "set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,)) self.original = IUploadable(original) From e0fa2286228a46cd81b82868b56cf096d18d95dc Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Mon, 25 Jan 2021 08:23:40 -0500 Subject: [PATCH 7/7] expand partial/lambda into full functions for clarity --- src/allmydata/immutable/upload.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index 27cc923fd..46e01184f 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -19,9 +19,6 @@ except ImportError: pass import os, time, weakref, itertools -from functools import ( - partial, -) import attr @@ -1076,8 +1073,18 @@ class EncryptAnUploadable(object): d.addCallback(lambda ignored: self._get_encryptor()) accum = _Accum(length) - action = partial(self._read_encrypted, accum, hash_only) - condition = lambda: accum.remaining == 0 + + def action(): + """ + Read some bytes into the accumulator. + """ + return self._read_encrypted(accum, hash_only) + + def condition(): + """ + Check to see if the accumulator has all the data. + """ + return accum.remaining == 0 d.addCallback(lambda ignored: until(action, condition)) d.addCallback(lambda ignored: accum.ciphertext)