mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-03-11 06:43:54 +00:00
try to reduce the size of the diff relative to master
This commit is contained in:
parent
7a9f52d2e5
commit
a2b4455229
@ -1116,31 +1116,31 @@ class Uploader(QueueMixin):
|
||||
SYMLINK.log(path=fp)
|
||||
return False
|
||||
elif pathinfo.isdir:
|
||||
if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
|
||||
self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
|
||||
|
||||
db_entry = self._db.get_db_entry(relpath_u)
|
||||
self._log("isdir dbentry %r" % (db_entry,))
|
||||
if not is_new_file(pathinfo, db_entry):
|
||||
NOT_NEW_DIRECTORY.log()
|
||||
return False
|
||||
|
||||
uploadable = Data("", self._client.convergence)
|
||||
encoded_path_u += magicpath.path2magic(u"/")
|
||||
with PROCESS_DIRECTORY().context() as action:
|
||||
if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
|
||||
self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
|
||||
|
||||
db_entry = self._db.get_db_entry(relpath_u)
|
||||
self._log("isdir dbentry %r" % (db_entry,))
|
||||
if not is_new_file(pathinfo, db_entry):
|
||||
NOT_NEW_DIRECTORY.log()
|
||||
return False
|
||||
|
||||
uploadable = Data("", self._client.convergence)
|
||||
encoded_path_u += magicpath.path2magic(u"/")
|
||||
upload_d = DeferredContext(self._upload_dirnode.add_file(
|
||||
encoded_path_u, uploadable,
|
||||
metadata={"version": 0},
|
||||
overwrite=True,
|
||||
progress=item.progress,
|
||||
))
|
||||
def _dir_succeeded(ign):
|
||||
action.add_success_fields(created_directory=relpath_u)
|
||||
self._count('directories_created')
|
||||
upload_d.addCallback(_dir_succeeded)
|
||||
upload_d.addCallback(lambda ign: self._scan(relpath_u))
|
||||
upload_d.addCallback(lambda ign: True)
|
||||
return upload_d.addActionFinish()
|
||||
def _dir_succeeded(ign):
|
||||
action.add_success_fields(created_directory=relpath_u)
|
||||
self._count('directories_created')
|
||||
upload_d.addCallback(_dir_succeeded)
|
||||
upload_d.addCallback(lambda ign: self._scan(relpath_u))
|
||||
upload_d.addCallback(lambda ign: True)
|
||||
return upload_d.addActionFinish()
|
||||
elif pathinfo.isfile:
|
||||
db_entry = self._db.get_db_entry(relpath_u)
|
||||
|
||||
@ -1191,7 +1191,7 @@ class Uploader(QueueMixin):
|
||||
self._count('files_uploaded')
|
||||
return True
|
||||
d2.addCallback(_add_db_entry)
|
||||
return d2.addActionFinish()
|
||||
return d2.result
|
||||
else:
|
||||
SPECIAL_FILE.log()
|
||||
return False
|
||||
@ -1436,86 +1436,84 @@ class Downloader(QueueMixin, WriteFileMixin):
|
||||
def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
|
||||
with SCAN_REMOTE_DMD(nickname=nickname).context():
|
||||
d = DeferredContext(dirnode.list())
|
||||
def scan_listing(listing_map):
|
||||
for encoded_relpath_u in listing_map.keys():
|
||||
relpath_u = magicpath.magic2path(encoded_relpath_u)
|
||||
self._log("found %r" % (relpath_u,))
|
||||
def scan_listing(listing_map):
|
||||
for encoded_relpath_u in listing_map.keys():
|
||||
relpath_u = magicpath.magic2path(encoded_relpath_u)
|
||||
self._log("found %r" % (relpath_u,))
|
||||
|
||||
file_node, metadata = listing_map[encoded_relpath_u]
|
||||
local_dbentry = self._get_local_latest(relpath_u)
|
||||
file_node, metadata = listing_map[encoded_relpath_u]
|
||||
local_dbentry = self._get_local_latest(relpath_u)
|
||||
|
||||
# XXX FIXME this is *awefully* similar to
|
||||
# _should_download code in function etc -- can we
|
||||
# share?
|
||||
remote_version = metadata.get('version', None)
|
||||
remote_uri = file_node.get_readonly_uri()
|
||||
REMOTE_DMD_ENTRY.log(
|
||||
relpath=relpath_u,
|
||||
pathentry=local_dbentry,
|
||||
remote_version=remote_version,
|
||||
remote_uri=remote_uri,
|
||||
)
|
||||
|
||||
if (local_dbentry is None or remote_version is None or
|
||||
local_dbentry.version < remote_version or
|
||||
(local_dbentry.version == remote_version and local_dbentry.last_downloaded_uri != remote_uri)):
|
||||
ADD_TO_DOWNLOAD_QUEUE.log(relpath=relpath_u)
|
||||
if scan_batch.has_key(relpath_u):
|
||||
scan_batch[relpath_u] += [(file_node, metadata)]
|
||||
else:
|
||||
scan_batch[relpath_u] = [(file_node, metadata)]
|
||||
self._status_reporter(
|
||||
True, 'Magic folder is working',
|
||||
'Last scan: %s' % self.nice_current_time(),
|
||||
# XXX FIXME this is *awefully* similar to
|
||||
# _should_download code in function etc -- can we
|
||||
# share?
|
||||
remote_version = metadata.get('version', None)
|
||||
remote_uri = file_node.get_readonly_uri()
|
||||
REMOTE_DMD_ENTRY.log(
|
||||
relpath=relpath_u,
|
||||
pathentry=local_dbentry,
|
||||
remote_version=remote_version,
|
||||
remote_uri=remote_uri,
|
||||
)
|
||||
|
||||
d.addCallback(scan_listing)
|
||||
d.addBoth(self._logcb, "end of _scan_remote_dmd")
|
||||
return d.addActionFinish()
|
||||
if (local_dbentry is None or remote_version is None or
|
||||
local_dbentry.version < remote_version or
|
||||
(local_dbentry.version == remote_version and local_dbentry.last_downloaded_uri != remote_uri)):
|
||||
ADD_TO_DOWNLOAD_QUEUE.log(relpath=relpath_u)
|
||||
if scan_batch.has_key(relpath_u):
|
||||
scan_batch[relpath_u] += [(file_node, metadata)]
|
||||
else:
|
||||
scan_batch[relpath_u] = [(file_node, metadata)]
|
||||
self._status_reporter(
|
||||
True, 'Magic folder is working',
|
||||
'Last scan: %s' % self.nice_current_time(),
|
||||
)
|
||||
|
||||
d.addCallback(scan_listing)
|
||||
return d.addActionFinish()
|
||||
|
||||
def _scan_remote_collective(self, scan_self=False):
|
||||
scan_batch = {} # path -> [(filenode, metadata)]
|
||||
with SCAN_REMOTE_COLLECTIVE().context():
|
||||
scan_batch = {} # path -> [(filenode, metadata)]
|
||||
|
||||
d = DeferredContext(self._collective_dirnode.list())
|
||||
def scan_collective(dirmap):
|
||||
d2 = DeferredContext(defer.succeed(None))
|
||||
for dir_name in dirmap:
|
||||
(dirnode, metadata) = dirmap[dir_name]
|
||||
if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
|
||||
d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
|
||||
self._scan_remote_dmd(dir_name, dirnode, scan_batch))
|
||||
def _err(f, dir_name=dir_name):
|
||||
self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
|
||||
# XXX what should we do to make this failure more visible to users?
|
||||
d2.addErrback(_err)
|
||||
def scan_collective(dirmap):
|
||||
d2 = DeferredContext(defer.succeed(None))
|
||||
for dir_name in dirmap:
|
||||
(dirnode, metadata) = dirmap[dir_name]
|
||||
if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
|
||||
d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
|
||||
self._scan_remote_dmd(dir_name, dirnode, scan_batch))
|
||||
def _err(f, dir_name=dir_name):
|
||||
self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
|
||||
# XXX what should we do to make this failure more visible to users?
|
||||
d2.addErrback(_err)
|
||||
|
||||
return d2.result
|
||||
d.addCallback(scan_collective)
|
||||
return d2.result
|
||||
d.addCallback(scan_collective)
|
||||
|
||||
def _filter_batch_to_deque(ign):
|
||||
self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
|
||||
for relpath_u in scan_batch.keys():
|
||||
file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
|
||||
def _filter_batch_to_deque(ign):
|
||||
self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
|
||||
for relpath_u in scan_batch.keys():
|
||||
file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
|
||||
|
||||
if self._should_download(relpath_u, metadata['version'], file_node.get_readonly_uri()):
|
||||
to_dl = DownloadItem(
|
||||
relpath_u,
|
||||
PercentProgress(file_node.get_size()),
|
||||
file_node,
|
||||
metadata,
|
||||
file_node.get_size(),
|
||||
)
|
||||
to_dl.set_status('queued', self._clock.seconds())
|
||||
self._deque.append(to_dl)
|
||||
self._count("objects_queued")
|
||||
else:
|
||||
self._log("Excluding %r" % (relpath_u,))
|
||||
self._call_hook(None, 'processed', async=True) # await this maybe-Deferred??
|
||||
if self._should_download(relpath_u, metadata['version'], file_node.get_readonly_uri()):
|
||||
to_dl = DownloadItem(
|
||||
relpath_u,
|
||||
PercentProgress(file_node.get_size()),
|
||||
file_node,
|
||||
metadata,
|
||||
file_node.get_size(),
|
||||
)
|
||||
to_dl.set_status('queued', self._clock.seconds())
|
||||
self._deque.append(to_dl)
|
||||
self._count("objects_queued")
|
||||
else:
|
||||
self._log("Excluding %r" % (relpath_u,))
|
||||
self._call_hook(None, 'processed', async=True) # await this maybe-Deferred??
|
||||
|
||||
self._log("deque after = %r" % (self._deque,))
|
||||
d.addCallback(_filter_batch_to_deque)
|
||||
return d.addActionFinish()
|
||||
self._log("deque after = %r" % (self._deque,))
|
||||
d.addCallback(_filter_batch_to_deque)
|
||||
return d.addActionFinish()
|
||||
|
||||
|
||||
def _scan_delay(self):
|
||||
@ -1524,22 +1522,22 @@ class Downloader(QueueMixin, WriteFileMixin):
|
||||
def _perform_scan(self):
|
||||
with PERFORM_SCAN().context():
|
||||
d = DeferredContext(defer.maybeDeferred(self._scan_remote_collective))
|
||||
def scanned(result):
|
||||
self._status_reporter(
|
||||
True, 'Magic folder is working',
|
||||
'Last scan: %s' % self.nice_current_time(),
|
||||
)
|
||||
return result
|
||||
def scan_failed(reason):
|
||||
twlog.msg("Remote scan failed: %s" % (reason.value,))
|
||||
self._log("_scan failed: %s" % (repr(reason.value),))
|
||||
self._status_reporter(
|
||||
False, 'Remote scan has failed: %s' % str(reason.value),
|
||||
'Last attempted at %s' % self.nice_current_time(),
|
||||
)
|
||||
return None
|
||||
d.addCallbacks(scanned, scan_failed)
|
||||
return d.addActionFinish()
|
||||
def scanned(result):
|
||||
self._status_reporter(
|
||||
True, 'Magic folder is working',
|
||||
'Last scan: %s' % self.nice_current_time(),
|
||||
)
|
||||
return result
|
||||
def scan_failed(reason):
|
||||
twlog.msg("Remote scan failed: %s" % (reason.value,))
|
||||
self._log("_scan failed: %s" % (repr(reason.value),))
|
||||
self._status_reporter(
|
||||
False, 'Remote scan has failed: %s' % str(reason.value),
|
||||
'Last attempted at %s' % self.nice_current_time(),
|
||||
)
|
||||
return None
|
||||
d.addCallbacks(scanned, scan_failed)
|
||||
return d.addActionFinish()
|
||||
|
||||
def _process(self, item):
|
||||
# Downloader
|
||||
|
Loading…
x
Reference in New Issue
Block a user