mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-23 23:02:25 +00:00
remove get_serverid() from ReadBucketProxy and customers, including Checker
and debug.py dump-share commands refs #1363
This commit is contained in:
parent
e5c4e83f4c
commit
550d67f51f
@ -497,7 +497,6 @@ class Checker(log.PrefixingLogMixin):
|
||||
|
||||
rref = s.get_rref()
|
||||
lease_seed = s.get_lease_seed()
|
||||
serverid = s.get_serverid()
|
||||
if self._add_lease:
|
||||
renew_secret = self._get_renewal_secret(lease_seed)
|
||||
cancel_secret = self._get_cancel_secret(lease_seed)
|
||||
@ -507,7 +506,7 @@ class Checker(log.PrefixingLogMixin):
|
||||
|
||||
d = rref.callRemote("get_buckets", storageindex)
|
||||
def _wrap_results(res):
|
||||
return (res, serverid, True)
|
||||
return (res, True)
|
||||
|
||||
def _trap_errs(f):
|
||||
level = log.WEIRD
|
||||
@ -516,7 +515,7 @@ class Checker(log.PrefixingLogMixin):
|
||||
self.log("failure from server on 'get_buckets' the REMOTE failure was:",
|
||||
facility="tahoe.immutable.checker",
|
||||
failure=f, level=level, umid="AX7wZQ")
|
||||
return ({}, serverid, False)
|
||||
return ({}, False)
|
||||
|
||||
d.addCallbacks(_wrap_results, _trap_errs)
|
||||
return d
|
||||
@ -555,7 +554,7 @@ class Checker(log.PrefixingLogMixin):
|
||||
level=log.WEIRD, umid="hEGuQg")
|
||||
|
||||
|
||||
def _download_and_verify(self, serverid, sharenum, bucket):
|
||||
def _download_and_verify(self, server, sharenum, bucket):
|
||||
"""Start an attempt to download and verify every block in this bucket
|
||||
and return a deferred that will eventually fire once the attempt
|
||||
completes.
|
||||
@ -575,7 +574,7 @@ class Checker(log.PrefixingLogMixin):
|
||||
results."""
|
||||
|
||||
vcap = self._verifycap
|
||||
b = layout.ReadBucketProxy(bucket, serverid, vcap.get_storage_index())
|
||||
b = layout.ReadBucketProxy(bucket, server, vcap.get_storage_index())
|
||||
veup = ValidatedExtendedURIProxy(b, vcap)
|
||||
d = veup.start()
|
||||
|
||||
@ -658,7 +657,7 @@ class Checker(log.PrefixingLogMixin):
|
||||
|
||||
def _verify_server_shares(self, s):
|
||||
""" Return a deferred which eventually fires with a tuple of
|
||||
(set(sharenum), serverid, set(corruptsharenum),
|
||||
(set(sharenum), server, set(corruptsharenum),
|
||||
set(incompatiblesharenum), success) showing all the shares verified
|
||||
to be served by this server, and all the corrupt shares served by the
|
||||
server, and all the incompatible shares served by the server. In case
|
||||
@ -682,11 +681,11 @@ class Checker(log.PrefixingLogMixin):
|
||||
d = self._get_buckets(s, self._verifycap.get_storage_index())
|
||||
|
||||
def _got_buckets(result):
|
||||
bucketdict, serverid, success = result
|
||||
bucketdict, success = result
|
||||
|
||||
shareverds = []
|
||||
for (sharenum, bucket) in bucketdict.items():
|
||||
d = self._download_and_verify(serverid, sharenum, bucket)
|
||||
d = self._download_and_verify(s, sharenum, bucket)
|
||||
shareverds.append(d)
|
||||
|
||||
dl = deferredutil.gatherResults(shareverds)
|
||||
@ -703,29 +702,29 @@ class Checker(log.PrefixingLogMixin):
|
||||
corrupt.add(sharenum)
|
||||
elif whynot == 'incompatible':
|
||||
incompatible.add(sharenum)
|
||||
return (verified, serverid, corrupt, incompatible, success)
|
||||
return (verified, s, corrupt, incompatible, success)
|
||||
|
||||
dl.addCallback(collect)
|
||||
return dl
|
||||
|
||||
def _err(f):
|
||||
f.trap(RemoteException, DeadReferenceError)
|
||||
return (set(), s.get_serverid(), set(), set(), False)
|
||||
return (set(), s, set(), set(), False)
|
||||
|
||||
d.addCallbacks(_got_buckets, _err)
|
||||
return d
|
||||
|
||||
def _check_server_shares(self, s):
|
||||
"""Return a deferred which eventually fires with a tuple of
|
||||
(set(sharenum), serverid, set(), set(), responded) showing all the
|
||||
(set(sharenum), server, set(), set(), responded) showing all the
|
||||
shares claimed to be served by this server. In case the server is
|
||||
disconnected then it fires with (set() serverid, set(), set(), False)
|
||||
disconnected then it fires with (set(), server, set(), set(), False)
|
||||
(a server disconnecting when we ask it for buckets is the same, for
|
||||
our purposes, as a server that says it has none, except that we want
|
||||
to track and report whether or not each server responded.)"""
|
||||
def _curry_empty_corrupted(res):
|
||||
buckets, serverid, responded = res
|
||||
return (set(buckets), serverid, set(), set(), responded)
|
||||
buckets, responded = res
|
||||
return (set(buckets), s, set(), set(), responded)
|
||||
d = self._get_buckets(s, self._verifycap.get_storage_index())
|
||||
d.addCallback(_curry_empty_corrupted)
|
||||
return d
|
||||
@ -741,7 +740,8 @@ class Checker(log.PrefixingLogMixin):
|
||||
corruptsharelocators = [] # (serverid, storageindex, sharenum)
|
||||
incompatiblesharelocators = [] # (serverid, storageindex, sharenum)
|
||||
|
||||
for theseverifiedshares, thisserverid, thesecorruptshares, theseincompatibleshares, thisresponded in results:
|
||||
for theseverifiedshares, thisserver, thesecorruptshares, theseincompatibleshares, thisresponded in results:
|
||||
thisserverid = thisserver.get_serverid()
|
||||
servers.setdefault(thisserverid, set()).update(theseverifiedshares)
|
||||
for sharenum in theseverifiedshares:
|
||||
verifiedshares.setdefault(sharenum, set()).add(thisserverid)
|
||||
|
@ -3,7 +3,7 @@ from zope.interface import implements
|
||||
from twisted.internet import defer
|
||||
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
|
||||
FileTooLargeError, HASH_SIZE
|
||||
from allmydata.util import mathutil, idlib, observer, pipeline
|
||||
from allmydata.util import mathutil, observer, pipeline
|
||||
from allmydata.util.assertutil import precondition
|
||||
from allmydata.storage.server import si_b2a
|
||||
|
||||
@ -296,20 +296,19 @@ class ReadBucketProxy:
|
||||
|
||||
MAX_UEB_SIZE = 2000 # actual size is closer to 419, but varies by a few bytes
|
||||
|
||||
def __init__(self, rref, peerid, storage_index):
|
||||
def __init__(self, rref, server, storage_index):
|
||||
self._rref = rref
|
||||
self._peerid = peerid
|
||||
peer_id_s = idlib.shortnodeid_b2a(peerid)
|
||||
storage_index_s = si_b2a(storage_index)
|
||||
self._reprstr = "<ReadBucketProxy %s to peer [%s] SI %s>" % (id(self), peer_id_s, storage_index_s)
|
||||
self._server = server
|
||||
self._storage_index = storage_index
|
||||
self._started = False # sent request to server
|
||||
self._ready = observer.OneShotObserverList() # got response from server
|
||||
|
||||
def get_peerid(self):
|
||||
return self._peerid
|
||||
return self._server.get_serverid()
|
||||
|
||||
def __repr__(self):
|
||||
return self._reprstr
|
||||
return "<ReadBucketProxy %s to peer [%s] SI %s>" % \
|
||||
(id(self), self._server.get_name(), si_b2a(self._storage_index))
|
||||
|
||||
def _start_if_needed(self):
|
||||
""" Returns a deferred that will be fired when I'm ready to return
|
||||
|
@ -85,7 +85,7 @@ class CHKCheckerAndUEBFetcher:
|
||||
self.log("no readers, so no UEB", level=log.NOISY)
|
||||
return
|
||||
b,server = self._readers.pop()
|
||||
rbp = ReadBucketProxy(b, server.get_serverid(), si_b2a(self._storage_index))
|
||||
rbp = ReadBucketProxy(b, server, si_b2a(self._storage_index))
|
||||
d = rbp.get_uri_extension()
|
||||
d.addCallback(self._got_uri_extension)
|
||||
d.addErrback(self._ueb_error)
|
||||
|
@ -68,7 +68,7 @@ def dump_immutable_chk_share(f, out, options):
|
||||
from allmydata.util.encodingutil import quote_output, to_str
|
||||
|
||||
# use a ReadBucketProxy to parse the bucket and find the uri extension
|
||||
bp = ReadBucketProxy(None, '', '')
|
||||
bp = ReadBucketProxy(None, None, '')
|
||||
offsets = bp._parse_offsets(f.read_share_data(0, 0x44))
|
||||
print >>out, "%20s: %d" % ("version", bp._version)
|
||||
seek = offsets['uri_extension']
|
||||
@ -610,7 +610,7 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out):
|
||||
class ImmediateReadBucketProxy(ReadBucketProxy):
|
||||
def __init__(self, sf):
|
||||
self.sf = sf
|
||||
ReadBucketProxy.__init__(self, "", "", "")
|
||||
ReadBucketProxy.__init__(self, None, None, "")
|
||||
def __repr__(self):
|
||||
return "<ImmediateReadBucketProxy>"
|
||||
def _read(self, offset, size):
|
||||
@ -768,7 +768,7 @@ def corrupt_share(options):
|
||||
else:
|
||||
# otherwise assume it's immutable
|
||||
f = ShareFile(fn)
|
||||
bp = ReadBucketProxy(None, '', '')
|
||||
bp = ReadBucketProxy(None, None, '')
|
||||
offsets = bp._parse_offsets(f.read_share_data(0, 0x24))
|
||||
start = f._data_offset + offsets["data"]
|
||||
end = f._data_offset + offsets["plaintext_hash_tree"]
|
||||
|
@ -23,6 +23,7 @@ from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
|
||||
from allmydata.interfaces import BadWriteEnablerError
|
||||
from allmydata.test.common import LoggingServiceParent
|
||||
from allmydata.test.common_web import WebRenderingMixin
|
||||
from allmydata.test.no_network import NoNetworkServer
|
||||
from allmydata.web.storage import StorageStatus, remove_prefix
|
||||
|
||||
class Marker:
|
||||
@ -190,7 +191,8 @@ class BucketProxy(unittest.TestCase):
|
||||
br = BucketReader(self, sharefname)
|
||||
rb = RemoteBucket()
|
||||
rb.target = br
|
||||
rbp = rbp_class(rb, peerid="abc", storage_index="")
|
||||
server = NoNetworkServer("abc", None)
|
||||
rbp = rbp_class(rb, server, storage_index="")
|
||||
self.failUnlessIn("to peer", repr(rbp))
|
||||
self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user