immutable: refactor uploader to do just encoding-and-uploading, not encryption

This makes Uploader take an EncryptedUploadable object instead of an Uploadable object.  I also changed it to return a verify cap instead of a tuple of the bits of data that one finds in a verify cap.
This will facilitate hooking together an Uploader and a Downloader to make a Repairer.
Also move offloaded.py into src/allmydata/immutable/.
This commit is contained in:
Zooko O'Whielacronx 2009-01-06 21:48:22 -07:00
parent 81add135dc
commit c85f75bb08
8 changed files with 90 additions and 91 deletions

@ -15,7 +15,7 @@ from allmydata.storage import StorageServer
from allmydata.immutable.upload import Uploader
from allmydata.immutable.download import Downloader
from allmydata.immutable.filenode import FileNode, LiteralFileNode
from allmydata.offloaded import Helper
from allmydata.immutable.offloaded import Helper
from allmydata.control import ControlServer
from allmydata.introducer.client import IntroducerClient
from allmydata.util import hashutil, base32, pollmixin, cachedir

@ -197,6 +197,8 @@ class Encoder(object):
self.landlords = landlords.copy()
def start(self):
""" Returns a Deferred that will fire with the verify cap (an instance of
uri.CHKFileVerifierURI)."""
self.log("%s starting" % (self,))
#paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
assert self._codec
@ -637,8 +639,8 @@ class Encoder(object):
# update our sharemap
self._shares_placed = set(self.landlords.keys())
return (self.uri_extension_hash, self.required_shares,
self.num_shares, self.file_size)
return uri.CHKFileVerifierURI(self._storage_index, self.uri_extension_hash,
self.required_shares, self.num_shares, self.file_size)
def err(self, f):
self.log("upload failed", failure=f, level=log.UNUSUAL)

@ -9,6 +9,7 @@ import allmydata
from allmydata import interfaces, storage, uri
from allmydata.immutable import upload
from allmydata.immutable.layout import ReadBucketProxy
from allmydata.util.assertutil import precondition
from allmydata.util import idlib, log, observer, fileutil, hashutil
@ -205,10 +206,12 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
# and inform the client when the upload has finished
return self._finished_observers.when_fired()
def _finished(self, res):
(uri_extension_hash, needed_shares, total_shares, size) = res
r = self._results
r.uri_extension_hash = uri_extension_hash
def _finished(self, uploadresults):
precondition(isinstance(uploadresults.verifycapstr, str), uploadresults.verifycapstr)
assert interfaces.IUploadResults.providedBy(uploadresults), uploadresults
r = uploadresults
v = uri.from_string(r.verifycapstr)
r.uri_extension_hash = v.uri_extension_hash
f_times = self._fetcher.get_times()
r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
r.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
@ -216,7 +219,7 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
self._reader.close()
os.unlink(self._encoding_file)
self._finished_observers.fire(r)
self._helper.upload_finished(self._storage_index, size)
self._helper.upload_finished(self._storage_index, v.size)
del self._reader
def _failed(self, f):

@ -13,7 +13,7 @@ from allmydata.util.hashutil import file_renewal_secret_hash, \
storage_index_hash, plaintext_segment_hasher, convergence_hasher
from allmydata import storage, hashtree, uri
from allmydata.immutable import encode
from allmydata.util import base32, idlib, mathutil
from allmydata.util import base32, idlib, log, mathutil
from allmydata.util.assertutil import precondition
from allmydata.util.rrefutil import get_versioned_remote_reference
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
@ -662,27 +662,21 @@ class CHKUploader:
kwargs["facility"] = "tahoe.upload"
return self._client.log(*args, **kwargs)
def start(self, uploadable):
def start(self, encrypted_uploadable):
"""Start uploading the file.
This method returns a Deferred that will fire with the URI (a
string)."""
Returns a Deferred that will fire with the UploadResults instance.
"""
self._started = time.time()
uploadable = IUploadable(uploadable)
self.log("starting upload of %s" % uploadable)
eu = IEncryptedUploadable(encrypted_uploadable)
self.log("starting upload of %s" % eu)
eu = EncryptAnUploadable(uploadable, self._log_number)
eu.set_upload_status(self._upload_status)
d = self.start_encrypted(eu)
def _uploaded(res):
d1 = uploadable.get_encryption_key()
d1.addCallback(lambda key: self._compute_uri(res, key))
return d1
d.addCallback(_uploaded)
def _done(res):
def _done(uploadresults):
self._upload_status.set_active(False)
return res
return uploadresults
d.addBoth(_done)
return d
@ -696,6 +690,7 @@ class CHKUploader:
return self._encoder.abort()
def start_encrypted(self, encrypted):
""" Returns a Deferred that will fire with the UploadResults instance. """
eu = IEncryptedUploadable(encrypted)
started = time.time()
@ -706,7 +701,6 @@ class CHKUploader:
d.addCallback(self.set_shareholders, e)
d.addCallback(lambda res: e.start())
d.addCallback(self._encrypted_done)
# this fires with the uri_extension_hash and other data
return d
def locate_all_shareholders(self, encoder, started):
@ -761,7 +755,8 @@ class CHKUploader:
assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
encoder.set_shareholders(buckets)
def _encrypted_done(self, res):
def _encrypted_done(self, verifycap):
""" Returns a Deferred that will fire with the UploadResults instance. """
r = self._results
for shnum in self._encoder.get_shares_placed():
peer_tracker = self._sharemap[shnum]
@ -779,19 +774,7 @@ class CHKUploader:
r.timings["peer_selection"] = self._peer_selection_elapsed
r.timings.update(self._encoder.get_times())
r.uri_extension_data = self._encoder.get_uri_extension_data()
return res
def _compute_uri(self, (uri_extension_hash,
needed_shares, total_shares, size),
key):
u = uri.CHKFileURI(key=key,
uri_extension_hash=uri_extension_hash,
needed_shares=needed_shares,
total_shares=total_shares,
size=size,
)
r = self._results
r.uri = u.to_string()
r.verifycapstr = verifycap.to_string()
return r
def get_upload_status(self):
@ -948,26 +931,23 @@ class AssistedUploader:
kwargs["parent"] = self._log_number
return log.msg(*args, **kwargs)
def start(self, uploadable):
def start(self, encrypted_uploadable, storage_index):
"""Start uploading the file.
Returns a Deferred that will fire with the UploadResults instance.
"""
precondition(isinstance(storage_index, str), storage_index)
self._started = time.time()
u = IUploadable(uploadable)
eu = EncryptAnUploadable(u, self._log_number)
eu = IEncryptedUploadable(encrypted_uploadable)
eu.set_upload_status(self._upload_status)
self._encuploadable = eu
self._storage_index = storage_index
d = eu.get_size()
d.addCallback(self._got_size)
d.addCallback(lambda res: eu.get_all_encoding_parameters())
d.addCallback(self._got_all_encoding_parameters)
# when we get the encryption key, that will also compute the storage
# index, so this only takes one pass.
# TODO: I'm not sure it's cool to switch back and forth between
# the Uploadable and the IEncryptedUploadable that wraps it.
d.addCallback(lambda res: u.get_encryption_key())
d.addCallback(self._got_encryption_key)
d.addCallback(lambda res: eu.get_storage_index())
d.addCallback(self._got_storage_index)
d.addCallback(self._contact_helper)
d.addCallback(self._build_readcap)
d.addCallback(self._build_verifycap)
def _done(res):
self._upload_status.set_active(False)
return res
@ -985,13 +965,6 @@ class AssistedUploader:
self._total_shares = n
self._segment_size = segment_size
def _got_encryption_key(self, key):
self._key = key
def _got_storage_index(self, storage_index):
self._storage_index = storage_index
def _contact_helper(self, res):
now = self._time_contacting_helper_start = time.time()
self._storage_index_elapsed = now - self._started
@ -1023,7 +996,7 @@ class AssistedUploader:
self._upload_status.set_results(upload_results)
return upload_results
def _build_readcap(self, upload_results):
def _build_verifycap(self, upload_results):
self.log("upload finished, building readcap")
self._upload_status.set_status("Building Readcap")
r = upload_results
@ -1031,13 +1004,11 @@ class AssistedUploader:
assert r.uri_extension_data["total_shares"] == self._total_shares
assert r.uri_extension_data["segment_size"] == self._segment_size
assert r.uri_extension_data["size"] == self._size
u = uri.CHKFileURI(key=self._key,
uri_extension_hash=r.uri_extension_hash,
needed_shares=self._needed_shares,
total_shares=self._total_shares,
size=self._size,
)
r.uri = u.to_string()
r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
uri_extension_hash=r.uri_extension_hash,
needed_shares=self._needed_shares,
total_shares=self._total_shares, size=self._size
).to_string()
now = time.time()
r.file_size = self._size
r.timings["storage_index"] = self._storage_index_elapsed
@ -1207,13 +1178,12 @@ class Data(FileHandle):
assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
FileHandle.__init__(self, StringIO(data), convergence=convergence)
class Uploader(service.MultiService):
class Uploader(service.MultiService, log.PrefixingLogMixin):
"""I am a service that allows file uploading. I am a service-child of the
Client.
"""
implements(IUploader)
name = "uploader"
uploader_class = CHKUploader
URI_LIT_SIZE_THRESHOLD = 55
MAX_UPLOAD_STATUSES = 10
@ -1224,6 +1194,7 @@ class Uploader(service.MultiService):
self._all_uploads = weakref.WeakKeyDictionary() # for debugging
self._all_upload_statuses = weakref.WeakKeyDictionary()
self._recent_upload_statuses = []
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
service.MultiService.__init__(self)
def startService(self):
@ -1233,7 +1204,7 @@ class Uploader(service.MultiService):
self._got_helper)
def _got_helper(self, helper):
log.msg("got helper connection, getting versions")
self.log("got helper connection, getting versions")
default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
{ },
"application-version": "unknown: no get_version()",
@ -1257,7 +1228,9 @@ class Uploader(service.MultiService):
def upload(self, uploadable):
# this returns the URI
"""
Returns a Deferred that will fire with the UploadResults instance.
"""
assert self.parent
assert self.running
@ -1275,12 +1248,31 @@ class Uploader(service.MultiService):
if size <= self.URI_LIT_SIZE_THRESHOLD:
uploader = LiteralUploader(self.parent)
elif self._helper:
uploader = AssistedUploader(self._helper)
return uploader.start(uploadable)
else:
uploader = self.uploader_class(self.parent)
self._add_upload(uploader)
return uploader.start(uploadable)
eu = EncryptAnUploadable(uploadable, self._parentmsgid)
d2 = defer.succeed(None)
if self._helper:
uploader = AssistedUploader(self._helper)
d2.addCallback(lambda x: eu.get_storage_index())
d2.addCallback(lambda si: uploader.start(eu, si))
else:
uploader = CHKUploader(self.parent)
d2.addCallback(lambda x: uploader.start(eu))
self._add_upload(uploader)
def turn_verifycap_into_read_cap(uploadresults):
# Generate the uri from the verifycap plus the key.
d3 = uploadable.get_encryption_key()
def put_readcap_into_results(key):
v = uri.from_string(uploadresults.verifycapstr)
r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
uploadresults.uri = r.to_string()
return uploadresults
d3.addCallback(put_readcap_into_results)
return d3
d2.addCallback(turn_verifycap_into_read_cap)
return d2
d.addCallback(_got_size)
def _done(res):
uploadable.close()

@ -1212,10 +1212,9 @@ class IEncoder(Interface):
set_encrypted_uploadable() and set_shareholders() must be called
before this can be invoked.
This returns a Deferred that fires with a tuple of
(uri_extension_hash, needed_shares, total_shares, size) when the
upload process is complete. This information, plus the encryption
key, is sufficient to construct the URI.
This returns a Deferred that fires with a verify cap when the upload process is
complete. The verifycap, plus the encryption key, is sufficient to construct the read
cap.
"""
class IDecoder(Interface):

@ -304,9 +304,9 @@ class Encode(unittest.TestCase):
d.addCallback(_ready)
def _check(res):
(uri_extension_hash, required_shares, num_shares, file_size) = res
self.failUnless(isinstance(uri_extension_hash, str))
self.failUnlessEqual(len(uri_extension_hash), 32)
verifycap = res
self.failUnless(isinstance(verifycap.uri_extension_hash, str))
self.failUnlessEqual(len(verifycap.uri_extension_hash), 32)
for i,peer in enumerate(all_shareholders):
self.failUnless(peer.closed)
self.failUnlessEqual(len(peer.blocks), NUM_SEGMENTS)
@ -475,7 +475,7 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
def recover(self, (res, key, shareholders), AVAILABLE_SHARES,
recover_mode, target=None):
(uri_extension_hash, required_shares, num_shares, file_size) = res
verifycap = res
if "corrupt_key" in recover_mode:
# we corrupt the key, so that the decrypted data is corrupted and
@ -485,10 +485,10 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
key = flip_bit(key)
u = uri.CHKFileURI(key=key,
uri_extension_hash=uri_extension_hash,
needed_shares=required_shares,
total_shares=num_shares,
size=file_size)
uri_extension_hash=verifycap.uri_extension_hash,
needed_shares=verifycap.needed_shares,
total_shares=verifycap.total_shares,
size=verifycap.size)
client = FakeClient()
if not target:

@ -5,8 +5,9 @@ from twisted.application import service
from foolscap import Tub, eventual
from foolscap.logging import log
from allmydata import offloaded, storage
from allmydata.immutable import upload
from allmydata import storage
from allmydata.immutable import offloaded, upload
from allmydata import uri
from allmydata.util import hashutil, fileutil, mathutil
from pycryptopp.cipher.aes import AES
@ -27,8 +28,10 @@ class CHKUploadHelper_fake(offloaded.CHKUploadHelper):
"size": size,
}
self._results.uri_extension_data = ueb_data
return (hashutil.uri_extension_hash(""),
needed_shares, total_shares, size)
self._results.verifycapstr = uri.CHKFileVerifierURI(self._storage_index, "x"*32,
needed_shares, total_shares,
size).to_string()
return self._results
d2.addCallback(_got_parms)
return d2
d.addCallback(_got_size)

@ -8,8 +8,8 @@ from twisted.internet import threads # CLI tests use deferToThread
from twisted.internet.error import ConnectionDone, ConnectionLost
from twisted.internet.interfaces import IConsumer, IPushProducer
import allmydata
from allmydata import uri, storage, offloaded
from allmydata.immutable import download, upload, filenode
from allmydata import uri, storage
from allmydata.immutable import download, filenode, offloaded, upload
from allmydata.util import idlib, mathutil
from allmydata.util import log, base32
from allmydata.scripts import runner