diff --git a/src/allmydata/immutable/checker.py b/src/allmydata/immutable/checker.py index 2f6de07f3..b43d6b154 100644 --- a/src/allmydata/immutable/checker.py +++ b/src/allmydata/immutable/checker.py @@ -497,7 +497,6 @@ class Checker(log.PrefixingLogMixin): rref = s.get_rref() lease_seed = s.get_lease_seed() - serverid = s.get_serverid() if self._add_lease: renew_secret = self._get_renewal_secret(lease_seed) cancel_secret = self._get_cancel_secret(lease_seed) @@ -507,7 +506,7 @@ class Checker(log.PrefixingLogMixin): d = rref.callRemote("get_buckets", storageindex) def _wrap_results(res): - return (res, serverid, True) + return (res, True) def _trap_errs(f): level = log.WEIRD @@ -516,7 +515,7 @@ class Checker(log.PrefixingLogMixin): self.log("failure from server on 'get_buckets' the REMOTE failure was:", facility="tahoe.immutable.checker", failure=f, level=level, umid="AX7wZQ") - return ({}, serverid, False) + return ({}, False) d.addCallbacks(_wrap_results, _trap_errs) return d @@ -555,7 +554,7 @@ class Checker(log.PrefixingLogMixin): level=log.WEIRD, umid="hEGuQg") - def _download_and_verify(self, serverid, sharenum, bucket): + def _download_and_verify(self, server, sharenum, bucket): """Start an attempt to download and verify every block in this bucket and return a deferred that will eventually fire once the attempt completes. @@ -575,7 +574,7 @@ class Checker(log.PrefixingLogMixin): results.""" vcap = self._verifycap - b = layout.ReadBucketProxy(bucket, serverid, vcap.get_storage_index()) + b = layout.ReadBucketProxy(bucket, server, vcap.get_storage_index()) veup = ValidatedExtendedURIProxy(b, vcap) d = veup.start() @@ -658,7 +657,7 @@ class Checker(log.PrefixingLogMixin): def _verify_server_shares(self, s): """ Return a deferred which eventually fires with a tuple of - (set(sharenum), serverid, set(corruptsharenum), + (set(sharenum), server, set(corruptsharenum), set(incompatiblesharenum), success) showing all the shares verified to be served by this server, and all the corrupt shares served by the server, and all the incompatible shares served by the server. In case @@ -682,11 +681,11 @@ class Checker(log.PrefixingLogMixin): d = self._get_buckets(s, self._verifycap.get_storage_index()) def _got_buckets(result): - bucketdict, serverid, success = result + bucketdict, success = result shareverds = [] for (sharenum, bucket) in bucketdict.items(): - d = self._download_and_verify(serverid, sharenum, bucket) + d = self._download_and_verify(s, sharenum, bucket) shareverds.append(d) dl = deferredutil.gatherResults(shareverds) @@ -703,29 +702,29 @@ class Checker(log.PrefixingLogMixin): corrupt.add(sharenum) elif whynot == 'incompatible': incompatible.add(sharenum) - return (verified, serverid, corrupt, incompatible, success) + return (verified, s, corrupt, incompatible, success) dl.addCallback(collect) return dl def _err(f): f.trap(RemoteException, DeadReferenceError) - return (set(), s.get_serverid(), set(), set(), False) + return (set(), s, set(), set(), False) d.addCallbacks(_got_buckets, _err) return d def _check_server_shares(self, s): """Return a deferred which eventually fires with a tuple of - (set(sharenum), serverid, set(), set(), responded) showing all the + (set(sharenum), server, set(), set(), responded) showing all the shares claimed to be served by this server. In case the server is - disconnected then it fires with (set() serverid, set(), set(), False) + disconnected then it fires with (set(), server, set(), set(), False) (a server disconnecting when we ask it for buckets is the same, for our purposes, as a server that says it has none, except that we want to track and report whether or not each server responded.)""" def _curry_empty_corrupted(res): - buckets, serverid, responded = res - return (set(buckets), serverid, set(), set(), responded) + buckets, responded = res + return (set(buckets), s, set(), set(), responded) d = self._get_buckets(s, self._verifycap.get_storage_index()) d.addCallback(_curry_empty_corrupted) return d @@ -741,7 +740,8 @@ class Checker(log.PrefixingLogMixin): corruptsharelocators = [] # (serverid, storageindex, sharenum) incompatiblesharelocators = [] # (serverid, storageindex, sharenum) - for theseverifiedshares, thisserverid, thesecorruptshares, theseincompatibleshares, thisresponded in results: + for theseverifiedshares, thisserver, thesecorruptshares, theseincompatibleshares, thisresponded in results: + thisserverid = thisserver.get_serverid() servers.setdefault(thisserverid, set()).update(theseverifiedshares) for sharenum in theseverifiedshares: verifiedshares.setdefault(sharenum, set()).add(thisserverid) diff --git a/src/allmydata/immutable/layout.py b/src/allmydata/immutable/layout.py index 27fb84450..30956780f 100644 --- a/src/allmydata/immutable/layout.py +++ b/src/allmydata/immutable/layout.py @@ -3,7 +3,7 @@ from zope.interface import implements from twisted.internet import defer from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \ FileTooLargeError, HASH_SIZE -from allmydata.util import mathutil, idlib, observer, pipeline +from allmydata.util import mathutil, observer, pipeline from allmydata.util.assertutil import precondition from allmydata.storage.server import si_b2a @@ -296,20 +296,19 @@ class ReadBucketProxy: MAX_UEB_SIZE = 2000 # actual size is closer to 419, but varies by a few bytes - def __init__(self, rref, peerid, storage_index): + def __init__(self, rref, server, storage_index): self._rref = rref - self._peerid = peerid - peer_id_s = idlib.shortnodeid_b2a(peerid) - storage_index_s = si_b2a(storage_index) - self._reprstr = "" % (id(self), peer_id_s, storage_index_s) + self._server = server + self._storage_index = storage_index self._started = False # sent request to server self._ready = observer.OneShotObserverList() # got response from server def get_peerid(self): - return self._peerid + return self._server.get_serverid() def __repr__(self): - return self._reprstr + return "" % \ + (id(self), self._server.get_name(), si_b2a(self._storage_index)) def _start_if_needed(self): """ Returns a deferred that will be fired when I'm ready to return diff --git a/src/allmydata/immutable/offloaded.py b/src/allmydata/immutable/offloaded.py index 94f5cdd0a..02b1386e4 100644 --- a/src/allmydata/immutable/offloaded.py +++ b/src/allmydata/immutable/offloaded.py @@ -85,7 +85,7 @@ class CHKCheckerAndUEBFetcher: self.log("no readers, so no UEB", level=log.NOISY) return b,server = self._readers.pop() - rbp = ReadBucketProxy(b, server.get_serverid(), si_b2a(self._storage_index)) + rbp = ReadBucketProxy(b, server, si_b2a(self._storage_index)) d = rbp.get_uri_extension() d.addCallback(self._got_uri_extension) d.addErrback(self._ueb_error) diff --git a/src/allmydata/scripts/debug.py b/src/allmydata/scripts/debug.py index ac9b28a54..a046578b7 100644 --- a/src/allmydata/scripts/debug.py +++ b/src/allmydata/scripts/debug.py @@ -68,7 +68,7 @@ def dump_immutable_chk_share(f, out, options): from allmydata.util.encodingutil import quote_output, to_str # use a ReadBucketProxy to parse the bucket and find the uri extension - bp = ReadBucketProxy(None, '', '') + bp = ReadBucketProxy(None, None, '') offsets = bp._parse_offsets(f.read_share_data(0, 0x44)) print >>out, "%20s: %d" % ("version", bp._version) seek = offsets['uri_extension'] @@ -610,7 +610,7 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out): class ImmediateReadBucketProxy(ReadBucketProxy): def __init__(self, sf): self.sf = sf - ReadBucketProxy.__init__(self, "", "", "") + ReadBucketProxy.__init__(self, None, None, "") def __repr__(self): return "" def _read(self, offset, size): @@ -768,7 +768,7 @@ def corrupt_share(options): else: # otherwise assume it's immutable f = ShareFile(fn) - bp = ReadBucketProxy(None, '', '') + bp = ReadBucketProxy(None, None, '') offsets = bp._parse_offsets(f.read_share_data(0, 0x24)) start = f._data_offset + offsets["data"] end = f._data_offset + offsets["plaintext_hash_tree"] diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index afe5824f4..91c2fdc17 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -23,6 +23,7 @@ from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \ from allmydata.interfaces import BadWriteEnablerError from allmydata.test.common import LoggingServiceParent from allmydata.test.common_web import WebRenderingMixin +from allmydata.test.no_network import NoNetworkServer from allmydata.web.storage import StorageStatus, remove_prefix class Marker: @@ -190,7 +191,8 @@ class BucketProxy(unittest.TestCase): br = BucketReader(self, sharefname) rb = RemoteBucket() rb.target = br - rbp = rbp_class(rb, peerid="abc", storage_index="") + server = NoNetworkServer("abc", None) + rbp = rbp_class(rb, server, storage_index="") self.failUnlessIn("to peer", repr(rbp)) self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)