mirror of
synced 2025-03-21 03:15:16 +00:00
add upload-results timing info for helper uploads. This changes the Helper protocol, and introduces a compatibility break
This commit is contained in:
@ -669,10 +669,3 @@ class Encoder(object):
def get_times(self):
# return a dictionary of encode+push timings
return self._times
def get_rates(self):
# return a dictionary of encode+push speeds
rates = {
"encode": self.file_size / self._times["cumulative_encoding"],
"push": self.file_size / self._times["cumulative_sending"],
return rates
@ -1376,7 +1376,7 @@ class RIControlClient(RemoteInterface):
return DictOf(Nodeid, float)
UploadResults = DictOf(str, str)
UploadResults = Any() #DictOf(str, str)
class RIEncryptedUploadable(RemoteInterface):
__remote_name__ = "RIEncryptedUploadable.tahoe.allmydata.com"
@ -1,5 +1,5 @@
import os.path, stat
import os.path, stat, time
from zope.interface import implements
from twisted.application import service
from twisted.internet import defer
@ -131,13 +131,14 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
def __init__(self, storage_index, helper,
incoming_file, encoding_file,
results, log_number):
self._storage_index = storage_index
self._helper = helper
self._incoming_file = incoming_file
self._encoding_file = encoding_file
upload_id = idlib.b2a(storage_index)[:6]
self._log_number = log_number
self._results = results
self._helper.log("CHKUploadHelper starting for SI %s" % upload_id,
@ -159,6 +160,7 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
return upload.CHKUploader.log(self, *args, **kwargs)
def start(self):
self._started = time.time()
# determine if we need to upload the file. If so, return ({},self) .
# If not, return (UploadResults,None) .
self.log("deciding whether to upload the file or not", level=log.NOISY)
@ -166,15 +168,15 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
# we have the whole file, and we might be encoding it (or the
# encode/upload might have failed, and we need to restart it).
self.log("ciphertext already in place", level=log.UNUSUAL)
return ({}, self)
return (self._results, self)
if os.path.exists(self._incoming_file):
# we have some of the file, but not all of it (otherwise we'd be
# encoding). The caller might be useful.
self.log("partial ciphertext already present", level=log.UNUSUAL)
return ({}, self)
return (self._results, self)
# we don't remember uploading this file
self.log("no ciphertext yet", level=log.NOISY)
return ({}, self)
return (self._results, self)
def remote_upload(self, reader):
# reader is an RIEncryptedUploadable. I am specified to return an
@ -190,10 +192,14 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
def _finished(self, res):
(uri_extension_hash, needed_shares, total_shares, size) = res
upload_results = {'uri_extension_hash': uri_extension_hash}
r = self._results
r.uri_extension_hash = uri_extension_hash
f_times = self._fetcher.get_times()
r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
r.timings["total_fetch"] = f_times["total"]
del self._reader
@ -248,6 +254,10 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
self._readers = []
self._started = False
self._f = None
self._times = {
"cumulative_fetch": 0.0,
"total": 0.0,
def log(self, *args, **kwargs):
if "facility" not in kwargs:
@ -264,6 +274,7 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
if self._started:
self._started = True
started = time.time()
if os.path.exists(self._encoding_file):
self.log("ciphertext already present, bypassing fetch",
@ -276,7 +287,7 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
# else.
have = os.stat(self._encoding_file)[stat.ST_SIZE]
d = self.call("read_encrypted", have-1, 1)
d.addCallback(lambda ignored: self._done2())
d.addCallback(self._done2, started)
# first, find out how large the file is going to be
@ -284,6 +295,7 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
d.addCallback(self._done2, started)
def _got_size(self, size):
@ -327,8 +339,11 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
# transfer that involves more than a few hundred chunks.
# 'fire_when_done' lives a long time, but the Deferreds returned by
# the inner _fetch() call do not.
start = time.time()
d = defer.maybeDeferred(self._fetch)
def _done(finished):
elapsed = time.time() - start
self._times["cumulative_fetch"] += elapsed
if finished:
self.log("finished reading ciphertext", level=log.NOISY)
@ -366,10 +381,11 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
os.rename(self._incoming_file, self._encoding_file)
return self._done2()
def _done2(self):
def _done2(self, _ignored, started):
self.log("done2", level=log.NOISY)
elapsed = time.time() - started
self._times["total"] = elapsed
self._readers = []
@ -382,6 +398,8 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
def when_done(self):
return self._done_observers.when_fired()
def get_times(self):
return self._times
class LocalCiphertextReader(AskUntilSuccessMixin):
@ -449,6 +467,8 @@ class Helper(Referenceable, service.MultiService):
return self.parent.log(*args, **kwargs)
def remote_upload_chk(self, storage_index):
r = upload.UploadResults()
started = time.time()
si_s = idlib.b2a(storage_index)
lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
incoming_file = os.path.join(self._chk_incoming, si_s)
@ -458,11 +478,14 @@ class Helper(Referenceable, service.MultiService):
uh = self._active_uploads[storage_index]
return uh.start()
d = self._check_for_chk_already_in_grid(storage_index, lp)
def _checked(upload_results):
if upload_results:
d = self._check_for_chk_already_in_grid(storage_index, r, lp)
def _checked(already_present):
elapsed = time.time() - started
r.timings['existence_check'] = elapsed
if already_present:
# the necessary results are placed in the UploadResults
self.log("file already found in grid", parent=lp)
return (upload_results, None)
return (r, None)
# the file is not present in the grid, by which we mean there are
# less than 'N' shares available.
@ -477,7 +500,7 @@ class Helper(Referenceable, service.MultiService):
self.log("creating new upload helper", parent=lp)
uh = self.chk_upload_helper_class(storage_index, self,
incoming_file, encoding_file,
r, lp)
self._active_uploads[storage_index] = uh
return uh.start()
@ -488,7 +511,7 @@ class Helper(Referenceable, service.MultiService):
return d
def _check_for_chk_already_in_grid(self, storage_index, lp):
def _check_for_chk_already_in_grid(self, storage_index, results, lp):
# see if this file is already in the grid
lp2 = self.log("doing a quick check+UEBfetch",
parent=lp, level=log.NOISY)
@ -499,8 +522,8 @@ class Helper(Referenceable, service.MultiService):
if res:
(sharemap, ueb_data, ueb_hash) = res
self.log("found file in grid", level=log.NOISY, parent=lp)
upload_results = {'uri_extension_hash': ueb_hash}
return upload_results
results.uri_extension_hash = ueb_hash
return True
return False
return d
@ -28,7 +28,8 @@ class CHKUploadHelper_fake(offloaded.CHKUploadHelper):
class CHKUploadHelper_already_uploaded(offloaded.CHKUploadHelper):
def start(self):
res = {'uri_extension_hash': hashutil.uri_extension_hash("")}
res = upload.UploadResults()
res.uri_extension_hash = hashutil.uri_extension_hash("")
return (res, None)
class FakeClient(service.MultiService):
@ -4,7 +4,7 @@ from zope.interface import implements
from twisted.python import failure
from twisted.internet import defer
from twisted.application import service
from foolscap import Referenceable
from foolscap import Referenceable, Copyable, RemoteCopy
from foolscap import eventual
from foolscap.logging import log
@ -36,14 +36,17 @@ class HaveAllPeersError(Exception):
class TooFullError(Exception):
class UploadResults:
class UploadResults(Copyable, RemoteCopy):
typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
copytype = typeToCopy
file_size = None
uri = None
sharemap = None # dict of shnum to placement string
servermap = None # dict of peerid to set(shnums)
def __init__(self):
self.timings = {} # dict of name to number of seconds
self.rates = {} # dict of name to rates (in bytes per second)
# our current uri_extension is 846 bytes for small files, a few bytes
# more for larger ones (since the filesize is encoded in decimal in a
@ -597,15 +600,19 @@ class CHKUploader:
def start_encrypted(self, encrypted):
eu = IEncryptedUploadable(encrypted)
started = time.time()
self._encoder = e = encode.Encoder(self._log_number)
d = e.set_encrypted_uploadable(eu)
d.addCallback(self.locate_all_shareholders, started)
d.addCallback(self.set_shareholders, e)
d.addCallback(lambda res: e.start())
# this fires with the uri_extension_hash and other data
return d
def locate_all_shareholders(self, encoder):
def locate_all_shareholders(self, encoder, started):
peer_selection_started = now = time.time()
self._storage_index_elapsed = now - started
storage_index = encoder.get_param("storage_index")
upload_id = idlib.b2a(storage_index)[:6]
self.log("using storage index %s" % upload_id)
@ -621,7 +628,7 @@ class CHKUploader:
share_size, block_size,
num_segments, n, desired)
def _done(res):
self._peer_selection_finished = time.time()
self._peer_selection_elapsed = time.time() - peer_selection_started
return res
return d
@ -642,6 +649,26 @@ class CHKUploader:
assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
def _encrypted_done(self, res):
r = self._results
r.sharemap = {}
r.servermap = {}
for shnum in self._encoder.get_shares_placed():
peer_tracker = self._sharemap[shnum]
peerid = peer_tracker.peerid
peerid_s = idlib.shortnodeid_b2a(peerid)
r.sharemap[shnum] = "Placed on [%s]" % peerid_s
if peerid not in r.servermap:
r.servermap[peerid] = set()
now = time.time()
r.file_size = self._encoder.file_size
r.timings["total"] = now - self._started
r.timings["storage_index"] = self._storage_index_elapsed
r.timings["peer_selection"] = self._peer_selection_elapsed
return res
def _compute_uri(self, (uri_extension_hash,
needed_shares, total_shares, size),
@ -653,24 +680,6 @@ class CHKUploader:
r = self._results
r.uri = u.to_string()
r.sharemap = {}
r.servermap = {}
for shnum in self._encoder.get_shares_placed():
peer_tracker = self._sharemap[shnum]
peerid = peer_tracker.peerid
peerid_s = idlib.shortnodeid_b2a(peerid)
r.sharemap[shnum] = "Placed on [%s]" % peerid_s
if peerid not in r.servermap:
r.servermap[peerid] = set()
peer_selection_time = (self._peer_selection_finished
- self._peer_selection_started)
now = time.time()
r.timings["total"] = now - self._started
r.rates["total"] = 1.0 * self._encoder.file_size / r.timings["total"]
r.timings["peer_selection"] = peer_selection_time
return r
@ -703,7 +712,10 @@ class LiteralUploader:
def start(self, uploadable):
uploadable = IUploadable(uploadable)
d = uploadable.get_size()
d.addCallback(lambda size: read_this_many_bytes(uploadable, size))
def _got_size(size):
self._results.file_size = size
return read_this_many_bytes(uploadable, size)
d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
d.addCallback(lambda u: u.to_string())
@ -794,7 +806,6 @@ class AssistedUploader:
assert isinstance(default_encoding_parameters, dict)
self._default_encoding_parameters = default_encoding_parameters
self._log_number = log.msg("AssistedUploader starting")
self._results = UploadResults()
def log(self, msg, parent=None, **kwargs):
if parent is None:
@ -838,16 +849,16 @@ class AssistedUploader:
self._storage_index = storage_index
def _contact_helper(self, res):
now = self._time_contacting_helper = time.time()
self._results.timings["local_hashing"] = now - self._started
now = self._time_contacting_helper_start = time.time()
self._storage_index_elapsed = now - self._started
self.log("contacting helper..")
d = self._helper.callRemote("upload_chk", self._storage_index)
return d
def _contacted_helper(self, (upload_results, upload_helper)):
now = time.time()
elapsed = now - self._time_contacting_helper
self._results.timings["contacting_helper"] = elapsed
elapsed = now - self._time_contacting_helper_start
self._elapsed_time_contacting_helper = elapsed
if upload_helper:
self.log("helper says we need to upload")
# we need to upload the file
@ -883,18 +894,21 @@ class AssistedUploader:
def _build_readcap(self, upload_results):
self.log("upload finished, building readcap")
ur = upload_results
r = upload_results
u = uri.CHKFileURI(key=self._key,
r = self._results
r.uri = u.to_string()
now = time.time()
r.file_size = self._size
r.timings["storage_index"] = self._storage_index_elapsed
r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
if "total" in r.timings:
r.timings["helper_total"] = r.timings["total"]
r.timings["total"] = now - self._started
r.rates["total"] = 1.0 * self._size / r.timings["total"]
return r
class NoParameterPreferencesMixin:
@ -18,9 +18,12 @@
<li>Servermap: <span n:render="servermap" /></li>
<li>File Size: <span n:render="string" n:data="file_size" /> bytes</li>
<li>Total: <span n:render="time" n:data="time_total" />
(<span n:render="rate" n:data="rate_total" />)</li>
<li>Storage Index: <span n:render="time" n:data="time_storage_index" />
(<span n:render="rate" n:data="rate_storage_index" />)</li>
<li>Peer Selection: <span n:render="time" n:data="time_peer_selection" /></li>
<li>Encode And Push: <span n:render="time" n:data="time_total_encode_and_push" /></li>
@ -1303,6 +1303,11 @@ class UnlinkedPOSTCHKUploader(rend.Page):
return d
def data_file_size(self, ctx, data):
d = self.upload_results()
d.addCallback(lambda res: res.file_size)
return d
def render_time(self, ctx, data):
# 1.23s, 790ms, 132us
if data is None:
@ -1327,50 +1332,57 @@ class UnlinkedPOSTCHKUploader(rend.Page):
return "%.1fkBps" % (r/1000)
return "%dBps" % r
def data_time_total(self, ctx, data):
def _get_time(self, name):
d = self.upload_results()
d.addCallback(lambda res: res.timings.get("total"))
d.addCallback(lambda res: res.timings.get(name))
return d
def data_time_total(self, ctx, data):
return self._get_time("total")
def data_time_storage_index(self, ctx, data):
return self._get_time("storage_index")
def data_time_peer_selection(self, ctx, data):
d = self.upload_results()
d.addCallback(lambda res: res.timings.get("peer_selection"))
return d
return self._get_time("peer_selection")
def data_time_total_encode_and_push(self, ctx, data):
d = self.upload_results()
d.addCallback(lambda res: res.timings.get("total_encode_and_push"))
return d
return self._get_time("total_encode_and_push")
def data_time_cumulative_encoding(self, ctx, data):
d = self.upload_results()
d.addCallback(lambda res: res.timings.get("cumulative_encoding"))
return d
return self._get_time("cumulative_encoding")
def data_time_cumulative_sending(self, ctx, data):
d = self.upload_results()
d.addCallback(lambda res: res.timings.get("cumulative_sending"))
return d
return self._get_time("cumulative_sending")
def data_time_hashes_and_close(self, ctx, data):
return self._get_time("hashes_and_close")
def _get_rate(self, name):
d = self.upload_results()
d.addCallback(lambda res: res.timings.get("hashes_and_close"))
def _convert(r):
file_size = r.file_size
time = r.timings.get(name)
if time is None:
return None
return 1.0 * file_size / time
except ZeroDivisionError:
return None
return d
def data_rate_total(self, ctx, data):
d = self.upload_results()
d.addCallback(lambda res: res.rates.get("total"))
return d
return self._get_rate("total")
def data_rate_storage_index(self, ctx, data):
return self._get_rate("storage_index")
def data_rate_encode(self, ctx, data):
d = self.upload_results()
d.addCallback(lambda res: res.rates.get("encode"))
return d
return self._get_rate("cumulative_encoding")
def data_rate_push(self, ctx, data):
d = self.upload_results()
d.addCallback(lambda res: res.rates.get("push"))
return d
return self._get_rate("cumulative_sending")
class UnlinkedPOSTSSKUploader(rend.Page):
Reference in New Issue
Block a user