mutable.py: split replace() into update() and overwrite(). Addresses #328.

This commit is contained in:
Brian Warner 2008-03-12 18:00:43 -07:00
parent 0aa0efa123
commit 2ef70ab814
9 changed files with 164 additions and 31 deletions

View File

@ -351,17 +351,101 @@ defined concretely at a later date.
=== Code Details === === Code Details ===
The current FileNode class will be renamed ImmutableFileNode, and a new The MutableFileNode class is used to manipulate mutable files (as opposed to
MutableFileNode class will be created. Instances of this class will contain a ImmutableFileNodes). These are initially generated with
URI and a reference to the client (for peer selection and connection). The client.create_mutable_file(), and later recreated from URIs with
methods of MutableFileNode are: client.create_node_from_uri(). Instances of this class will contain a URI and
a reference to the client (for peer selection and connection).
* replace(newdata) -> OK, ConsistencyError, NotEnoughPeersError The methods of MutableFileNode are:
* get() -> [deferred] newdata, NotEnoughPeersError
* download_to_data() -> [deferred] newdata, NotEnoughPeersError
* if there are multiple retrieveable versions in the grid, get() returns * if there are multiple retrieveable versions in the grid, get() returns
the first version it can reconstruct, and silently ignores the others. the first version it can reconstruct, and silently ignores the others.
In the future, a more advanced API will signal and provide access to In the future, a more advanced API will signal and provide access to
the multiple heads. the multiple heads.
* update(newdata) -> OK, UncoordinatedWriteError, NotEnoughPeersError
* overwrite(newdata) -> OK, UncoordinatedWriteError, NotEnoughPeersError
download_to_data() causes a new retrieval to occur, pulling the current
contents from the grid and returning them to the caller. At the same time,
this call caches information about the current version of the file. This
information will be used in a subsequent call to update(), and if another
change has occured between the two, this information will be out of date,
triggering the UncoordinatedWriteError.
update() is therefore intended to be used just after a download_to_data(), in
the following pattern:
d = mfn.download_to_data()
d.addCallback(apply_delta)
d.addCallback(mfn.update)
If the update() call raises UCW, then the application can simply return an
error to the user ("you violated the Prime Coordination Directive"), and they
can try again later. Alternatively, the application can attempt to retry on
its own. To accomplish this, the app needs to pause, download the new
(post-collision and post-recovery) form of the file, reapply their delta,
then submit the update request again. A randomized pause is necessary to
reduce the chances of colliding a second time with another client that is
doing exactly the same thing:
d = mfn.download_to_data()
d.addCallback(apply_delta)
d.addCallback(mfn.update)
def _retry(f):
f.trap(UncoordinatedWriteError)
d1 = pause(random.uniform(5, 20))
d1.addCallback(lambda res: mfn.download_to_data())
d1.addCallback(apply_delta)
d1.addCallback(mfn.update)
return d1
d.addErrback(_retry)
Enthusiastic applications can retry multiple times, using a randomized
exponential backoff between each. A particularly enthusiastic application can
retry forever, but such apps are encouraged to provide a means to the user of
giving up after a while.
UCW does not mean that the update was not applied, so it is also a good idea
to skip the retry-update step if the delta was already applied:
d = mfn.download_to_data()
d.addCallback(apply_delta)
d.addCallback(mfn.update)
def _retry(f):
f.trap(UncoordinatedWriteError)
d1 = pause(random.uniform(5, 20))
d1.addCallback(lambda res: mfn.download_to_data())
def _maybe_apply_delta(contents):
new_contents = apply_delta(contents)
if new_contents != contents:
return mfn.update(new_contents)
d1.addCallback(_maybe_apply_delta)
return d1
d.addErrback(_retry)
update() is the right interface to use for delta-application situations, like
directory nodes (in which apply_delta might be adding or removing child
entries from a serialized table).
Note that any uncoordinated write has the potential to lose data. We must do
more analysis to be sure, but it appears that two clients who write to the
same mutable file at the same time (even if both eventually retry) will, with
high probability, result in one client observing UCW and the other silently
losing their changes. It is also possible for both clients to observe UCW.
The moral of the story is that the Prime Coordination Directive is there for
a reason, and that recovery/UCW/retry is not a subsitute for write
coordination.
overwrite() tells the client to ignore this cached version information, and
to unconditionally replace the mutable file's contents with the new data.
This should not be used in delta application, but rather in situations where
you want to replace the file's contents with completely unrelated ones. When
raw files are uploaded into a mutable slot through the tahoe webapi (using
POST and the ?mutable=true argument), they are put in place with overwrite().
The peer-selection and data-structure manipulation (and signing/verification) The peer-selection and data-structure manipulation (and signing/verification)
steps will be implemented in a separate class in allmydata/mutable.py . steps will be implemented in a separate class in allmydata/mutable.py .

View File

@ -158,7 +158,7 @@ class SpeedTest:
d1.addCallback(lambda n: n.get_uri()) d1.addCallback(lambda n: n.get_uri())
elif self.mutable_mode == "upload": elif self.mutable_mode == "upload":
data = open(fn,"rb").read() data = open(fn,"rb").read()
d1 = self._n.replace(data) d1 = self._n.overwrite(data)
d1.addCallback(lambda res: self._n.get_uri()) d1.addCallback(lambda res: self._n.get_uri())
else: else:
up = upload.FileName(fn) up = upload.FileName(fn)

View File

@ -204,7 +204,7 @@ class NewDirectoryNode:
def _update(children): def _update(children):
children[name] = (children[name][0], metadata) children[name] = (children[name][0], metadata)
new_contents = self._pack_contents(children) new_contents = self._pack_contents(children)
return self._node.replace(new_contents) return self._node.update(new_contents)
d.addCallback(_update) d.addCallback(_update)
d.addCallback(lambda res: self) d.addCallback(lambda res: self)
return d return d
@ -306,7 +306,7 @@ class NewDirectoryNode:
metadata = new_metadata.copy() metadata = new_metadata.copy()
children[name] = (child, metadata) children[name] = (child, metadata)
new_contents = self._pack_contents(children) new_contents = self._pack_contents(children)
return self._node.replace(new_contents) return self._node.update(new_contents)
d.addCallback(_add) d.addCallback(_add)
d.addCallback(lambda res: None) d.addCallback(lambda res: None)
return d return d
@ -337,7 +337,7 @@ class NewDirectoryNode:
old_child, metadata = children[name] old_child, metadata = children[name]
del children[name] del children[name]
new_contents = self._pack_contents(children) new_contents = self._pack_contents(children)
d = self._node.replace(new_contents) d = self._node.update(new_contents)
def _done(res): def _done(res):
return old_child return old_child
d.addCallback(_done) d.addCallback(_done)

View File

@ -576,17 +576,45 @@ class IMutableFileNode(IFileNode, IMutableFilesystemNode):
more advanced API will signal and provide access to the multiple more advanced API will signal and provide access to the multiple
heads.""" heads."""
def replace(newdata): def update(newdata):
"""Replace the old contents with the new data. Returns a Deferred """Attempt to replace the old contents with the new data.
that fires (with None) when the operation is complete.
If the node detects that there are multiple outstanding versions of download_to_data() must have been called before calling update().
the file, this will raise ConsistencyError, and may leave the
distributed file in an unusual state (the node will try to ensure Returns a Deferred. If the Deferred fires successfully, the update
that at least one version of the file remains retrievable, but it may appeared to succeed. However, another writer (who read before your
or may not be the one you just tried to upload). You should respond changes were published) might still clobber your changes: they will
to this by downloading the current contents of the file and retrying discover a problem but you will not. (see ticket #347 for details).
the replace() operation.
If the mutable file has been changed (by some other writer) since the
last call to download_to_data(), this will raise
UncoordinatedWriteError and the file will be left in an inconsistent
state (possibly the version you provided, possibly the old version,
possibly somebody else's version, and possibly a mix of shares from
all of these). The recommended response to UncoordinatedWriteError is
to either return it to the caller (since they failed to coordinate
their writes), or to do a new download_to_data() / modify-data /
update() loop.
update() is appropriate to use in a read-modify-write sequence, such
as a directory modification.
"""
def overwrite(newdata):
"""Attempt to replace the old contents with the new data.
Unlike update(), overwrite() does not require a previous call to
download_to_data(). It will unconditionally replace the old contents
with new data.
overwrite() is implemented by doing download_to_data() and update()
in rapid succession, so there remains a (smaller) possibility of
UncoordinatedWriteError. A future version will remove the full
download_to_data step, making this faster than update().
overwrite() is only appropriate to use when the new contents of the
mutable file are completely unrelated to the old ones, and you do not
care about other clients changes to the file.
""" """
def get_writekey(): def get_writekey():

View File

@ -1805,11 +1805,21 @@ class MutableFileNode:
self._client.notify_retrieve(r) self._client.notify_retrieve(r)
return r.retrieve() return r.retrieve()
def replace(self, newdata): def update(self, newdata):
# this must be called after a retrieve
assert self._pubkey, "download_to_data() must be called before update()"
assert self._current_seqnum is not None, "download_to_data() must be called before update()"
return self._publish(newdata)
def overwrite(self, newdata):
# we do retrieve just to get the seqnum. We ignore the contents.
# TODO: use a smaller form of retrieve that doesn't try to fetch the
# data. Also, replace Publish with a form that uses the cached
# sharemap from the previous retrieval.
r = self.retrieve_class(self) r = self.retrieve_class(self)
self._client.notify_retrieve(r) self._client.notify_retrieve(r)
d = r.retrieve() d = r.retrieve()
d.addCallback(lambda res: self._publish(newdata)) d.addCallback(lambda ignored: self._publish(newdata))
return d return d
class MutableWatcher(service.MultiService): class MutableWatcher(service.MultiService):

View File

@ -97,11 +97,14 @@ class FakeMutableFileNode:
def get_size(self): def get_size(self):
return "?" # TODO: see mutable.MutableFileNode.get_size return "?" # TODO: see mutable.MutableFileNode.get_size
def replace(self, new_contents): def update(self, new_contents):
assert not self.is_readonly() assert not self.is_readonly()
self.all_contents[self.storage_index] = new_contents self.all_contents[self.storage_index] = new_contents
return defer.succeed(None) return defer.succeed(None)
def overwrite(self, new_contents):
return self.update(new_contents)
def make_mutable_file_uri(): def make_mutable_file_uri():
return uri.WriteableSSKFileURI(writekey=os.urandom(16), return uri.WriteableSSKFileURI(writekey=os.urandom(16),

View File

@ -46,9 +46,11 @@ class FakeFilenode(mutable.MutableFileNode):
return defer.succeed(self.all_contents[self.all_rw_friends[self.get_uri()]]) return defer.succeed(self.all_contents[self.all_rw_friends[self.get_uri()]])
else: else:
return defer.succeed(self.all_contents[self.get_uri()]) return defer.succeed(self.all_contents[self.get_uri()])
def replace(self, newdata): def update(self, newdata):
self.all_contents[self.get_uri()] = newdata self.all_contents[self.get_uri()] = newdata
return defer.succeed(None) return defer.succeed(None)
def overwrite(self, newdata):
return self.update(newdata)
class FakeStorage: class FakeStorage:
# this class replaces the collection of storage servers, allowing the # this class replaces the collection of storage servers, allowing the
@ -162,6 +164,9 @@ class FakeClient:
d.addCallback(lambda res: n) d.addCallback(lambda res: n)
return d return d
def notify_retrieve(self, r):
pass
def create_node_from_uri(self, u): def create_node_from_uri(self, u):
u = IURI(u) u = IURI(u)
if INewDirectoryURI.providedBy(u): if INewDirectoryURI.providedBy(u):
@ -233,15 +238,18 @@ class Filenode(unittest.TestCase):
def test_create(self): def test_create(self):
d = self.client.create_mutable_file() d = self.client.create_mutable_file()
def _created(n): def _created(n):
d = n.replace("contents 1") d = n.overwrite("contents 1")
d.addCallback(lambda res: self.failUnlessIdentical(res, None)) d.addCallback(lambda res: self.failUnlessIdentical(res, None))
d.addCallback(lambda res: n.download_to_data()) d.addCallback(lambda res: n.download_to_data())
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1")) d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
d.addCallback(lambda res: n.replace("contents 2")) d.addCallback(lambda res: n.overwrite("contents 2"))
d.addCallback(lambda res: n.download_to_data()) d.addCallback(lambda res: n.download_to_data())
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2")) d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
d.addCallback(lambda res: n.download(download.Data())) d.addCallback(lambda res: n.download(download.Data()))
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2")) d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
d.addCallback(lambda res: n.update("contents 3"))
d.addCallback(lambda res: n.download_to_data())
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3"))
return d return d
d.addCallback(_created) d.addCallback(_created)
return d return d
@ -251,7 +259,7 @@ class Filenode(unittest.TestCase):
def _created(n): def _created(n):
d = n.download_to_data() d = n.download_to_data()
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1")) d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
d.addCallback(lambda res: n.replace("contents 2")) d.addCallback(lambda res: n.overwrite("contents 2"))
d.addCallback(lambda res: n.download_to_data()) d.addCallback(lambda res: n.download_to_data())
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2")) d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
return d return d

View File

@ -695,7 +695,7 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
self.failUnlessEqual(res, DATA) self.failUnlessEqual(res, DATA)
# replace the data # replace the data
log.msg("starting replace1") log.msg("starting replace1")
d1 = newnode.replace(NEWDATA) d1 = newnode.update(NEWDATA)
d1.addCallback(lambda res: newnode.download_to_data()) d1.addCallback(lambda res: newnode.download_to_data())
return d1 return d1
d.addCallback(_check_download_3) d.addCallback(_check_download_3)
@ -709,7 +709,7 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
newnode2 = self.clients[3].create_node_from_uri(uri) newnode2 = self.clients[3].create_node_from_uri(uri)
self._newnode3 = self.clients[3].create_node_from_uri(uri) self._newnode3 = self.clients[3].create_node_from_uri(uri)
log.msg("starting replace2") log.msg("starting replace2")
d1 = newnode1.replace(NEWERDATA) d1 = newnode1.overwrite(NEWERDATA)
d1.addCallback(lambda res: newnode2.download_to_data()) d1.addCallback(lambda res: newnode2.download_to_data())
return d1 return d1
d.addCallback(_check_download_4) d.addCallback(_check_download_4)

View File

@ -829,7 +829,7 @@ class POSTHandler(rend.Page):
# one # one
d2 = self._node.get(name) d2 = self._node.get(name)
def _got_newnode(newnode): def _got_newnode(newnode):
d3 = newnode.replace(data) d3 = newnode.overwrite(data)
d3.addCallback(lambda res: newnode.get_uri()) d3.addCallback(lambda res: newnode.get_uri())
return d3 return d3
d2.addCallback(_got_newnode) d2.addCallback(_got_newnode)
@ -858,7 +858,7 @@ class POSTHandler(rend.Page):
# TODO: 'name' handling needs review # TODO: 'name' handling needs review
d = defer.succeed(self._node) d = defer.succeed(self._node)
def _got_child_overwrite(child_node): def _got_child_overwrite(child_node):
child_node.replace(data) child_node.overwrite(data)
return child_node.get_uri() return child_node.get_uri()
d.addCallback(_got_child_overwrite) d.addCallback(_got_child_overwrite)
return d return d