mutable publish: track multiple servers-per-share. Fixes some of #1628.

The remaining work is to write additional tests.

src/allmydata/test/no_network.py:

 This supports tests in which servers leave the grid only to return with
 their shares intact at a later time.

src/allmydata/test/test_mutable.py:

 The UCWEs in the incident reports associated with #1628 all seem to be
 associated with shares that the servermap knows about, but which aren't
 accounted for during the publish process for whatever reason. Specifically,
 it looks like the publisher is only capable of keeping track of a single
 storage server for a given share. This makes the repair process worse than
 it was pre-MDMF at updating all of the shares of a particular file to the
 newest version, and can also cause spurious UCWEs. This test simulates such
 a layout and fails if an UCWE is thrown. We need to write another test to
 ensure that all copies of a share are updated to the latest version (or
 alter this test to do that), so that the test suite doesn't pass unless both
 regressions are fixed.

 We want the publisher to follow the existing share placement when uploading
 a new version of a mutable file, and we don't want this test to pass unless
 it does.

src/allmydata/mutable/publish.py:

 Before this commit, the publisher only kept track of a single writer for
 each share. This is insufficient to handle updates in which a single share
 may live on multiple servers. In the best case, an update will only update
 one of the existing shares instead of all of them. In some cases, the update
 will encounter the existing shares when publishing some other share,
 interpret it as a sign of an uncoordinated update, and fail. Keeping track
 of all of the writers helps ensure that all existing shares are updated, and
 helps avoid spurious uncoordinated write errors.
This commit is contained in:
Kevan Carstensen 2011-12-27 21:33:58 -08:00 committed by Brian Warner
parent ab9859d6a5
commit e29323f68f
3 changed files with 118 additions and 62 deletions

View File

@ -269,31 +269,33 @@ class Publish:
cancel_secret = self._node.get_cancel_secret(server)
secrets = (write_enabler, renew_secret, cancel_secret)
self.writers[shnum] = writer_class(shnum,
server.get_rref(),
self._storage_index,
secrets,
self._new_seqnum,
self.required_shares,
self.total_shares,
self.segment_size,
self.datalength)
self.writers[shnum].server = server
writer = writer_class(shnum,
server.get_rref(),
self._storage_index,
secrets,
self._new_seqnum,
self.required_shares,
self.total_shares,
self.segment_size,
self.datalength)
self.writers.setdefault(shnum, []).append(writer)
writer.server = server
known_shares = self._servermap.get_known_shares()
assert (server, shnum) in known_shares
old_versionid, old_timestamp = known_shares[(server,shnum)]
(old_seqnum, old_root_hash, old_salt, old_segsize,
old_datalength, old_k, old_N, old_prefix,
old_offsets_tuple) = old_versionid
self.writers[shnum].set_checkstring(old_seqnum,
old_root_hash,
old_salt)
writer.set_checkstring(old_seqnum,
old_root_hash,
old_salt)
# Our remote shares will not have a complete checkstring until
# after we are done writing share data and have started to write
# blocks. In the meantime, we need to know what to look for when
# writing, so that we can detect UncoordinatedWriteErrors.
self._checkstring = self.writers.values()[0].get_checkstring()
self._checkstring = self.writers.values()[0][0].get_checkstring()
# Now, we start pushing shares.
self._status.timings["setup"] = time.time() - self._started
@ -466,34 +468,35 @@ class Publish:
cancel_secret = self._node.get_cancel_secret(server)
secrets = (write_enabler, renew_secret, cancel_secret)
self.writers[shnum] = writer_class(shnum,
server.get_rref(),
self._storage_index,
secrets,
self._new_seqnum,
self.required_shares,
self.total_shares,
self.segment_size,
self.datalength)
self.writers[shnum].server = server
writer = writer_class(shnum,
server.get_rref(),
self._storage_index,
secrets,
self._new_seqnum,
self.required_shares,
self.total_shares,
self.segment_size,
self.datalength)
self.writers.setdefault(shnum, []).append(writer)
writer.server = server
known_shares = self._servermap.get_known_shares()
if (server, shnum) in known_shares:
old_versionid, old_timestamp = known_shares[(server,shnum)]
(old_seqnum, old_root_hash, old_salt, old_segsize,
old_datalength, old_k, old_N, old_prefix,
old_offsets_tuple) = old_versionid
self.writers[shnum].set_checkstring(old_seqnum,
old_root_hash,
old_salt)
writer.set_checkstring(old_seqnum,
old_root_hash,
old_salt)
elif (server, shnum) in self.bad_share_checkstrings:
old_checkstring = self.bad_share_checkstrings[(server, shnum)]
self.writers[shnum].set_checkstring(old_checkstring)
writer.set_checkstring(old_checkstring)
# Our remote shares will not have a complete checkstring until
# after we are done writing share data and have started to write
# blocks. In the meantime, we need to know what to look for when
# writing, so that we can detect UncoordinatedWriteErrors.
self._checkstring = self.writers.values()[0].get_checkstring()
self._checkstring = self.writers.values()[0][0].get_checkstring()
# Now, we start pushing shares.
self._status.timings["setup"] = time.time() - self._started
@ -620,7 +623,10 @@ class Publish:
# Can we still successfully publish this file?
# TODO: Keep track of outstanding queries before aborting the
# process.
if len(self.writers) < self.required_shares or self.surprised:
all_writers = []
for shnum, writers in self.writers.iteritems():
all_writers.extend(writers)
if len(all_writers) < self.required_shares or self.surprised:
return self._failure()
# Figure out what we need to do next. Each of these needs to
@ -675,8 +681,9 @@ class Publish:
salt = os.urandom(16)
assert self._version == SDMF_VERSION
for writer in self.writers.itervalues():
writer.put_salt(salt)
for shnum, writers in self.writers.iteritems():
for writer in writers:
writer.put_salt(salt)
def _encode_segment(self, segnum):
@ -751,8 +758,9 @@ class Publish:
block_hash = hashutil.block_hash(hashed)
self.blockhashes[shareid][segnum] = block_hash
# find the writer for this share
writer = self.writers[shareid]
writer.put_block(sharedata, segnum, salt)
writers = self.writers[shareid]
for writer in writers:
writer.put_block(sharedata, segnum, salt)
def push_everything_else(self):
@ -775,8 +783,9 @@ class Publish:
def push_encprivkey(self):
encprivkey = self._encprivkey
self._status.set_status("Pushing encrypted private key")
for writer in self.writers.itervalues():
writer.put_encprivkey(encprivkey)
for shnum, writers in self.writers.iteritems():
for writer in writers:
writer.put_encprivkey(encprivkey)
def push_blockhashes(self):
@ -788,8 +797,9 @@ class Publish:
# set the leaf for future use.
self.sharehash_leaves[shnum] = t[0]
writer = self.writers[shnum]
writer.put_blockhashes(self.blockhashes[shnum])
writers = self.writers[shnum]
for writer in writers:
writer.put_blockhashes(self.blockhashes[shnum])
def push_sharehashes(self):
@ -799,8 +809,9 @@ class Publish:
needed_indices = share_hash_tree.needed_hashes(shnum)
self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i])
for i in needed_indices] )
writer = self.writers[shnum]
writer.put_sharehashes(self.sharehashes[shnum])
writers = self.writers[shnum]
for writer in writers:
writer.put_sharehashes(self.sharehashes[shnum])
self.root_hash = share_hash_tree[0]
@ -811,8 +822,9 @@ class Publish:
# - Push the signature
self._status.set_status("Pushing root hashes and signature")
for shnum in xrange(self.total_shares):
writer = self.writers[shnum]
writer.put_root_hash(self.root_hash)
writers = self.writers[shnum]
for writer in writers:
writer.put_root_hash(self.root_hash)
self._update_checkstring()
self._make_and_place_signature()
@ -825,7 +837,7 @@ class Publish:
uncoordinated writes. SDMF files will have the same checkstring,
so we need not do anything.
"""
self._checkstring = self.writers.values()[0].get_checkstring()
self._checkstring = self.writers.values()[0][0].get_checkstring()
def _make_and_place_signature(self):
@ -834,11 +846,12 @@ class Publish:
"""
started = time.time()
self._status.set_status("Signing prefix")
signable = self.writers[0].get_signable()
signable = self.writers.values()[0][0].get_signable()
self.signature = self._privkey.sign(signable)
for (shnum, writer) in self.writers.iteritems():
writer.put_signature(self.signature)
for (shnum, writers) in self.writers.iteritems():
for writer in writers:
writer.put_signature(self.signature)
self._status.timings['sign'] = time.time() - started
@ -851,25 +864,26 @@ class Publish:
ds = []
verification_key = self._pubkey.serialize()
for (shnum, writer) in self.writers.copy().iteritems():
writer.put_verification_key(verification_key)
self.num_outstanding += 1
def _no_longer_outstanding(res):
self.num_outstanding -= 1
return res
for (shnum, writers) in self.writers.copy().iteritems():
for writer in writers:
writer.put_verification_key(verification_key)
self.num_outstanding += 1
def _no_longer_outstanding(res):
self.num_outstanding -= 1
return res
d = writer.finish_publishing()
d.addBoth(_no_longer_outstanding)
d.addErrback(self._connection_problem, writer)
d.addCallback(self._got_write_answer, writer, started)
ds.append(d)
d = writer.finish_publishing()
d.addBoth(_no_longer_outstanding)
d.addErrback(self._connection_problem, writer)
d.addCallback(self._got_write_answer, writer, started)
ds.append(d)
self._record_verinfo()
self._status.timings['pack'] = time.time() - started
return defer.DeferredList(ds)
def _record_verinfo(self):
self.versioninfo = self.writers.values()[0].get_verinfo()
self.versioninfo = self.writers.values()[0][0].get_verinfo()
def _connection_problem(self, f, writer):
@ -879,7 +893,7 @@ class Publish:
"""
self.log("found problem: %s" % str(f))
self._last_failure = f
del(self.writers[writer.shnum])
self.writers[writer.shnum].remove(writer)
def log_goal(self, goal, message=""):
@ -988,9 +1002,11 @@ class Publish:
# knowingly also writing to that server from other writers.
# TODO: Precompute this.
known_shnums = [x.shnum for x in self.writers.values()
if x.server == server]
surprise_shares -= set(known_shnums)
shares = []
for shnum, writers in self.writers.iteritems():
shares.extend([x.shnum for x in writers if x.server == server])
known_shnums = set(shares)
surprise_shares -= known_shnums
self.log("found the following surprise shares: %s" %
str(surprise_shares))

View File

@ -289,6 +289,7 @@ class NoNetworkGrid(service.MultiService):
del self.wrappers_by_id[serverid]
del self.proxies_by_id[serverid]
self.rebuild_serverlist()
return ss
def break_server(self, serverid):
# mark the given server as broken, so it will throw exceptions when

View File

@ -2534,6 +2534,45 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
d.addCallback(_created)
return d
def test_multiply_placed_shares(self):
self.basedir = "mutable/Problems/test_multiply_placed_shares"
self.set_up_grid()
self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['n'] = 75
nm = self.g.clients[0].nodemaker
d = nm.create_mutable_file(MutableData("contents 1"))
# remove one of the servers and reupload the file.
def _created(n):
self._node = n
servers = self.g.get_all_serverids()
self.ss = self.g.remove_server(servers[len(servers)-1])
new_server = self.g.make_server(len(servers)-1)
self.g.add_server(len(servers)-1, new_server)
return self._node.download_best_version()
d.addCallback(_created)
d.addCallback(lambda data: MutableData(data))
d.addCallback(lambda data: self._node.overwrite(data))
# restore the server we removed earlier, then download+upload
# the file again
def _overwritten(ign):
self.g.add_server(len(self.g.servers_by_number), self.ss)
return self._node.download_best_version()
d.addCallback(_overwritten)
d.addCallback(lambda data: MutableData(data))
d.addCallback(lambda data: self._node.overwrite(data))
d.addCallback(lambda ignored:
self._node.get_servermap(MODE_CHECK))
def _overwritten_again(smap):
# Make sure that all shares were updated by making sure that
# there aren't any other versions in the sharemap.
self.failUnlessEqual(len(smap.recoverable_versions()), 1)
self.failUnlessEqual(len(smap.unrecoverable_versions()), 0)
d.addCallback(_overwritten_again)
return d
def test_bad_server(self):
# Break one server, then create the file: the initial publish should
# complete with an alternate server. Breaking a second server should