Convert Downloader._process

This commit is contained in:
Jean-Paul Calderone 2019-02-26 14:26:37 -05:00
parent 248449fefa
commit 7d8d74425c

View File

@ -812,6 +812,50 @@ PERFORM_SCAN = ActionType(
u"Remote storage is being scanned for changes which need to be synchronized.",
)
_STATUS = Field.for_types(
u"status",
# Should just be unicode...
[unicode, bytes],
u"The status of an item in a processing queue.",
)
QUEUED_ITEM_STATUS_CHANGE = MessageType(
u"magic-folder:item:status-change",
[eliotutil.RELPATH, _STATUS],
u"A queued item changed status.",
)
_CONFLICT_REASON = Field.for_types(
u"conflict_reason",
[unicode, type(None)],
u"A human-readable explanation of why a file was in conflict.",
eliotutil.validateSetMembership({
u"dbentry mismatch metadata",
u"dbentry newer version",
u"last_downloaded_uri mismatch",
u"file appeared",
}),
)
CHECKING_CONFLICTS = ActionType(
u"magic-folder:item:checking-conflicts",
[],
[_IS_CONFLICT, _CONFLICT_REASON],
u"A potential download item is being checked to determine if it is in a conflicted state.",
)
REMOTE_DIRECTORY_CREATED = MessageType(
u"magic-folder:remote-directory-created",
[],
u"The downloader found a new directory in the DMD.",
)
REMOTE_DIRECTORY_DELETED = MessageType(
u"magic-folder:remote-directory-deleted",
[],
u"The downloader found a directory has been deleted from the DMD.",
)
class QueueMixin(HookMixin):
"""
A parent class for Uploader and Downloader that handles putting
@ -1024,6 +1068,10 @@ class QueuedItem(object):
if current_time is None:
current_time = time.time()
self._status_history[status] = current_time
QUEUED_ITEM_STATUS_CHANGE.log(
relpath=self.relpath_u,
status=status,
)
def status_time(self, state):
"""
@ -1750,10 +1798,8 @@ class Downloader(QueueMixin, WriteFileMixin):
self._deque.append(to_dl)
self._count("objects_queued")
else:
self._log("Excluding %r" % (relpath_u,))
self._call_hook(None, 'processed', async=True) # await this maybe-Deferred??
self._log("deque after = %r" % (self._deque,))
d.addCallback(_filter_batch_to_deque)
return d.addActionFinish()
@ -1851,47 +1897,53 @@ class Downloader(QueueMixin, WriteFileMixin):
# for this file, which is the URI under which the file was last
# uploaded.
if db_entry:
# * 2c. If any of the following are true, then classify as a conflict:
# * i. there are pending notifications of changes to ``foo``;
# * ii. the last-seen statinfo is either absent (i.e. there is
# no entry in the database for this path), or different from the
# current statinfo;
with CHECKING_CONFLICTS() as action:
if db_entry:
# * 2c. If any of the following are true, then classify as a conflict:
# * i. there are pending notifications of changes to ``foo``;
# * ii. the last-seen statinfo is either absent (i.e. there is
# no entry in the database for this path), or different from the
# current statinfo;
if current_statinfo.exists:
self._log("checking conflicts {}".format(item.relpath_u))
if (db_entry.mtime_ns != current_statinfo.mtime_ns or \
db_entry.ctime_ns != current_statinfo.ctime_ns or \
db_entry.size != current_statinfo.size):
is_conflict = True
self._log("conflict because local change0")
if db_entry.last_downloaded_uri is None \
or db_entry.last_uploaded_uri is None \
or dmd_last_downloaded_uri is None:
# we've never downloaded anything before for this
# file, but the other side might have created a new
# file "at the same time"
if db_entry.version >= item.metadata['version']:
self._log("conflict because my version >= remote version")
if current_statinfo.exists:
conflict_reason = None
if (db_entry.mtime_ns != current_statinfo.mtime_ns or \
db_entry.ctime_ns != current_statinfo.ctime_ns or \
db_entry.size != current_statinfo.size):
is_conflict = True
elif dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
is_conflict = True
self._log("conflict because dmd_last_downloaded_uri != db_entry.last_downloaded_uri")
conflict_reason = u"dbentry mismatch metadata"
else: # no local db_entry .. but has the file appeared locally meantime?
if current_statinfo.exists:
is_conflict = True
self._log("conflict because local change1")
if db_entry.last_downloaded_uri is None \
or db_entry.last_uploaded_uri is None \
or dmd_last_downloaded_uri is None:
# we've never downloaded anything before for this
# file, but the other side might have created a new
# file "at the same time"
if db_entry.version >= item.metadata['version']:
is_conflict = True
conflict_reason = u"dbentry newer version"
elif dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
is_conflict = True
conflict_reason = u"last_downloaded_uri mismatch"
else: # no local db_entry .. but has the file appeared locally meantime?
if current_statinfo.exists:
is_conflict = True
conflict_reason = u"file appeared"
action.add_success_fields(
is_conflict=is_conflict,
conflict_reason=conflict_reason,
)
if is_conflict:
self._count('objects_conflicted')
if item.relpath_u.endswith(u"/"):
if item.metadata.get('deleted', False):
self._log("rmdir(%r) ignored" % (abspath_u,))
REMOTE_DIRECTORY_DELETED.log()
else:
self._log("mkdir(%r)" % (abspath_u,))
REMOTE_DIRECTORY_CREATED.log()
d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
d.addCallback(lambda ign: abspath_u)
else:
@ -1912,7 +1964,6 @@ class Downloader(QueueMixin, WriteFileMixin):
def trap_conflicts(f):
f.trap(ConflictError)
self._log("IGNORE CONFLICT ERROR %r" % f)
return False
d.addErrback(trap_conflicts)
return d.addActionFinish()