log more peerinfo in download/upload/checker problems

This commit is contained in:
Brian Warner 2008-02-26 17:33:14 -07:00
parent 1a02f38cf3
commit d96f90e1fb
4 changed files with 27 additions and 10 deletions

View File

@ -115,6 +115,7 @@ class SimpleCHKFileVerifier(download.FileDownloader):
self._size = u.size
self._num_needed_shares = u.needed_shares
self._si_s = storage.si_b2a(self._storage_index)
self.init_logging()
self._output = VerifyingOutput(self._size)

View File

@ -6,7 +6,7 @@ from twisted.internet.interfaces import IPushProducer, IConsumer
from twisted.application import service
from foolscap.eventual import eventually
from allmydata.util import base32, mathutil, hashutil, log
from allmydata.util import base32, mathutil, hashutil, log, idlib
from allmydata.util.assertutil import _assert
from allmydata import codec, hashtree, storage, uri
from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, \
@ -389,6 +389,7 @@ class FileDownloader:
self._size = u.size
self._num_needed_shares = u.needed_shares
self._si_s = storage.si_b2a(self._storage_index)
self.init_logging()
self._status = s = DownloadStatus()
@ -490,8 +491,10 @@ class FileDownloader:
dl = []
for (peerid,ss) in self._client.get_permuted_peers("storage",
self._storage_index):
peerid_s = idlib.shortnodeid_b2a(peerid)
d = ss.callRemote("get_buckets", self._storage_index)
d.addCallbacks(self._got_response, self._got_error)
d.addCallbacks(self._got_response, self._got_error,
callbackArgs=(peerid_s,))
dl.append(d)
self._responses_received = 0
self._queries_sent = len(dl)
@ -501,14 +504,14 @@ class FileDownloader:
self._queries_sent))
return defer.DeferredList(dl)
def _got_response(self, buckets):
def _got_response(self, buckets, peerid_s):
self._responses_received += 1
if self._status:
self._status.set_status("Locating Shares (%d/%d)" %
(self._responses_received,
self._queries_sent))
for sharenum, bucket in buckets.iteritems():
b = storage.ReadBucketProxy(bucket)
b = storage.ReadBucketProxy(bucket, peerid_s, self._si_s)
self.add_share_bucket(sharenum, b)
self._uri_extension_sources.append(b)
@ -556,7 +559,11 @@ class FileDownloader:
if h != self._uri_extension_hash:
self._fetch_failures["uri_extension"] += 1
msg = ("The copy of uri_extension we received from "
"%s was bad" % bucket)
"%s was bad: wanted %s, got %s" %
(bucket,
base32.b2a(self._uri_extension_hash),
base32.b2a(h)))
self.log(msg, level=log.SCARY)
raise BadURIExtensionHashValue(msg)
return self._unpack_uri_extension_data(proposal)
return self._obtain_validated_thing(None,
@ -576,7 +583,8 @@ class FileDownloader:
d.addCallback(lambda res: getattr(bucket, methname)(*args))
d.addCallback(validatorfunc, bucket)
def _bad(f):
self.log("WEIRD: %s from vbucket %s failed: %s" % (name, bucket, f))
self.log("%s from vbucket %s failed:" % (name, bucket),
failure=f, level=log.WEIRD)
if not sources:
raise NotEnoughPeersError("ran out of peers, last error was %s"
% (f,))

View File

@ -68,7 +68,8 @@ class CHKCheckerAndUEBFetcher:
if k not in self._sharemap:
self._sharemap[k] = []
self._sharemap[k].append(peerid)
self._readers.update(buckets.values())
self._readers.update( [ (bucket, peerid)
for bucket in buckets.values() ] )
def _got_error(self, f):
if f.check(KeyError):
@ -82,8 +83,9 @@ class CHKCheckerAndUEBFetcher:
if not self._readers:
self.log("no readers, so no UEB", level=log.NOISY)
return
b = self._readers.pop()
rbp = storage.ReadBucketProxy(b)
b,peerid = self._readers.pop()
rbp = storage.ReadBucketProxy(b, idlib.shortnodeid_b2a(peerid),
storage.si_b2a(self._storage_index))
d = rbp.startIfNecessary()
d.addCallback(lambda res: rbp.get_uri_extension())
d.addCallback(self._got_uri_extension)

View File

@ -1201,10 +1201,16 @@ class WriteBucketProxy:
class ReadBucketProxy:
implements(IStorageBucketReader)
def __init__(self, rref):
def __init__(self, rref, peerid_s=None, storage_index_s=None):
self._rref = rref
self._peerid_s = peerid_s
self._si_s = storage_index_s
self._started = False
def __repr__(self):
return "<ReadBucketProxy to peer [%s] SI %s>" % (self._peerid_s,
self._si_s)
def startIfNecessary(self):
if self._started:
return defer.succeed(self)