mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-03-10 22:43:52 +00:00
clean up Helper to make later changes easier
Fix up control flow inside the Helper, to make it more friendly for later refactoring.
This commit is contained in:
parent
e60982c851
commit
0df833eac9
@ -160,6 +160,7 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
|
||||
self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
|
||||
self._finished_observers = observer.OneShotObserverList()
|
||||
|
||||
self._started = time.time()
|
||||
d = self._fetcher.when_done()
|
||||
d.addCallback(lambda res: self._reader.start())
|
||||
d.addCallback(lambda res: self.start_encrypted(self._reader))
|
||||
@ -171,25 +172,6 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
|
||||
kwargs['facility'] = "tahoe.helper.chk"
|
||||
return upload.CHKUploader.log(self, *args, **kwargs)
|
||||
|
||||
def start(self):
|
||||
self._started = time.time()
|
||||
# determine if we need to upload the file. If so, return ({},self) .
|
||||
# If not, return (UploadResults,None) .
|
||||
self.log("deciding whether to upload the file or not", level=log.NOISY)
|
||||
if os.path.exists(self._encoding_file):
|
||||
# we have the whole file, and we might be encoding it (or the
|
||||
# encode/upload might have failed, and we need to restart it).
|
||||
self.log("ciphertext already in place", level=log.UNUSUAL)
|
||||
return (self._results, self)
|
||||
if os.path.exists(self._incoming_file):
|
||||
# we have some of the file, but not all of it (otherwise we'd be
|
||||
# encoding). The caller might be useful.
|
||||
self.log("partial ciphertext already present", level=log.UNUSUAL)
|
||||
return (self._results, self)
|
||||
# we don't remember uploading this file
|
||||
self.log("no ciphertext yet", level=log.NOISY)
|
||||
return (self._results, self)
|
||||
|
||||
def remote_get_version(self):
|
||||
return self.VERSION
|
||||
|
||||
@ -197,6 +179,20 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
|
||||
# reader is an RIEncryptedUploadable. I am specified to return an
|
||||
# UploadResults dictionary.
|
||||
|
||||
# Log how much ciphertext we need to get.
|
||||
self.log("deciding whether to upload the file or not", level=log.NOISY)
|
||||
if os.path.exists(self._encoding_file):
|
||||
# we have the whole file, and we might be encoding it (or the
|
||||
# encode/upload might have failed, and we need to restart it).
|
||||
self.log("ciphertext already in place", level=log.UNUSUAL)
|
||||
elif os.path.exists(self._incoming_file):
|
||||
# we have some of the file, but not all of it (otherwise we'd be
|
||||
# encoding). The caller might be useful.
|
||||
self.log("partial ciphertext already present", level=log.UNUSUAL)
|
||||
else:
|
||||
# we don't remember uploading this file
|
||||
self.log("no ciphertext yet", level=log.NOISY)
|
||||
|
||||
# let our fetcher pull ciphertext from the reader.
|
||||
self._fetcher.add_reader(reader)
|
||||
# and also hashes
|
||||
@ -491,7 +487,6 @@ class Helper(Referenceable):
|
||||
{ },
|
||||
"application-version": str(allmydata.__full_version__),
|
||||
}
|
||||
chk_upload_helper_class = CHKUploadHelper
|
||||
MAX_UPLOAD_STATUSES = 10
|
||||
|
||||
def __init__(self, basedir, storage_broker, secret_holder,
|
||||
@ -567,44 +562,15 @@ class Helper(Referenceable):
|
||||
def remote_upload_chk(self, storage_index):
|
||||
self.count("chk_upload_helper.upload_requests")
|
||||
r = upload.UploadResults()
|
||||
si_s = si_b2a(storage_index)
|
||||
lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
|
||||
incoming_file = os.path.join(self._chk_incoming, si_s)
|
||||
encoding_file = os.path.join(self._chk_encoding, si_s)
|
||||
lp = self.log(format="helper: upload_chk query for SI %(si)s",
|
||||
si=si_b2a(storage_index))
|
||||
if storage_index in self._active_uploads:
|
||||
self.log("upload is currently active", parent=lp)
|
||||
uh = self._active_uploads[storage_index]
|
||||
return uh.start()
|
||||
return (None, uh)
|
||||
|
||||
d = self._check_for_chk_already_in_grid(storage_index, r, lp)
|
||||
def _checked(already_present):
|
||||
if already_present:
|
||||
# the necessary results are placed in the UploadResults
|
||||
self.count("chk_upload_helper.upload_already_present")
|
||||
self.log("file already found in grid", parent=lp)
|
||||
return (r, None)
|
||||
|
||||
self.count("chk_upload_helper.upload_need_upload")
|
||||
# the file is not present in the grid, by which we mean there are
|
||||
# less than 'N' shares available.
|
||||
self.log("unable to find file in the grid", parent=lp,
|
||||
level=log.NOISY)
|
||||
# We need an upload helper. Check our active uploads again in
|
||||
# case there was a race.
|
||||
if storage_index in self._active_uploads:
|
||||
self.log("upload is currently active", parent=lp)
|
||||
uh = self._active_uploads[storage_index]
|
||||
else:
|
||||
self.log("creating new upload helper", parent=lp)
|
||||
uh = self.chk_upload_helper_class(storage_index, self,
|
||||
self._storage_broker,
|
||||
self._secret_holder,
|
||||
incoming_file, encoding_file,
|
||||
r, lp)
|
||||
self._active_uploads[storage_index] = uh
|
||||
self._add_upload(uh)
|
||||
return uh.start()
|
||||
d.addCallback(_checked)
|
||||
d = self._check_chk(storage_index, r, lp)
|
||||
d.addCallback(self._did_chk_check, storage_index, r, lp)
|
||||
def _err(f):
|
||||
self.log("error while checking for chk-already-in-grid",
|
||||
failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg")
|
||||
@ -612,7 +578,7 @@ class Helper(Referenceable):
|
||||
d.addErrback(_err)
|
||||
return d
|
||||
|
||||
def _check_for_chk_already_in_grid(self, storage_index, results, lp):
|
||||
def _check_chk(self, storage_index, results, lp):
|
||||
# see if this file is already in the grid
|
||||
lp2 = self.log("doing a quick check+UEBfetch",
|
||||
parent=lp, level=log.NOISY)
|
||||
@ -628,11 +594,46 @@ class Helper(Referenceable):
|
||||
results.uri_extension_data = ueb_data
|
||||
results.preexisting_shares = len(sharemap)
|
||||
results.pushed_shares = 0
|
||||
return True
|
||||
return False
|
||||
return results
|
||||
return None
|
||||
d.addCallback(_checked)
|
||||
return d
|
||||
|
||||
def _did_chk_check(self, already_present, storage_index, r, lp):
|
||||
if already_present:
|
||||
# the necessary results are placed in the UploadResults
|
||||
self.count("chk_upload_helper.upload_already_present")
|
||||
self.log("file already found in grid", parent=lp)
|
||||
return (already_present, None)
|
||||
|
||||
self.count("chk_upload_helper.upload_need_upload")
|
||||
# the file is not present in the grid, by which we mean there are
|
||||
# less than 'N' shares available.
|
||||
self.log("unable to find file in the grid", parent=lp,
|
||||
level=log.NOISY)
|
||||
# We need an upload helper. Check our active uploads again in
|
||||
# case there was a race.
|
||||
if storage_index in self._active_uploads:
|
||||
self.log("upload is currently active", parent=lp)
|
||||
uh = self._active_uploads[storage_index]
|
||||
else:
|
||||
self.log("creating new upload helper", parent=lp)
|
||||
uh = self._make_chk_upload_helper(storage_index, r, lp)
|
||||
self._active_uploads[storage_index] = uh
|
||||
self._add_upload(uh)
|
||||
return (None, uh)
|
||||
|
||||
def _make_chk_upload_helper(self, storage_index, r, lp):
|
||||
si_s = si_b2a(storage_index)
|
||||
incoming_file = os.path.join(self._chk_incoming, si_s)
|
||||
encoding_file = os.path.join(self._chk_encoding, si_s)
|
||||
uh = CHKUploadHelper(storage_index, self,
|
||||
self._storage_broker,
|
||||
self._secret_holder,
|
||||
incoming_file, encoding_file,
|
||||
r, lp)
|
||||
return uh
|
||||
|
||||
def _add_upload(self, uh):
|
||||
self._all_uploads[uh] = None
|
||||
if self._history:
|
||||
|
@ -1,4 +1,5 @@
|
||||
import os
|
||||
from twisted.internet import defer
|
||||
from twisted.trial import unittest
|
||||
from twisted.application import service
|
||||
|
||||
@ -37,8 +38,20 @@ class CHKUploadHelper_fake(offloaded.CHKUploadHelper):
|
||||
d.addCallback(_got_size)
|
||||
return d
|
||||
|
||||
class CHKUploadHelper_already_uploaded(offloaded.CHKUploadHelper):
|
||||
def start(self):
|
||||
class Helper_fake_upload(offloaded.Helper):
|
||||
def _make_chk_upload_helper(self, storage_index, r, lp):
|
||||
si_s = si_b2a(storage_index)
|
||||
incoming_file = os.path.join(self._chk_incoming, si_s)
|
||||
encoding_file = os.path.join(self._chk_encoding, si_s)
|
||||
uh = CHKUploadHelper_fake(storage_index, self,
|
||||
self._storage_broker,
|
||||
self._secret_holder,
|
||||
incoming_file, encoding_file,
|
||||
r, lp)
|
||||
return uh
|
||||
|
||||
class Helper_already_uploaded(Helper_fake_upload):
|
||||
def _check_chk(self, storage_index, results, lp):
|
||||
res = upload.UploadResults()
|
||||
res.uri_extension_hash = hashutil.uri_extension_hash("")
|
||||
|
||||
@ -53,7 +66,7 @@ class CHKUploadHelper_already_uploaded(offloaded.CHKUploadHelper):
|
||||
"size": len(DATA),
|
||||
}
|
||||
res.uri_extension_data = ueb_data
|
||||
return (res, None)
|
||||
return defer.succeed(res)
|
||||
|
||||
class FakeClient(service.MultiService):
|
||||
DEFAULT_ENCODING_PARAMETERS = {"k":25,
|
||||
@ -101,13 +114,12 @@ class AssistedUpload(unittest.TestCase):
|
||||
# bogus host/port
|
||||
t.setLocation("bogus:1234")
|
||||
|
||||
def setUpHelper(self, basedir):
|
||||
def setUpHelper(self, basedir, helper_class=Helper_fake_upload):
|
||||
fileutil.make_dirs(basedir)
|
||||
self.helper = h = offloaded.Helper(basedir,
|
||||
self.storage_broker,
|
||||
self.secret_holder,
|
||||
None, None)
|
||||
h.chk_upload_helper_class = CHKUploadHelper_fake
|
||||
self.helper = h = helper_class(basedir,
|
||||
self.storage_broker,
|
||||
self.secret_holder,
|
||||
None, None)
|
||||
self.helper_furl = self.tub.registerReference(h)
|
||||
|
||||
def tearDown(self):
|
||||
@ -196,8 +208,7 @@ class AssistedUpload(unittest.TestCase):
|
||||
|
||||
def test_already_uploaded(self):
|
||||
self.basedir = "helper/AssistedUpload/test_already_uploaded"
|
||||
self.setUpHelper(self.basedir)
|
||||
self.helper.chk_upload_helper_class = CHKUploadHelper_already_uploaded
|
||||
self.setUpHelper(self.basedir, helper_class=Helper_already_uploaded)
|
||||
u = upload.Uploader(self.helper_furl)
|
||||
u.setServiceParent(self.s)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user