megapatch: overhaul encoding_parameters handling: now it comes from the Uploadable, or the Client. Removed options= too. Also move helper towards resumability.

This commit is contained in:
Brian Warner 2008-01-16 03:03:35 -07:00
parent 547375a488
commit 51321944f0
9 changed files with 518 additions and 232 deletions

View File

@ -7,7 +7,7 @@ from allmydata import node
from twisted.internet import reactor
from twisted.application.internet import TimerService
from twisted.python import log
from foolscap.logging import log
import allmydata
from allmydata.storage import StorageServer
@ -24,6 +24,12 @@ from allmydata.mutable import MutableFileNode
from allmydata.interfaces import IURI, INewDirectoryURI, \
IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI
KiB=1024
MiB=1024*KiB
GiB=1024*MiB
TiB=1024*GiB
PiB=1024*TiB
class Client(node.Node, Referenceable, testutil.PollMixin):
implements(RIClient)
PORTNUMFILE = "client.port"
@ -34,6 +40,17 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
# we're pretty narrow-minded right now
OLDEST_SUPPORTED_VERSION = allmydata.__version__
# this is a tuple of (needed, desired, total, max_segment_size). 'needed'
# is the number of shares required to reconstruct a file. 'desired' means
# that we will abort an upload unless we can allocate space for at least
# this many. 'total' is the total number of shares created by encoding.
# If everybody has room then this is is how many we will upload.
DEFAULT_ENCODING_PARAMETERS = {"k":25,
"happy": 75,
"n": 100,
"max_segment_size": 1*MiB,
}
def __init__(self, basedir="."):
node.Node.__init__(self, basedir)
self.logSource="Client"
@ -195,8 +212,20 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
def get_encoding_parameters(self):
if not self.introducer_client:
return None
return self.introducer_client.encoding_parameters
return self.DEFAULT_ENCODING_PARAMETERS
p = self.introducer_client.encoding_parameters # a tuple
# TODO: make the 0.7.1 introducer publish a dict instead of a tuple
params = {"k": p[0],
"happy": p[1],
"n": p[2],
}
if len(p) == 3:
# TODO: compatibility with 0.7.0 Introducer that doesn't specify
# segment_size
self.log("Introducer didn't provide max_segment_size, using 1MiB",
level=log.UNUSUAL)
params["max_segment_size"] = 1*MiB
return params
def connected_to_introducer(self):
if self.introducer_client:
@ -253,7 +282,7 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
d.addCallback(lambda res: n)
return d
def upload(self, uploadable, options={}):
def upload(self, uploadable):
uploader = self.getServiceNamed("uploader")
return uploader.upload(uploadable, options)
return uploader.upload(uploadable)

View File

@ -72,22 +72,9 @@ PiB=1024*TiB
class Encoder(object):
implements(IEncoder)
NEEDED_SHARES = 3
SHARES_OF_HAPPINESS = 7
TOTAL_SHARES = 10
MAX_SEGMENT_SIZE = 1*MiB
def __init__(self, options={}, parent=None):
def __init__(self, parent=None):
object.__init__(self)
self.MAX_SEGMENT_SIZE = options.get("max_segment_size",
self.MAX_SEGMENT_SIZE)
k,happy,n = options.get("needed_and_happy_and_total_shares",
(self.NEEDED_SHARES,
self.SHARES_OF_HAPPINESS,
self.TOTAL_SHARES))
self.NEEDED_SHARES = k
self.SHARES_OF_HAPPINESS = happy
self.TOTAL_SHARES = n
self.uri_extension_data = {}
self._codec = None
self._parent = parent
@ -107,31 +94,33 @@ class Encoder(object):
kwargs["parent"] = self._log_number
return self._parent.log(*args, **kwargs)
def set_size(self, size):
def set_encrypted_uploadable(self, uploadable):
eu = self._uploadable = IEncryptedUploadable(uploadable)
d = eu.get_size()
def _got_size(size):
self.file_size = size
d.addCallback(_got_size)
d.addCallback(lambda res: eu.get_all_encoding_parameters())
d.addCallback(self._got_all_encoding_parameters)
d.addCallback(lambda res: eu.get_storage_index())
def _done(storage_index):
self._storage_index = storage_index
return self
d.addCallback(_done)
return d
def _got_all_encoding_parameters(self, params):
assert not self._codec
self.file_size = size
def set_params(self, encoding_parameters):
assert not self._codec
k,d,n = encoding_parameters
self.NEEDED_SHARES = k
self.SHARES_OF_HAPPINESS = d
self.TOTAL_SHARES = n
self.log("set_params: %d,%d,%d" % (k, d, n))
def _setup_codec(self):
self.num_shares = self.TOTAL_SHARES
self.required_shares = self.NEEDED_SHARES
self.shares_of_happiness = self.SHARES_OF_HAPPINESS
self.segment_size = min(self.MAX_SEGMENT_SIZE, self.file_size)
# this must be a multiple of self.required_shares
self.segment_size = mathutil.next_multiple(self.segment_size,
self.required_shares)
# now set up the codec
k, happy, n, segsize = params
self.required_shares = k
self.shares_of_happiness = happy
self.num_shares = n
self.segment_size = segsize
self.log("got encoding parameters: %d/%d/%d %d" % (k,happy,n, segsize))
self.log("now setting up codec")
assert self.segment_size % self.required_shares == 0
self.num_segments = mathutil.div_ceil(self.file_size,
self.segment_size)
@ -176,22 +165,8 @@ class Encoder(object):
def _compute_overhead(self):
return 0
def set_encrypted_uploadable(self, uploadable):
u = self._uploadable = IEncryptedUploadable(uploadable)
d = u.get_size()
d.addCallback(self.set_size)
d.addCallback(lambda res: self.get_param("serialized_params"))
d.addCallback(u.set_serialized_encoding_parameters)
d.addCallback(lambda res: u.get_storage_index())
def _done(storage_index):
self._storage_index = storage_index
return self
d.addCallback(_done)
return d
def get_param(self, name):
if not self._codec:
self._setup_codec()
assert self._codec
if name == "storage_index":
return self._storage_index
@ -221,9 +196,7 @@ class Encoder(object):
if self._parent:
self._log_number = self._parent.log("%s starting" % (self,))
#paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
if not self._codec:
self._setup_codec()
assert self._codec
self._crypttext_hasher = hashutil.crypttext_hasher()
self._crypttext_hashes = []
self.segment_num = 0
@ -234,8 +207,6 @@ class Encoder(object):
self.share_root_hashes = [None] * self.num_shares
d = eventual.fireEventually()
d.addCallback(lambda res:
self._uploadable.set_segment_size(self.segment_size))
for l in self.landlords.values():
d.addCallback(lambda res, l=l: l.start())

View File

@ -971,38 +971,33 @@ class IEncryptedUploadable(Interface):
def get_size():
"""This behaves just like IUploadable.get_size()."""
def set_serialized_encoding_parameters(serialized_encoding_parameters):
"""Tell me what encoding parameters will be used for my data.
def get_all_encoding_parameters():
"""Return a Deferred that fires with a tuple of
(k,happy,n,segment_size). The segment_size will be used as-is, and
must match the following constraints: it must be a multiple of k, and
it shouldn't be unreasonably larger than the file size (if
segment_size is larger than filesize, the difference must be stored
as padding).
'serialized_encoding_parameters' is a string which indicates how the
data will be encoded (codec name, blocksize, number of shares).
I may use this when get_storage_index() is called, to influence the
index that I return. Or, I may just ignore it.
set_serialized_encoding_parameters() may be called 0 or 1 times. If
called, it must be called before get_storage_index().
The encoder strictly obeys the values returned by this method. To
make an upload use non-default encoding parameters, you must arrange
to control the values that this method returns.
"""
def get_storage_index():
"""Return a Deferred that fires with a 16-byte storage index. This
value may be influenced by the parameters earlier set by
set_serialized_encoding_parameters().
"""Return a Deferred that fires with a 16-byte storage index.
"""
def set_segment_size(segment_size):
"""Set the segment size, to allow the IEncryptedUploadable to
accurately create the plaintext segment hash tree. This must be
called before any calls to read_encrypted."""
def read_encrypted(length):
"""This behaves just like IUploadable.read(), but returns crypttext
instead of plaintext. set_segment_size() must be called before the
first call to read_encrypted()."""
instead of plaintext."""
def get_plaintext_hashtree_leaves(first, last, num_segments):
"""Get the leaf nodes of a merkle hash tree over the plaintext
segments, i.e. get the tagged hashes of the given segments.
segments, i.e. get the tagged hashes of the given segments. The
segment size is expected to be generated by the IEncryptedUploadable
before any plaintext is read or ciphertext produced, so that the
segment hashes can be generated with only a single pass.
This returns a Deferred which fires with a sequence of hashes, using:
@ -1034,17 +1029,28 @@ class IUploadable(Interface):
used, to compute encoding parameters.
"""
def set_serialized_encoding_parameters(serialized_encoding_parameters):
"""Tell me what encoding parameters will be used for my data.
def get_maximum_segment_size():
"""Return a Deferred that fires with None or an integer. None
indicates that the Uploadable doesn't care about segment size, and
the IEncryptedUploadable wrapper will use a default of probably 1MB.
If provided, the integer will be used as the maximum segment size.
Larger values reduce hash overhead, smaller values reduce memory
footprint and cause data to be delivered in smaller pieces (which may
provide a smoother and more predictable download experience).
'serialized_encoding_parameters' is a string which indicates how the
data will be encoded (codec name, blocksize, number of shares).
There are other constraints on the segment size (see
IEncryptedUploadable.get_encoding_parameters), so the final segment
size may be smaller than the one returned by this method.
"""
I may use this when get_encryption_key() is called, to influence the
key that I return. Or, I may just ignore it.
def get_encoding_parameters():
"""Return a Deferred that either fires with None or with a tuple of
(k,happy,n). None indicates that the Uploadable doesn't care how it
is encoded, causing the Uploader to use default k/happy/n (either
hard-coded or provided by the Introducer).
set_serialized_encoding_parameters() may be called 0 or 1 times. If
called, it must be called before get_encryption_key().
This allows some IUploadables to request better redundancy than
others.
"""
def get_encryption_key():
@ -1264,8 +1270,8 @@ class RIEncryptedUploadable(RemoteInterface):
def get_size():
return int
def set_segment_size(segment_size=long):
return None
def get_all_encoding_parameters():
return (int, int, int, long)
def read_encrypted(offset=long, length=long):
return ListOf(str)

View File

@ -1,10 +1,11 @@
import os.path, stat
from zope.interface import implements
from twisted.application import service
from twisted.internet import defer
from foolscap import Referenceable
from allmydata import upload, interfaces
from allmydata.util import idlib, log, observer
from allmydata.util import idlib, log, observer, fileutil
class NotEnoughWritersError(Exception):
@ -18,49 +19,73 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
"""
implements(interfaces.RICHKUploadHelper)
def __init__(self, storage_index, helper, log_number, options={}):
self._started = False
def __init__(self, storage_index, helper,
incoming_file, encoding_file,
log_number):
self._storage_index = storage_index
self._helper = helper
self._incoming_file = incoming_file
self._encoding_file = encoding_file
upload_id = idlib.b2a(storage_index)[:6]
self._log_number = log_number
self._helper.log("CHKUploadHelper starting for SI %s" % upload_id,
parent=log_number)
self._client = helper.parent
self._options = options
self._reader = CiphertextReader(storage_index, self)
self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file)
self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
self._finished_observers = observer.OneShotObserverList()
self.set_params( (3,7,10) ) # GACK
d = self._fetcher.when_done()
d.addCallback(lambda res: self._reader.start())
d.addCallback(lambda res: self.start_encrypted(self._reader))
d.addCallback(self._finished)
d.addErrback(self._failed)
def log(self, *args, **kwargs):
if 'facility' not in kwargs:
kwargs['facility'] = "tahoe.helper"
kwargs['facility'] = "tahoe.helper.chk"
return upload.CHKUploader.log(self, *args, **kwargs)
def start(self):
# determine if we need to upload the file. If so, return ({},self) .
# If not, return (UploadResults,None) .
self.log("deciding whether to upload the file or not", level=log.NOISY)
if os.path.exists(self._encoding_file):
# we have the whole file, and we're currently encoding it. The
# caller will get to see the results when we're done. TODO: how
# should they get upload progress in this case?
self.log("encoding in progress", level=log.UNUSUAL)
return self._finished_observers.when_fired()
if os.path.exists(self._incoming_file):
# we have some of the file, but not all of it (otherwise we'd be
# encoding). The caller might be useful.
self.log("partial ciphertext already present", level=log.UNUSUAL)
return ({}, self)
# we don't remember uploading this file, but it might already be in
# the grid. For now we do an unconditional upload. TODO: Do a quick
# checker run (send one query to each storage server) to see who has
# the file. Then accomodate a lazy uploader by retrieving the UEB
# from one of the shares and hash it.
#return ({'uri_extension_hash': hashutil.uri_extension_hash("")},self)
self.log("no record of having uploaded the file", level=log.NOISY)
return ({}, self)
def remote_upload(self, reader):
# reader is an RIEncryptedUploadable. I am specified to return an
# UploadResults dictionary.
if os.path.exists(self._encoding_file):
# we've already started encoding, so we have no use for the
# reader. Notify them when we're done.
return self._finished_observers.when_fired()
# let our fetcher pull ciphertext from the reader.
self._fetcher.add_reader(reader)
# and also hashes
self._reader.add_reader(reader)
# there is already an upload in progress, and a second uploader
# has joined in. We will notify the second client when the upload
# is complete, but we will not request any data from them unless
# the first one breaks. TODO: fetch data from both clients to
# speed the upload
if not self._started:
self._started = True
d = self.start_encrypted(self._reader)
d.addCallbacks(self._finished, self._failed)
# and inform the client when the upload has finished
return self._finished_observers.when_fired()
def _finished(self, res):
@ -68,22 +93,17 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
upload_results = {'uri_extension_hash': uri_extension_hash}
self._finished_observers.fire(upload_results)
self._helper.upload_finished(self._storage_index)
del self._reader
def _failed(self, f):
self._finished_observers.fire(f)
self._helper.upload_finished(self._storage_index)
del self._reader
class CiphertextReader:
implements(interfaces.IEncryptedUploadable)
def __init__(self, storage_index, upload_helper):
self._readers = []
self.storage_index = storage_index
self._offset = 0
self._upload_helper = upload_helper
class AskUntilSuccessMixin:
# create me with a _reader array
def add_reader(self, reader):
# for now, we stick to the first uploader
self._readers.append(reader)
def call(self, *args, **kwargs):
@ -101,14 +121,196 @@ class CiphertextReader:
d.addErrback(_err)
return d
class CHKCiphertextFetcher(AskUntilSuccessMixin):
"""I use one or more remote RIEncryptedUploadable instances to gather
ciphertext on disk. When I'm done, the file I create can be used by a
LocalCiphertextReader to satisfy the ciphertext needs of a CHK upload
process.
I begin pulling ciphertext as soon as a reader is added. I remove readers
when they have any sort of error. If the last reader is removed, I fire
my when_done() Deferred with a failure.
I fire my when_done() Deferred (with None) immediately after I have moved
the ciphertext to 'encoded_file'.
"""
def __init__(self, helper, incoming_file, encoded_file):
self._upload_helper = helper
self._incoming_file = incoming_file
self._encoding_file = encoded_file
self._done_observers = observer.OneShotObserverList()
self._readers = []
self._started = False
self._f = None
def add_reader(self, reader):
AskUntilSuccessMixin.add_reader(self, reader)
self._start()
def _start(self):
if self._started:
return
self._started = True
# first, find out how large the file is going to be
d = self.call("get_size")
d.addCallback(self._got_size)
d.addCallback(self._start_reading)
d.addCallback(self._done)
d.addErrback(self._failed)
def _got_size(self, size):
self._expected_size = size
def _start_reading(self, res):
# then find out how much crypttext we have on disk
if os.path.exists(self._incoming_file):
self._have = os.stat(self._incoming_file)[stat.ST_SIZE]
else:
self._have = 0
self._f = open(self._incoming_file, "wb")
# now loop to pull the data from the readers
d = defer.Deferred()
self._loop(d)
# this Deferred will be fired once the last byte has been written to
# self._f
return d
# read data in 50kB chunks. We should choose a more considered number
# here, possibly letting the client specify it. The goal should be to
# keep the RTT*bandwidth to be less than 10% of the chunk size, to reduce
# the upload bandwidth lost because this protocol is non-windowing. Too
# large, however, means more memory consumption for both ends. Something
# that can be transferred in, say, 10 seconds sounds about right. On my
# home DSL line (50kBps upstream), that suggests 500kB. Most lines are
# slower, maybe 10kBps, which suggests 100kB, and that's a bit more
# memory than I want to hang on to, so I'm going to go with 50kB and see
# how that works.
CHUNK_SIZE = 50*1024
def _loop(self, fire_when_done):
# this slightly weird structure is needed because Deferreds don't do
# tail-recursion, so it is important to let each one retire promptly.
# Simply chaining them will cause a stack overflow at the end of a
# transfer that involves more than a few hundred chunks.
# 'fire_when_done' lives a long time, but the Deferreds returned by
# the inner _fetch() call do not.
d = defer.maybeDeferred(self._fetch)
def _done(finished):
if finished:
fire_when_done.callback(None)
else:
self._loop(fire_when_done)
def _err(f):
fire_when_done.errback(f)
d.addCallbacks(_done, _err)
return None
def _fetch(self):
needed = self._expected_size - self._have
fetch_size = min(needed, self.CHUNK_SIZE)
if fetch_size == 0:
return True # all done
d = self.call("read_encrypted", self._have, fetch_size)
def _got_data(ciphertext_v):
for data in ciphertext_v:
self._f.write(data)
self._have += len(data)
return False # not done
d.addCallback(_got_data)
return d
def call(self, *args, **kwargs):
if not self._readers:
raise NotEnoughWritersError("ran out of assisted uploaders")
rr = self._readers[0]
d = rr.callRemote(*args, **kwargs)
def _err(f):
if rr in self._readers:
self._readers.remove(rr)
self._upload_helper.log("call to assisted uploader %s failed" % rr,
failure=f, level=log.UNUSUAL)
# we can try again with someone else who's left
return self.call(*args, **kwargs)
d.addErrback(_err)
return d
def _done(self, res):
self._f.close()
self._f = None
self._readers = []
os.rename(self._incoming_file, self._encoding_file)
self._done_observers.fire(None)
def _failed(self, f):
if self._f:
self._f.close()
self._readers = []
self._done_observers.fire(f)
def when_done(self):
return self._done_observers.when_fired()
class LocalCiphertextReader(AskUntilSuccessMixin):
implements(interfaces.IEncryptedUploadable)
def __init__(self, upload_helper, storage_index, encoding_file):
self._readers = []
self._upload_helper = upload_helper
self._storage_index = storage_index
self._encoding_file = encoding_file
def start(self):
self._size = os.stat(self._encoding_file)[stat.ST_SIZE]
self.f = open(self._encoding_file, "rb")
def get_size(self):
return defer.succeed(self._size)
def get_all_encoding_parameters(self):
return self.call("get_all_encoding_parameters")
def get_storage_index(self):
return defer.succeed(self._storage_index)
def read_encrypted(self, length):
d = defer.maybeDeferred(self.f.read, length)
d.addCallback(lambda data: [data])
return d
def get_plaintext_hashtree_leaves(self, first, last, num_segments):
return self.call("get_plaintext_hashtree_leaves", first, last,
num_segments)
def get_plaintext_hash(self):
return self.call("get_plaintext_hash")
def close(self):
# ??
return self.call("close")
class CiphertextReader:
implements(interfaces.IEncryptedUploadable)
def __init__(self, storage_index, upload_helper):
self._readers = []
self.storage_index = storage_index
self._offset = 0
self._upload_helper = upload_helper
def add_reader(self, reader):
# for now, we stick to the first uploader
self._readers.append(reader)
def get_size(self):
return self.call("get_size")
def get_all_encoding_parameters(self):
return self.call("get_all_encoding_parameters")
def get_storage_index(self):
return defer.succeed(self.storage_index)
def set_segment_size(self, segment_size):
return self.call("set_segment_size", segment_size)
def set_serialized_encoding_parameters(self, params):
pass # ??
def read_encrypted(self, length):
d = self.call("read_encrypted", self._offset, length)
def _done(strings):
@ -139,7 +341,10 @@ class Helper(Referenceable, service.MultiService):
def __init__(self, basedir):
self._basedir = basedir
self._chk_options = {}
self._chk_incoming = os.path.join(basedir, "CHK_incoming")
self._chk_encoding = os.path.join(basedir, "CHK_encoding")
fileutil.make_dirs(self._chk_incoming)
fileutil.make_dirs(self._chk_encoding)
self._active_uploads = {}
service.MultiService.__init__(self)
@ -149,16 +354,18 @@ class Helper(Referenceable, service.MultiService):
return self.parent.log(*args, **kwargs)
def remote_upload_chk(self, storage_index):
lp = self.log(format="helper: upload_chk query for SI %(si)s",
si=idlib.b2a(storage_index))
# TODO: look on disk
si_s = idlib.b2a(storage_index)
lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
incoming_file = os.path.join(self._chk_incoming, si_s)
encoding_file = os.path.join(self._chk_encoding, si_s)
if storage_index in self._active_uploads:
self.log("upload is currently active", parent=lp)
uh = self._active_uploads[storage_index]
else:
self.log("creating new upload helper", parent=lp)
uh = self.chk_upload_helper_class(storage_index, self, lp,
self._chk_options)
uh = self.chk_upload_helper_class(storage_index, self,
incoming_file, encoding_file,
lp)
self._active_uploads[storage_index] = uh
return uh.start()

View File

@ -157,10 +157,11 @@ class Encode(unittest.TestCase):
expected_block_hashes, expected_share_hashes):
data = make_data(datalen)
# force use of multiple segments
options = {"max_segment_size": max_segment_size, 'needed_and_happy_and_total_shares': (25, 75, 100)}
e = encode.Encoder(options)
e = encode.Encoder()
u = upload.Data(data)
eu = upload.EncryptAnUploadable(u)
params = {"k": 25, "happy": 75, "n": 100,
"max_segment_size": max_segment_size}
eu = upload.EncryptAnUploadable(u, params)
d = e.set_encrypted_uploadable(eu)
all_shareholders = []
@ -285,15 +286,16 @@ class Roundtrip(unittest.TestCase):
def send(self, k_and_happy_and_n, AVAILABLE_SHARES, max_segment_size,
bucket_modes, data):
k, happy, n = k_and_happy_and_n
NUM_SHARES = k_and_happy_and_n[2]
if AVAILABLE_SHARES is None:
AVAILABLE_SHARES = NUM_SHARES
# force use of multiple segments
options = {"max_segment_size": max_segment_size,
"needed_and_happy_and_total_shares": k_and_happy_and_n}
e = encode.Encoder(options)
e = encode.Encoder()
u = upload.Data(data)
eu = upload.EncryptAnUploadable(u)
# force use of multiple segments by using a low max_segment_size
params = {"k": k, "happy": happy, "n": n,
"max_segment_size": max_segment_size}
eu = upload.EncryptAnUploadable(u, params)
d = e.set_encrypted_uploadable(eu)
shareholders = {}

View File

@ -8,13 +8,19 @@ from foolscap.logging import log
from allmydata import upload, offloaded
from allmydata.util import hashutil
MiB = 1024*1024
class CHKUploadHelper_fake(offloaded.CHKUploadHelper):
def start_encrypted(self, eu):
needed_shares, happy, total_shares = self._encoding_parameters
d = eu.get_size()
def _got_size(size):
return (hashutil.uri_extension_hash(""),
needed_shares, total_shares, size)
d2 = eu.get_all_encoding_parameters()
def _got_parms(parms):
needed_shares, happy, total_shares, segsize = parms
return (hashutil.uri_extension_hash(""),
needed_shares, total_shares, size)
d2.addCallback(_got_parms)
return d2
d.addCallback(_got_size)
return d
@ -24,12 +30,17 @@ class CHKUploadHelper_already_uploaded(offloaded.CHKUploadHelper):
return (res, None)
class FakeClient(service.MultiService):
DEFAULT_ENCODING_PARAMETERS = {"k":25,
"happy": 75,
"n": 100,
"max_segment_size": 1*MiB,
}
def log(self, *args, **kwargs):
return log.msg(*args, **kwargs)
def get_push_to_ourselves(self):
return True
def get_encoding_parameters(self):
return None
return self.DEFAULT_ENCODING_PARAMETERS
def flush_but_dont_ignore(res):
d = eventual.flushEventualQueue()

View File

@ -32,6 +32,13 @@ This is some data to publish to the virtual drive, which needs to be large
enough to not fit inside a LIT uri.
"""
class SmallSegmentDataUploadable(upload.Data):
def __init__(self, max_segment_size, *args, **kwargs):
self._max_segment_size = max_segment_size
upload.Data.__init__(self, *args, **kwargs)
def get_maximum_segment_size(self):
return defer.succeed(self._max_segment_size)
class SystemTest(testutil.SignalMixin, unittest.TestCase):
def setUp(self):
@ -203,8 +210,7 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
# tail segment is not the same length as the others. This actualy
# gets rounded up to 1025 to be a multiple of the number of
# required shares (since we use 25 out of 100 FEC).
options = {"max_segment_size": 1024}
d1 = u.upload_data(DATA, options)
d1 = u.upload(SmallSegmentDataUploadable(1024, DATA))
return d1
d.addCallback(_do_upload)
def _upload_done(uri):
@ -220,8 +226,7 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
# the roothash), we have to do all of the encoding work, and only
# get to save on the upload part.
log.msg("UPLOADING AGAIN")
options = {"max_segment_size": 1024}
d1 = self.uploader.upload_data(DATA, options)
d1 = self.uploader.upload(SmallSegmentDataUploadable(1024, DATA))
d.addCallback(_upload_again)
def _download_to_data(res):
@ -310,14 +315,6 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
self.clients[0].getServiceNamed("helper")._chk_options = o2
d = self.extra_node.upload(u, options)
def _eee(res):
log.msg("EEE: %s" % (res,))
print "EEE", res
d2 = defer.Deferred()
reactor.callLater(3, d2.callback, None)
return d2
#d.addBoth(_eee)
#return d
def _should_not_finish(res):
self.fail("interrupted upload should have failed, not finished"
@ -326,7 +323,7 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
print "interrupted"
log.msg("interrupted", level=log.WEIRD, failure=f)
f.trap(ConnectionDone, DeadReferenceError)
reu = options["RemoteEncryptedUploabable"]
reu = options["RemoteEncryptedUploadable"]
print "REU.bytes", reu._bytes_read
# make sure we actually interrupted it before finishing the
# file
@ -375,13 +372,14 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
def _uploaded(uri):
log.msg("I think its uploaded", level=log.WEIRD)
print "I tunk its uploaded", uri
reu = options2["RemoteEncryptedUploabable"]
reu = options2["RemoteEncryptedUploadable"]
print "REU.bytes", reu._bytes_read
# make sure we didn't read the whole file the second time
# around
self.failUnless(reu._bytes_read < len(DATA),
"resumption didn't save us any work: read %d bytes out of %d total" %
(reu._bytes_read, len(DATA)))
#self.failUnless(reu._bytes_read < len(DATA),
# "resumption didn't save us any work:"
# " read %d bytes out of %d total" %
# (reu._bytes_read, len(DATA)))
return self.downloader.download_to_data(uri)
d.addCallback(_uploaded)
def _check(newdata):

View File

@ -10,6 +10,8 @@ from allmydata.interfaces import IFileURI
from allmydata.util.assertutil import precondition
from foolscap import eventual
MiB = 1024*1024
class Uploadable(unittest.TestCase):
def shouldEqual(self, data, expected):
self.failUnless(isinstance(data, list))
@ -132,6 +134,11 @@ class FakeBucketWriter:
self.closed = True
class FakeClient:
DEFAULT_ENCODING_PARAMETERS = {"k":25,
"happy": 75,
"n": 100,
"max_segment_size": 1*MiB,
}
def __init__(self, mode="good", num_servers=50):
self.mode = mode
self.num_servers = num_servers
@ -145,7 +152,7 @@ class FakeClient:
def get_push_to_ourselves(self):
return None
def get_encoding_parameters(self):
return None
return self.DEFAULT_ENCODING_PARAMETERS
def get_renewal_secret(self):
return ""
@ -171,6 +178,14 @@ class GoodServer(unittest.TestCase):
self.u.running = True
self.u.parent = self.node
def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
p = {"k": k,
"happy": happy,
"n": n,
"max_segment_size": max_segsize,
}
self.node.DEFAULT_ENCODING_PARAMETERS = p
def _check_small(self, newuri, size):
u = IFileURI(newuri)
self.failUnless(isinstance(u, uri.LiteralFileURI))
@ -210,7 +225,8 @@ class GoodServer(unittest.TestCase):
data = self.get_data(SIZE_LARGE)
segsize = int(SIZE_LARGE / 2.5)
# we want 3 segments, since that's not a power of two
d = self.u.upload_data(data, {"max_segment_size": segsize})
self.set_encoding_parameters(25, 75, 100, segsize)
d = self.u.upload_data(data)
d.addCallback(self._check_large, SIZE_LARGE)
return d
@ -298,13 +314,21 @@ class PeerSelection(unittest.TestCase):
self.failUnlessEqual(len(u.key), 16)
self.failUnlessEqual(u.size, size)
def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
p = {"k": k,
"happy": happy,
"n": n,
"max_segment_size": max_segsize,
}
self.node.DEFAULT_ENCODING_PARAMETERS = p
def test_one_each(self):
# if we have 50 shares, and there are 50 peers, and they all accept a
# share, we should get exactly one share per peer
self.make_client()
data = self.get_data(SIZE_LARGE)
self.u.DEFAULT_ENCODING_PARAMETERS = (25, 30, 50)
self.set_encoding_parameters(25, 30, 50)
d = self.u.upload_data(data)
d.addCallback(self._check_large, SIZE_LARGE)
def _check(res):
@ -321,7 +345,7 @@ class PeerSelection(unittest.TestCase):
self.make_client()
data = self.get_data(SIZE_LARGE)
self.u.DEFAULT_ENCODING_PARAMETERS = (50, 75, 100)
self.set_encoding_parameters(50, 75, 100)
d = self.u.upload_data(data)
d.addCallback(self._check_large, SIZE_LARGE)
def _check(res):
@ -338,7 +362,7 @@ class PeerSelection(unittest.TestCase):
self.make_client()
data = self.get_data(SIZE_LARGE)
self.u.DEFAULT_ENCODING_PARAMETERS = (24, 41, 51)
self.set_encoding_parameters(24, 41, 51)
d = self.u.upload_data(data)
d.addCallback(self._check_large, SIZE_LARGE)
def _check(res):
@ -365,7 +389,7 @@ class PeerSelection(unittest.TestCase):
self.make_client()
data = self.get_data(SIZE_LARGE)
self.u.DEFAULT_ENCODING_PARAMETERS = (100, 150, 200)
self.set_encoding_parameters(100, 150, 200)
d = self.u.upload_data(data)
d.addCallback(self._check_large, SIZE_LARGE)
def _check(res):
@ -382,7 +406,7 @@ class PeerSelection(unittest.TestCase):
self.make_client(3)
data = self.get_data(SIZE_LARGE)
self.u.DEFAULT_ENCODING_PARAMETERS = (3, 5, 10)
self.set_encoding_parameters(3, 5, 10)
d = self.u.upload_data(data)
d.addCallback(self._check_large, SIZE_LARGE)
def _check(res):

View File

@ -21,6 +21,12 @@ from pycryptopp.cipher.aes import AES
from cStringIO import StringIO
KiB=1024
MiB=1024*KiB
GiB=1024*MiB
TiB=1024*GiB
PiB=1024*TiB
class HaveAllPeersError(Exception):
# we use this to jump out of the loop
pass
@ -323,28 +329,66 @@ class EncryptAnUploadable:
IEncryptedUploadable."""
implements(IEncryptedUploadable)
def __init__(self, original, options={}):
def __init__(self, original, default_encoding_parameters):
self.original = original
self._options = options
assert isinstance(default_encoding_parameters, dict)
self._default_encoding_parameters = default_encoding_parameters
self._encryptor = None
self._plaintext_hasher = plaintext_hasher()
self._plaintext_segment_hasher = None
self._plaintext_segment_hashes = []
self._params = None
self._encoding_parameters = None
self._file_size = None
def get_size(self):
return self.original.get_size()
if self._file_size is not None:
return defer.succeed(self._file_size)
d = self.original.get_size()
def _got_size(size):
self._file_size = size
return size
d.addCallback(_got_size)
return d
def set_serialized_encoding_parameters(self, params):
self._params = params
def get_all_encoding_parameters(self):
if self._encoding_parameters is not None:
return defer.succeed(self._encoding_parameters)
d1 = self.get_size()
d2 = self.original.get_maximum_segment_size()
d3 = self.original.get_encoding_parameters()
d = defer.DeferredList([d1, d2, d3],
fireOnOneErrback=True, consumeErrors=True)
def _got_pieces(res):
file_size = res[0][1]
max_segsize = res[1][1]
params = res[2][1]
defaults = self._default_encoding_parameters
if max_segsize is None:
max_segsize = defaults["max_segment_size"]
if params is None:
k = defaults["k"]
happy = defaults["happy"]
n = defaults["n"]
else:
precondition(isinstance(params, tuple), params)
(k, happy, n) = params
# for small files, shrink the segment size to avoid wasting space
segsize = min(max_segsize, file_size)
# this must be a multiple of 'required_shares'==k
segsize = mathutil.next_multiple(segsize, k)
self._segment_size = segsize # used by segment hashers
self._encoding_parameters = (k, happy, n, segsize)
return self._encoding_parameters
d.addCallback(_got_pieces)
return d
def _get_encryptor(self):
if self._encryptor:
return defer.succeed(self._encryptor)
if self._params is not None:
self.original.set_serialized_encoding_parameters(self._params)
d = self.original.get_encryption_key()
def _got(key):
e = AES(key)
@ -366,9 +410,6 @@ class EncryptAnUploadable:
d.addCallback(lambda res: self._storage_index)
return d
def set_segment_size(self, segsize):
self._segment_size = segsize
def _get_segment_hasher(self):
p = self._plaintext_segment_hasher
if p:
@ -396,8 +437,13 @@ class EncryptAnUploadable:
offset += this_segment
def read_encrypted(self, length):
d = self._get_encryptor()
d.addCallback(lambda res: self.original.read(length))
# make sure our parameters have been set up first
d = self.get_all_encoding_parameters()
d.addCallback(lambda ignored: self._get_encryptor())
# then fetch the plaintext
d.addCallback(lambda ignored: self.original.read(length))
# and encrypt it..
# through the fields we go, hashing all the way, sHA! sHA! sHA!
def _got(data):
assert isinstance(data, (tuple, list)), type(data)
data = list(data)
@ -432,15 +478,13 @@ class EncryptAnUploadable:
class CHKUploader:
peer_selector_class = Tahoe2PeerSelector
def __init__(self, client, options={}):
def __init__(self, client, default_encoding_parameters):
self._client = client
self._options = options
assert isinstance(default_encoding_parameters, dict)
self._default_encoding_parameters = default_encoding_parameters
self._log_number = self._client.log("CHKUploader starting")
self._encoder = None
def set_params(self, encoding_parameters):
self._encoding_parameters = encoding_parameters
def log(self, *args, **kwargs):
if "parent" not in kwargs:
kwargs["parent"] = self._log_number
@ -457,7 +501,7 @@ class CHKUploader:
uploadable = IUploadable(uploadable)
self.log("starting upload of %s" % uploadable)
eu = EncryptAnUploadable(uploadable)
eu = EncryptAnUploadable(uploadable, self._default_encoding_parameters)
d = self.start_encrypted(eu)
def _uploaded(res):
d1 = uploadable.get_encryption_key()
@ -478,8 +522,7 @@ class CHKUploader:
def start_encrypted(self, encrypted):
eu = IEncryptedUploadable(encrypted)
self._encoder = e = encode.Encoder(self._options, self)
e.set_params(self._encoding_parameters)
self._encoder = e = encode.Encoder(self)
d = e.set_encrypted_uploadable(eu)
d.addCallback(self.locate_all_shareholders)
d.addCallback(self.set_shareholders, e)
@ -497,7 +540,7 @@ class CHKUploader:
block_size = encoder.get_param("block_size")
num_segments = encoder.get_param("num_segments")
k,desired,n = encoder.get_param("share_counts")
push_to_ourselves = self._options.get("push_to_ourselves", False)
push_to_ourselves = self._client.get_push_to_ourselves()
gs = peer_selector.get_shareholders
d = gs(self._client, storage_index, share_size, block_size,
@ -548,9 +591,8 @@ def read_this_many_bytes(uploadable, size, prepend_data=[]):
class LiteralUploader:
def __init__(self, client, options={}):
def __init__(self, client):
self._client = client
self._options = options
def set_params(self, encoding_parameters):
pass
@ -566,7 +608,7 @@ class LiteralUploader:
def close(self):
pass
class RemoteEncryptedUploabable(Referenceable):
class RemoteEncryptedUploadable(Referenceable):
implements(RIEncryptedUploadable)
def __init__(self, encrypted_uploadable):
@ -578,8 +620,9 @@ class RemoteEncryptedUploabable(Referenceable):
def remote_get_size(self):
return self._eu.get_size()
def remote_set_segment_size(self, segment_size):
self._eu.set_segment_size(segment_size)
def remote_get_all_encoding_parameters(self):
return self._eu.get_all_encoding_parameters()
def remote_read_encrypted(self, offset, length):
# we don't yet implement seek
assert offset == self._offset, "%d != %d" % (offset, self._offset)
@ -603,9 +646,10 @@ class RemoteEncryptedUploabable(Referenceable):
class AssistedUploader:
def __init__(self, helper, options={}):
def __init__(self, helper, default_encoding_parameters):
self._helper = helper
self._options = options
assert isinstance(default_encoding_parameters, dict)
self._default_encoding_parameters = default_encoding_parameters
self._log_number = log.msg("AssistedUploader starting")
def log(self, msg, parent=None, **kwargs):
@ -613,15 +657,14 @@ class AssistedUploader:
parent = self._log_number
return log.msg(msg, parent=parent, **kwargs)
def set_params(self, encoding_parameters):
self._needed_shares, happy, self._total_shares = encoding_parameters
def start(self, uploadable):
u = IUploadable(uploadable)
eu = IEncryptedUploadable(EncryptAnUploadable(u, self._options))
eu = EncryptAnUploadable(u, self._default_encoding_parameters)
self._encuploadable = eu
d = eu.get_size()
d.addCallback(self._got_size)
d.addCallback(lambda res: eu.get_all_encoding_parameters())
d.addCallback(self._got_all_encoding_parameters)
# when we get the encryption key, that will also compute the storage
# index, so this only takes one pass.
# TODO: I'm not sure it's cool to switch back and forth between
@ -637,6 +680,12 @@ class AssistedUploader:
def _got_size(self, size):
self._size = size
def _got_all_encoding_parameters(self, params):
k, happy, n, segment_size = params
# stash these for URI generation later
self._needed_shares = k
self._total_shares = n
def _got_encryption_key(self, key):
self._key = key
@ -652,10 +701,10 @@ class AssistedUploader:
if upload_helper:
self.log("helper says we need to upload")
# we need to upload the file
reu = RemoteEncryptedUploabable(self._encuploadable)
if "debug_stash_RemoteEncryptedUploadable" in self._options:
self._options["RemoteEncryptedUploabable"] = reu
if "debug_interrupt" in self._options:
reu = RemoteEncryptedUploadable(self._encuploadable)
if False: #"debug_stash_RemoteEncryptedUploadable" in self._options:
self._options["RemoteEncryptedUploadable"] = reu
if False: #"debug_interrupt" in self._options:
reu._cutoff = self._options["debug_interrupt"]
def _cutoff():
# simulate the loss of the connection to the helper
@ -680,6 +729,11 @@ class AssistedUploader:
)
return u.to_string()
class NoParameterPreferencesMixin:
def get_maximum_segment_size(self):
return defer.succeed(None)
def get_encoding_parameters(self):
return defer.succeed(None)
class ConvergentUploadMixin:
# to use this, the class it is mixed in to must have a seekable
@ -687,10 +741,6 @@ class ConvergentUploadMixin:
_params = None
_key = None
def set_serialized_encoding_parameters(self, params):
self._params = params
# ignored for now
def get_encryption_key(self):
if self._key is None:
f = self._filehandle
@ -711,16 +761,13 @@ class ConvergentUploadMixin:
class NonConvergentUploadMixin:
_key = None
def set_serialized_encoding_parameters(self, params):
pass
def get_encryption_key(self):
if self._key is None:
self._key = os.urandom(16)
return defer.succeed(self._key)
class FileHandle(ConvergentUploadMixin):
class FileHandle(ConvergentUploadMixin, NoParameterPreferencesMixin):
implements(IUploadable)
def __init__(self, filehandle):
@ -758,13 +805,6 @@ class Uploader(service.MultiService):
uploader_class = CHKUploader
URI_LIT_SIZE_THRESHOLD = 55
DEFAULT_ENCODING_PARAMETERS = (25, 75, 100)
# this is a tuple of (needed, desired, total). 'needed' is the number of
# shares required to reconstruct a file. 'desired' means that we will
# abort an upload unless we can allocate space for at least this many.
# 'total' is the total number of shares created by encoding. If everybody
# has room then this is is how many we will upload.
def __init__(self, helper_furl=None):
self._helper_furl = helper_furl
self._helper = None
@ -779,25 +819,23 @@ class Uploader(service.MultiService):
def _got_helper(self, helper):
self._helper = helper
def upload(self, uploadable, options={}):
def upload(self, uploadable):
# this returns the URI
assert self.parent
assert self.running
push_to_ourselves = self.parent.get_push_to_ourselves()
if push_to_ourselves is not None:
options["push_to_ourselves"] = push_to_ourselves
uploadable = IUploadable(uploadable)
d = uploadable.get_size()
def _got_size(size):
default_params = self.parent.get_encoding_parameters()
precondition(isinstance(default_params, dict), default_params)
precondition("max_segment_size" in default_params, default_params)
if size <= self.URI_LIT_SIZE_THRESHOLD:
uploader = LiteralUploader(self.parent, options)
uploader = LiteralUploader(self.parent)
elif self._helper:
uploader = AssistedUploader(self._helper, options)
uploader = AssistedUploader(self._helper, default_params)
else:
uploader = self.uploader_class(self.parent, options)
uploader.set_params(self.parent.get_encoding_parameters()
or self.DEFAULT_ENCODING_PARAMETERS)
uploader = self.uploader_class(self.parent, default_params)
return uploader.start(uploadable)
d.addCallback(_got_size)
def _done(res):
@ -807,9 +845,9 @@ class Uploader(service.MultiService):
return d
# utility functions
def upload_data(self, data, options={}):
return self.upload(Data(data), options)
def upload_filename(self, filename, options={}):
return self.upload(FileName(filename), options)
def upload_filehandle(self, filehandle, options={}):
return self.upload(FileHandle(filehandle), options)
def upload_data(self, data):
return self.upload(Data(data))
def upload_filename(self, filename):
return self.upload(FileName(filename))
def upload_filehandle(self, filehandle):
return self.upload(FileHandle(filehandle))