tahoe-lafs/src/allmydata/offloaded.py

116 lines
4.0 KiB
Python

from zope.interface import implements
from twisted.application import service
from twisted.internet import defer
from foolscap import Referenceable
from allmydata import upload, interfaces
class CHKUploadHelper(Referenceable, upload.CHKUploader):
"""I am the helper-server -side counterpart to AssistedUploader. I handle
peer selection, encoding, and share pushing. I read ciphertext from the
remote AssistedUploader.
"""
implements(interfaces.RICHKUploadHelper)
def __init__(self, storage_index, helper):
self._finished = False
self._storage_index = storage_index
self._helper = helper
self._log_number = self._helper.log("CHKUploadHelper starting")
self._client = helper.parent
self._wait_for_numpeers = None
self._options = {}
self.set_params( (3,7,10) ) # GACK
def start(self):
# determine if we need to upload the file. If so, return ({},self) .
# If not, return (UploadResults,None) .
#return ({'uri_extension_hash': hashutil.uri_extension_hash("")},self)
return ({}, self)
def remote_upload(self, reader):
# reader is an RIEncryptedUploadable. I am specified to return an
# UploadResults dictionary.
eu = CiphertextReader(reader, self._storage_index)
d = self.start_encrypted(eu)
def _done(res):
self.finished(self._storage_index)
(uri_extension_hash, needed_shares, total_shares, size) = res
return {'uri_extension_hash': uri_extension_hash}
d.addCallback(_done)
return d
def finished(self, res):
self._finished = True
self._helper.upload_finished(self._storage_index)
class CiphertextReader:
implements(interfaces.IEncryptedUploadable)
def __init__(self, remote_reader, storage_index):
self.rr = remote_reader
self.storage_index = storage_index
self._offset = 0
def get_size(self):
return self.rr.callRemote("get_size")
def get_storage_index(self):
return defer.succeed(self.storage_index)
def set_segment_size(self, segment_size):
return self.rr.callRemote("set_segment_size", segment_size)
def set_serialized_encoding_parameters(self, params):
pass # ??
def read_encrypted(self, length):
d = self.rr.callRemote("read_encrypted", self._offset, length)
def _done(strings):
self._offset += sum([len(data) for data in strings])
return strings
d.addCallback(_done)
return d
def get_plaintext_hashtree_leaves(self, first, last, num_segments):
return self.rr.callRemote("get_plaintext_hashtree_leaves",
first, last, num_segments)
def get_plaintext_hash(self):
return self.rr.callRemote("get_plaintext_hash")
def close(self):
# ??
return self.rr.callRemote("close")
class Helper(Referenceable, service.MultiService):
implements(interfaces.RIHelper)
# this is the non-distributed version. When we need to have multiple
# helpers, this object will become the HelperCoordinator, and will query
# the farm of Helpers to see if anyone has the storage_index of interest,
# and send the request off to them. If nobody has it, we'll choose a
# helper at random.
chk_upload_helper_class = CHKUploadHelper
def __init__(self, basedir):
self._basedir = basedir
self._active_uploads = {}
service.MultiService.__init__(self)
def log(self, msg, **kwargs):
if 'facility' not in kwargs:
kwargs['facility'] = "helper"
return self.parent.log(msg, **kwargs)
def remote_upload_chk(self, storage_index):
# TODO: look on disk
if storage_index in self._active_uploads:
uh = self._active_uploads[storage_index]
else:
uh = self.chk_upload_helper_class(storage_index, self)
self._active_uploads[storage_index] = uh
return uh.start()
def upload_finished(self, storage_index):
del self._active_uploads[storage_index]