mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-20 17:52:50 +00:00
immutable: use new logging mixins to simplify logging
This commit is contained in:
parent
d511941136
commit
d67a3fe4b1
@ -166,6 +166,7 @@ class SimpleCHKFileVerifier(download.FileDownloader):
|
||||
|
||||
def __init__(self, client, u, storage_index, k, N, size, ueb_hash):
|
||||
precondition(isinstance(u, CHKFileURI), u)
|
||||
download.FileDownloader.__init__(self, client, u, None);
|
||||
self._client = client
|
||||
|
||||
self._uri = u
|
||||
|
@ -335,7 +335,7 @@ class ValidatedExtendedURIProxy:
|
||||
d.addCallback(self._parse_and_validate)
|
||||
return d
|
||||
|
||||
class ValidatedReadBucketProxy:
|
||||
class ValidatedReadBucketProxy(log.PrefixingLogMixin):
|
||||
"""I am a front-end for a remote storage bucket, responsible for
|
||||
retrieving and validating data from that bucket.
|
||||
|
||||
@ -346,6 +346,8 @@ class ValidatedReadBucketProxy:
|
||||
share_hash_tree, share_root_hash,
|
||||
num_blocks):
|
||||
""" share_root_hash is the root of the share hash tree; share_root_hash is stored in the UEB """
|
||||
prefix = "%d-%s-%s" % (sharenum, bucket, base32.b2a_l(share_hash_tree[0][:8], 60))
|
||||
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
|
||||
self.sharenum = sharenum
|
||||
self.bucket = bucket
|
||||
self._share_hash = None # None means not validated yet
|
||||
@ -401,7 +403,7 @@ class ValidatedReadBucketProxy:
|
||||
self._share_hash = sht.get_leaf(self.sharenum)
|
||||
|
||||
blockhash = hashutil.block_hash(blockdata)
|
||||
#log.msg("checking block_hash(shareid=%d, blocknum=%d) len=%d "
|
||||
#self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
|
||||
# "%r .. %r: %s" %
|
||||
# (self.sharenum, blocknum, len(blockdata),
|
||||
# blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
|
||||
@ -415,31 +417,31 @@ class ValidatedReadBucketProxy:
|
||||
except (hashtree.BadHashError, hashtree.NotEnoughHashesError):
|
||||
# log.WEIRD: indicates undetected disk/network error, or more
|
||||
# likely a programming error
|
||||
log.msg("hash failure in block=%d, shnum=%d on %s" %
|
||||
self.log("hash failure in block=%d, shnum=%d on %s" %
|
||||
(blocknum, self.sharenum, self.bucket))
|
||||
if self._share_hash:
|
||||
log.msg(""" failure occurred when checking the block_hash_tree.
|
||||
self.log(""" failure occurred when checking the block_hash_tree.
|
||||
This suggests that either the block data was bad, or that the
|
||||
block hashes we received along with it were bad.""")
|
||||
else:
|
||||
log.msg(""" the failure probably occurred when checking the
|
||||
self.log(""" the failure probably occurred when checking the
|
||||
share_hash_tree, which suggests that the share hashes we
|
||||
received from the remote peer were bad.""")
|
||||
log.msg(" have self._share_hash: %s" % bool(self._share_hash))
|
||||
log.msg(" block length: %d" % len(blockdata))
|
||||
log.msg(" block hash: %s" % base32.b2a_or_none(blockhash))
|
||||
self.log(" have self._share_hash: %s" % bool(self._share_hash))
|
||||
self.log(" block length: %d" % len(blockdata))
|
||||
self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
|
||||
if len(blockdata) < 100:
|
||||
log.msg(" block data: %r" % (blockdata,))
|
||||
self.log(" block data: %r" % (blockdata,))
|
||||
else:
|
||||
log.msg(" block data start/end: %r .. %r" %
|
||||
self.log(" block data start/end: %r .. %r" %
|
||||
(blockdata[:50], blockdata[-50:]))
|
||||
log.msg(" root hash: %s" % base32.b2a(self._share_root_hash))
|
||||
log.msg(" share hash tree:\n" + self.share_hash_tree.dump())
|
||||
log.msg(" block hash tree:\n" + self.block_hash_tree.dump())
|
||||
self.log(" root hash: %s" % base32.b2a(self._share_root_hash))
|
||||
self.log(" share hash tree:\n" + self.share_hash_tree.dump())
|
||||
self.log(" block hash tree:\n" + self.block_hash_tree.dump())
|
||||
lines = []
|
||||
for i,h in sorted(sharehashes):
|
||||
lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
|
||||
log.msg(" sharehashes:\n" + "\n".join(lines) + "\n")
|
||||
self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
|
||||
lines = []
|
||||
for i,h in enumerate(blockhashes):
|
||||
lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
|
||||
@ -454,7 +456,7 @@ class ValidatedReadBucketProxy:
|
||||
|
||||
|
||||
|
||||
class BlockDownloader:
|
||||
class BlockDownloader(log.PrefixingLogMixin):
|
||||
"""I am responsible for downloading a single block (from a single bucket)
|
||||
for a single segment.
|
||||
|
||||
@ -462,41 +464,37 @@ class BlockDownloader:
|
||||
"""
|
||||
|
||||
def __init__(self, vbucket, blocknum, parent, results):
|
||||
prefix = "%s-%d" % (vbucket, blocknum)
|
||||
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
|
||||
self.vbucket = vbucket
|
||||
self.blocknum = blocknum
|
||||
self.parent = parent
|
||||
self.results = results
|
||||
self._log_number = self.parent.log("starting block %d" % blocknum)
|
||||
|
||||
def log(self, *args, **kwargs):
|
||||
if "parent" not in kwargs:
|
||||
kwargs["parent"] = self._log_number
|
||||
return self.parent.log(*args, **kwargs)
|
||||
|
||||
def start(self, segnum):
|
||||
lognum = self.log("get_block(segnum=%d)" % segnum)
|
||||
self.log("get_block(segnum=%d)" % segnum)
|
||||
started = time.time()
|
||||
d = self.vbucket.get_block(segnum)
|
||||
d.addCallbacks(self._hold_block, self._got_block_error,
|
||||
callbackArgs=(started, lognum,), errbackArgs=(lognum,))
|
||||
callbackArgs=(started,))
|
||||
return d
|
||||
|
||||
def _hold_block(self, data, started, lognum):
|
||||
def _hold_block(self, data, started):
|
||||
if self.results:
|
||||
elapsed = time.time() - started
|
||||
peerid = self.vbucket.bucket.get_peerid()
|
||||
if peerid not in self.results.timings["fetch_per_server"]:
|
||||
self.results.timings["fetch_per_server"][peerid] = []
|
||||
self.results.timings["fetch_per_server"][peerid].append(elapsed)
|
||||
self.log("got block", parent=lognum)
|
||||
self.log("got block")
|
||||
self.parent.hold_block(self.blocknum, data)
|
||||
|
||||
def _got_block_error(self, f, lognum):
|
||||
def _got_block_error(self, f):
|
||||
level = log.WEIRD
|
||||
if f.check(DeadReferenceError):
|
||||
level = log.UNUSUAL
|
||||
self.log("BlockDownloader[%d] got error" % self.blocknum,
|
||||
failure=f, level=level, parent=lognum, umid="5Z4uHQ")
|
||||
failure=f, level=level, umid="5Z4uHQ")
|
||||
if self.results:
|
||||
peerid = self.vbucket.bucket.get_peerid()
|
||||
self.results.server_problems[peerid] = str(f)
|
||||
@ -626,22 +624,22 @@ class DownloadStatus:
|
||||
def set_results(self, value):
|
||||
self.results = value
|
||||
|
||||
class FileDownloader:
|
||||
class FileDownloader(log.PrefixingLogMixin):
|
||||
implements(IPushProducer)
|
||||
_status = None
|
||||
|
||||
def __init__(self, client, u, downloadable):
|
||||
precondition(isinstance(u, uri.CHKFileURI), u)
|
||||
|
||||
prefix=base32.b2a_l(u.get_storage_index()[:8], 60)
|
||||
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
|
||||
self._client = client
|
||||
|
||||
self._uri = u
|
||||
self._storage_index = u.storage_index
|
||||
self._storage_index = u.get_storage_index()
|
||||
self._uri_extension_hash = u.uri_extension_hash
|
||||
self._vup = None # ValidatedExtendedURIProxy
|
||||
|
||||
self._si_s = storage.si_b2a(self._storage_index)
|
||||
self.init_logging()
|
||||
|
||||
self._started = time.time()
|
||||
self._status = s = DownloadStatus()
|
||||
s.set_status("Starting")
|
||||
@ -665,7 +663,7 @@ class FileDownloader:
|
||||
if IConsumer.providedBy(downloadable):
|
||||
downloadable.registerProducer(self, True)
|
||||
self._downloadable = downloadable
|
||||
self._output = Output(downloadable, u.key, self._uri.size, self._log_number,
|
||||
self._output = Output(downloadable, u.key, self._uri.size, self._parentmsgid,
|
||||
self._status)
|
||||
|
||||
self.active_buckets = {} # k: shnum, v: bucket
|
||||
@ -676,19 +674,6 @@ class FileDownloader:
|
||||
|
||||
self._crypttext_hash_tree = None
|
||||
|
||||
def init_logging(self):
|
||||
self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
|
||||
num = self._client.log(format="FileDownloader(%(si)s): starting",
|
||||
si=storage.si_b2a(self._storage_index))
|
||||
self._log_number = num
|
||||
|
||||
def log(self, *args, **kwargs):
|
||||
if "parent" not in kwargs:
|
||||
kwargs["parent"] = self._log_number
|
||||
if "facility" not in kwargs:
|
||||
kwargs["facility"] = "tahoe.download"
|
||||
return log.msg(*args, **kwargs)
|
||||
|
||||
def pauseProducing(self):
|
||||
if self._paused:
|
||||
return
|
||||
@ -788,7 +773,7 @@ class FileDownloader:
|
||||
level = log.WEIRD
|
||||
if f.check(DeadReferenceError):
|
||||
level = log.UNUSUAL
|
||||
self._client.log("Error during get_buckets", failure=f, level=level,
|
||||
self.log("Error during get_buckets", failure=f, level=level,
|
||||
umid="3uuBUQ")
|
||||
|
||||
def bucket_failed(self, vbucket):
|
||||
@ -831,7 +816,7 @@ class FileDownloader:
|
||||
vups = []
|
||||
for sharenum, bucket in self._share_buckets:
|
||||
vups.append(ValidatedExtendedURIProxy(bucket, self._uri.get_verify_cap(), self._fetch_failures))
|
||||
vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._log_number)
|
||||
vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid)
|
||||
d = vto.start()
|
||||
|
||||
def _got_uri_extension(vup):
|
||||
@ -866,7 +851,7 @@ class FileDownloader:
|
||||
if self._status:
|
||||
self._status.set_status("Retrieving crypttext hash tree")
|
||||
|
||||
vto = ValidatedThingObtainer(vchtps , debugname="vchtps", log_id=self._log_number)
|
||||
vto = ValidatedThingObtainer(vchtps , debugname="vchtps", log_id=self._parentmsgid)
|
||||
d = vto.start()
|
||||
|
||||
def _got_crypttext_hash_tree(res):
|
||||
|
Loading…
x
Reference in New Issue
Block a user