Simplify _scan_remote_* and remove Downloader._download_scan_batch attribute.

Signed-off-by: Daira Hopwood <daira@jacaranda.org>
This commit is contained in:
Daira Hopwood 2015-11-03 02:31:34 +00:00
parent f14d24b140
commit 5938535b87

View File

@ -515,7 +515,6 @@ class Downloader(QueueMixin, WriteFileMixin):
self._upload_readonly_dircap = upload_readonly_dircap
self._turn_delay = self.REMOTE_SCAN_INTERVAL
self._download_scan_batch = {} # path -> [(filenode, metadata)]
def start_scanning(self):
self._log("start_scanning")
@ -589,14 +588,8 @@ class Downloader(QueueMixin, WriteFileMixin):
collective_dirmap_d.addCallback(highest_version)
return collective_dirmap_d
def _append_to_batch(self, name, file_node, metadata):
if self._download_scan_batch.has_key(name):
self._download_scan_batch[name] += [(file_node, metadata)]
else:
self._download_scan_batch[name] = [(file_node, metadata)]
def _scan_remote(self, nickname, dirnode):
self._log("_scan_remote nickname %r" % (nickname,))
def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
self._log("_scan_remote_dmd nickname %r" % (nickname,))
d = dirnode.list()
def scan_listing(listing_map):
for encoded_relpath_u in listing_map.keys():
@ -607,16 +600,21 @@ class Downloader(QueueMixin, WriteFileMixin):
local_version = self._get_local_latest(relpath_u)
remote_version = metadata.get('version', None)
self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
if local_version is None or remote_version is None or local_version < remote_version:
self._log("%r added to download queue" % (relpath_u,))
self._append_to_batch(relpath_u, file_node, metadata)
if scan_batch.has_key(relpath_u):
scan_batch[relpath_u] += [(file_node, metadata)]
else:
scan_batch[relpath_u] = [(file_node, metadata)]
d.addCallback(scan_listing)
d.addBoth(self._logcb, "end of _scan_remote")
d.addBoth(self._logcb, "end of _scan_remote_dmd")
return d
def _scan_remote_collective(self):
self._log("_scan_remote_collective")
self._download_scan_batch = {} # XXX
scan_batch = {} # path -> [(filenode, metadata)]
d = self._collective_dirnode.list()
def scan_collective(dirmap):
@ -624,39 +622,32 @@ class Downloader(QueueMixin, WriteFileMixin):
for dir_name in dirmap:
(dirnode, metadata) = dirmap[dir_name]
if dirnode.get_readonly_uri() != self._upload_readonly_dircap:
d2.addCallback(lambda ign, dir_name=dir_name: self._scan_remote(dir_name, dirnode))
d2.addCallback(lambda ign, dir_name=dir_name:
self._scan_remote_dmd(dir_name, dirnode, scan_batch))
def _err(f):
self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
# XXX what should we do to make this failure more visible to users?
d2.addErrback(_err)
return d2
d.addCallback(scan_collective)
d.addCallback(self._filter_scan_batch)
d.addCallback(self._add_batch_to_download_queue)
def _filter_batch_to_deque(ign):
self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
for relpath_u in scan_batch.keys():
file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
if self._should_download(relpath_u, metadata['version']):
self._deque.append( (relpath_u, file_node, metadata) )
else:
self._log("Excluding %r" % (relpath_u,))
self._count('objects_excluded')
self._call_hook(None, 'processed')
self._log("deque after = %r" % (self._deque,))
d.addCallback(_filter_batch_to_deque)
return d
def _add_batch_to_download_queue(self, result):
self._log("result = %r" % (result,))
self._log("deque = %r" % (self._deque,))
self._deque.extend(result)
self._log("deque after = %r" % (self._deque,))
self._count('objects_queued', len(result))
def _filter_scan_batch(self, result):
self._log("_filter_scan_batch")
extension = [] # consider whether this should be a dict
for relpath_u in self._download_scan_batch.keys():
if relpath_u in self._pending:
continue
file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
if self._should_download(relpath_u, metadata['version']):
extension += [(relpath_u, file_node, metadata)]
else:
self._log("Excluding %r" % (relpath_u,))
self._count('objects_excluded')
self._call_hook(None, 'processed')
return extension
def _when_queue_is_empty(self):
d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
d.addBoth(self._logcb, "after _scan_remote_collective 1")