add HelperUploadResults

This splits the pb.Copyable on-wire object (HelperUploadResults) out
from the local results object (UploadResults). To maintain compatibility
with older Helpers, we have to leave pb.Copyable classes alone and
unmodified, but we want to change UploadResults to use IServers instead
of serverids. So by using a different class on the wire, and translating
to/from it on either end, we can accomplish both.
This commit is contained in:
Brian Warner 2012-05-21 21:14:00 -07:00
parent b3af012b13
commit b71234c538
3 changed files with 103 additions and 59 deletions

View File

@ -137,14 +137,13 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
def __init__(self, storage_index,
helper, storage_broker, secret_holder,
incoming_file, encoding_file,
results, log_number):
log_number):
self._storage_index = storage_index
self._helper = helper
self._incoming_file = incoming_file
self._encoding_file = encoding_file
self._upload_id = si_b2a(storage_index)[:5]
self._log_number = log_number
self._results = results
self._upload_status = upload.UploadStatus()
self._upload_status.set_helper(False)
self._upload_status.set_storage_index(storage_index)
@ -201,19 +200,31 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
# and inform the client when the upload has finished
return self._finished_observers.when_fired()
def _finished(self, uploadresults):
precondition(isinstance(uploadresults.verifycapstr, str), uploadresults.verifycapstr)
assert interfaces.IUploadResults.providedBy(uploadresults), uploadresults
r = uploadresults
v = uri.from_string(r.verifycapstr)
r.uri_extension_hash = v.uri_extension_hash
def _finished(self, ur):
precondition(isinstance(ur.verifycapstr, str), ur.verifycapstr)
assert interfaces.IUploadResults.providedBy(ur), ur
v = uri.from_string(ur.verifycapstr)
f_times = self._fetcher.get_times()
r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
r.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
r.timings["total_fetch"] = f_times["total"]
hur = upload.HelperUploadResults()
hur.timings = {"cumulative_fetch": f_times["cumulative_fetch"],
"total_fetch": f_times["total"],
}
for k in ur.timings:
hur.timings[k] = ur.timings[k]
hur.uri_extension_hash = v.uri_extension_hash
hur.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
hur.preexisting_shares = ur.preexisting_shares
hur.sharemap = ur.sharemap
hur.servermap = ur.servermap
hur.pushed_shares = ur.pushed_shares
hur.file_size = ur.file_size
hur.uri_extension_data = ur.uri_extension_data
hur.verifycapstr = ur.verifycapstr
self._reader.close()
os.unlink(self._encoding_file)
self._finished_observers.fire(r)
self._finished_observers.fire(hur)
self._helper.upload_finished(self._storage_index, v.size)
del self._reader
@ -561,7 +572,6 @@ class Helper(Referenceable):
def remote_upload_chk(self, storage_index):
self.count("chk_upload_helper.upload_requests")
r = upload.UploadResults()
lp = self.log(format="helper: upload_chk query for SI %(si)s",
si=si_b2a(storage_index))
if storage_index in self._active_uploads:
@ -569,8 +579,8 @@ class Helper(Referenceable):
uh = self._active_uploads[storage_index]
return (None, uh)
d = self._check_chk(storage_index, r, lp)
d.addCallback(self._did_chk_check, storage_index, r, lp)
d = self._check_chk(storage_index, lp)
d.addCallback(self._did_chk_check, storage_index, lp)
def _err(f):
self.log("error while checking for chk-already-in-grid",
failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg")
@ -578,7 +588,7 @@ class Helper(Referenceable):
d.addErrback(_err)
return d
def _check_chk(self, storage_index, results, lp):
def _check_chk(self, storage_index, lp):
# see if this file is already in the grid
lp2 = self.log("doing a quick check+UEBfetch",
parent=lp, level=log.NOISY)
@ -589,17 +599,18 @@ class Helper(Referenceable):
if res:
(sharemap, ueb_data, ueb_hash) = res
self.log("found file in grid", level=log.NOISY, parent=lp)
results.uri_extension_hash = ueb_hash
results.sharemap = sharemap
results.uri_extension_data = ueb_data
results.preexisting_shares = len(sharemap)
results.pushed_shares = 0
return results
hur = upload.HelperUploadResults()
hur.uri_extension_hash = ueb_hash
hur.sharemap = sharemap
hur.uri_extension_data = ueb_data
hur.preexisting_shares = len(sharemap)
hur.pushed_shares = 0
return hur
return None
d.addCallback(_checked)
return d
def _did_chk_check(self, already_present, storage_index, r, lp):
def _did_chk_check(self, already_present, storage_index, lp):
if already_present:
# the necessary results are placed in the UploadResults
self.count("chk_upload_helper.upload_already_present")
@ -618,12 +629,12 @@ class Helper(Referenceable):
uh = self._active_uploads[storage_index]
else:
self.log("creating new upload helper", parent=lp)
uh = self._make_chk_upload_helper(storage_index, r, lp)
uh = self._make_chk_upload_helper(storage_index, lp)
self._active_uploads[storage_index] = uh
self._add_upload(uh)
return (None, uh)
def _make_chk_upload_helper(self, storage_index, r, lp):
def _make_chk_upload_helper(self, storage_index, lp):
si_s = si_b2a(storage_index)
incoming_file = os.path.join(self._chk_incoming, si_s)
encoding_file = os.path.join(self._chk_encoding, si_s)
@ -631,7 +642,7 @@ class Helper(Referenceable):
self._storage_broker,
self._secret_holder,
incoming_file, encoding_file,
r, lp)
lp)
return uh
def _add_upload(self, uh):

View File

@ -32,8 +32,10 @@ from cStringIO import StringIO
class TooFullError(Exception):
pass
class UploadResults(Copyable, RemoteCopy):
implements(IUploadResults)
# HelperUploadResults are what we get from the Helper, and to retain
# backwards compatibility with old Helpers we can't change the format. We
# convert them into a local UploadResults upon receipt.
class HelperUploadResults(Copyable, RemoteCopy):
# note: don't change this string, it needs to match the value used on the
# helper, and it does *not* need to match the fully-qualified
# package/module/class name
@ -55,6 +57,19 @@ class UploadResults(Copyable, RemoteCopy):
self.preexisting_shares = None # count of shares already present
self.pushed_shares = None # count of shares we pushed
class UploadResults:
implements(IUploadResults)
def __init__(self):
self.timings = {} # dict of name to number of seconds
self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
self.file_size = None
self.ciphertext_fetched = None # how much the helper fetched
self.uri = None
self.preexisting_shares = None # count of shares already present
self.pushed_shares = None # count of shares we pushed
# our current uri_extension is 846 bytes for small files, a few bytes
# more for larger ones (since the filesize is encoded in decimal in a
@ -1179,7 +1194,7 @@ class AssistedUploader:
d.addCallback(self._contacted_helper)
return d
def _contacted_helper(self, (upload_results, upload_helper)):
def _contacted_helper(self, (helper_upload_results, upload_helper)):
now = time.time()
elapsed = now - self._time_contacting_helper_start
self._elapsed_time_contacting_helper = elapsed
@ -1197,7 +1212,7 @@ class AssistedUploader:
return d
self.log("helper says file is already uploaded", level=log.OPERATIONAL)
self._upload_status.set_progress(1, 1.0)
return upload_results
return helper_upload_results
def _convert_old_upload_results(self, upload_results):
# pre-1.3.0 helpers return upload results which contain a mapping
@ -1216,30 +1231,41 @@ class AssistedUploader:
if str in [type(v) for v in sharemap.values()]:
upload_results.sharemap = None
def _build_verifycap(self, upload_results):
def _build_verifycap(self, helper_upload_results):
self.log("upload finished, building readcap", level=log.OPERATIONAL)
self._convert_old_upload_results(upload_results)
self._convert_old_upload_results(helper_upload_results)
self._upload_status.set_status("Building Readcap")
r = upload_results
assert r.uri_extension_data["needed_shares"] == self._needed_shares
assert r.uri_extension_data["total_shares"] == self._total_shares
assert r.uri_extension_data["segment_size"] == self._segment_size
assert r.uri_extension_data["size"] == self._size
r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
uri_extension_hash=r.uri_extension_hash,
needed_shares=self._needed_shares,
total_shares=self._total_shares, size=self._size
).to_string()
hur = helper_upload_results
assert hur.uri_extension_data["needed_shares"] == self._needed_shares
assert hur.uri_extension_data["total_shares"] == self._total_shares
assert hur.uri_extension_data["segment_size"] == self._segment_size
assert hur.uri_extension_data["size"] == self._size
ur = UploadResults()
# hur.verifycap doesn't exist if already found
v = uri.CHKFileVerifierURI(self._storage_index,
uri_extension_hash=hur.uri_extension_hash,
needed_shares=self._needed_shares,
total_shares=self._total_shares,
size=self._size)
ur.verifycapstr = v.to_string()
ur.timings = hur.timings
ur.uri_extension_data = hur.uri_extension_data
ur.uri_extension_hash = hur.uri_extension_hash
ur.preexisting_shares = hur.preexisting_shares
ur.pushed_shares = hur.pushed_shares
ur.sharemap = hur.sharemap
ur.servermap = hur.servermap # not if already found
ur.ciphertext_fetched = hur.ciphertext_fetched # not if already found
now = time.time()
r.file_size = self._size
r.timings["storage_index"] = self._storage_index_elapsed
r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
if "total" in r.timings:
r.timings["helper_total"] = r.timings["total"]
r.timings["total"] = now - self._started
ur.file_size = self._size
ur.timings["storage_index"] = self._storage_index_elapsed
ur.timings["contacting_helper"] = self._elapsed_time_contacting_helper
if "total" in ur.timings:
ur.timings["helper_total"] = ur.timings["total"]
ur.timings["total"] = now - self._started
self._upload_status.set_status("Finished")
self._upload_status.set_results(r)
return r
self._upload_status.set_results(ur)
return ur
def get_upload_status(self):
return self._upload_status

View File

@ -22,24 +22,31 @@ class CHKUploadHelper_fake(offloaded.CHKUploadHelper):
def _got_size(size):
d2 = eu.get_all_encoding_parameters()
def _got_parms(parms):
# just pretend we did the upload
needed_shares, happy, total_shares, segsize = parms
ueb_data = {"needed_shares": needed_shares,
"total_shares": total_shares,
"segment_size": segsize,
"size": size,
}
self._results.uri_extension_data = ueb_data
self._results.verifycapstr = uri.CHKFileVerifierURI(self._storage_index, "x"*32,
needed_shares, total_shares,
size).to_string()
return self._results
r = upload.UploadResults()
r.preexisting_shares = 0
r.pushed_shares = total_shares
r.file_size = size
r.uri_extension_data = ueb_data
v = uri.CHKFileVerifierURI(self._storage_index, "x"*32,
needed_shares, total_shares,
size)
r.verifycapstr = v.to_string()
return r
d2.addCallback(_got_parms)
return d2
d.addCallback(_got_size)
return d
class Helper_fake_upload(offloaded.Helper):
def _make_chk_upload_helper(self, storage_index, r, lp):
def _make_chk_upload_helper(self, storage_index, lp):
si_s = si_b2a(storage_index)
incoming_file = os.path.join(self._chk_incoming, si_s)
encoding_file = os.path.join(self._chk_encoding, si_s)
@ -47,12 +54,12 @@ class Helper_fake_upload(offloaded.Helper):
self._storage_broker,
self._secret_holder,
incoming_file, encoding_file,
r, lp)
lp)
return uh
class Helper_already_uploaded(Helper_fake_upload):
def _check_chk(self, storage_index, results, lp):
res = upload.UploadResults()
def _check_chk(self, storage_index, lp):
res = upload.HelperUploadResults()
res.uri_extension_hash = hashutil.uri_extension_hash("")
# we're pretending that the file they're trying to upload was already