mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-19 03:06:33 +00:00
MutableFileNode.modify: pass first_time= and servermap= to the modifier callback
This commit is contained in:
parent
6ec352fe92
commit
fb9af2c7a0
@ -24,7 +24,7 @@ class Deleter:
|
||||
self.node = node
|
||||
self.name = name
|
||||
self.must_exist = True
|
||||
def modify(self, old_contents):
|
||||
def modify(self, old_contents, servermap, first_time):
|
||||
children = self.node._unpack_contents(old_contents)
|
||||
if self.name not in children:
|
||||
if self.must_exist:
|
||||
@ -42,7 +42,7 @@ class MetadataSetter:
|
||||
self.name = name
|
||||
self.metadata = metadata
|
||||
|
||||
def modify(self, old_contents):
|
||||
def modify(self, old_contents, servermap, first_time):
|
||||
children = self.node._unpack_contents(old_contents)
|
||||
if self.name not in children:
|
||||
raise NoSuchChildError(self.name)
|
||||
@ -62,7 +62,7 @@ class Adder:
|
||||
def set_node(self, name, node, metadata):
|
||||
self.entries.append( [name, node, metadata] )
|
||||
|
||||
def modify(self, old_contents):
|
||||
def modify(self, old_contents, servermap, first_time):
|
||||
children = self.node._unpack_contents(old_contents)
|
||||
now = time.time()
|
||||
for e in self.entries:
|
||||
|
@ -618,12 +618,14 @@ class IMutableFileNode(IFileNode, IMutableFilesystemNode):
|
||||
uploading the new version. I return a Deferred that fires (with a
|
||||
PublishStatus object) when the update is complete.
|
||||
|
||||
The modifier callable will be given two arguments: a string (with the
|
||||
old contents) and a servermap. As with download_best_version(), the
|
||||
old contents will be from the best recoverable version, but the
|
||||
modifier can use the servermap to make other decisions (such as
|
||||
refusing to apply the delta if there are multiple parallel versions,
|
||||
or if there is evidence of a newer unrecoverable version).
|
||||
The modifier callable will be given three arguments: a string (with
|
||||
the old contents), a 'first_time' boolean, and a servermap. As with
|
||||
download_best_version(), the old contents will be from the best
|
||||
recoverable version, but the modifier can use the servermap to make
|
||||
other decisions (such as refusing to apply the delta if there are
|
||||
multiple parallel versions, or if there is evidence of a newer
|
||||
unrecoverable version). 'first_time' will be True the first time the
|
||||
modifier is called, and False on any subsequent calls.
|
||||
|
||||
The callable should return a string with the new contents. The
|
||||
callable must be prepared to be called multiple times, and must
|
||||
|
@ -328,10 +328,12 @@ class MutableFileNode:
|
||||
I implement the following pseudocode::
|
||||
|
||||
obtain_mutable_filenode_lock()
|
||||
first_time = True
|
||||
while True:
|
||||
update_servermap(MODE_WRITE)
|
||||
old = retrieve_best_version()
|
||||
new = modifier(old, *args, **kwargs)
|
||||
new = modifier(old, servermap, first_time)
|
||||
first_time = False
|
||||
if new == old: break
|
||||
try:
|
||||
publish(new)
|
||||
@ -366,23 +368,23 @@ class MutableFileNode:
|
||||
servermap = ServerMap()
|
||||
if backoffer is None:
|
||||
backoffer = BackoffAgent().delay
|
||||
return self._modify_and_retry(servermap, modifier, backoffer)
|
||||
def _modify_and_retry(self, servermap, modifier, backoffer):
|
||||
d = self._modify_once(servermap, modifier)
|
||||
return self._modify_and_retry(servermap, modifier, backoffer, True)
|
||||
def _modify_and_retry(self, servermap, modifier, backoffer, first_time):
|
||||
d = self._modify_once(servermap, modifier, first_time)
|
||||
def _retry(f):
|
||||
f.trap(UncoordinatedWriteError)
|
||||
d2 = defer.maybeDeferred(backoffer, self, f)
|
||||
d2.addCallback(lambda ignored:
|
||||
self._modify_and_retry(servermap, modifier,
|
||||
backoffer))
|
||||
backoffer, False))
|
||||
return d2
|
||||
d.addErrback(_retry)
|
||||
return d
|
||||
def _modify_once(self, servermap, modifier):
|
||||
def _modify_once(self, servermap, modifier, first_time):
|
||||
d = self._update_servermap(servermap, MODE_WRITE)
|
||||
d.addCallback(self._once_updated_download_best_version, servermap)
|
||||
def _apply(old_contents):
|
||||
new_contents = modifier(old_contents)
|
||||
new_contents = modifier(old_contents, servermap, first_time)
|
||||
if new_contents is None or new_contents == old_contents:
|
||||
# no changes need to be made
|
||||
return
|
||||
|
@ -256,7 +256,7 @@ class FakeMutableFileNode:
|
||||
def _modify(self, modifier):
|
||||
assert not self.is_readonly()
|
||||
old_contents = self.all_contents[self.storage_index]
|
||||
self.all_contents[self.storage_index] = modifier(old_contents)
|
||||
self.all_contents[self.storage_index] = modifier(old_contents, None, True)
|
||||
return None
|
||||
|
||||
def download(self, target):
|
||||
|
@ -382,18 +382,18 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
|
||||
return d
|
||||
|
||||
def test_modify(self):
|
||||
def _modifier(old_contents):
|
||||
def _modifier(old_contents, servermap, first_time):
|
||||
return old_contents + "line2"
|
||||
def _non_modifier(old_contents):
|
||||
def _non_modifier(old_contents, servermap, first_time):
|
||||
return old_contents
|
||||
def _none_modifier(old_contents):
|
||||
def _none_modifier(old_contents, servermap, first_time):
|
||||
return None
|
||||
def _error_modifier(old_contents):
|
||||
def _error_modifier(old_contents, servermap, first_time):
|
||||
raise ValueError("oops")
|
||||
def _toobig_modifier(old_contents):
|
||||
def _toobig_modifier(old_contents, servermap, first_time):
|
||||
return "b" * (Publish.MAX_SEGMENT_SIZE+1)
|
||||
calls = []
|
||||
def _ucw_error_modifier(old_contents):
|
||||
def _ucw_error_modifier(old_contents, servermap, first_time):
|
||||
# simulate an UncoordinatedWriteError once
|
||||
calls.append(1)
|
||||
if len(calls) <= 1:
|
||||
@ -444,16 +444,16 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
|
||||
return d
|
||||
|
||||
def test_modify_backoffer(self):
|
||||
def _modifier(old_contents):
|
||||
def _modifier(old_contents, servermap, first_time):
|
||||
return old_contents + "line2"
|
||||
calls = []
|
||||
def _ucw_error_modifier(old_contents):
|
||||
def _ucw_error_modifier(old_contents, servermap, first_time):
|
||||
# simulate an UncoordinatedWriteError once
|
||||
calls.append(1)
|
||||
if len(calls) <= 1:
|
||||
raise UncoordinatedWriteError("simulated")
|
||||
return old_contents + "line3"
|
||||
def _always_ucw_error_modifier(old_contents):
|
||||
def _always_ucw_error_modifier(old_contents, servermap, first_time):
|
||||
raise UncoordinatedWriteError("simulated")
|
||||
def _backoff_stopper(node, f):
|
||||
return f
|
||||
@ -1658,7 +1658,7 @@ class MultipleVersions(unittest.TestCase, PublishMixin, CheckerMixin):
|
||||
target[0] = 3 # seqnum4
|
||||
self._set_versions(target)
|
||||
|
||||
def _modify(oldversion):
|
||||
def _modify(oldversion, servermap, first_time):
|
||||
return oldversion + " modified"
|
||||
d = self._fn.modify(_modify)
|
||||
d.addCallback(lambda res: self._fn.download_best_version())
|
||||
|
Loading…
Reference in New Issue
Block a user