Merge branch 'master' into 2916.grid-manager-proposal.5

This commit is contained in:
meejah 2021-01-25 15:42:25 -07:00
commit 3c646991ad
6 changed files with 263 additions and 31 deletions

0
newsfragments/3594.minor Normal file
View File

0
newsfragments/3595.minor Normal file
View File

View File

@ -13,19 +13,30 @@ if PY2:
from past.builtins import long, unicode from past.builtins import long, unicode
from six import ensure_str from six import ensure_str
try:
from typing import List
except ImportError:
pass
import os, time, weakref, itertools import os, time, weakref, itertools
import attr
from zope.interface import implementer from zope.interface import implementer
from twisted.python import failure from twisted.python import failure
from twisted.internet import defer from twisted.internet import defer
from twisted.application import service from twisted.application import service
from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually from foolscap.api import Referenceable, Copyable, RemoteCopy
from allmydata.crypto import aes from allmydata.crypto import aes
from allmydata.util.hashutil import file_renewal_secret_hash, \ from allmydata.util.hashutil import file_renewal_secret_hash, \
file_cancel_secret_hash, bucket_renewal_secret_hash, \ file_cancel_secret_hash, bucket_renewal_secret_hash, \
bucket_cancel_secret_hash, plaintext_hasher, \ bucket_cancel_secret_hash, plaintext_hasher, \
storage_index_hash, plaintext_segment_hasher, convergence_hasher storage_index_hash, plaintext_segment_hasher, convergence_hasher
from allmydata.util.deferredutil import timeout_call from allmydata.util.deferredutil import (
timeout_call,
until,
)
from allmydata import hashtree, uri from allmydata import hashtree, uri
from allmydata.storage.server import si_b2a from allmydata.storage.server import si_b2a
from allmydata.immutable import encode from allmydata.immutable import encode
@ -900,13 +911,45 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
raise UploadUnhappinessError(msg) raise UploadUnhappinessError(msg)
@attr.s
class _Accum(object):
"""
Accumulate up to some known amount of ciphertext.
:ivar remaining: The number of bytes still expected.
:ivar ciphertext: The bytes accumulated so far.
"""
remaining = attr.ib(validator=attr.validators.instance_of(int)) # type: int
ciphertext = attr.ib(default=attr.Factory(list)) # type: List[bytes]
def extend(self,
size, # type: int
ciphertext, # type: List[bytes]
):
"""
Accumulate some more ciphertext.
:param size: The amount of data the new ciphertext represents towards
the goal. This may be more than the actual size of the given
ciphertext if the source has run out of data.
:param ciphertext: The new ciphertext to accumulate.
"""
self.remaining -= size
self.ciphertext.extend(ciphertext)
@implementer(IEncryptedUploadable) @implementer(IEncryptedUploadable)
class EncryptAnUploadable(object): class EncryptAnUploadable(object):
"""This is a wrapper that takes an IUploadable and provides """This is a wrapper that takes an IUploadable and provides
IEncryptedUploadable.""" IEncryptedUploadable."""
CHUNKSIZE = 50*1024 CHUNKSIZE = 50*1024
def __init__(self, original, log_parent=None, progress=None): def __init__(self, original, log_parent=None, progress=None, chunk_size=None):
"""
:param chunk_size: The number of bytes to read from the uploadable at a
time, or None for some default.
"""
precondition(original.default_params_set, precondition(original.default_params_set,
"set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,)) "set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,))
self.original = IUploadable(original) self.original = IUploadable(original)
@ -920,6 +963,8 @@ class EncryptAnUploadable(object):
self._ciphertext_bytes_read = 0 self._ciphertext_bytes_read = 0
self._status = None self._status = None
self._progress = progress self._progress = progress
if chunk_size is not None:
self.CHUNKSIZE = chunk_size
def set_upload_status(self, upload_status): def set_upload_status(self, upload_status):
self._status = IUploadStatus(upload_status) self._status = IUploadStatus(upload_status)
@ -1026,47 +1071,53 @@ class EncryptAnUploadable(object):
# and size # and size
d.addCallback(lambda ignored: self.get_size()) d.addCallback(lambda ignored: self.get_size())
d.addCallback(lambda ignored: self._get_encryptor()) d.addCallback(lambda ignored: self._get_encryptor())
# then fetch and encrypt the plaintext. The unusual structure here
# (passing a Deferred *into* a function) is needed to avoid accum = _Accum(length)
# overflowing the stack: Deferreds don't optimize out tail recursion.
# We also pass in a list, to which _read_encrypted will append def action():
# ciphertext. """
ciphertext = [] Read some bytes into the accumulator.
d2 = defer.Deferred() """
d.addCallback(lambda ignored: return self._read_encrypted(accum, hash_only)
self._read_encrypted(length, ciphertext, hash_only, d2))
d.addCallback(lambda ignored: d2) def condition():
"""
Check to see if the accumulator has all the data.
"""
return accum.remaining == 0
d.addCallback(lambda ignored: until(action, condition))
d.addCallback(lambda ignored: accum.ciphertext)
return d return d
def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done): def _read_encrypted(self,
if not remaining: ciphertext_accum, # type: _Accum
fire_when_done.callback(ciphertext) hash_only, # type: bool
return None ):
# type: (...) -> defer.Deferred
"""
Read the next chunk of plaintext, encrypt it, and extend the accumulator
with the resulting ciphertext.
"""
# tolerate large length= values without consuming a lot of RAM by # tolerate large length= values without consuming a lot of RAM by
# reading just a chunk (say 50kB) at a time. This only really matters # reading just a chunk (say 50kB) at a time. This only really matters
# when hash_only==True (i.e. resuming an interrupted upload), since # when hash_only==True (i.e. resuming an interrupted upload), since
# that's the case where we will be skipping over a lot of data. # that's the case where we will be skipping over a lot of data.
size = min(remaining, self.CHUNKSIZE) size = min(ciphertext_accum.remaining, self.CHUNKSIZE)
remaining = remaining - size
# read a chunk of plaintext.. # read a chunk of plaintext..
d = defer.maybeDeferred(self.original.read, size) d = defer.maybeDeferred(self.original.read, size)
# N.B.: if read() is synchronous, then since everything else is
# actually synchronous too, we'd blow the stack unless we stall for a
# tick. Once you accept a Deferred from IUploadable.read(), you must
# be prepared to have it fire immediately too.
d.addCallback(fireEventually)
def _good(plaintext): def _good(plaintext):
# and encrypt it.. # and encrypt it..
# o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/' # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
ct = self._hash_and_encrypt_plaintext(plaintext, hash_only) ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
ciphertext.extend(ct) # Intentionally tell the accumulator about the expected size, not
self._read_encrypted(remaining, ciphertext, hash_only, # the actual size. If we run out of data we still want remaining
fire_when_done) # to drop otherwise it will never reach 0 and the loop will never
def _err(why): # end.
fire_when_done.errback(why) ciphertext_accum.extend(size, ct)
d.addCallback(_good) d.addCallback(_good)
d.addErrback(_err) return d
return None
def _hash_and_encrypt_plaintext(self, data, hash_only): def _hash_and_encrypt_plaintext(self, data, hash_only):
assert isinstance(data, (tuple, list)), type(data) assert isinstance(data, (tuple, list)), type(data)

View File

@ -74,3 +74,58 @@ class DeferredUtilTests(unittest.TestCase, deferredutil.WaitForDelayedCallsMixin
d = defer.succeed(None) d = defer.succeed(None)
d.addBoth(self.wait_for_delayed_calls) d.addBoth(self.wait_for_delayed_calls)
return d return d
class UntilTests(unittest.TestCase):
"""
Tests for ``deferredutil.until``.
"""
def test_exception(self):
"""
If the action raises an exception, the ``Deferred`` returned by ``until``
fires with a ``Failure``.
"""
self.assertFailure(
deferredutil.until(lambda: 1/0, lambda: True),
ZeroDivisionError,
)
def test_stops_on_condition(self):
"""
The action is called repeatedly until ``condition`` returns ``True``.
"""
calls = []
def action():
calls.append(None)
def condition():
return len(calls) == 3
self.assertIs(
self.successResultOf(
deferredutil.until(action, condition),
),
None,
)
self.assertEqual(3, len(calls))
def test_waits_for_deferred(self):
"""
If the action returns a ``Deferred`` then it is called again when the
``Deferred`` fires.
"""
counter = [0]
r1 = defer.Deferred()
r2 = defer.Deferred()
results = [r1, r2]
def action():
counter[0] += 1
return results.pop(0)
def condition():
return False
deferredutil.until(action, condition)
self.assertEqual([1], counter)
r1.callback(None)
self.assertEqual([2], counter)

View File

@ -14,6 +14,17 @@ if PY2:
import os, shutil import os, shutil
from io import BytesIO from io import BytesIO
from base64 import (
b64encode,
)
from hypothesis import (
given,
)
from hypothesis.strategies import (
just,
integers,
)
from twisted.trial import unittest from twisted.trial import unittest
from twisted.python.failure import Failure from twisted.python.failure import Failure
@ -2029,6 +2040,91 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
f.close() f.close()
return None return None
class EncryptAnUploadableTests(unittest.TestCase):
"""
Tests for ``EncryptAnUploadable``.
"""
def test_same_length(self):
"""
``EncryptAnUploadable.read_encrypted`` returns ciphertext of the same
length as the underlying plaintext.
"""
plaintext = b"hello world"
uploadable = upload.FileHandle(BytesIO(plaintext), None)
uploadable.set_default_encoding_parameters({
# These values shouldn't matter.
"k": 3,
"happy": 5,
"n": 10,
"max_segment_size": 128 * 1024,
})
encrypter = upload.EncryptAnUploadable(uploadable)
ciphertext = b"".join(self.successResultOf(encrypter.read_encrypted(1024, False)))
self.assertEqual(len(ciphertext), len(plaintext))
@given(just(b"hello world"), integers(min_value=0, max_value=len(b"hello world")))
def test_known_result(self, plaintext, split_at):
"""
``EncryptAnUploadable.read_encrypted`` returns a known-correct ciphertext
string for certain inputs. The ciphertext is independent of the read
sizes.
"""
convergence = b"\x42" * 16
uploadable = upload.FileHandle(BytesIO(plaintext), convergence)
uploadable.set_default_encoding_parameters({
# The convergence key is a function of k, n, and max_segment_size
# (among other things). The value for happy doesn't matter
# though.
"k": 3,
"happy": 5,
"n": 10,
"max_segment_size": 128 * 1024,
})
encrypter = upload.EncryptAnUploadable(uploadable)
def read(n):
return b"".join(self.successResultOf(encrypter.read_encrypted(n, False)))
# Read the string in one or two pieces to make sure underlying state
# is maintained properly.
first = read(split_at)
second = read(len(plaintext) - split_at)
third = read(1)
ciphertext = first + second + third
self.assertEqual(
b"Jd2LHCRXozwrEJc=",
b64encode(ciphertext),
)
def test_large_read(self):
"""
``EncryptAnUploadable.read_encrypted`` succeeds even when the requested
data length is much larger than the chunk size.
"""
convergence = b"\x42" * 16
# 4kB of plaintext
plaintext = b"\xde\xad\xbe\xef" * 1024
uploadable = upload.FileHandle(BytesIO(plaintext), convergence)
uploadable.set_default_encoding_parameters({
"k": 3,
"happy": 5,
"n": 10,
"max_segment_size": 128 * 1024,
})
# Make the chunk size very small so we don't have to operate on a huge
# amount of data to exercise the relevant codepath.
encrypter = upload.EncryptAnUploadable(uploadable, chunk_size=1)
d = encrypter.read_encrypted(len(plaintext), False)
ciphertext = self.successResultOf(d)
self.assertEqual(
list(map(len, ciphertext)),
# Chunk size was specified as 1 above so we will get the whole
# plaintext in one byte chunks.
[1] * len(plaintext),
)
# TODO: # TODO:
# upload with exactly 75 servers (shares_of_happiness) # upload with exactly 75 servers (shares_of_happiness)
# have a download fail # have a download fail

View File

@ -15,7 +15,18 @@ if PY2:
import time import time
try:
from typing import (
Callable,
Any,
)
except ImportError:
pass
from foolscap.api import eventually from foolscap.api import eventually
from eliot.twisted import (
inline_callbacks,
)
from twisted.internet import defer, reactor, error from twisted.internet import defer, reactor, error
from twisted.python.failure import Failure from twisted.python.failure import Failure
@ -201,3 +212,22 @@ class WaitForDelayedCallsMixin(PollMixin):
d.addErrback(log.err, "error while waiting for delayed calls") d.addErrback(log.err, "error while waiting for delayed calls")
d.addBoth(lambda ign: res) d.addBoth(lambda ign: res)
return d return d
@inline_callbacks
def until(
action, # type: Callable[[], defer.Deferred[Any]]
condition, # type: Callable[[], bool]
):
# type: (...) -> defer.Deferred[None]
"""
Run a Deferred-returning function until a condition is true.
:param action: The action to run.
:param condition: The predicate signaling stop.
:return: A Deferred that fires after the condition signals stop.
"""
while True:
yield action()
if condition():
break