mutable: test write failures, uncoordinated write detection

This commit is contained in:
Brian Warner 2008-04-22 11:49:53 -07:00
parent 7e1f9761e8
commit e6074f5dfc
4 changed files with 263 additions and 87 deletions

View File

@ -30,6 +30,10 @@ class UncoordinatedWriteError(Exception):
class UnrecoverableFileError(Exception):
pass
class NotEnoughServersError(Exception):
"""There were not enough functioning servers available to place shares
upon."""
class CorruptShareError(Exception):
def __init__(self, peerid, shnum, reason):
self.args = (peerid, shnum, reason)

View File

@ -101,23 +101,24 @@ class MutableFileNode:
contents. Returns a Deferred that fires (with the MutableFileNode
instance you should use) when it completes.
"""
self._required_shares, self._total_shares = self.DEFAULT_ENCODING
d = defer.maybeDeferred(self._generate_pubprivkeys, keypair_generator)
def _generated( (pubkey, privkey) ):
self._pubkey, self._privkey = pubkey, privkey
pubkey_s = self._pubkey.serialize()
privkey_s = self._privkey.serialize()
self._writekey = hashutil.ssk_writekey_hash(privkey_s)
self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
self._readkey = self._uri.readkey
self._storage_index = self._uri.storage_index
return self._upload(initial_contents, None)
d.addCallback(_generated)
d.addCallback(self._generated)
d.addCallback(lambda res: self._upload(initial_contents, None))
return d
def _generated(self, (pubkey, privkey) ):
self._pubkey, self._privkey = pubkey, privkey
pubkey_s = self._pubkey.serialize()
privkey_s = self._privkey.serialize()
self._writekey = hashutil.ssk_writekey_hash(privkey_s)
self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
self._readkey = self._uri.readkey
self._storage_index = self._uri.storage_index
self._required_shares, self._total_shares = self.DEFAULT_ENCODING
def _generate_pubprivkeys(self, keypair_generator):
if keypair_generator:
return keypair_generator(self.SIGNATURE_KEY_SIZE)

View File

@ -11,7 +11,8 @@ from allmydata import hashtree, codec, storage
from pycryptopp.cipher.aes import AES
from foolscap.eventual import eventually
from common import MODE_WRITE, UncoordinatedWriteError, DictOfSets
from common import MODE_WRITE, DictOfSets, \
UncoordinatedWriteError, NotEnoughServersError
from servermap import ServerMap
from layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \
unpack_checkstring, SIGNED_PREFIX
@ -110,16 +111,14 @@ class Publish:
self._status.set_progress(0.0)
self._status.set_active(True)
def get_status(self):
return self._status
def log(self, *args, **kwargs):
if 'parent' not in kwargs:
kwargs['parent'] = self._log_number
return log.msg(*args, **kwargs)
def log_err(self, *args, **kwargs):
if 'parent' not in kwargs:
kwargs['parent'] = self._log_number
return log.err(*args, **kwargs)
def publish(self, newdata):
"""Publish the filenode's current contents. Returns a Deferred that
fires (with None) when the publish has done as much work as it's ever
@ -191,8 +190,16 @@ class Publish:
self.setup_encoding_parameters()
# if we experience any surprises (writes which were rejected because
# our test vector did not match, or shares which we didn't expect to
# see), we set this flag and report an UncoordinatedWriteError at the
# end of the publish process.
self.surprised = False
# as a failsafe, refuse to iterate through self.loop more than a
# thousand times.
self.looplimit = 1000
# we keep track of three tables. The first is our goal: which share
# we want to see on which servers. This is initially populated by the
# existing servermap.
@ -264,19 +271,28 @@ class Publish:
self.log("entering loop", level=log.NOISY)
if not self._running:
return
self.update_goal()
# how far are we from our goal?
needed = self.goal - self.placed - self.outstanding
self._update_status()
if needed:
# we need to send out new shares
self.log(format="need to send %(needed)d new shares",
needed=len(needed), level=log.NOISY)
d = self._send_shares(needed)
d.addCallback(self.loop)
d.addErrback(self._fatal_error)
return
self.looplimit -= 1
if self.looplimit <= 0:
raise RuntimeError("loop limit exceeded")
if self.surprised:
# don't send out any new shares, just wait for the outstanding
# ones to be retired.
self.log("currently surprised, so don't send any new shares",
level=log.NOISY)
else:
self.update_goal()
# how far are we from our goal?
needed = self.goal - self.placed - self.outstanding
self._update_status()
if needed:
# we need to send out new shares
self.log(format="need to send %(needed)d new shares",
needed=len(needed), level=log.NOISY)
self._send_shares(needed)
return
if self.outstanding:
# queries are still pending, keep waiting
@ -293,9 +309,9 @@ class Publish:
self._status.timings["push"] = elapsed
return self._done(None)
def log_goal(self, goal):
logmsg = []
for (peerid, shnum) in goal:
def log_goal(self, goal, message=""):
logmsg = [message]
for (shnum, peerid) in sorted([(s,p) for (p,s) in goal]):
logmsg.append("sh%d to [%s]" % (shnum,
idlib.shortnodeid_b2a(peerid)))
self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
@ -303,6 +319,10 @@ class Publish:
level=log.NOISY)
def update_goal(self):
# if log.recording_noisy
if True:
self.log_goal(self.goal, "before update: ")
# first, remove any bad peers from our goal
self.goal = set([ (peerid, shnum)
for (peerid, shnum) in self.goal
@ -318,10 +338,6 @@ class Publish:
if not homeless_shares:
return
# if log.recording_noisy
if False:
self.log_goal(self.goal)
# if an old share X is on a node, put the new share X there too.
# TODO: 1: redistribute shares to achieve one-per-peer, by copying
# shares from existing peers to new (less-crowded) ones. The
@ -340,21 +356,35 @@ class Publish:
peerlist = []
for i, (peerid, ss) in enumerate(self.full_peerlist):
if peerid in self.bad_peers:
continue
entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
peerlist.append(entry)
peerlist.sort()
if not peerlist:
raise NotEnoughServersError("Ran out of non-bad servers")
new_assignments = []
# we then index this peerlist with an integer, because we may have to
# wrap. We update the goal as we go.
i = 0
for shnum in homeless_shares:
(ignored1, ignored2, peerid, ss) = peerlist[i]
# TODO: if we are forced to send a share to a server that already
# has one, we may have two write requests in flight, and the
# servermap (which was computed before either request was sent)
# won't reflect the new shares, so the second response will cause
# us to be surprised ("unexpected share on peer"), causing the
# publish to fail with an UncoordinatedWriteError. This is
# troublesome but not really a bit problem. Fix it at some point.
self.goal.add( (peerid, shnum) )
self.connections[peerid] = ss
i += 1
if i >= len(peerlist):
i = 0
if True:
self.log_goal(self.goal, "after update: ")
@ -564,8 +594,8 @@ class Publish:
read_vector = [(0, struct.calcsize(SIGNED_PREFIX))]
# ok, send the messages!
self.log("sending %d shares" % len(all_tw_vectors), level=log.NOISY)
started = time.time()
dl = []
for (peerid, tw_vectors) in all_tw_vectors.items():
write_enabler = self._node.get_write_enabler(peerid)
@ -574,16 +604,19 @@ class Publish:
secrets = (write_enabler, renew_secret, cancel_secret)
shnums = tw_vectors.keys()
for shnum in shnums:
self.outstanding.add( (peerid, shnum) )
d = self._do_testreadwrite(peerid, secrets,
tw_vectors, read_vector)
d.addCallbacks(self._got_write_answer, self._got_write_error,
callbackArgs=(peerid, shnums, started),
errbackArgs=(peerid, shnums, started))
d.addCallback(self.loop)
d.addErrback(self._fatal_error)
dl.append(d)
self._update_status()
return defer.DeferredList(dl, fireOnOneErrback=True) # just for testing
self.log("%d shares sent" % len(all_tw_vectors), level=log.NOISY)
def _do_testreadwrite(self, peerid, secrets,
tw_vectors, read_vector):
@ -610,7 +643,29 @@ class Publish:
wrote, read_data = answer
surprise_shares = set(read_data.keys()) - set(shnums)
if surprise_shares:
self.log("they had shares %s that we didn't know about" %
(list(surprise_shares),),
parent=lp, level=log.WEIRD)
self.surprised = True
if not wrote:
# TODO: there are two possibilities. The first is that the server
# is full (or just doesn't want to give us any room), which means
# we shouldn't ask them again, but is *not* an indication of an
# uncoordinated write. The second is that our testv failed, which
# *does* indicate an uncoordinated write. We currently don't have
# a way to tell these two apart (in fact, the storage server code
# doesn't have the option of refusing our share).
#
# If the server is full, mark the peer as bad (so we don't ask
# them again), but don't set self.surprised. The loop() will find
# a new server.
#
# If the testv failed, log it, set self.surprised, but don't
# bother adding to self.bad_peers .
self.log("our testv failed, so the write did not happen",
parent=lp, level=log.WEIRD)
self.surprised = True
@ -623,15 +678,19 @@ class Publish:
other_salt) = unpack_checkstring(checkstring)
expected_version = self._servermap.version_on_peer(peerid,
shnum)
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = expected_version
self.log("somebody modified the share on us:"
" shnum=%d: I thought they had #%d:R=%s,"
" but testv reported #%d:R=%s" %
(shnum,
seqnum, base32.b2a(root_hash)[:4],
other_seqnum, base32.b2a(other_roothash)[:4]),
parent=lp, level=log.NOISY)
if expected_version:
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = expected_version
self.log("somebody modified the share on us:"
" shnum=%d: I thought they had #%d:R=%s,"
" but testv reported #%d:R=%s" %
(shnum,
seqnum, base32.b2a(root_hash)[:4],
other_seqnum, base32.b2a(other_roothash)[:4]),
parent=lp, level=log.NOISY)
# if expected_version==None, then we didn't expect to see a
# share on that peer, and the 'surprise_shares' clause above
# will have logged it.
# self.loop() will take care of finding new homes
return
@ -641,14 +700,6 @@ class Publish:
self._servermap.add_new_share(peerid, shnum,
self.versioninfo, started)
surprise_shares = set(read_data.keys()) - set(shnums)
if surprise_shares:
self.log("they had shares %s that we didn't know about" %
(list(surprise_shares),),
parent=lp, level=log.WEIRD)
self.surprised = True
return
# self.loop() will take care of checking to see if we're done
return
@ -664,28 +715,6 @@ class Publish:
return
def _log_dispatch_map(self, dispatch_map):
for shnum, places in dispatch_map.items():
sent_to = [(idlib.shortnodeid_b2a(peerid),
seqnum,
base32.b2a(root_hash)[:4])
for (peerid,seqnum,root_hash) in places]
self.log(" share %d sent to: %s" % (shnum, sent_to),
level=log.NOISY)
def _maybe_recover(self, (surprised, dispatch_map)):
self.log("_maybe_recover, surprised=%s, dispatch_map:" % surprised,
level=log.NOISY)
self._log_dispatch_map(dispatch_map)
if not surprised:
self.log(" no recovery needed")
return
self.log("We need recovery!", level=log.WEIRD)
print "RECOVERY NOT YET IMPLEMENTED"
# but dispatch_map will help us do it
raise UncoordinatedWriteError("I was surprised!")
def _done(self, res):
if not self._running:
return
@ -694,14 +723,18 @@ class Publish:
self._status.timings["total"] = now - self._started
self._status.set_active(False)
if isinstance(res, failure.Failure):
self.log("Retrieve done, with failure", failure=res)
self.log("Publish done, with failure", failure=res, level=log.WEIRD)
self._status.set_status("Failed")
elif self.surprised:
self.log("Publish done, UncoordinatedWriteError", level=log.UNUSUAL)
self._status.set_status("UncoordinatedWriteError")
# deliver a failure
res = failure.Failure(UncoordinatedWriteError())
# TODO: recovery
else:
self.log("Publish done, success")
self._status.set_status("Done")
self._status.set_progress(1.0)
eventually(self.done_deferred.callback, res)
def get_status(self):
return self._status

View File

@ -1,12 +1,13 @@
import struct
import os, struct
from cStringIO import StringIO
from twisted.trial import unittest
from twisted.internet import defer, reactor
from allmydata import uri, download
from allmydata.util import base32, testutil
from allmydata import uri, download, storage
from allmydata.util import base32, testutil, idlib
from allmydata.util.idlib import shortnodeid_b2a
from allmydata.util.hashutil import tagged_hash
from allmydata.util.fileutil import make_dirs
from allmydata.encode import NotEnoughSharesError
from allmydata.interfaces import IURI, IMutableFileURI, IUploadable
from foolscap.eventual import eventually, fireEventually
@ -1331,3 +1332,140 @@ class Exceptions(unittest.TestCase):
ucwe = UncoordinatedWriteError()
self.failUnless("UncoordinatedWriteError" in repr(ucwe), repr(ucwe))
# we can't do this test with a FakeClient, since it uses FakeStorageServer
# instances which always succeed. So we need a less-fake one.
class IntentionalError(Exception):
pass
class LocalWrapper:
def __init__(self, original):
self.original = original
self.broken = False
def callRemote(self, methname, *args, **kwargs):
def _call():
if self.broken:
raise IntentionalError("I was asked to break")
meth = getattr(self.original, "remote_" + methname)
return meth(*args, **kwargs)
d = fireEventually()
d.addCallback(lambda res: _call())
return d
class LessFakeClient(FakeClient):
def __init__(self, basedir, num_peers=10):
self._num_peers = num_peers
self._peerids = [tagged_hash("peerid", "%d" % i)[:20]
for i in range(self._num_peers)]
self._connections = {}
for peerid in self._peerids:
peerdir = os.path.join(basedir, idlib.shortnodeid_b2a(peerid))
make_dirs(peerdir)
ss = storage.StorageServer(peerdir)
ss.setNodeID(peerid)
lw = LocalWrapper(ss)
self._connections[peerid] = lw
self.nodeid = "fakenodeid"
class Problems(unittest.TestCase, testutil.ShouldFailMixin):
def test_surprise(self):
basedir = os.path.join("mutable/CollidingWrites/test_surprise")
self.client = LessFakeClient(basedir)
d = self.client.create_mutable_file("contents 1")
def _created(n):
d = defer.succeed(None)
d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
def _got_smap1(smap):
# stash the old state of the file
self.old_map = smap
d.addCallback(_got_smap1)
# then modify the file, leaving the old map untouched
d.addCallback(lambda res: log.msg("starting winning write"))
d.addCallback(lambda res: n.overwrite("contents 2"))
# now attempt to modify the file with the old servermap. This
# will look just like an uncoordinated write, in which every
# single share got updated between our mapupdate and our publish
d.addCallback(lambda res: log.msg("starting doomed write"))
d.addCallback(lambda res:
self.shouldFail(UncoordinatedWriteError,
"test_surprise", None,
n.upload,
"contents 2a", self.old_map))
return d
d.addCallback(_created)
return d
def test_unexpected_shares(self):
# upload the file, take a servermap, shut down one of the servers,
# upload it again (causing shares to appear on a new server), then
# upload using the old servermap. The last upload should fail with an
# UncoordinatedWriteError, because of the shares that didn't appear
# in the servermap.
basedir = os.path.join("mutable/CollidingWrites/test_unexpexted_shares")
self.client = LessFakeClient(basedir)
d = self.client.create_mutable_file("contents 1")
def _created(n):
d = defer.succeed(None)
d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
def _got_smap1(smap):
# stash the old state of the file
self.old_map = smap
# now shut down one of the servers
peer0 = list(smap.make_sharemap()[0])[0]
self.client._connections.pop(peer0)
# then modify the file, leaving the old map untouched
log.msg("starting winning write")
return n.overwrite("contents 2")
d.addCallback(_got_smap1)
# now attempt to modify the file with the old servermap. This
# will look just like an uncoordinated write, in which every
# single share got updated between our mapupdate and our publish
d.addCallback(lambda res: log.msg("starting doomed write"))
d.addCallback(lambda res:
self.shouldFail(UncoordinatedWriteError,
"test_surprise", None,
n.upload,
"contents 2a", self.old_map))
return d
d.addCallback(_created)
return d
def test_bad_server(self):
# Break one server, then create the file: the initial publish should
# complete with an alternate server. Breaking a second server should
# not prevent an update from succeeding either.
basedir = os.path.join("mutable/CollidingWrites/test_bad_server")
self.client = LessFakeClient(basedir, 20)
# to make sure that one of the initial peers is broken, we have to
# get creative. We create the keys, so we can figure out the storage
# index, but we hold off on doing the initial publish until we've
# broken the server on which the first share wants to be stored.
n = FastMutableFileNode(self.client)
d = defer.succeed(None)
d.addCallback(n._generate_pubprivkeys)
d.addCallback(n._generated)
def _break_peer0(res):
si = n.get_storage_index()
peerlist = self.client.get_permuted_peers("storage", si)
peerid0, connection0 = peerlist[0]
peerid1, connection1 = peerlist[1]
connection0.broken = True
self.connection1 = connection1
d.addCallback(_break_peer0)
# now let the initial publish finally happen
d.addCallback(lambda res: n._upload("contents 1", None))
# that ought to work
d.addCallback(lambda res: n.download_best_version())
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
# now break the second peer
def _break_peer1(res):
self.connection1.broken = True
d.addCallback(_break_peer1)
d.addCallback(lambda res: n.overwrite("contents 2"))
# that ought to work too
d.addCallback(lambda res: n.download_best_version())
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
return d