mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-06-19 15:53:48 +00:00
helper status: include percentage fetched+pushed, add helper-uploads to the upload/download list
This commit is contained in:
@ -369,3 +369,11 @@ class Client(node.Node, testutil.PollMixin):
|
|||||||
def list_recent_retrieve(self):
|
def list_recent_retrieve(self):
|
||||||
watcher = self.getServiceNamed("mutable-watcher")
|
watcher = self.getServiceNamed("mutable-watcher")
|
||||||
return watcher.list_recent_retrieve()
|
return watcher.list_recent_retrieve()
|
||||||
|
|
||||||
|
def list_active_helper_statuses(self):
|
||||||
|
helper = self.getServiceNamed("helper")
|
||||||
|
return helper.get_active_upload_statuses()
|
||||||
|
def list_recent_helper_statuses(self):
|
||||||
|
helper = self.getServiceNamed("helper")
|
||||||
|
return helper.get_recent_upload_statuses()
|
||||||
|
|
||||||
|
@ -143,6 +143,9 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
|
|||||||
self._results = results
|
self._results = results
|
||||||
self._upload_status = upload.UploadStatus()
|
self._upload_status = upload.UploadStatus()
|
||||||
self._upload_status.set_helper(False)
|
self._upload_status.set_helper(False)
|
||||||
|
self._upload_status.set_storage_index(storage_index)
|
||||||
|
self._upload_status.set_status("fetching ciphertext")
|
||||||
|
self._upload_status.set_progress(0, 1.0)
|
||||||
self._helper.log("CHKUploadHelper starting for SI %s" % upload_id,
|
self._helper.log("CHKUploadHelper starting for SI %s" % upload_id,
|
||||||
parent=log_number)
|
parent=log_number)
|
||||||
|
|
||||||
@ -209,6 +212,10 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
|
|||||||
del self._reader
|
del self._reader
|
||||||
|
|
||||||
def _failed(self, f):
|
def _failed(self, f):
|
||||||
|
self.log(format="CHKUploadHelper(%(si)s) failed",
|
||||||
|
si=storage.si_b2a(self._storage_index)[:5],
|
||||||
|
failure=f,
|
||||||
|
level=log.UNUSUAL)
|
||||||
self._finished_observers.fire(f)
|
self._finished_observers.fire(f)
|
||||||
self._helper.upload_finished(self._storage_index, 0)
|
self._helper.upload_finished(self._storage_index, 0)
|
||||||
del self._reader
|
del self._reader
|
||||||
@ -306,6 +313,7 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
|
|||||||
|
|
||||||
def _got_size(self, size):
|
def _got_size(self, size):
|
||||||
self.log("total size is %d bytes" % size, level=log.NOISY)
|
self.log("total size is %d bytes" % size, level=log.NOISY)
|
||||||
|
self._upload_helper._upload_status.set_size(size)
|
||||||
self._expected_size = size
|
self._expected_size = size
|
||||||
|
|
||||||
def _start_reading(self, res):
|
def _start_reading(self, res):
|
||||||
@ -365,16 +373,16 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
|
|||||||
needed = self._expected_size - self._have
|
needed = self._expected_size - self._have
|
||||||
fetch_size = min(needed, self.CHUNK_SIZE)
|
fetch_size = min(needed, self.CHUNK_SIZE)
|
||||||
if fetch_size == 0:
|
if fetch_size == 0:
|
||||||
|
self._upload_helper._upload_status.set_progress(1, 1.0)
|
||||||
return True # all done
|
return True # all done
|
||||||
percent = 0
|
percent = 0.0
|
||||||
if self._expected_size:
|
if self._expected_size:
|
||||||
percent = 1.0 * (self._have+fetch_size) / self._expected_size
|
percent = 1.0 * (self._have+fetch_size) / self._expected_size
|
||||||
percent = int(100*percent)
|
|
||||||
self.log(format="fetching %(start)d-%(end)d of %(total)d (%(percent)d%%)",
|
self.log(format="fetching %(start)d-%(end)d of %(total)d (%(percent)d%%)",
|
||||||
start=self._have,
|
start=self._have,
|
||||||
end=self._have+fetch_size,
|
end=self._have+fetch_size,
|
||||||
total=self._expected_size,
|
total=self._expected_size,
|
||||||
percent=percent,
|
percent=int(100.0*percent),
|
||||||
level=log.NOISY)
|
level=log.NOISY)
|
||||||
d = self.call("read_encrypted", self._have, fetch_size)
|
d = self.call("read_encrypted", self._have, fetch_size)
|
||||||
def _got_data(ciphertext_v):
|
def _got_data(ciphertext_v):
|
||||||
@ -383,6 +391,7 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
|
|||||||
self._have += len(data)
|
self._have += len(data)
|
||||||
self._ciphertext_fetched += len(data)
|
self._ciphertext_fetched += len(data)
|
||||||
self._upload_helper._helper.count("chk_upload_helper.fetched_bytes", len(data))
|
self._upload_helper._helper.count("chk_upload_helper.fetched_bytes", len(data))
|
||||||
|
self._upload_helper._upload_status.set_progress(1, percent)
|
||||||
return False # not done
|
return False # not done
|
||||||
d.addCallback(_got_data)
|
d.addCallback(_got_data)
|
||||||
return d
|
return d
|
||||||
@ -432,6 +441,7 @@ class LocalCiphertextReader(AskUntilSuccessMixin):
|
|||||||
self._status = interfaces.IUploadStatus(upload_status)
|
self._status = interfaces.IUploadStatus(upload_status)
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
|
self._upload_helper._upload_status.set_status("pushing")
|
||||||
self._size = os.stat(self._encoding_file)[stat.ST_SIZE]
|
self._size = os.stat(self._encoding_file)[stat.ST_SIZE]
|
||||||
self.f = open(self._encoding_file, "rb")
|
self.f = open(self._encoding_file, "rb")
|
||||||
|
|
||||||
@ -471,6 +481,7 @@ class Helper(Referenceable, service.MultiService):
|
|||||||
|
|
||||||
name = "helper"
|
name = "helper"
|
||||||
chk_upload_helper_class = CHKUploadHelper
|
chk_upload_helper_class = CHKUploadHelper
|
||||||
|
MAX_UPLOAD_STATUSES = 10
|
||||||
|
|
||||||
def __init__(self, basedir, stats_provider=None):
|
def __init__(self, basedir, stats_provider=None):
|
||||||
self._basedir = basedir
|
self._basedir = basedir
|
||||||
@ -479,6 +490,7 @@ class Helper(Referenceable, service.MultiService):
|
|||||||
fileutil.make_dirs(self._chk_incoming)
|
fileutil.make_dirs(self._chk_incoming)
|
||||||
fileutil.make_dirs(self._chk_encoding)
|
fileutil.make_dirs(self._chk_encoding)
|
||||||
self._active_uploads = {}
|
self._active_uploads = {}
|
||||||
|
self._recent_upload_statuses = []
|
||||||
self.stats_provider = stats_provider
|
self.stats_provider = stats_provider
|
||||||
if stats_provider:
|
if stats_provider:
|
||||||
stats_provider.register_producer(self)
|
stats_provider.register_producer(self)
|
||||||
@ -612,4 +624,16 @@ class Helper(Referenceable, service.MultiService):
|
|||||||
|
|
||||||
def upload_finished(self, storage_index, size):
|
def upload_finished(self, storage_index, size):
|
||||||
self.count("chk_upload_helper.encoded_bytes", size)
|
self.count("chk_upload_helper.encoded_bytes", size)
|
||||||
|
uh = self._active_uploads[storage_index]
|
||||||
del self._active_uploads[storage_index]
|
del self._active_uploads[storage_index]
|
||||||
|
s = uh.get_upload_status()
|
||||||
|
s.set_active(False)
|
||||||
|
self._recent_upload_statuses.append(s)
|
||||||
|
while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
|
||||||
|
self._recent_upload_statuses.pop(0)
|
||||||
|
|
||||||
|
def get_active_upload_statuses(self):
|
||||||
|
return [u.get_upload_status() for u in self._active_uploads.values()]
|
||||||
|
|
||||||
|
def get_recent_upload_statuses(self):
|
||||||
|
return self._recent_upload_statuses
|
||||||
|
@ -94,7 +94,10 @@ class FakeClient(service.MultiService):
|
|||||||
return []
|
return []
|
||||||
def list_recent_retrieve(self):
|
def list_recent_retrieve(self):
|
||||||
return []
|
return []
|
||||||
|
def list_active_helper_statuses(self):
|
||||||
|
return []
|
||||||
|
def list_recent_helper_statuses(self):
|
||||||
|
return []
|
||||||
|
|
||||||
class WebMixin(object):
|
class WebMixin(object):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
@ -659,17 +659,21 @@ class Status(rend.Page):
|
|||||||
addSlash = True
|
addSlash = True
|
||||||
|
|
||||||
def data_active_operations(self, ctx, data):
|
def data_active_operations(self, ctx, data):
|
||||||
active = (IClient(ctx).list_active_uploads() +
|
client = IClient(ctx)
|
||||||
IClient(ctx).list_active_downloads() +
|
active = (client.list_active_uploads() +
|
||||||
IClient(ctx).list_active_publish() +
|
client.list_active_downloads() +
|
||||||
IClient(ctx).list_active_retrieve())
|
client.list_active_publish() +
|
||||||
|
client.list_active_retrieve() +
|
||||||
|
client.list_active_helper_statuses())
|
||||||
return active
|
return active
|
||||||
|
|
||||||
def data_recent_operations(self, ctx, data):
|
def data_recent_operations(self, ctx, data):
|
||||||
recent = [o for o in (IClient(ctx).list_recent_uploads() +
|
client = IClient(ctx)
|
||||||
IClient(ctx).list_recent_downloads() +
|
recent = [o for o in (client.list_recent_uploads() +
|
||||||
IClient(ctx).list_recent_publish() +
|
client.list_recent_downloads() +
|
||||||
IClient(ctx).list_recent_retrieve())
|
client.list_recent_publish() +
|
||||||
|
client.list_recent_retrieve() +
|
||||||
|
client.list_recent_helper_statuses())
|
||||||
if not o.get_active()]
|
if not o.get_active()]
|
||||||
recent.sort(lambda a,b: cmp(a.get_started(), b.get_started()))
|
recent.sort(lambda a,b: cmp(a.get_started(), b.get_started()))
|
||||||
recent.reverse()
|
recent.reverse()
|
||||||
@ -735,6 +739,12 @@ class Status(rend.Page):
|
|||||||
s = u.get_upload_status()
|
s = u.get_upload_status()
|
||||||
if s.get_counter() == count:
|
if s.get_counter() == count:
|
||||||
return UploadStatusPage(s)
|
return UploadStatusPage(s)
|
||||||
|
for s in (client.list_active_helper_statuses() +
|
||||||
|
client.list_recent_helper_statuses()):
|
||||||
|
if s.get_counter() == count:
|
||||||
|
# immutable-upload helpers use the same status object as
|
||||||
|
# a regular immutable-upload
|
||||||
|
return UploadStatusPage(s)
|
||||||
if stype == "down":
|
if stype == "down":
|
||||||
for s in client.list_recent_downloads():
|
for s in client.list_recent_downloads():
|
||||||
if s.get_counter() == count:
|
if s.get_counter() == count:
|
||||||
|
Reference in New Issue
Block a user