mutable: call remove_advise_corrupt_share when we see share corruption in mapupdate/download/check, tolerate servers that do not implement it

This commit is contained in:
Brian Warner 2008-10-24 13:21:28 -07:00
parent db37c14ab7
commit 9f21f7cf65
4 changed files with 25 additions and 0 deletions

View File

@ -90,6 +90,8 @@ class MutableChecker:
self.bad_shares.append( (peerid, shnum, f) ) self.bad_shares.append( (peerid, shnum, f) )
prefix = data[:SIGNED_PREFIX_LENGTH] prefix = data[:SIGNED_PREFIX_LENGTH]
servermap.mark_bad_share(peerid, shnum, prefix) servermap.mark_bad_share(peerid, shnum, prefix)
ss = servermap.connections[peerid]
self.notify_server_corruption(ss, shnum, str(f.value))
def check_prefix(self, peerid, shnum, data): def check_prefix(self, peerid, shnum, data):
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix, (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
@ -135,6 +137,10 @@ class MutableChecker:
if alleged_writekey != self._node.get_writekey(): if alleged_writekey != self._node.get_writekey():
raise CorruptShareError(peerid, shnum, "invalid privkey") raise CorruptShareError(peerid, shnum, "invalid privkey")
def notify_server_corruption(self, ss, shnum, reason):
ss.callRemoteOnly("advise_corrupt_share",
"mutable", self._storage_index, shnum, reason)
def _count_shares(self, smap, version): def _count_shares(self, smap, version):
available_shares = smap.shares_available() available_shares = smap.shares_available()
(num_distinct_shares, k, N) = available_shares[version] (num_distinct_shares, k, N) = available_shares[version]

View File

@ -268,6 +268,7 @@ class Retrieve:
self.log(format="bad share: %(f_value)s", self.log(format="bad share: %(f_value)s",
f_value=str(f.value), failure=f, f_value=str(f.value), failure=f,
level=log.WEIRD, umid="7fzWZw") level=log.WEIRD, umid="7fzWZw")
self.notify_server_corruption(peerid, shnum, str(e))
self.remove_peer(peerid) self.remove_peer(peerid)
self.servermap.mark_bad_share(peerid, shnum, prefix) self.servermap.mark_bad_share(peerid, shnum, prefix)
self._bad_shares.add( (peerid, shnum) ) self._bad_shares.add( (peerid, shnum) )
@ -279,6 +280,11 @@ class Retrieve:
self._try_to_validate_privkey(datav[2], peerid, shnum, lp) self._try_to_validate_privkey(datav[2], peerid, shnum, lp)
# all done! # all done!
def notify_server_corruption(self, peerid, shnum, reason):
ss = self.servermap.connections[peerid]
ss.callRemoteOnly("advise_corrupt_share",
"mutable", self._storage_index, shnum, reason)
def _got_results_one_share(self, shnum, peerid, def _got_results_one_share(self, shnum, peerid,
got_prefix, got_hash_and_data): got_prefix, got_hash_and_data):
self.log("_got_results: got shnum #%d from peerid %s" self.log("_got_results: got shnum #%d from peerid %s"

View File

@ -575,6 +575,7 @@ class ServermapUpdater:
f = failure.Failure() f = failure.Failure()
self.log(format="bad share: %(f_value)s", f_value=str(f.value), self.log(format="bad share: %(f_value)s", f_value=str(f.value),
failure=f, parent=lp, level=log.WEIRD, umid="h5llHg") failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
self.notify_server_corruption(peerid, shnum, str(e))
self._bad_peers.add(peerid) self._bad_peers.add(peerid)
self._last_failure = f self._last_failure = f
checkstring = data[:SIGNED_PREFIX_LENGTH] checkstring = data[:SIGNED_PREFIX_LENGTH]
@ -609,6 +610,11 @@ class ServermapUpdater:
# all done! # all done!
self.log("_got_results done", parent=lp, level=log.NOISY) self.log("_got_results done", parent=lp, level=log.NOISY)
def notify_server_corruption(self, peerid, shnum, reason):
ss = self._servermap.connections[peerid]
ss.callRemoteOnly("advise_corrupt_share",
"mutable", self._storage_index, shnum, reason)
def _got_results_one_share(self, shnum, data, peerid, lp): def _got_results_one_share(self, shnum, data, peerid, lp):
self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s", self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
shnum=shnum, shnum=shnum,

View File

@ -117,6 +117,13 @@ class FakeStorageServer:
d = fireEventually() d = fireEventually()
d.addCallback(lambda res: _call()) d.addCallback(lambda res: _call())
return d return d
def callRemoteOnly(self, methname, *args, **kwargs):
d = self.callRemote(methname, *args, **kwargs)
d.addBoth(lambda ignore: None)
pass
def advise_corrupt_share(self, share_type, storage_index, shnum, reason):
pass
def slot_readv(self, storage_index, shnums, readv): def slot_readv(self, storage_index, shnums, readv):
d = self.storage.read(self.peerid, storage_index) d = self.storage.read(self.peerid, storage_index)