mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-20 17:52:50 +00:00
Merge pull request #971 from LeastAuthority/3594.read_encrypted-test
Directly test read_encrypted behavior and remove a `fireEventually` from its implementation Fixes: ticket:3594 Fixes: ticket:3595
This commit is contained in:
commit
e5f0dcfbb4
0
newsfragments/3594.minor
Normal file
0
newsfragments/3594.minor
Normal file
0
newsfragments/3595.minor
Normal file
0
newsfragments/3595.minor
Normal file
@ -13,19 +13,30 @@ if PY2:
|
||||
from past.builtins import long, unicode
|
||||
from six import ensure_str
|
||||
|
||||
try:
|
||||
from typing import List
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
import os, time, weakref, itertools
|
||||
|
||||
import attr
|
||||
|
||||
from zope.interface import implementer
|
||||
from twisted.python import failure
|
||||
from twisted.internet import defer
|
||||
from twisted.application import service
|
||||
from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
|
||||
from foolscap.api import Referenceable, Copyable, RemoteCopy
|
||||
|
||||
from allmydata.crypto import aes
|
||||
from allmydata.util.hashutil import file_renewal_secret_hash, \
|
||||
file_cancel_secret_hash, bucket_renewal_secret_hash, \
|
||||
bucket_cancel_secret_hash, plaintext_hasher, \
|
||||
storage_index_hash, plaintext_segment_hasher, convergence_hasher
|
||||
from allmydata.util.deferredutil import timeout_call
|
||||
from allmydata.util.deferredutil import (
|
||||
timeout_call,
|
||||
until,
|
||||
)
|
||||
from allmydata import hashtree, uri
|
||||
from allmydata.storage.server import si_b2a
|
||||
from allmydata.immutable import encode
|
||||
@ -900,13 +911,45 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
||||
raise UploadUnhappinessError(msg)
|
||||
|
||||
|
||||
@attr.s
|
||||
class _Accum(object):
|
||||
"""
|
||||
Accumulate up to some known amount of ciphertext.
|
||||
|
||||
:ivar remaining: The number of bytes still expected.
|
||||
:ivar ciphertext: The bytes accumulated so far.
|
||||
"""
|
||||
remaining = attr.ib(validator=attr.validators.instance_of(int)) # type: int
|
||||
ciphertext = attr.ib(default=attr.Factory(list)) # type: List[bytes]
|
||||
|
||||
def extend(self,
|
||||
size, # type: int
|
||||
ciphertext, # type: List[bytes]
|
||||
):
|
||||
"""
|
||||
Accumulate some more ciphertext.
|
||||
|
||||
:param size: The amount of data the new ciphertext represents towards
|
||||
the goal. This may be more than the actual size of the given
|
||||
ciphertext if the source has run out of data.
|
||||
|
||||
:param ciphertext: The new ciphertext to accumulate.
|
||||
"""
|
||||
self.remaining -= size
|
||||
self.ciphertext.extend(ciphertext)
|
||||
|
||||
|
||||
@implementer(IEncryptedUploadable)
|
||||
class EncryptAnUploadable(object):
|
||||
"""This is a wrapper that takes an IUploadable and provides
|
||||
IEncryptedUploadable."""
|
||||
CHUNKSIZE = 50*1024
|
||||
|
||||
def __init__(self, original, log_parent=None, progress=None):
|
||||
def __init__(self, original, log_parent=None, progress=None, chunk_size=None):
|
||||
"""
|
||||
:param chunk_size: The number of bytes to read from the uploadable at a
|
||||
time, or None for some default.
|
||||
"""
|
||||
precondition(original.default_params_set,
|
||||
"set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,))
|
||||
self.original = IUploadable(original)
|
||||
@ -920,6 +963,8 @@ class EncryptAnUploadable(object):
|
||||
self._ciphertext_bytes_read = 0
|
||||
self._status = None
|
||||
self._progress = progress
|
||||
if chunk_size is not None:
|
||||
self.CHUNKSIZE = chunk_size
|
||||
|
||||
def set_upload_status(self, upload_status):
|
||||
self._status = IUploadStatus(upload_status)
|
||||
@ -1026,47 +1071,53 @@ class EncryptAnUploadable(object):
|
||||
# and size
|
||||
d.addCallback(lambda ignored: self.get_size())
|
||||
d.addCallback(lambda ignored: self._get_encryptor())
|
||||
# then fetch and encrypt the plaintext. The unusual structure here
|
||||
# (passing a Deferred *into* a function) is needed to avoid
|
||||
# overflowing the stack: Deferreds don't optimize out tail recursion.
|
||||
# We also pass in a list, to which _read_encrypted will append
|
||||
# ciphertext.
|
||||
ciphertext = []
|
||||
d2 = defer.Deferred()
|
||||
d.addCallback(lambda ignored:
|
||||
self._read_encrypted(length, ciphertext, hash_only, d2))
|
||||
d.addCallback(lambda ignored: d2)
|
||||
|
||||
accum = _Accum(length)
|
||||
|
||||
def action():
|
||||
"""
|
||||
Read some bytes into the accumulator.
|
||||
"""
|
||||
return self._read_encrypted(accum, hash_only)
|
||||
|
||||
def condition():
|
||||
"""
|
||||
Check to see if the accumulator has all the data.
|
||||
"""
|
||||
return accum.remaining == 0
|
||||
|
||||
d.addCallback(lambda ignored: until(action, condition))
|
||||
d.addCallback(lambda ignored: accum.ciphertext)
|
||||
return d
|
||||
|
||||
def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
|
||||
if not remaining:
|
||||
fire_when_done.callback(ciphertext)
|
||||
return None
|
||||
def _read_encrypted(self,
|
||||
ciphertext_accum, # type: _Accum
|
||||
hash_only, # type: bool
|
||||
):
|
||||
# type: (...) -> defer.Deferred
|
||||
"""
|
||||
Read the next chunk of plaintext, encrypt it, and extend the accumulator
|
||||
with the resulting ciphertext.
|
||||
"""
|
||||
# tolerate large length= values without consuming a lot of RAM by
|
||||
# reading just a chunk (say 50kB) at a time. This only really matters
|
||||
# when hash_only==True (i.e. resuming an interrupted upload), since
|
||||
# that's the case where we will be skipping over a lot of data.
|
||||
size = min(remaining, self.CHUNKSIZE)
|
||||
remaining = remaining - size
|
||||
size = min(ciphertext_accum.remaining, self.CHUNKSIZE)
|
||||
|
||||
# read a chunk of plaintext..
|
||||
d = defer.maybeDeferred(self.original.read, size)
|
||||
# N.B.: if read() is synchronous, then since everything else is
|
||||
# actually synchronous too, we'd blow the stack unless we stall for a
|
||||
# tick. Once you accept a Deferred from IUploadable.read(), you must
|
||||
# be prepared to have it fire immediately too.
|
||||
d.addCallback(fireEventually)
|
||||
def _good(plaintext):
|
||||
# and encrypt it..
|
||||
# o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
|
||||
ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
|
||||
ciphertext.extend(ct)
|
||||
self._read_encrypted(remaining, ciphertext, hash_only,
|
||||
fire_when_done)
|
||||
def _err(why):
|
||||
fire_when_done.errback(why)
|
||||
# Intentionally tell the accumulator about the expected size, not
|
||||
# the actual size. If we run out of data we still want remaining
|
||||
# to drop otherwise it will never reach 0 and the loop will never
|
||||
# end.
|
||||
ciphertext_accum.extend(size, ct)
|
||||
d.addCallback(_good)
|
||||
d.addErrback(_err)
|
||||
return None
|
||||
return d
|
||||
|
||||
def _hash_and_encrypt_plaintext(self, data, hash_only):
|
||||
assert isinstance(data, (tuple, list)), type(data)
|
||||
|
@ -74,3 +74,58 @@ class DeferredUtilTests(unittest.TestCase, deferredutil.WaitForDelayedCallsMixin
|
||||
d = defer.succeed(None)
|
||||
d.addBoth(self.wait_for_delayed_calls)
|
||||
return d
|
||||
|
||||
|
||||
class UntilTests(unittest.TestCase):
|
||||
"""
|
||||
Tests for ``deferredutil.until``.
|
||||
"""
|
||||
def test_exception(self):
|
||||
"""
|
||||
If the action raises an exception, the ``Deferred`` returned by ``until``
|
||||
fires with a ``Failure``.
|
||||
"""
|
||||
self.assertFailure(
|
||||
deferredutil.until(lambda: 1/0, lambda: True),
|
||||
ZeroDivisionError,
|
||||
)
|
||||
|
||||
def test_stops_on_condition(self):
|
||||
"""
|
||||
The action is called repeatedly until ``condition`` returns ``True``.
|
||||
"""
|
||||
calls = []
|
||||
def action():
|
||||
calls.append(None)
|
||||
|
||||
def condition():
|
||||
return len(calls) == 3
|
||||
|
||||
self.assertIs(
|
||||
self.successResultOf(
|
||||
deferredutil.until(action, condition),
|
||||
),
|
||||
None,
|
||||
)
|
||||
self.assertEqual(3, len(calls))
|
||||
|
||||
def test_waits_for_deferred(self):
|
||||
"""
|
||||
If the action returns a ``Deferred`` then it is called again when the
|
||||
``Deferred`` fires.
|
||||
"""
|
||||
counter = [0]
|
||||
r1 = defer.Deferred()
|
||||
r2 = defer.Deferred()
|
||||
results = [r1, r2]
|
||||
def action():
|
||||
counter[0] += 1
|
||||
return results.pop(0)
|
||||
|
||||
def condition():
|
||||
return False
|
||||
|
||||
deferredutil.until(action, condition)
|
||||
self.assertEqual([1], counter)
|
||||
r1.callback(None)
|
||||
self.assertEqual([2], counter)
|
||||
|
@ -14,6 +14,17 @@ if PY2:
|
||||
|
||||
import os, shutil
|
||||
from io import BytesIO
|
||||
from base64 import (
|
||||
b64encode,
|
||||
)
|
||||
|
||||
from hypothesis import (
|
||||
given,
|
||||
)
|
||||
from hypothesis.strategies import (
|
||||
just,
|
||||
integers,
|
||||
)
|
||||
|
||||
from twisted.trial import unittest
|
||||
from twisted.python.failure import Failure
|
||||
@ -2029,6 +2040,91 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||
f.close()
|
||||
return None
|
||||
|
||||
|
||||
class EncryptAnUploadableTests(unittest.TestCase):
|
||||
"""
|
||||
Tests for ``EncryptAnUploadable``.
|
||||
"""
|
||||
def test_same_length(self):
|
||||
"""
|
||||
``EncryptAnUploadable.read_encrypted`` returns ciphertext of the same
|
||||
length as the underlying plaintext.
|
||||
"""
|
||||
plaintext = b"hello world"
|
||||
uploadable = upload.FileHandle(BytesIO(plaintext), None)
|
||||
uploadable.set_default_encoding_parameters({
|
||||
# These values shouldn't matter.
|
||||
"k": 3,
|
||||
"happy": 5,
|
||||
"n": 10,
|
||||
"max_segment_size": 128 * 1024,
|
||||
})
|
||||
encrypter = upload.EncryptAnUploadable(uploadable)
|
||||
ciphertext = b"".join(self.successResultOf(encrypter.read_encrypted(1024, False)))
|
||||
self.assertEqual(len(ciphertext), len(plaintext))
|
||||
|
||||
@given(just(b"hello world"), integers(min_value=0, max_value=len(b"hello world")))
|
||||
def test_known_result(self, plaintext, split_at):
|
||||
"""
|
||||
``EncryptAnUploadable.read_encrypted`` returns a known-correct ciphertext
|
||||
string for certain inputs. The ciphertext is independent of the read
|
||||
sizes.
|
||||
"""
|
||||
convergence = b"\x42" * 16
|
||||
uploadable = upload.FileHandle(BytesIO(plaintext), convergence)
|
||||
uploadable.set_default_encoding_parameters({
|
||||
# The convergence key is a function of k, n, and max_segment_size
|
||||
# (among other things). The value for happy doesn't matter
|
||||
# though.
|
||||
"k": 3,
|
||||
"happy": 5,
|
||||
"n": 10,
|
||||
"max_segment_size": 128 * 1024,
|
||||
})
|
||||
encrypter = upload.EncryptAnUploadable(uploadable)
|
||||
def read(n):
|
||||
return b"".join(self.successResultOf(encrypter.read_encrypted(n, False)))
|
||||
|
||||
# Read the string in one or two pieces to make sure underlying state
|
||||
# is maintained properly.
|
||||
first = read(split_at)
|
||||
second = read(len(plaintext) - split_at)
|
||||
third = read(1)
|
||||
ciphertext = first + second + third
|
||||
|
||||
self.assertEqual(
|
||||
b"Jd2LHCRXozwrEJc=",
|
||||
b64encode(ciphertext),
|
||||
)
|
||||
|
||||
def test_large_read(self):
|
||||
"""
|
||||
``EncryptAnUploadable.read_encrypted`` succeeds even when the requested
|
||||
data length is much larger than the chunk size.
|
||||
"""
|
||||
convergence = b"\x42" * 16
|
||||
# 4kB of plaintext
|
||||
plaintext = b"\xde\xad\xbe\xef" * 1024
|
||||
uploadable = upload.FileHandle(BytesIO(plaintext), convergence)
|
||||
uploadable.set_default_encoding_parameters({
|
||||
"k": 3,
|
||||
"happy": 5,
|
||||
"n": 10,
|
||||
"max_segment_size": 128 * 1024,
|
||||
})
|
||||
# Make the chunk size very small so we don't have to operate on a huge
|
||||
# amount of data to exercise the relevant codepath.
|
||||
encrypter = upload.EncryptAnUploadable(uploadable, chunk_size=1)
|
||||
d = encrypter.read_encrypted(len(plaintext), False)
|
||||
ciphertext = self.successResultOf(d)
|
||||
self.assertEqual(
|
||||
list(map(len, ciphertext)),
|
||||
# Chunk size was specified as 1 above so we will get the whole
|
||||
# plaintext in one byte chunks.
|
||||
[1] * len(plaintext),
|
||||
)
|
||||
|
||||
|
||||
# TODO:
|
||||
# upload with exactly 75 servers (shares_of_happiness)
|
||||
# have a download fail
|
||||
|
@ -15,7 +15,18 @@ if PY2:
|
||||
|
||||
import time
|
||||
|
||||
try:
|
||||
from typing import (
|
||||
Callable,
|
||||
Any,
|
||||
)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
from foolscap.api import eventually
|
||||
from eliot.twisted import (
|
||||
inline_callbacks,
|
||||
)
|
||||
from twisted.internet import defer, reactor, error
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
@ -201,3 +212,22 @@ class WaitForDelayedCallsMixin(PollMixin):
|
||||
d.addErrback(log.err, "error while waiting for delayed calls")
|
||||
d.addBoth(lambda ign: res)
|
||||
return d
|
||||
|
||||
@inline_callbacks
|
||||
def until(
|
||||
action, # type: Callable[[], defer.Deferred[Any]]
|
||||
condition, # type: Callable[[], bool]
|
||||
):
|
||||
# type: (...) -> defer.Deferred[None]
|
||||
"""
|
||||
Run a Deferred-returning function until a condition is true.
|
||||
|
||||
:param action: The action to run.
|
||||
:param condition: The predicate signaling stop.
|
||||
|
||||
:return: A Deferred that fires after the condition signals stop.
|
||||
"""
|
||||
while True:
|
||||
yield action()
|
||||
if condition():
|
||||
break
|
||||
|
Loading…
x
Reference in New Issue
Block a user