offloaded: early code: most of client-side, defined the RemoteInterfaces

This commit is contained in:
Brian Warner 2008-01-08 21:18:54 -07:00
parent d4e9e3b9c4
commit db71bdae9c
3 changed files with 135 additions and 4 deletions

@ -41,7 +41,8 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
self.init_lease_secret()
self.init_storage()
self.init_options()
self.add_service(Uploader())
helper_furl = self.get_config("helper.furl")
self.add_service(Uploader(helper_furl))
self.add_service(Downloader())
self.add_service(Checker())

@ -0,0 +1,53 @@
from foolscap import RemoteInterface
from foolscap.schema import DictOf, ChoiceOf
from allmydata.interfaces import StorageIndex, Hash
UploadResults = DictOf(str, str)
class RIEncryptedUploadable(RemoteInterface):
__remote_name__ = "RIEncryptedUploadable.tahoe.allmydata.com"
def get_size():
return int
def set_segment_size(segment_size=long):
return None
def read_encrypted(offset=long, length=long):
return str
def get_plaintext_hashtree_leaves(first=int, last=int):
return ListOf(Hash)
def get_plaintext_hash():
return Hash
class RIUploadHelper(RemoteInterface):
__remote_name__ = "RIUploadHelper.tahoe.allmydata.com"
def upload(reader=RIEncryptedUploadable):
return UploadResults
class RIHelper(RemoteInterface):
__remote_name__ = "RIHelper.tahoe.allmydata.com"
def upload(si=StorageIndex):
"""See if a file with a given storage index needs uploading. The
helper will ask the appropriate storage servers to see if the file
has already been uploaded. If so, the helper will return a set of
'upload results' that includes whatever hashes are needed to build
the read-cap, and perhaps a truncated sharemap.
If the file has not yet been uploaded (or if it was only partially
uploaded), the helper will return an empty upload-results dictionary
and also an RIUploadHelper object that will take care of the upload
process. The client should call upload() on this object and pass it a
reference to an RIEncryptedUploadable object that will provide
ciphertext. When the upload is finished, the upload() method will
finish and return the upload results.
"""
return (UploadResults, ChoiceOf(RIUploadHelper, None))

@ -551,6 +551,66 @@ class LiteralUploader:
def close(self):
pass
class AssistedUploader(FileUploader):
def __init__(self, helper, options={}):
self._helper = helper
self._options = options
self._log_number = self._client.log("AssistedUploader starting")
def set_params(self, encoding_parameters):
pass
def start(self, uploadable):
uploadable = IUploadable(uploadable)
eu = IEncryptedUploadable(EncryptAnUploadable(uploadable))
self._encuploadable = eu
d = eu.get_size()
d.addCallback(self._got_size)
# when we get the encryption key, that will also compute the storage
# index, so this only takes one pass.
# TODO: I'm not sure it's cool to switch back and forth between
# the Uploadable and the IEncryptedUploadable that wraps it.
d.addCallback(lambda res: u.get_encryption_key())
d.addCallback(self._got_encryption_key)
d.addCallback(lambda res: eu.get_storage_index())
d.addCallback(self._got_storage_index)
d.addCallback(self._contact_helper)
d.addCallback(self._build_readcap)
return d
def _got_size(self, size):
self._size = size
def _got_encryption_key(self, key):
self._key = key
def _got_storage_index(self, storage_index):
self._storage_index = storage_index
def _contact_helper(self, res):
d = self._helper.callRemote("upload", self._storage_index)
d.addCallback(self._contacted_helper)
return d
def _contacted_helper(self, upload_results, upload_helper):
if upload_helper:
# we need to upload the file
reu = RemoteEncryptedUploabable(self._encuploadable)
d = upload_helper.callRemote("upload", reu)
# this Deferred will fire with the upload results
return d
return upload_results
def _build_readcap(self, upload_results):
ur = upload_results
u = uri.CHKFileURI(key=self._key,
uri_extension_hash=ur['uri_extension_hash'],
needed_shares=self._needed_shares,
total_shares=self._total_shares,
size=self._size,
)
return u.to_string()
class ConvergentUploadMixin:
# to use this, the class it is mixed in to must have a seekable
@ -636,6 +696,19 @@ class Uploader(service.MultiService):
# 'total' is the total number of shares created by encoding. If everybody
# has room then this is is how many we will upload.
def __init__(self, helper_furl=None):
self._helper_furl = helper_furl
self._helper = None
def startService(self):
service.MultiService.startService(self)
if self._helper_furl:
self.parent.tub.connectTo(self._helper_furl,
self._got_helper)
def _got_helper(self, helper):
self._helper = helper
def upload(self, uploadable, options={}, wait_for_numpeers=None):
assert wait_for_numpeers is None or isinstance(wait_for_numpeers, int), wait_for_numpeers
# this returns the URI
@ -648,10 +721,14 @@ class Uploader(service.MultiService):
uploadable = IUploadable(uploadable)
d = uploadable.get_size()
def _got_size(size):
uploader_class = self.uploader_class
if size <= self.URI_LIT_SIZE_THRESHOLD:
uploader_class = LiteralUploader
uploader = uploader_class(self.parent, options, wait_for_numpeers)
uploader = LiteralUploader(self.parent, options,
wait_for_numpeers)
elif self._helper:
uploader = AssistedUploader(self.parent, options)
else:
uploader = self.uploader_class(self.parent, options,
wait_for_numpeers)
uploader.set_params(self.parent.get_encoding_parameters()
or self.DEFAULT_ENCODING_PARAMETERS)
return uploader.start(uploadable)