mutable.py: log more information during publish, specifically the sharemap, and the reason for an UncoordinatedWriteError

This commit is contained in:
Brian Warner 2008-01-10 22:16:23 -07:00
parent 0af1a9e1c5
commit 689c71f946

View File

@ -716,16 +716,16 @@ class Publish:
num = self._node._client.log("Publish(%s): starting" % prefix)
self._log_number = num
def log(self, msg, parent=None, **kwargs):
prefix = self._log_prefix
if parent is None:
parent = self._log_number
num = self._node._client.log("Publish(%s): %s" % (prefix, msg),
parent=parent, **kwargs)
def log(self, *args, **kwargs):
if 'parent' not in kwargs:
kwargs['parent'] = self._log_number
num = log.msg(*args, **kwargs)
return num
def log_err(self, f):
num = log.err(f, parent=self._log_number)
def log_err(self, *args, **kwargs):
if 'parent' not in kwargs:
kwargs['parent'] = self._log_number
num = log.err(*args, **kwargs)
return num
def publish(self, newdata, wait_for_numpeers=None):
@ -881,14 +881,14 @@ class Publish:
def _got_query_results(self, datavs, peerid, permutedid,
reachable_peers, current_share_peers):
lp = self.log("_got_query_results from %s" %
idlib.shortnodeid_b2a(peerid))
lp = self.log(format="_got_query_results from %(peerid)s",
peerid=idlib.shortnodeid_b2a(peerid))
assert isinstance(datavs, dict)
reachable_peers[peerid] = permutedid
if not datavs:
self.log("peer has no shares", parent=lp)
for shnum, datav in datavs.items():
self.log("peer has shnum %d" % shnum, parent=lp)
lp2 = self.log("peer has shnum %d" % shnum, parent=lp)
assert len(datav) == 1
data = datav[0]
# We want (seqnum, root_hash, IV) from all servers to know what
@ -907,10 +907,13 @@ class Publish:
# self._pubkey is present because we require read-before-replace
valid = self._pubkey.verify(prefix, signature)
if not valid:
self.log("bad signature from %s shnum %d" %
(shnum, idlib.shortnodeid_b2a(peerid)),
parent=lp, level=log.WEIRD)
self.log(format="bad signature from %(peerid)s shnum %(shnum)d",
peerid=idlib.shortnodeid_b2a(peerid), shnum=shnum,
parent=lp2, level=log.WEIRD)
continue
self.log(format="peer has goodsig shnum %(shnum)d seqnum %(seqnum)d",
shnum=shnum, seqnum=seqnum,
parent=lp2, level=log.NOISY)
share = (shnum, seqnum, root_hash)
current_share_peers.add(shnum, (peerid, seqnum, root_hash) )
@ -978,6 +981,19 @@ class Publish:
# TODO: 2: move those shares instead of copying them, to reduce future
# update work
# if log.recording_noisy
logmsg = []
for shnum in range(total_shares):
logmsg2 = []
for oldplace in current_share_peers.get(shnum, []):
(peerid, seqnum, R) = oldplace
logmsg2.append("%s:#%d:R=%s" % (idlib.shortnodeid_b2a(peerid),
seqnum, idlib.b2a(R)[:4]))
logmsg.append("sh%d on (%s)" % (shnum, "/".join(logmsg2)))
self.log("sharemap: %s" % (", ".join(logmsg)), level=log.NOISY)
self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
level=log.NOISY)
shares_needing_homes = range(total_shares)
target_map = DictOfSets() # maps shnum to set((peerid,oldseqnum,oldR))
shares_per_peer = DictOfSets()
@ -985,7 +1001,13 @@ class Publish:
for oldplace in current_share_peers.get(shnum, []):
(peerid, seqnum, R) = oldplace
if seqnum >= self._new_seqnum:
raise UncoordinatedWriteError()
self.log("somebody has a newer sequence number than what we were uploading",
level=log.WEIRD)
self.log(format="peerid=%(peerid)s, theirs=%(seqnum)d, mine=%(new_seqnum)d",
peerid=idlib.shortnodeid_b2a(peerid),
seqnum=seqnum,
new_seqnum=self._new_seqnum)
raise UncoordinatedWriteError("somebody has a newer sequence number than us")
target_map.add(shnum, oldplace)
shares_per_peer.add(peerid, shnum)
if shnum in shares_needing_homes:
@ -1224,7 +1246,8 @@ class Publish:
def _got_write_answer(self, answer, tw_vectors, my_checkstring,
peerid, expected_old_shares,
dispatch_map):
self.log("_got_write_answer from %s" % idlib.shortnodeid_b2a(peerid))
lp = self.log("_got_write_answer from %s" %
idlib.shortnodeid_b2a(peerid))
wrote, read_data = answer
surprised = False
@ -1235,6 +1258,8 @@ class Publish:
dispatch_map.add(shnum, (peerid, new_seqnum, new_root_hash))
else:
# surprise! our testv failed, so the write did not happen
self.log("our testv failed, that write did not happen",
parent=lp, level=log.WEIRD)
surprised = True
for shnum, (old_cs,) in read_data.items():
@ -1245,12 +1270,21 @@ class Publish:
if shnum not in expected_old_shares:
# surprise! there was a share we didn't know about
self.log("they had share %d that we didn't know about" % shnum,
parent=lp, level=log.WEIRD)
surprised = True
else:
seqnum, root_hash = expected_old_shares[shnum]
if seqnum is not None:
if seqnum != old_seqnum or root_hash != old_root_hash:
# surprise! somebody modified the share on us
self.log("somebody modified the share on us:"
" shnum=%d: I thought they had #%d:R=%s,"
" but testv reported #%d:R=%s" %
(shnum,
seqnum, idlib.b2a(root_hash)[:4],
old_seqnum, idlib.b2a(old_root_hash)[:4]),
parent=lp, level=log.WEIRD)
surprised = True
if surprised:
self._surprised = True
@ -1261,14 +1295,17 @@ class Publish:
seqnum,
idlib.b2a(root_hash)[:4])
for (peerid,seqnum,root_hash) in places]
self.log(" share %d sent to: %s" % (shnum, sent_to))
self.log(" share %d sent to: %s" % (shnum, sent_to),
level=log.NOISY)
def _maybe_recover(self, (surprised, dispatch_map)):
self.log("_maybe_recover, surprised=%s, dispatch_map:" % surprised)
self.log("_maybe_recover, surprised=%s, dispatch_map:" % surprised,
level=log.NOISY)
self._log_dispatch_map(dispatch_map)
if not surprised:
self.log(" no recovery needed")
return
self.log("We need recovery!", level=log.WEIRD)
print "RECOVERY NOT YET IMPLEMENTED"
# but dispatch_map will help us do it
raise UncoordinatedWriteError("I was surprised!")