Basic remote conflict detection based on ancestor uri

Signed-off-by: Daira Hopwood <daira@jacaranda.org>
This commit is contained in:
Daira Hopwood 2015-10-27 18:44:45 +00:00
parent 7a2e021c75
commit 63ad778b7d

View File

@ -13,7 +13,7 @@ from twisted.application import service
from allmydata.util import fileutil from allmydata.util import fileutil
from allmydata.interfaces import IDirectoryNode from allmydata.interfaces import IDirectoryNode
from allmydata.util import log from allmydata.util import log
from allmydata.util.fileutil import precondition_abspath, get_pathinfo from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
from allmydata.util.assertutil import precondition from allmydata.util.assertutil import precondition
from allmydata.util.deferredutil import HookMixin from allmydata.util.deferredutil import HookMixin
from allmydata.util.encodingutil import listdir_filepath, to_filepath, \ from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
@ -428,6 +428,9 @@ class Uploader(QueueMixin):
class WriteFileMixin(object): class WriteFileMixin(object):
FUDGE_SECONDS = 10.0 FUDGE_SECONDS = 10.0
def _get_conflicted_filename(self, abspath_u):
return abspath_u + u".conflict"
def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None): def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)" self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
% (abspath_u, len(file_contents), is_conflict, now)) % (abspath_u, len(file_contents), is_conflict, now))
@ -456,6 +459,7 @@ class WriteFileMixin(object):
fileutil.write(replacement_path_u, file_contents) fileutil.write(replacement_path_u, file_contents)
os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS)) os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
if is_conflict: if is_conflict:
print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
return self._rename_conflicted_file(abspath_u, replacement_path_u) return self._rename_conflicted_file(abspath_u, replacement_path_u)
else: else:
try: try:
@ -467,7 +471,13 @@ class WriteFileMixin(object):
def _rename_conflicted_file(self, abspath_u, replacement_path_u): def _rename_conflicted_file(self, abspath_u, replacement_path_u):
self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u)) self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
conflict_path_u = abspath_u + u".conflict" conflict_path_u = self._get_conflicted_filename(abspath_u)
print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
if os.path.isfile(replacement_path_u):
print "%r exists" % (replacement_path_u,)
if os.path.isfile(conflict_path_u):
print "%r exists" % (conflict_path_u,)
fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u) fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
return conflict_path_u return conflict_path_u
@ -646,18 +656,8 @@ class Downloader(QueueMixin, WriteFileMixin):
(relpath_u, file_node, metadata) = item (relpath_u, file_node, metadata) = item
fp = self._get_filepath(relpath_u) fp = self._get_filepath(relpath_u)
abspath_u = unicode_from_filepath(fp) abspath_u = unicode_from_filepath(fp)
conflict_path_u = self._get_conflicted_filename(abspath_u)
d = defer.succeed(None) d = defer.succeed(None)
if relpath_u.endswith(u"/"):
self._log("mkdir(%r)" % (abspath_u,))
d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
d.addCallback(lambda ign: abspath_u)
else:
d.addCallback(lambda ign: file_node.download_best_version())
if metadata.get('deleted', False):
d.addCallback(lambda result: self._unlink_deleted_file(abspath_u, result))
else:
d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents, is_conflict=False))
def do_update_db(written_abspath_u): def do_update_db(written_abspath_u):
filecap = file_node.get_uri() filecap = file_node.get_uri()
@ -665,6 +665,7 @@ class Downloader(QueueMixin, WriteFileMixin):
last_downloaded_uri = filecap last_downloaded_uri = filecap
last_downloaded_timestamp = now last_downloaded_timestamp = now
written_pathinfo = get_pathinfo(written_abspath_u) written_pathinfo = get_pathinfo(written_abspath_u)
if not written_pathinfo.exists and not metadata.get('deleted', False): if not written_pathinfo.exists and not metadata.get('deleted', False):
raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u)) raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
@ -675,11 +676,45 @@ class Downloader(QueueMixin, WriteFileMixin):
self._log("download failed: %s" % (str(f),)) self._log("download failed: %s" % (str(f),))
self._count('objects_failed') self._count('objects_failed')
return f return f
if os.path.isfile(conflict_path_u):
def fail(res):
raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
d.addCallback(fail)
else:
is_conflict = False
if self._db.check_file_db_exists(relpath_u):
dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
local_last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
print "metadata %r" % (metadata,)
print "<<<<--- if %r != %r" % (dmd_last_downloaded_uri, local_last_downloaded_uri)
if dmd_last_downloaded_uri is not None and dmd_last_downloaded_uri != local_last_downloaded_uri:
is_conflict = True
#dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
#local_last_uploaded_uri = ...
if relpath_u.endswith(u"/"):
self._log("mkdir(%r)" % (abspath_u,))
d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
d.addCallback(lambda ign: abspath_u)
else:
d.addCallback(lambda ign: file_node.download_best_version())
if metadata.get('deleted', False):
d.addCallback(lambda result: self._unlink_deleted_file(abspath_u, result))
else:
d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
is_conflict=is_conflict))
d.addCallbacks(do_update_db, failed) d.addCallbacks(do_update_db, failed)
def remove_from_pending(res): def remove_from_pending(res):
self._pending.remove(relpath_u) self._pending.remove(relpath_u)
return res return res
d.addBoth(remove_from_pending) d.addBoth(remove_from_pending)
def trap_conflicts(f):
f.trap(ConflictError)
return None
d.addErrback(trap_conflicts)
return d return d
def _unlink_deleted_file(self, abspath_u, result): def _unlink_deleted_file(self, abspath_u, result):