mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-25 07:31:07 +00:00
offloaded: create a Helper if 'run_helper' is non-empty
This commit is contained in:
parent
0e2ddb00be
commit
2ad84eeed8
@ -14,6 +14,7 @@ from allmydata.storage import StorageServer
|
|||||||
from allmydata.upload import Uploader
|
from allmydata.upload import Uploader
|
||||||
from allmydata.download import Downloader
|
from allmydata.download import Downloader
|
||||||
from allmydata.checker import Checker
|
from allmydata.checker import Checker
|
||||||
|
from allmydata.offloaded import Helper
|
||||||
from allmydata.control import ControlServer
|
from allmydata.control import ControlServer
|
||||||
from allmydata.introducer import IntroducerClient
|
from allmydata.introducer import IntroducerClient
|
||||||
from allmydata.util import hashutil, idlib, testutil
|
from allmydata.util import hashutil, idlib, testutil
|
||||||
@ -45,6 +46,7 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
|
|||||||
self.add_service(Uploader(helper_furl))
|
self.add_service(Uploader(helper_furl))
|
||||||
self.add_service(Downloader())
|
self.add_service(Downloader())
|
||||||
self.add_service(Checker())
|
self.add_service(Checker())
|
||||||
|
# ControlServer and Helper are attached after Tub startup
|
||||||
|
|
||||||
self.introducer_furl = self.get_config("introducer.furl", required=True)
|
self.introducer_furl = self.get_config("introducer.furl", required=True)
|
||||||
|
|
||||||
@ -135,6 +137,7 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
|
|||||||
ic.setServiceParent(self)
|
ic.setServiceParent(self)
|
||||||
|
|
||||||
self.register_control()
|
self.register_control()
|
||||||
|
self.register_helper()
|
||||||
|
|
||||||
def register_control(self):
|
def register_control(self):
|
||||||
c = ControlServer()
|
c = ControlServer()
|
||||||
@ -142,6 +145,20 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
|
|||||||
control_url = self.tub.registerReference(c)
|
control_url = self.tub.registerReference(c)
|
||||||
self.write_private_config("control.furl", control_url + "\n")
|
self.write_private_config("control.furl", control_url + "\n")
|
||||||
|
|
||||||
|
def register_helper(self):
|
||||||
|
run_helper = self.get_config("run_helper")
|
||||||
|
if not run_helper:
|
||||||
|
return
|
||||||
|
h = Helper(os.path.join(self.basedir, "helper"))
|
||||||
|
h.setServiceParent(self)
|
||||||
|
helper_furl = self.tub.registerReference(h)
|
||||||
|
# TODO: this is confusing. BASEDIR/private/helper.furl is created by
|
||||||
|
# the helper. BASEDIR/helper.furl is consumed by the client who wants
|
||||||
|
# to use the helper. I like having the filename be the same, since
|
||||||
|
# that makes 'cp' work smoothly, but the difference between config
|
||||||
|
# inputs and generated outputs is hard to see.
|
||||||
|
self.write_private_config("helper.furl", helper_furl + "\n")
|
||||||
|
|
||||||
def remote_get_versions(self):
|
def remote_get_versions(self):
|
||||||
return str(allmydata.__version__), str(self.OLDEST_SUPPORTED_VERSION)
|
return str(allmydata.__version__), str(self.OLDEST_SUPPORTED_VERSION)
|
||||||
|
|
||||||
|
@ -1,7 +1,10 @@
|
|||||||
|
|
||||||
from foolscap import RemoteInterface
|
from zope.interface import implements
|
||||||
|
from twisted.application import service
|
||||||
|
from foolscap import RemoteInterface, Referenceable
|
||||||
from foolscap.schema import DictOf, ChoiceOf, ListOf
|
from foolscap.schema import DictOf, ChoiceOf, ListOf
|
||||||
from allmydata.interfaces import StorageIndex, Hash
|
from allmydata.interfaces import StorageIndex, Hash
|
||||||
|
from allmydata.util import hashutil
|
||||||
|
|
||||||
UploadResults = DictOf(str, str)
|
UploadResults = DictOf(str, str)
|
||||||
|
|
||||||
@ -51,3 +54,66 @@ class RIHelper(RemoteInterface):
|
|||||||
"""
|
"""
|
||||||
return (UploadResults, ChoiceOf(RIUploadHelper, None))
|
return (UploadResults, ChoiceOf(RIUploadHelper, None))
|
||||||
|
|
||||||
|
|
||||||
|
class CHKUploadHelper(Referenceable):
|
||||||
|
"""I am the helper-server -side counterpart to AssistedUploader. I handle
|
||||||
|
peer selection, encoding, and share pushing. I read ciphertext from the
|
||||||
|
remote AssistedUploader.
|
||||||
|
"""
|
||||||
|
implements(RIUploadHelper)
|
||||||
|
|
||||||
|
def __init__(self, storage_index, helper):
|
||||||
|
self._finished = False
|
||||||
|
self._storage_index = storage_index
|
||||||
|
self._helper = helper
|
||||||
|
self._log_number = self._helper.log("CHKUploadHelper starting")
|
||||||
|
|
||||||
|
def log(self, msg, parent=None):
|
||||||
|
if parent is None:
|
||||||
|
parent = self._log_number
|
||||||
|
return self._client.log(msg, parent=parent)
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
# determine if we need to upload the file. If so, return ({},self) .
|
||||||
|
# If not, return (UploadResults,None) .
|
||||||
|
return ({'uri_extension_hash': hashutil.uri_extension_hash("")},self)
|
||||||
|
|
||||||
|
def remote_upload(self, reader):
|
||||||
|
# reader is an RIEncryptedUploadable. I am specified to return an
|
||||||
|
# UploadResults dictionary.
|
||||||
|
return {'uri_extension_hash': hashutil.uri_extension_hash("")}
|
||||||
|
|
||||||
|
def finished(self, res):
|
||||||
|
self._finished = True
|
||||||
|
self._helper.upload_finished(self._storage_index)
|
||||||
|
|
||||||
|
|
||||||
|
class Helper(Referenceable, service.MultiService):
|
||||||
|
implements(RIHelper)
|
||||||
|
# this is the non-distributed version. When we need to have multiple
|
||||||
|
# helpers, this object will query the farm to see if anyone has the
|
||||||
|
# storage_index of interest, and send the request off to them.
|
||||||
|
|
||||||
|
chk_upload_helper_class = CHKUploadHelper
|
||||||
|
|
||||||
|
def __init__(self, basedir):
|
||||||
|
self._basedir = basedir
|
||||||
|
self._active_uploads = {}
|
||||||
|
service.MultiService.__init__(self)
|
||||||
|
|
||||||
|
def log(self, msg, **kwargs):
|
||||||
|
if 'facility' not in kwargs:
|
||||||
|
kwargs['facility'] = "helper"
|
||||||
|
return self.parent.log(msg, **kwargs)
|
||||||
|
|
||||||
|
def remote_upload(self, storage_index):
|
||||||
|
# TODO: look on disk
|
||||||
|
if storage_index in self._active_uploads:
|
||||||
|
uh = self._active_uploads[storage_index]
|
||||||
|
else:
|
||||||
|
uh = CHKUploadHelper(storage_index, self)
|
||||||
|
self._active_uploads[storage_index] = uh
|
||||||
|
return uh.start()
|
||||||
|
|
||||||
|
def upload_finished(self, storage_index):
|
||||||
|
del self._active_uploads[storage_index]
|
||||||
|
Loading…
Reference in New Issue
Block a user