mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-20 13:33:09 +00:00
offloaded: cleanup to handle multiple simultaneous uploaders gracefully
This commit is contained in:
parent
7bb9307871
commit
f0430ccc48
@ -4,9 +4,12 @@ from twisted.application import service
|
||||
from twisted.internet import defer
|
||||
from foolscap import Referenceable
|
||||
from allmydata import upload, interfaces
|
||||
from allmydata.util import idlib
|
||||
from allmydata.util import idlib, log, observer
|
||||
|
||||
|
||||
class NotEnoughWritersError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class CHKUploadHelper(Referenceable, upload.CHKUploader):
|
||||
"""I am the helper-server -side counterpart to AssistedUploader. I handle
|
||||
@ -16,7 +19,7 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
|
||||
implements(interfaces.RICHKUploadHelper)
|
||||
|
||||
def __init__(self, storage_index, helper, log_number, options={}):
|
||||
self._finished = False
|
||||
self._started = False
|
||||
self._storage_index = storage_index
|
||||
self._helper = helper
|
||||
upload_id = idlib.b2a(storage_index)[:6]
|
||||
@ -26,7 +29,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
|
||||
|
||||
self._client = helper.parent
|
||||
self._options = options
|
||||
self._readers = []
|
||||
self._reader = CiphertextReader(storage_index, self)
|
||||
self._finished_observers = observer.OneShotObserverList()
|
||||
|
||||
self.set_params( (3,7,10) ) # GACK
|
||||
|
||||
@ -45,59 +49,81 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
|
||||
# reader is an RIEncryptedUploadable. I am specified to return an
|
||||
# UploadResults dictionary.
|
||||
|
||||
self._readers.append(reader)
|
||||
reader.notifyOnDisconnect(self._remove_reader, reader)
|
||||
eu = CiphertextReader(reader, self._storage_index)
|
||||
d = self.start_encrypted(eu)
|
||||
def _done(res):
|
||||
self.finished(self._storage_index)
|
||||
(uri_extension_hash, needed_shares, total_shares, size) = res
|
||||
return {'uri_extension_hash': uri_extension_hash}
|
||||
d.addCallback(_done)
|
||||
return d
|
||||
self._reader.add_reader(reader)
|
||||
|
||||
def _remove_reader(self, reader):
|
||||
# NEEDS MORE
|
||||
self._readers.remove(reader)
|
||||
if not self._readers:
|
||||
if not self._finished:
|
||||
self.finished(None)
|
||||
# there is already an upload in progress, and a second uploader
|
||||
# has joined in. We will notify the second client when the upload
|
||||
# is complete, but we will not request any data from them unless
|
||||
# the first one breaks. TODO: fetch data from both clients to
|
||||
# speed the upload
|
||||
|
||||
def finished(self, res):
|
||||
self._finished = True
|
||||
if not self._started:
|
||||
self._started = True
|
||||
d = self.start_encrypted(self._reader)
|
||||
d.addCallbacks(self._finished, self._failed)
|
||||
return self._finished_observers.when_fired()
|
||||
|
||||
def _finished(self, res):
|
||||
(uri_extension_hash, needed_shares, total_shares, size) = res
|
||||
upload_results = {'uri_extension_hash': uri_extension_hash}
|
||||
self._finished_observers.fire(upload_results)
|
||||
self._helper.upload_finished(self._storage_index)
|
||||
|
||||
def _failed(self, f):
|
||||
self._finished_observers.fire(f)
|
||||
self._helper.upload_finished(self._storage_index)
|
||||
|
||||
class CiphertextReader:
|
||||
implements(interfaces.IEncryptedUploadable)
|
||||
|
||||
def __init__(self, remote_reader, storage_index):
|
||||
self.rr = remote_reader
|
||||
def __init__(self, storage_index, upload_helper):
|
||||
self._readers = []
|
||||
self.storage_index = storage_index
|
||||
self._offset = 0
|
||||
self._upload_helper = upload_helper
|
||||
|
||||
def add_reader(self, reader):
|
||||
# for now, we stick to the first uploader
|
||||
self._readers.append(reader)
|
||||
|
||||
def call(self, *args, **kwargs):
|
||||
if not self._readers:
|
||||
raise NotEnoughWritersError("ran out of assisted uploaders")
|
||||
rr = self._readers[0]
|
||||
d = rr.callRemote(*args, **kwargs)
|
||||
def _err(f):
|
||||
if rr in self._readers:
|
||||
self._readers.remove(rr)
|
||||
self._upload_helper.log("call to assisted uploader %s failed" % rr,
|
||||
failure=f, level=log.UNUSUAL)
|
||||
# we can try again with someone else who's left
|
||||
return self.call(*args, **kwargs)
|
||||
d.addErrback(_err)
|
||||
return d
|
||||
|
||||
def get_size(self):
|
||||
return self.rr.callRemote("get_size")
|
||||
return self.call("get_size")
|
||||
def get_storage_index(self):
|
||||
return defer.succeed(self.storage_index)
|
||||
def set_segment_size(self, segment_size):
|
||||
return self.rr.callRemote("set_segment_size", segment_size)
|
||||
return self.call("set_segment_size", segment_size)
|
||||
def set_serialized_encoding_parameters(self, params):
|
||||
pass # ??
|
||||
def read_encrypted(self, length):
|
||||
d = self.rr.callRemote("read_encrypted", self._offset, length)
|
||||
d = self.call("read_encrypted", self._offset, length)
|
||||
def _done(strings):
|
||||
self._offset += sum([len(data) for data in strings])
|
||||
return strings
|
||||
d.addCallback(_done)
|
||||
return d
|
||||
def get_plaintext_hashtree_leaves(self, first, last, num_segments):
|
||||
return self.rr.callRemote("get_plaintext_hashtree_leaves",
|
||||
first, last, num_segments)
|
||||
return self.call("get_plaintext_hashtree_leaves", first, last,
|
||||
num_segments)
|
||||
def get_plaintext_hash(self):
|
||||
return self.rr.callRemote("get_plaintext_hash")
|
||||
return self.call("get_plaintext_hash")
|
||||
def close(self):
|
||||
# ??
|
||||
return self.rr.callRemote("close")
|
||||
return self.call("close")
|
||||
|
||||
|
||||
class Helper(Referenceable, service.MultiService):
|
||||
|
Loading…
Reference in New Issue
Block a user