mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-04-16 15:08:58 +00:00
mutable publish: if we are surprised by shares that match what we would have written anyways, don't be surprised. This should fix one of the two #546 problems, in which we re-use a server and forget that we already sent them a share.
This commit is contained in:
parent
d51c6fa4e7
commit
278c47b9bd
@ -386,13 +386,14 @@ class Publish:
|
||||
i = 0
|
||||
for shnum in homeless_shares:
|
||||
(ignored1, ignored2, peerid, ss) = peerlist[i]
|
||||
# TODO: if we are forced to send a share to a server that already
|
||||
# has one, we may have two write requests in flight, and the
|
||||
# if we are forced to send a share to a server that already has
|
||||
# one, we may have two write requests in flight, and the
|
||||
# servermap (which was computed before either request was sent)
|
||||
# won't reflect the new shares, so the second response will cause
|
||||
# us to be surprised ("unexpected share on peer"), causing the
|
||||
# publish to fail with an UncoordinatedWriteError. This is
|
||||
# troublesome but not really a bit problem. Fix it at some point.
|
||||
# won't reflect the new shares, so the second response will be
|
||||
# surprising. There is code in _got_write_answer() to tolerate
|
||||
# this, otherwise it would cause the publish to fail with an
|
||||
# UncoordinatedWriteError. See #546 for details of the trouble
|
||||
# this used to cause.
|
||||
self.goal.add( (peerid, shnum) )
|
||||
self.connections[peerid] = ss
|
||||
i += 1
|
||||
@ -483,6 +484,7 @@ class Publish:
|
||||
root_hash = share_hash_tree[0]
|
||||
assert len(root_hash) == 32
|
||||
self.log("my new root_hash is %s" % base32.b2a(root_hash))
|
||||
self._new_version_info = (self._new_seqnum, root_hash, self.salt)
|
||||
|
||||
prefix = pack_prefix(self._new_seqnum, root_hash, self.salt,
|
||||
self.required_shares, self.total_shares,
|
||||
@ -672,7 +674,67 @@ class Publish:
|
||||
wrote, read_data = answer
|
||||
|
||||
surprise_shares = set(read_data.keys()) - set(shnums)
|
||||
if surprise_shares:
|
||||
|
||||
surprised = False
|
||||
for shnum in surprise_shares:
|
||||
# read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX)
|
||||
checkstring = read_data[shnum][0]
|
||||
their_version_info = unpack_checkstring(checkstring)
|
||||
if their_version_info == self._new_version_info:
|
||||
# they have the right share, somehow
|
||||
|
||||
if (peerid,shnum) in self.goal:
|
||||
# and we want them to have it, so we probably sent them a
|
||||
# copy in an earlier write. This is ok, and avoids the
|
||||
# #546 problem.
|
||||
continue
|
||||
|
||||
# They aren't in our goal, but they are still for the right
|
||||
# version. Somebody else wrote them, and it's a convergent
|
||||
# uncoordinated write. Pretend this is ok (don't be
|
||||
# surprised), since I suspect there's a decent chance that
|
||||
# we'll hit this in normal operation.
|
||||
continue
|
||||
|
||||
else:
|
||||
# the new shares are of a different version
|
||||
if peerid in self._servermap.reachable_peers:
|
||||
# we asked them about their shares, so we had knowledge
|
||||
# of what they used to have. Any surprising shares must
|
||||
# have come from someone else, so UCW.
|
||||
surprised = True
|
||||
else:
|
||||
# we didn't ask them, and now we've discovered that they
|
||||
# have a share we didn't know about. This indicates that
|
||||
# mapupdate should have wokred harder and asked more
|
||||
# servers before concluding that it knew about them all.
|
||||
|
||||
# signal UCW, but make sure to ask this peer next time,
|
||||
# so we'll remember to update it if/when we retry.
|
||||
surprised = True
|
||||
# TODO: ask this peer next time. I don't yet have a good
|
||||
# way to do this. Two insufficient possibilities are:
|
||||
#
|
||||
# self._servermap.add_new_share(peerid, shnum, verinfo, now)
|
||||
# but that requires fetching/validating/parsing the whole
|
||||
# version string, and all we have is the checkstring
|
||||
# self._servermap.mark_bad_share(peerid, shnum, checkstring)
|
||||
# that will make publish overwrite the share next time,
|
||||
# but it won't re-query the server, and it won't make
|
||||
# mapupdate search further
|
||||
|
||||
# TODO later: when publish starts, do
|
||||
# servermap.get_best_version(), extract the seqnum,
|
||||
# subtract one, and store as highest-replaceable-seqnum.
|
||||
# Then, if this surprise-because-we-didn't-ask share is
|
||||
# of highest-replaceable-seqnum or lower, we're allowed
|
||||
# to replace it: send out a new writev (or rather add it
|
||||
# to self.goal and loop).
|
||||
pass
|
||||
|
||||
surprised = True
|
||||
|
||||
if surprised:
|
||||
self.log("they had shares %s that we didn't know about" %
|
||||
(list(surprise_shares),),
|
||||
parent=lp, level=log.WEIRD, umid="un9CSQ")
|
||||
|
@ -1959,6 +1959,38 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
|
||||
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
|
||||
return d
|
||||
|
||||
def test_bad_server_overlap(self):
|
||||
# like test_bad_server, but with no extra unused servers to fall back
|
||||
# upon. This means that we must re-use a server which we've already
|
||||
# used. If we don't remember the fact that we sent them one share
|
||||
# already, we'll mistakenly think we're experiencing an
|
||||
# UncoordinatedWriteError.
|
||||
|
||||
# Break one server, then create the file: the initial publish should
|
||||
# complete with an alternate server. Breaking a second server should
|
||||
# not prevent an update from succeeding either.
|
||||
basedir = os.path.join("mutable/CollidingWrites/test_bad_server")
|
||||
self.client = LessFakeClient(basedir, 10)
|
||||
|
||||
peerids = sorted(self.client._connections.keys())
|
||||
self.client._connections[peerids[0]].broken = True
|
||||
|
||||
d = self.client.create_mutable_file("contents 1")
|
||||
def _created(n):
|
||||
d = n.download_best_version()
|
||||
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
|
||||
# now break one of the remaining servers
|
||||
def _break_second_server(res):
|
||||
self.client._connections[peerids[1]].broken = True
|
||||
d.addCallback(_break_second_server)
|
||||
d.addCallback(lambda res: n.overwrite("contents 2"))
|
||||
# that ought to work too
|
||||
d.addCallback(lambda res: n.download_best_version())
|
||||
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
|
||||
return d
|
||||
d.addCallback(_created)
|
||||
return d
|
||||
|
||||
def test_publish_all_servers_bad(self):
|
||||
# Break all servers: the publish should fail
|
||||
basedir = os.path.join("mutable/CollidingWrites/publish_all_servers_bad")
|
||||
|
Loading…
x
Reference in New Issue
Block a user