mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-31 16:36:20 +00:00
mutable: replace MutableFileNode API, update tests. Changed all callers to use overwrite(), but that will change soon
This commit is contained in:
parent
157073d8d8
commit
a379690b04
@ -69,7 +69,7 @@ class NewDirectoryNode:
|
|||||||
return self
|
return self
|
||||||
|
|
||||||
def _read(self):
|
def _read(self):
|
||||||
d = self._node.download_to_data()
|
d = self._node.download_best_version()
|
||||||
d.addCallback(self._unpack_contents)
|
d.addCallback(self._unpack_contents)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
@ -203,7 +203,7 @@ class NewDirectoryNode:
|
|||||||
def _update(children):
|
def _update(children):
|
||||||
children[name] = (children[name][0], metadata)
|
children[name] = (children[name][0], metadata)
|
||||||
new_contents = self._pack_contents(children)
|
new_contents = self._pack_contents(children)
|
||||||
return self._node.update(new_contents)
|
return self._node.overwrite(new_contents)
|
||||||
d.addCallback(_update)
|
d.addCallback(_update)
|
||||||
d.addCallback(lambda res: self)
|
d.addCallback(lambda res: self)
|
||||||
return d
|
return d
|
||||||
@ -305,7 +305,7 @@ class NewDirectoryNode:
|
|||||||
metadata = new_metadata.copy()
|
metadata = new_metadata.copy()
|
||||||
children[name] = (child, metadata)
|
children[name] = (child, metadata)
|
||||||
new_contents = self._pack_contents(children)
|
new_contents = self._pack_contents(children)
|
||||||
return self._node.update(new_contents)
|
return self._node.overwrite(new_contents)
|
||||||
d.addCallback(_add)
|
d.addCallback(_add)
|
||||||
d.addCallback(lambda res: None)
|
d.addCallback(lambda res: None)
|
||||||
return d
|
return d
|
||||||
@ -336,7 +336,7 @@ class NewDirectoryNode:
|
|||||||
old_child, metadata = children[name]
|
old_child, metadata = children[name]
|
||||||
del children[name]
|
del children[name]
|
||||||
new_contents = self._pack_contents(children)
|
new_contents = self._pack_contents(children)
|
||||||
d = self._node.update(new_contents)
|
d = self._node.overwrite(new_contents)
|
||||||
def _done(res):
|
def _done(res):
|
||||||
return old_child
|
return old_child
|
||||||
d.addCallback(_done)
|
d.addCallback(_done)
|
||||||
|
@ -566,54 +566,155 @@ class IFileNode(IFilesystemNode):
|
|||||||
"""Return the length (in bytes) of the data this node represents."""
|
"""Return the length (in bytes) of the data this node represents."""
|
||||||
|
|
||||||
class IMutableFileNode(IFileNode, IMutableFilesystemNode):
|
class IMutableFileNode(IFileNode, IMutableFilesystemNode):
|
||||||
def download_to_data():
|
"""I provide access to a 'mutable file', which retains its identity
|
||||||
"""Download the file's contents. Return a Deferred that fires with
|
regardless of what contents are put in it.
|
||||||
those contents. If there are multiple retrievable versions in the
|
|
||||||
grid (because you failed to avoid simultaneous writes, see
|
|
||||||
docs/mutable.txt), this will return the first version that it can
|
|
||||||
reconstruct, and will silently ignore the others. In the future, a
|
|
||||||
more advanced API will signal and provide access to the multiple
|
|
||||||
heads."""
|
|
||||||
|
|
||||||
def update(newdata):
|
The consistency-vs-availability problem means that there might be
|
||||||
"""Attempt to replace the old contents with the new data.
|
multiple versions of a file present in the grid, some of which might be
|
||||||
|
unrecoverable (i.e. have fewer than 'k' shares). These versions are
|
||||||
|
loosely ordered: each has a sequence number and a hash, and any version
|
||||||
|
with seqnum=N was uploaded by a node which has seen at least one version
|
||||||
|
with seqnum=N-1.
|
||||||
|
|
||||||
download_to_data() must have been called before calling update().
|
The 'servermap' (an instance of IMutableFileServerMap) is used to
|
||||||
|
describe the versions that are known to be present in the grid, and which
|
||||||
|
servers are hosting their shares. It is used to represent the 'state of
|
||||||
|
the world', and is used for this purpose by my test-and-set operations.
|
||||||
|
Downloading the contents of the mutable file will also return a
|
||||||
|
servermap. Uploading a new version into the mutable file requires a
|
||||||
|
servermap as input, and the semantics of the replace operation is
|
||||||
|
'replace the file with my new version if it looks like nobody else has
|
||||||
|
changed the file since my previous download'. Because the file is
|
||||||
|
distributed, this is not a perfect test-and-set operation, but it will do
|
||||||
|
its best. If the replace process sees evidence of a simultaneous write,
|
||||||
|
it will signal an UncoordinatedWriteError, so that the caller can take
|
||||||
|
corrective action.
|
||||||
|
|
||||||
Returns a Deferred. If the Deferred fires successfully, the update
|
|
||||||
appeared to succeed. However, another writer (who read before your
|
|
||||||
changes were published) might still clobber your changes: they will
|
|
||||||
discover a problem but you will not. (see ticket #347 for details).
|
|
||||||
|
|
||||||
If the mutable file has been changed (by some other writer) since the
|
Most readers will want to use the 'best' current version of the file, and
|
||||||
last call to download_to_data(), this will raise
|
should use my 'download_best_version()' method.
|
||||||
UncoordinatedWriteError and the file will be left in an inconsistent
|
|
||||||
state (possibly the version you provided, possibly the old version,
|
|
||||||
possibly somebody else's version, and possibly a mix of shares from
|
|
||||||
all of these). The recommended response to UncoordinatedWriteError is
|
|
||||||
to either return it to the caller (since they failed to coordinate
|
|
||||||
their writes), or to do a new download_to_data() / modify-data /
|
|
||||||
update() loop.
|
|
||||||
|
|
||||||
update() is appropriate to use in a read-modify-write sequence, such
|
To unconditionally replace the file, callers should use overwrite(). This
|
||||||
as a directory modification.
|
is the mode that user-visible mutable files will probably use.
|
||||||
|
|
||||||
|
To apply some delta to the file, call modify() with a callable modifier
|
||||||
|
function that can apply the modification that you want to make. This is
|
||||||
|
the mode that dirnodes will use, since most directory modification
|
||||||
|
operations can be expressed in terms of deltas to the directory state.
|
||||||
|
|
||||||
|
|
||||||
|
Three methods are available for users who need to perform more complex
|
||||||
|
operations. The first is get_servermap(), which returns an up-to-date
|
||||||
|
servermap using a specified mode. The second is download_version(), which
|
||||||
|
downloads a specific version (not necessarily the 'best' one). The third
|
||||||
|
is 'upload', which accepts new contents and a servermap (which must have
|
||||||
|
been updated with MODE_WRITE). The upload method will attempt to apply
|
||||||
|
the new contents as long as no other node has modified the file since the
|
||||||
|
servermap was updated. This might be useful to a caller who wants to
|
||||||
|
merge multiple versions into a single new one.
|
||||||
|
|
||||||
|
Note that each time the servermap is updated, a specific 'mode' is used,
|
||||||
|
which determines how many peers are queried. To use a servermap for my
|
||||||
|
replace() method, that servermap must have been updated in MODE_WRITE.
|
||||||
|
These modes are defined in allmydata.mutable.common, and consist of
|
||||||
|
MODE_READ, MODE_WRITE, MODE_ANYTHING, and MODE_CHECK. Please look in
|
||||||
|
allmydata/mutable/servermap.py for details about the differences.
|
||||||
|
|
||||||
|
Mutable files are currently limited in size (about 3.5MB max) and can
|
||||||
|
only be retrieved and updated all-at-once, as a single big string. Future
|
||||||
|
versions of our mutable files will remove this restriction.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def overwrite(newdata):
|
def download_best_version():
|
||||||
"""Attempt to replace the old contents with the new data.
|
"""Download the 'best' available version of the file, meaning one of
|
||||||
|
the recoverable versions with the highest sequence number. If no
|
||||||
|
uncoordinated writes have occurred, and if enough shares are
|
||||||
|
available, then this will be the most recent version that has been
|
||||||
|
uploaded.
|
||||||
|
|
||||||
Unlike update(), overwrite() does not require a previous call to
|
I return a Deferred that fires with a (contents, servermap) pair. The
|
||||||
download_to_data(). It will unconditionally replace the old contents
|
servermap is updated with MODE_READ. The contents will be the version
|
||||||
with new data.
|
of the file indicated by servermap.best_recoverable_version(). If no
|
||||||
|
version is recoverable, the Deferred will errback with
|
||||||
|
UnrecoverableFileError.
|
||||||
|
"""
|
||||||
|
|
||||||
overwrite() is implemented by doing download_to_data() and update()
|
def overwrite(new_contents):
|
||||||
in rapid succession, so there remains a (smaller) possibility of
|
"""Unconditionally replace the contents of the mutable file with new
|
||||||
UncoordinatedWriteError. A future version will remove the full
|
ones. This simply chains get_servermap(MODE_WRITE) and upload(). This
|
||||||
download_to_data step, making this faster than update().
|
is only appropriate to use when the new contents of the file are
|
||||||
|
completely unrelated to the old ones, and you do not care about other
|
||||||
|
clients' changes.
|
||||||
|
|
||||||
overwrite() is only appropriate to use when the new contents of the
|
I return a Deferred that fires (with a PublishStatus object) when the
|
||||||
mutable file are completely unrelated to the old ones, and you do not
|
update has completed.
|
||||||
care about other clients changes to the file.
|
"""
|
||||||
|
|
||||||
|
def modify(modifier_cb):
|
||||||
|
"""Modify the contents of the file, by downloading the current
|
||||||
|
version, applying the modifier function (or bound method), then
|
||||||
|
uploading the new version. I return a Deferred that fires (with a
|
||||||
|
PublishStatus object) when the update is complete.
|
||||||
|
|
||||||
|
The modifier callable will be given two arguments: a string (with the
|
||||||
|
old contents) and a servermap. As with download_best_version(), the
|
||||||
|
old contents will be from the best recoverable version, but the
|
||||||
|
modifier can use the servermap to make other decisions (such as
|
||||||
|
refusing to apply the delta if there are multiple parallel versions,
|
||||||
|
or if there is evidence of a newer unrecoverable version).
|
||||||
|
|
||||||
|
The callable should return a string with the new contents. The
|
||||||
|
callable must be prepared to be called multiple times, and must
|
||||||
|
examine the input string to see if the change that it wants to make
|
||||||
|
is already present in the old version. If it does not need to make
|
||||||
|
any changes, it can either return None, or return its input string.
|
||||||
|
|
||||||
|
If the modifier raises an exception, it will be returned in the
|
||||||
|
errback.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def get_servermap(mode):
|
||||||
|
"""Return a Deferred that fires with an IMutableFileServerMap
|
||||||
|
instance, updated using the given mode.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def download_version(servermap, version):
|
||||||
|
"""Download a specific version of the file, using the servermap
|
||||||
|
as a guide to where the shares are located.
|
||||||
|
|
||||||
|
I return a Deferred that fires with the requested contents, or
|
||||||
|
errbacks with UnrecoverableFileError. Note that a servermap which was
|
||||||
|
updated with MODE_ANYTHING or MODE_READ may not know about shares for
|
||||||
|
all versions (those modes stop querying servers as soon as they can
|
||||||
|
fulfil their goals), so you may want to use MODE_CHECK (which checks
|
||||||
|
everything) to get increased visibility.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def upload(new_contents, servermap):
|
||||||
|
"""Replace the contents of the file with new ones. This requires a
|
||||||
|
servermap that was previously updated with MODE_WRITE.
|
||||||
|
|
||||||
|
I attempt to provide test-and-set semantics, in that I will avoid
|
||||||
|
modifying any share that is different than the version I saw in the
|
||||||
|
servermap. However, if another node is writing to the file at the
|
||||||
|
same time as me, I may manage to update some shares while they update
|
||||||
|
others. If I see any evidence of this, I will signal
|
||||||
|
UncoordinatedWriteError, and the file will be left in an inconsistent
|
||||||
|
state (possibly the version you provided, possibly the old version,
|
||||||
|
possibly somebody else's version, and possibly a mix of shares from
|
||||||
|
all of these).
|
||||||
|
|
||||||
|
The recommended response to UncoordinatedWriteError is to either
|
||||||
|
return it to the caller (since they failed to coordinate their
|
||||||
|
writes), or to attempt some sort of recovery. It may be sufficient to
|
||||||
|
wait a random interval (with exponential backoff) and repeat your
|
||||||
|
operation. If I do not signal UncoordinatedWriteError, then I was
|
||||||
|
able to write the new version without incident.
|
||||||
|
|
||||||
|
I return a Deferred that fires (with a PublishStatus object) when the
|
||||||
|
publish has completed. I will update the servermap in-place with the
|
||||||
|
location of all new shares.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def get_writekey():
|
def get_writekey():
|
||||||
|
@ -40,6 +40,12 @@ class MutableFileNode:
|
|||||||
self._current_roothash = None # ditto
|
self._current_roothash = None # ditto
|
||||||
self._current_seqnum = None # ditto
|
self._current_seqnum = None # ditto
|
||||||
|
|
||||||
|
# all users of this MutableFileNode go through the serializer. This
|
||||||
|
# takes advantage of the fact that Deferreds discard the callbacks
|
||||||
|
# that they're done with, so we can keep using the same Deferred
|
||||||
|
# forever without consuming more and more memory.
|
||||||
|
self._serializer = defer.succeed(None)
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', hasattr(self, '_uri') and self._uri.abbrev())
|
return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', hasattr(self, '_uri') and self._uri.abbrev())
|
||||||
|
|
||||||
@ -88,7 +94,7 @@ class MutableFileNode:
|
|||||||
# nobody knows about us yet"
|
# nobody knows about us yet"
|
||||||
self._current_seqnum = 0
|
self._current_seqnum = 0
|
||||||
self._current_roothash = "\x00"*32
|
self._current_roothash = "\x00"*32
|
||||||
return self._publish(None, initial_contents)
|
return self._upload(initial_contents, None)
|
||||||
d.addCallback(_generated)
|
d.addCallback(_generated)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
@ -198,40 +204,73 @@ class MutableFileNode:
|
|||||||
def get_verifier(self):
|
def get_verifier(self):
|
||||||
return IMutableFileURI(self._uri).get_verifier()
|
return IMutableFileURI(self._uri).get_verifier()
|
||||||
|
|
||||||
def obtain_lock(self, res=None):
|
def _do_serialized(self, cb, *args, **kwargs):
|
||||||
# stub, get real version from zooko's #265 patch
|
# note: to avoid deadlock, this callable is *not* allowed to invoke
|
||||||
|
# other serialized methods within this (or any other)
|
||||||
|
# MutableFileNode. The callable should be a bound method of this same
|
||||||
|
# MFN instance.
|
||||||
d = defer.Deferred()
|
d = defer.Deferred()
|
||||||
d.callback(res)
|
self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
|
||||||
|
self._serializer.addBoth(d.callback)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def release_lock(self, res):
|
#################################
|
||||||
# stub
|
|
||||||
return res
|
|
||||||
|
|
||||||
############################
|
def check(self):
|
||||||
|
verifier = self.get_verifier()
|
||||||
|
return self._client.getServiceNamed("checker").check(verifier)
|
||||||
|
|
||||||
# methods exposed to the higher-layer application
|
# allow the use of IDownloadTarget
|
||||||
|
def download(self, target):
|
||||||
def update_servermap(self, old_map=None, mode=MODE_READ):
|
# fake it. TODO: make this cleaner.
|
||||||
servermap = old_map or ServerMap()
|
d = self.download_best_version()
|
||||||
d = self.obtain_lock()
|
def _done(data):
|
||||||
d.addCallback(lambda res: self._update_servermap(servermap, mode))
|
target.open(len(data))
|
||||||
d.addBoth(self.release_lock)
|
target.write(data)
|
||||||
|
target.close()
|
||||||
|
return target.finish()
|
||||||
|
d.addCallback(_done)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def download_version(self, servermap, versionid):
|
|
||||||
"""Returns a Deferred that fires with a string."""
|
# new API
|
||||||
d = self.obtain_lock()
|
|
||||||
d.addCallback(lambda res: self._retrieve(servermap, versionid))
|
def download_best_version(self):
|
||||||
d.addBoth(self.release_lock)
|
return self._do_serialized(self._download_best_version)
|
||||||
|
def _download_best_version(self):
|
||||||
|
servermap = ServerMap()
|
||||||
|
d = self._try_once_to_download_best_version(servermap, MODE_READ)
|
||||||
|
def _maybe_retry(f):
|
||||||
|
f.trap(NotEnoughSharesError)
|
||||||
|
# the download is worth retrying once. Make sure to use the
|
||||||
|
# old servermap, since it is what remembers the bad shares,
|
||||||
|
# but use MODE_WRITE to make it look for even more shares.
|
||||||
|
# TODO: consider allowing this to retry multiple times.. this
|
||||||
|
# approach will let us tolerate about 8 bad shares, I think.
|
||||||
|
return self._try_once_to_download_best_version(servermap,
|
||||||
|
MODE_WRITE)
|
||||||
|
d.addErrback(_maybe_retry)
|
||||||
|
return d
|
||||||
|
def _try_once_to_download_best_version(self, servermap, mode):
|
||||||
|
d = self._update_servermap(servermap, mode)
|
||||||
|
def _updated(ignored):
|
||||||
|
goal = servermap.best_recoverable_version()
|
||||||
|
if not goal:
|
||||||
|
raise UnrecoverableFileError("no recoverable versions")
|
||||||
|
return self._try_once_to_download_version(servermap, goal)
|
||||||
|
d.addCallback(_updated)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def publish(self, servermap, new_contents):
|
|
||||||
d = self.obtain_lock()
|
def overwrite(self, new_contents):
|
||||||
d.addCallback(lambda res: self._publish(servermap, new_contents))
|
return self._do_serialized(self._overwrite, new_contents)
|
||||||
d.addBoth(self.release_lock)
|
def _overwrite(self, new_contents):
|
||||||
|
servermap = ServerMap()
|
||||||
|
d = self._update_servermap(servermap, mode=MODE_WRITE)
|
||||||
|
d.addCallback(lambda ignored: self._upload(new_contents, servermap))
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
||||||
def modify(self, modifier, *args, **kwargs):
|
def modify(self, modifier, *args, **kwargs):
|
||||||
"""I use a modifier callback to apply a change to the mutable file.
|
"""I use a modifier callback to apply a change to the mutable file.
|
||||||
I implement the following pseudocode::
|
I implement the following pseudocode::
|
||||||
@ -253,78 +292,39 @@ class MutableFileNode:
|
|||||||
sort, and it will be re-run as necessary until it succeeds. The
|
sort, and it will be re-run as necessary until it succeeds. The
|
||||||
modifier must inspect the old version to see whether its delta has
|
modifier must inspect the old version to see whether its delta has
|
||||||
already been applied: if so it should return the contents unmodified.
|
already been applied: if so it should return the contents unmodified.
|
||||||
|
|
||||||
|
Note that the modifier is required to run synchronously, and must not
|
||||||
|
invoke any methods on this MutableFileNode instance.
|
||||||
"""
|
"""
|
||||||
NotImplementedError
|
NotImplementedError
|
||||||
|
|
||||||
#################################
|
def get_servermap(self, mode):
|
||||||
|
return self._do_serialized(self._get_servermap, mode)
|
||||||
def check(self):
|
def _get_servermap(self, mode):
|
||||||
verifier = self.get_verifier()
|
servermap = ServerMap()
|
||||||
return self._client.getServiceNamed("checker").check(verifier)
|
return self._update_servermap(servermap, mode)
|
||||||
|
def _update_servermap(self, servermap, mode):
|
||||||
def _update_servermap(self, old_map, mode):
|
u = ServermapUpdater(self, servermap, mode)
|
||||||
u = ServermapUpdater(self, old_map, mode)
|
|
||||||
self._client.notify_mapupdate(u.get_status())
|
self._client.notify_mapupdate(u.get_status())
|
||||||
return u.update()
|
return u.update()
|
||||||
|
|
||||||
def _retrieve(self, servermap, verinfo):
|
def download_version(self, servermap, version):
|
||||||
r = Retrieve(self, servermap, verinfo)
|
return self._do_serialized(self._try_once_to_download_version,
|
||||||
|
servermap, version)
|
||||||
|
def _try_once_to_download_version(self, servermap, version):
|
||||||
|
r = Retrieve(self, servermap, version)
|
||||||
self._client.notify_retrieve(r.get_status())
|
self._client.notify_retrieve(r.get_status())
|
||||||
return r.download()
|
return r.download()
|
||||||
|
|
||||||
def _update_and_retrieve_best(self, old_map=None, mode=MODE_READ):
|
def upload(self, new_contents, servermap):
|
||||||
d = self.update_servermap(old_map=old_map, mode=mode)
|
return self._do_serialized(self._upload, new_contents, servermap)
|
||||||
def _updated(smap):
|
def _upload(self, new_contents, servermap):
|
||||||
goal = smap.best_recoverable_version()
|
|
||||||
if not goal:
|
|
||||||
raise UnrecoverableFileError("no recoverable versions")
|
|
||||||
return self.download_version(smap, goal)
|
|
||||||
d.addCallback(_updated)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def download_to_data(self):
|
|
||||||
d = self.obtain_lock()
|
|
||||||
d.addCallback(lambda res: self._update_and_retrieve_best())
|
|
||||||
def _maybe_retry(f):
|
|
||||||
f.trap(NotEnoughSharesError)
|
|
||||||
e = f.value
|
|
||||||
# the download is worth retrying once. Make sure to use the old
|
|
||||||
# servermap, since it is what remembers the bad shares, but use
|
|
||||||
# MODE_WRITE to make it look for even more shares. TODO: consider
|
|
||||||
# allowing this to retry multiple times.. this approach will let
|
|
||||||
# us tolerate about 8 bad shares, I think.
|
|
||||||
return self._update_and_retrieve_best(e.servermap, mode=MODE_WRITE)
|
|
||||||
d.addErrback(_maybe_retry)
|
|
||||||
d.addBoth(self.release_lock)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def download(self, target):
|
|
||||||
# fake it. TODO: make this cleaner.
|
|
||||||
d = self.download_to_data()
|
|
||||||
def _done(data):
|
|
||||||
target.open(len(data))
|
|
||||||
target.write(data)
|
|
||||||
target.close()
|
|
||||||
return target.finish()
|
|
||||||
d.addCallback(_done)
|
|
||||||
return d
|
|
||||||
|
|
||||||
|
|
||||||
def _publish(self, servermap, new_contents):
|
|
||||||
assert self._pubkey, "update_servermap must be called before publish"
|
assert self._pubkey, "update_servermap must be called before publish"
|
||||||
p = Publish(self, servermap)
|
p = Publish(self, servermap)
|
||||||
self._client.notify_publish(p.get_status())
|
self._client.notify_publish(p.get_status())
|
||||||
return p.publish(new_contents)
|
return p.publish(new_contents)
|
||||||
|
|
||||||
def update(self, new_contents):
|
|
||||||
d = self.obtain_lock()
|
|
||||||
d.addCallback(lambda res: self.update_servermap(mode=MODE_WRITE))
|
|
||||||
d.addCallback(self._publish, new_contents)
|
|
||||||
d.addBoth(self.release_lock)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def overwrite(self, new_contents):
|
|
||||||
return self.update(new_contents)
|
|
||||||
|
|
||||||
|
|
||||||
class MutableWatcher(service.MultiService):
|
class MutableWatcher(service.MultiService):
|
||||||
|
@ -90,21 +90,18 @@ class FakeMutableFileNode:
|
|||||||
return self.my_uri.is_readonly()
|
return self.my_uri.is_readonly()
|
||||||
def is_mutable(self):
|
def is_mutable(self):
|
||||||
return self.my_uri.is_mutable()
|
return self.my_uri.is_mutable()
|
||||||
def download_to_data(self):
|
|
||||||
return defer.succeed(self.all_contents[self.storage_index])
|
|
||||||
def get_writekey(self):
|
def get_writekey(self):
|
||||||
return "\x00"*16
|
return "\x00"*16
|
||||||
def get_size(self):
|
def get_size(self):
|
||||||
return "?" # TODO: see mutable.MutableFileNode.get_size
|
return "?" # TODO: see mutable.MutableFileNode.get_size
|
||||||
|
|
||||||
def update(self, new_contents):
|
def download_best_version(self):
|
||||||
|
return defer.succeed(self.all_contents[self.storage_index])
|
||||||
|
def overwrite(self, new_contents):
|
||||||
assert not self.is_readonly()
|
assert not self.is_readonly()
|
||||||
self.all_contents[self.storage_index] = new_contents
|
self.all_contents[self.storage_index] = new_contents
|
||||||
return defer.succeed(None)
|
return defer.succeed(None)
|
||||||
|
|
||||||
def overwrite(self, new_contents):
|
|
||||||
return self.update(new_contents)
|
|
||||||
|
|
||||||
|
|
||||||
def make_mutable_file_uri():
|
def make_mutable_file_uri():
|
||||||
return uri.WriteableSSKFileURI(writekey=os.urandom(16),
|
return uri.WriteableSSKFileURI(writekey=os.urandom(16),
|
||||||
|
@ -258,21 +258,27 @@ class Filenode(unittest.TestCase):
|
|||||||
d = self.client.create_mutable_file()
|
d = self.client.create_mutable_file()
|
||||||
def _created(n):
|
def _created(n):
|
||||||
d = defer.succeed(None)
|
d = defer.succeed(None)
|
||||||
d.addCallback(lambda res: n.update_servermap())
|
d.addCallback(lambda res: n.get_servermap(MODE_READ))
|
||||||
d.addCallback(lambda smap: smap.dump(StringIO()))
|
d.addCallback(lambda smap: smap.dump(StringIO()))
|
||||||
d.addCallback(lambda sio:
|
d.addCallback(lambda sio:
|
||||||
self.failUnless("3-of-10" in sio.getvalue()))
|
self.failUnless("3-of-10" in sio.getvalue()))
|
||||||
d.addCallback(lambda res: n.overwrite("contents 1"))
|
d.addCallback(lambda res: n.overwrite("contents 1"))
|
||||||
d.addCallback(lambda res: self.failUnlessIdentical(res, None))
|
d.addCallback(lambda res: self.failUnlessIdentical(res, None))
|
||||||
d.addCallback(lambda res: n.download_to_data())
|
d.addCallback(lambda res: n.download_best_version())
|
||||||
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
|
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
|
||||||
d.addCallback(lambda res: n.overwrite("contents 2"))
|
d.addCallback(lambda res: n.overwrite("contents 2"))
|
||||||
d.addCallback(lambda res: n.download_to_data())
|
d.addCallback(lambda res: n.download_best_version())
|
||||||
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
|
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
|
||||||
d.addCallback(lambda res: n.download(download.Data()))
|
d.addCallback(lambda res: n.download(download.Data()))
|
||||||
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
|
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
|
||||||
d.addCallback(lambda res: n.update("contents 3"))
|
d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
|
||||||
d.addCallback(lambda res: n.download_to_data())
|
d.addCallback(lambda smap: n.upload("contents 3", smap))
|
||||||
|
d.addCallback(lambda res: n.download_best_version())
|
||||||
|
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3"))
|
||||||
|
d.addCallback(lambda res: n.get_servermap(MODE_ANYTHING))
|
||||||
|
d.addCallback(lambda smap:
|
||||||
|
n.download_version(smap,
|
||||||
|
smap.best_recoverable_version()))
|
||||||
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3"))
|
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3"))
|
||||||
return d
|
return d
|
||||||
d.addCallback(_created)
|
d.addCallback(_created)
|
||||||
@ -281,10 +287,10 @@ class Filenode(unittest.TestCase):
|
|||||||
def test_create_with_initial_contents(self):
|
def test_create_with_initial_contents(self):
|
||||||
d = self.client.create_mutable_file("contents 1")
|
d = self.client.create_mutable_file("contents 1")
|
||||||
def _created(n):
|
def _created(n):
|
||||||
d = n.download_to_data()
|
d = n.download_best_version()
|
||||||
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
|
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
|
||||||
d.addCallback(lambda res: n.overwrite("contents 2"))
|
d.addCallback(lambda res: n.overwrite("contents 2"))
|
||||||
d.addCallback(lambda res: n.download_to_data())
|
d.addCallback(lambda res: n.download_best_version())
|
||||||
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
|
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
|
||||||
return d
|
return d
|
||||||
d.addCallback(_created)
|
d.addCallback(_created)
|
||||||
@ -295,21 +301,27 @@ class Filenode(unittest.TestCase):
|
|||||||
d = self.client.create_mutable_file()
|
d = self.client.create_mutable_file()
|
||||||
def _created(n):
|
def _created(n):
|
||||||
d = defer.succeed(None)
|
d = defer.succeed(None)
|
||||||
d.addCallback(lambda res: n.update_servermap())
|
d.addCallback(lambda res: n.get_servermap(MODE_READ))
|
||||||
d.addCallback(lambda smap: smap.dump(StringIO()))
|
d.addCallback(lambda smap: smap.dump(StringIO()))
|
||||||
d.addCallback(lambda sio:
|
d.addCallback(lambda sio:
|
||||||
self.failUnless("3-of-10" in sio.getvalue()))
|
self.failUnless("3-of-10" in sio.getvalue()))
|
||||||
d.addCallback(lambda res: n.overwrite("contents 1"))
|
d.addCallback(lambda res: n.overwrite("contents 1"))
|
||||||
d.addCallback(lambda res: self.failUnlessIdentical(res, None))
|
d.addCallback(lambda res: self.failUnlessIdentical(res, None))
|
||||||
d.addCallback(lambda res: n.download_to_data())
|
d.addCallback(lambda res: n.download_best_version())
|
||||||
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
|
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
|
||||||
d.addCallback(lambda res: n.overwrite("contents 2"))
|
d.addCallback(lambda res: n.overwrite("contents 2"))
|
||||||
d.addCallback(lambda res: n.download_to_data())
|
d.addCallback(lambda res: n.download_best_version())
|
||||||
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
|
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
|
||||||
d.addCallback(lambda res: n.download(download.Data()))
|
d.addCallback(lambda res: n.download(download.Data()))
|
||||||
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
|
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
|
||||||
d.addCallback(lambda res: n.update("contents 3"))
|
d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
|
||||||
d.addCallback(lambda res: n.download_to_data())
|
d.addCallback(lambda smap: n.upload("contents 3", smap))
|
||||||
|
d.addCallback(lambda res: n.download_best_version())
|
||||||
|
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3"))
|
||||||
|
d.addCallback(lambda res: n.get_servermap(MODE_ANYTHING))
|
||||||
|
d.addCallback(lambda smap:
|
||||||
|
n.download_version(smap,
|
||||||
|
smap.best_recoverable_version()))
|
||||||
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3"))
|
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3"))
|
||||||
return d
|
return d
|
||||||
d.addCallback(_created)
|
d.addCallback(_created)
|
||||||
@ -679,7 +691,7 @@ class Roundtrip(unittest.TestCase):
|
|||||||
self.failUnless(substring in "".join(allproblems))
|
self.failUnless(substring in "".join(allproblems))
|
||||||
return
|
return
|
||||||
if should_succeed:
|
if should_succeed:
|
||||||
d1 = self._fn.download_to_data()
|
d1 = self._fn.download_best_version()
|
||||||
d1.addCallback(lambda new_contents:
|
d1.addCallback(lambda new_contents:
|
||||||
self.failUnlessEqual(new_contents, self.CONTENTS))
|
self.failUnlessEqual(new_contents, self.CONTENTS))
|
||||||
return d1
|
return d1
|
||||||
@ -687,7 +699,7 @@ class Roundtrip(unittest.TestCase):
|
|||||||
return self.shouldFail(NotEnoughSharesError,
|
return self.shouldFail(NotEnoughSharesError,
|
||||||
"_corrupt_all(offset=%s)" % (offset,),
|
"_corrupt_all(offset=%s)" % (offset,),
|
||||||
substring,
|
substring,
|
||||||
self._fn.download_to_data)
|
self._fn.download_best_version)
|
||||||
d.addCallback(_do_retrieve)
|
d.addCallback(_do_retrieve)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
@ -797,7 +809,7 @@ class Roundtrip(unittest.TestCase):
|
|||||||
def _do_retrieve(servermap):
|
def _do_retrieve(servermap):
|
||||||
ver = servermap.best_recoverable_version()
|
ver = servermap.best_recoverable_version()
|
||||||
self.failUnless(ver)
|
self.failUnless(ver)
|
||||||
return self._fn.download_to_data()
|
return self._fn.download_best_version()
|
||||||
d.addCallback(_do_retrieve)
|
d.addCallback(_do_retrieve)
|
||||||
d.addCallback(lambda new_contents:
|
d.addCallback(lambda new_contents:
|
||||||
self.failUnlessEqual(new_contents, self.CONTENTS))
|
self.failUnlessEqual(new_contents, self.CONTENTS))
|
||||||
@ -807,7 +819,7 @@ class Roundtrip(unittest.TestCase):
|
|||||||
corrupt(None, self._storage, "signature")
|
corrupt(None, self._storage, "signature")
|
||||||
d = self.shouldFail(UnrecoverableFileError, "test_download_anyway",
|
d = self.shouldFail(UnrecoverableFileError, "test_download_anyway",
|
||||||
"no recoverable versions",
|
"no recoverable versions",
|
||||||
self._fn.download_to_data)
|
self._fn.download_best_version)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
||||||
@ -948,7 +960,7 @@ class MultipleEncodings(unittest.TestCase):
|
|||||||
self._client._storage._sequence = new_sequence
|
self._client._storage._sequence = new_sequence
|
||||||
log.msg("merge done")
|
log.msg("merge done")
|
||||||
d.addCallback(_merge)
|
d.addCallback(_merge)
|
||||||
d.addCallback(lambda res: fn3.download_to_data())
|
d.addCallback(lambda res: fn3.download_best_version())
|
||||||
def _retrieved(new_contents):
|
def _retrieved(new_contents):
|
||||||
# the current specified behavior is "first version recoverable"
|
# the current specified behavior is "first version recoverable"
|
||||||
self.failUnlessEqual(new_contents, contents1)
|
self.failUnlessEqual(new_contents, contents1)
|
||||||
|
@ -692,7 +692,7 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
|
|||||||
# contents. This allows it to use the cached pubkey and maybe the
|
# contents. This allows it to use the cached pubkey and maybe the
|
||||||
# latest-known sharemap.
|
# latest-known sharemap.
|
||||||
|
|
||||||
d.addCallback(lambda res: self._mutable_node_1.download_to_data())
|
d.addCallback(lambda res: self._mutable_node_1.download_best_version())
|
||||||
def _check_download_1(res):
|
def _check_download_1(res):
|
||||||
self.failUnlessEqual(res, DATA)
|
self.failUnlessEqual(res, DATA)
|
||||||
# now we see if we can retrieve the data from a new node,
|
# now we see if we can retrieve the data from a new node,
|
||||||
@ -701,7 +701,7 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
|
|||||||
uri = self._mutable_node_1.get_uri()
|
uri = self._mutable_node_1.get_uri()
|
||||||
log.msg("starting retrieve1")
|
log.msg("starting retrieve1")
|
||||||
newnode = self.clients[0].create_node_from_uri(uri)
|
newnode = self.clients[0].create_node_from_uri(uri)
|
||||||
return newnode.download_to_data()
|
return newnode.download_best_version()
|
||||||
d.addCallback(_check_download_1)
|
d.addCallback(_check_download_1)
|
||||||
|
|
||||||
def _check_download_2(res):
|
def _check_download_2(res):
|
||||||
@ -710,7 +710,7 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
|
|||||||
uri = self._mutable_node_1.get_uri()
|
uri = self._mutable_node_1.get_uri()
|
||||||
newnode = self.clients[1].create_node_from_uri(uri)
|
newnode = self.clients[1].create_node_from_uri(uri)
|
||||||
log.msg("starting retrieve2")
|
log.msg("starting retrieve2")
|
||||||
d1 = newnode.download_to_data()
|
d1 = newnode.download_best_version()
|
||||||
d1.addCallback(lambda res: (res, newnode))
|
d1.addCallback(lambda res: (res, newnode))
|
||||||
return d1
|
return d1
|
||||||
d.addCallback(_check_download_2)
|
d.addCallback(_check_download_2)
|
||||||
@ -719,8 +719,8 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
|
|||||||
self.failUnlessEqual(res, DATA)
|
self.failUnlessEqual(res, DATA)
|
||||||
# replace the data
|
# replace the data
|
||||||
log.msg("starting replace1")
|
log.msg("starting replace1")
|
||||||
d1 = newnode.update(NEWDATA)
|
d1 = newnode.overwrite(NEWDATA)
|
||||||
d1.addCallback(lambda res: newnode.download_to_data())
|
d1.addCallback(lambda res: newnode.download_best_version())
|
||||||
return d1
|
return d1
|
||||||
d.addCallback(_check_download_3)
|
d.addCallback(_check_download_3)
|
||||||
|
|
||||||
@ -734,7 +734,7 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
|
|||||||
self._newnode3 = self.clients[3].create_node_from_uri(uri)
|
self._newnode3 = self.clients[3].create_node_from_uri(uri)
|
||||||
log.msg("starting replace2")
|
log.msg("starting replace2")
|
||||||
d1 = newnode1.overwrite(NEWERDATA)
|
d1 = newnode1.overwrite(NEWERDATA)
|
||||||
d1.addCallback(lambda res: newnode2.download_to_data())
|
d1.addCallback(lambda res: newnode2.download_best_version())
|
||||||
return d1
|
return d1
|
||||||
d.addCallback(_check_download_4)
|
d.addCallback(_check_download_4)
|
||||||
|
|
||||||
@ -797,14 +797,14 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
|
|||||||
# pubkey mangling
|
# pubkey mangling
|
||||||
d.addCallback(_corrupt_shares)
|
d.addCallback(_corrupt_shares)
|
||||||
|
|
||||||
d.addCallback(lambda res: self._newnode3.download_to_data())
|
d.addCallback(lambda res: self._newnode3.download_best_version())
|
||||||
d.addCallback(_check_download_5)
|
d.addCallback(_check_download_5)
|
||||||
|
|
||||||
def _check_empty_file(res):
|
def _check_empty_file(res):
|
||||||
# make sure we can create empty files, this usually screws up the
|
# make sure we can create empty files, this usually screws up the
|
||||||
# segsize math
|
# segsize math
|
||||||
d1 = self.clients[2].create_mutable_file("")
|
d1 = self.clients[2].create_mutable_file("")
|
||||||
d1.addCallback(lambda newnode: newnode.download_to_data())
|
d1.addCallback(lambda newnode: newnode.download_best_version())
|
||||||
d1.addCallback(lambda res: self.failUnlessEqual("", res))
|
d1.addCallback(lambda res: self.failUnlessEqual("", res))
|
||||||
return d1
|
return d1
|
||||||
d.addCallback(_check_empty_file)
|
d.addCallback(_check_empty_file)
|
||||||
|
@ -985,6 +985,15 @@ class Web(WebMixin, unittest.TestCase):
|
|||||||
d.addCallback(_check)
|
d.addCallback(_check)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
def failUnlessMutableChildContentsAre(self, node, name, expected_contents):
|
||||||
|
assert isinstance(name, unicode)
|
||||||
|
d = node.get_child_at_path(name)
|
||||||
|
d.addCallback(lambda node: node.download_best_version())
|
||||||
|
def _check(contents):
|
||||||
|
self.failUnlessEqual(contents, expected_contents)
|
||||||
|
d.addCallback(_check)
|
||||||
|
return d
|
||||||
|
|
||||||
def failUnlessChildURIIs(self, node, name, expected_uri):
|
def failUnlessChildURIIs(self, node, name, expected_uri):
|
||||||
assert isinstance(name, unicode)
|
assert isinstance(name, unicode)
|
||||||
d = node.get_child_at_path(name)
|
d = node.get_child_at_path(name)
|
||||||
@ -1152,7 +1161,7 @@ class Web(WebMixin, unittest.TestCase):
|
|||||||
self.failUnless(IMutableFileURI.providedBy(u))
|
self.failUnless(IMutableFileURI.providedBy(u))
|
||||||
self.failUnless(u.storage_index in FakeMutableFileNode.all_contents)
|
self.failUnless(u.storage_index in FakeMutableFileNode.all_contents)
|
||||||
n = self.s.create_node_from_uri(new_uri)
|
n = self.s.create_node_from_uri(new_uri)
|
||||||
return n.download_to_data()
|
return n.download_best_version()
|
||||||
d.addCallback(_check)
|
d.addCallback(_check)
|
||||||
def _check2(data):
|
def _check2(data):
|
||||||
self.failUnlessEqual(data, self.NEWFILE_CONTENTS)
|
self.failUnlessEqual(data, self.NEWFILE_CONTENTS)
|
||||||
@ -1166,7 +1175,7 @@ class Web(WebMixin, unittest.TestCase):
|
|||||||
fn = self._foo_node
|
fn = self._foo_node
|
||||||
d.addCallback(self.failUnlessURIMatchesChild, fn, u"new.txt")
|
d.addCallback(self.failUnlessURIMatchesChild, fn, u"new.txt")
|
||||||
d.addCallback(lambda res:
|
d.addCallback(lambda res:
|
||||||
self.failUnlessChildContentsAre(fn, u"new.txt",
|
self.failUnlessMutableChildContentsAre(fn, u"new.txt",
|
||||||
self.NEWFILE_CONTENTS))
|
self.NEWFILE_CONTENTS))
|
||||||
d.addCallback(lambda res: self._foo_node.get(u"new.txt"))
|
d.addCallback(lambda res: self._foo_node.get(u"new.txt"))
|
||||||
def _got(newnode):
|
def _got(newnode):
|
||||||
@ -1184,7 +1193,7 @@ class Web(WebMixin, unittest.TestCase):
|
|||||||
file=("new.txt", NEWER_CONTENTS)))
|
file=("new.txt", NEWER_CONTENTS)))
|
||||||
d.addCallback(self.failUnlessURIMatchesChild, fn, u"new.txt")
|
d.addCallback(self.failUnlessURIMatchesChild, fn, u"new.txt")
|
||||||
d.addCallback(lambda res:
|
d.addCallback(lambda res:
|
||||||
self.failUnlessChildContentsAre(fn, u"new.txt",
|
self.failUnlessMutableChildContentsAre(fn, u"new.txt",
|
||||||
NEWER_CONTENTS))
|
NEWER_CONTENTS))
|
||||||
d.addCallback(lambda res: self._foo_node.get(u"new.txt"))
|
d.addCallback(lambda res: self._foo_node.get(u"new.txt"))
|
||||||
def _got2(newnode):
|
def _got2(newnode):
|
||||||
@ -1223,7 +1232,7 @@ class Web(WebMixin, unittest.TestCase):
|
|||||||
d.addCallback(_parse_overwrite_form_and_submit)
|
d.addCallback(_parse_overwrite_form_and_submit)
|
||||||
d.addBoth(self.shouldRedirect, urllib.quote(self.public_url + "/foo/"))
|
d.addBoth(self.shouldRedirect, urllib.quote(self.public_url + "/foo/"))
|
||||||
d.addCallback(lambda res:
|
d.addCallback(lambda res:
|
||||||
self.failUnlessChildContentsAre(fn, u"new.txt",
|
self.failUnlessMutableChildContentsAre(fn, u"new.txt",
|
||||||
EVEN_NEWER_CONTENTS))
|
EVEN_NEWER_CONTENTS))
|
||||||
d.addCallback(lambda res: self._foo_node.get(u"new.txt"))
|
d.addCallback(lambda res: self._foo_node.get(u"new.txt"))
|
||||||
def _got3(newnode):
|
def _got3(newnode):
|
||||||
@ -1735,7 +1744,7 @@ class Web(WebMixin, unittest.TestCase):
|
|||||||
self.failUnless(IMutableFileURI.providedBy(u))
|
self.failUnless(IMutableFileURI.providedBy(u))
|
||||||
self.failUnless(u.storage_index in FakeMutableFileNode.all_contents)
|
self.failUnless(u.storage_index in FakeMutableFileNode.all_contents)
|
||||||
n = self.s.create_node_from_uri(uri)
|
n = self.s.create_node_from_uri(uri)
|
||||||
return n.download_to_data()
|
return n.download_best_version()
|
||||||
d.addCallback(_check_mutable)
|
d.addCallback(_check_mutable)
|
||||||
def _check2_mutable(data):
|
def _check2_mutable(data):
|
||||||
self.failUnlessEqual(data, file_contents)
|
self.failUnlessEqual(data, file_contents)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user