mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-20 17:52:50 +00:00
offloaded: improve logging across the board
This commit is contained in:
parent
51321944f0
commit
c597e67c2b
@ -1,13 +1,12 @@
|
||||
|
||||
import os, random
|
||||
from zope.interface import implements
|
||||
from twisted.python import log
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.interfaces import IPushProducer, IConsumer
|
||||
from twisted.application import service
|
||||
from foolscap.eventual import eventually
|
||||
|
||||
from allmydata.util import idlib, mathutil, hashutil
|
||||
from allmydata.util import idlib, mathutil, hashutil, log
|
||||
from allmydata.util.assertutil import _assert
|
||||
from allmydata import codec, hashtree, storage, uri
|
||||
from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI
|
||||
@ -29,7 +28,7 @@ class DownloadStopped(Exception):
|
||||
pass
|
||||
|
||||
class Output:
|
||||
def __init__(self, downloadable, key, total_length):
|
||||
def __init__(self, downloadable, key, total_length, log_parent):
|
||||
self.downloadable = downloadable
|
||||
self._decryptor = AES(key)
|
||||
self._crypttext_hasher = hashutil.crypttext_hasher()
|
||||
@ -40,6 +39,14 @@ class Output:
|
||||
self._plaintext_hash_tree = None
|
||||
self._crypttext_hash_tree = None
|
||||
self._opened = False
|
||||
self._log_parent = log_parent
|
||||
|
||||
def log(self, *args, **kwargs):
|
||||
if "parent" not in kwargs:
|
||||
kwargs["parent"] = self._log_parent
|
||||
if "facility" not in kwargs:
|
||||
kwargs["facility"] = "download.output"
|
||||
return log.msg(*args, **kwargs)
|
||||
|
||||
def setup_hashtrees(self, plaintext_hashtree, crypttext_hashtree):
|
||||
self._plaintext_hash_tree = plaintext_hashtree
|
||||
@ -56,6 +63,10 @@ class Output:
|
||||
ch = hashutil.crypttext_segment_hasher()
|
||||
ch.update(crypttext)
|
||||
crypttext_leaves = {self._segment_number: ch.digest()}
|
||||
self.log(format="crypttext leaf hash (%(bytes)sB) [%(segnum)d] is %(hash)s",
|
||||
bytes=len(crypttext),
|
||||
segnum=self._segment_number, hash=idlib.b2a(ch.digest()),
|
||||
level=log.NOISY)
|
||||
self._crypttext_hash_tree.set_hashes(leaves=crypttext_leaves)
|
||||
|
||||
plaintext = self._decryptor.process(crypttext)
|
||||
@ -68,6 +79,10 @@ class Output:
|
||||
ph = hashutil.plaintext_segment_hasher()
|
||||
ph.update(plaintext)
|
||||
plaintext_leaves = {self._segment_number: ph.digest()}
|
||||
self.log(format="plaintext leaf hash (%(bytes)sB) [%(segnum)d] is %(hash)s",
|
||||
bytes=len(plaintext),
|
||||
segnum=self._segment_number, hash=idlib.b2a(ph.digest()),
|
||||
level=log.NOISY)
|
||||
self._plaintext_hash_tree.set_hashes(leaves=plaintext_leaves)
|
||||
|
||||
self._segment_number += 1
|
||||
@ -79,12 +94,14 @@ class Output:
|
||||
self.downloadable.write(plaintext)
|
||||
|
||||
def fail(self, why):
|
||||
log.msg("UNUSUAL: download failed: %s" % why)
|
||||
# this is really unusual, and deserves maximum forensics
|
||||
self.log("download failed!", failure=why, level=log.SCARY)
|
||||
self.downloadable.fail(why)
|
||||
|
||||
def close(self):
|
||||
self.crypttext_hash = self._crypttext_hasher.digest()
|
||||
self.plaintext_hash = self._plaintext_hasher.digest()
|
||||
self.log("download finished, closing IDownloadable", level=log.NOISY)
|
||||
self.downloadable.close()
|
||||
|
||||
def finish(self):
|
||||
@ -322,7 +339,7 @@ class FileDownloader:
|
||||
if IConsumer.providedBy(downloadable):
|
||||
downloadable.registerProducer(self, True)
|
||||
self._downloadable = downloadable
|
||||
self._output = Output(downloadable, u.key, self._size)
|
||||
self._output = Output(downloadable, u.key, self._size, self._log_number)
|
||||
self._paused = False
|
||||
self._stopped = False
|
||||
|
||||
@ -342,15 +359,16 @@ class FileDownloader:
|
||||
|
||||
def init_logging(self):
|
||||
self._log_prefix = prefix = idlib.b2a(self._storage_index)[:6]
|
||||
num = self._client.log("FileDownloader(%s): starting" % prefix)
|
||||
num = self._client.log(format="FileDownloader(%(si)s): starting",
|
||||
si=idlib.b2a(self._storage_index))
|
||||
self._log_number = num
|
||||
|
||||
def log(self, msg, parent=None):
|
||||
if parent is None:
|
||||
parent = self._log_number
|
||||
return self._client.log("FileDownloader(%s): %s" % (self._log_prefix,
|
||||
msg),
|
||||
parent=parent)
|
||||
def log(self, *args, **kwargs):
|
||||
if "parent" not in kwargs:
|
||||
kwargs["parent"] = self._log_number
|
||||
if "facility" not in kwargs:
|
||||
kwargs["facility"] = "tahoe.download"
|
||||
return log.msg(*args, **kwargs)
|
||||
|
||||
def pauseProducing(self):
|
||||
if self._paused:
|
||||
|
@ -6,7 +6,7 @@ from foolscap import eventual
|
||||
from allmydata import uri
|
||||
from allmydata.hashtree import HashTree
|
||||
from allmydata.util import mathutil, hashutil, idlib, log
|
||||
from allmydata.util.assertutil import _assert
|
||||
from allmydata.util.assertutil import _assert, precondition
|
||||
from allmydata.codec import CRSEncoder
|
||||
from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
|
||||
IEncryptedUploadable
|
||||
@ -73,13 +73,14 @@ PiB=1024*TiB
|
||||
class Encoder(object):
|
||||
implements(IEncoder)
|
||||
|
||||
def __init__(self, parent=None):
|
||||
def __init__(self, log_parent=None):
|
||||
object.__init__(self)
|
||||
self.uri_extension_data = {}
|
||||
self._codec = None
|
||||
self._parent = parent
|
||||
if self._parent:
|
||||
self._log_number = self._parent.log("creating Encoder %s" % self)
|
||||
precondition(log_parent is None or isinstance(log_parent, int),
|
||||
log_parent)
|
||||
self._log_number = log.msg("creating Encoder %s" % self,
|
||||
facility="tahoe.encoder", parent=log_parent)
|
||||
self._aborted = False
|
||||
|
||||
def __repr__(self):
|
||||
@ -88,16 +89,17 @@ class Encoder(object):
|
||||
return "<Encoder for unknown storage index>"
|
||||
|
||||
def log(self, *args, **kwargs):
|
||||
if not self._parent:
|
||||
return
|
||||
if "parent" not in kwargs:
|
||||
kwargs["parent"] = self._log_number
|
||||
return self._parent.log(*args, **kwargs)
|
||||
if "facility" not in kwargs:
|
||||
kwargs["facility"] = "tahoe.encoder"
|
||||
return log.msg(*args, **kwargs)
|
||||
|
||||
def set_encrypted_uploadable(self, uploadable):
|
||||
eu = self._uploadable = IEncryptedUploadable(uploadable)
|
||||
d = eu.get_size()
|
||||
def _got_size(size):
|
||||
self.log(format="file size: %(size)d", size=size)
|
||||
self.file_size = size
|
||||
d.addCallback(_got_size)
|
||||
d.addCallback(lambda res: eu.get_all_encoding_parameters())
|
||||
@ -193,8 +195,7 @@ class Encoder(object):
|
||||
self.landlords = landlords.copy()
|
||||
|
||||
def start(self):
|
||||
if self._parent:
|
||||
self._log_number = self._parent.log("%s starting" % (self,))
|
||||
self.log("%s starting" % (self,))
|
||||
#paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
|
||||
assert self._codec
|
||||
self._crypttext_hasher = hashutil.crypttext_hasher()
|
||||
@ -455,6 +456,8 @@ class Encoder(object):
|
||||
self.num_segments)
|
||||
d.addCallback(_got)
|
||||
def _got_hashtree_leaves(leaves):
|
||||
self.log("Encoder: got plaintext_hashtree_leaves: %s" %
|
||||
(",".join([idlib.b2a(h) for h in leaves]),))
|
||||
ht = list(HashTree(list(leaves)))
|
||||
self.uri_extension_data["plaintext_root_hash"] = ht[0]
|
||||
self._plaintext_hashtree_nodes = ht
|
||||
|
@ -32,7 +32,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
|
||||
parent=log_number)
|
||||
|
||||
self._client = helper.parent
|
||||
self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file)
|
||||
self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file,
|
||||
self._log_number)
|
||||
self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
|
||||
self._finished_observers = observer.OneShotObserverList()
|
||||
|
||||
@ -102,16 +103,18 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
|
||||
|
||||
class AskUntilSuccessMixin:
|
||||
# create me with a _reader array
|
||||
_last_failure = None
|
||||
|
||||
def add_reader(self, reader):
|
||||
self._readers.append(reader)
|
||||
|
||||
def call(self, *args, **kwargs):
|
||||
if not self._readers:
|
||||
raise NotEnoughWritersError("ran out of assisted uploaders")
|
||||
raise NotEnoughWritersError("ran out of assisted uploaders, last failure was %s" % self._last_failure)
|
||||
rr = self._readers[0]
|
||||
d = rr.callRemote(*args, **kwargs)
|
||||
def _err(f):
|
||||
self._last_failure = f
|
||||
if rr in self._readers:
|
||||
self._readers.remove(rr)
|
||||
self._upload_helper.log("call to assisted uploader %s failed" % rr,
|
||||
@ -135,15 +138,23 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
|
||||
the ciphertext to 'encoded_file'.
|
||||
"""
|
||||
|
||||
def __init__(self, helper, incoming_file, encoded_file):
|
||||
def __init__(self, helper, incoming_file, encoded_file, logparent):
|
||||
self._upload_helper = helper
|
||||
self._incoming_file = incoming_file
|
||||
self._encoding_file = encoded_file
|
||||
self._log_parent = logparent
|
||||
self._done_observers = observer.OneShotObserverList()
|
||||
self._readers = []
|
||||
self._started = False
|
||||
self._f = None
|
||||
|
||||
def log(self, *args, **kwargs):
|
||||
if "facility" not in kwargs:
|
||||
kwargs["facility"] = "tahoe.helper.chkupload.fetch"
|
||||
if "parent" not in kwargs:
|
||||
kwargs["parent"] = self._log_parent
|
||||
return log.msg(*args, **kwargs)
|
||||
|
||||
def add_reader(self, reader):
|
||||
AskUntilSuccessMixin.add_reader(self, reader)
|
||||
self._start()
|
||||
@ -161,12 +172,14 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
|
||||
d.addErrback(self._failed)
|
||||
|
||||
def _got_size(self, size):
|
||||
self.log("total size is %d bytes" % size, level=log.NOISY)
|
||||
self._expected_size = size
|
||||
|
||||
def _start_reading(self, res):
|
||||
# then find out how much crypttext we have on disk
|
||||
if os.path.exists(self._incoming_file):
|
||||
self._have = os.stat(self._incoming_file)[stat.ST_SIZE]
|
||||
self.log("we already have %d bytes" % self._have, level=log.NOISY)
|
||||
else:
|
||||
self._have = 0
|
||||
self._f = open(self._incoming_file, "wb")
|
||||
@ -200,10 +213,12 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
|
||||
d = defer.maybeDeferred(self._fetch)
|
||||
def _done(finished):
|
||||
if finished:
|
||||
self.log("finished reading ciphertext", level=log.NOISY)
|
||||
fire_when_done.callback(None)
|
||||
else:
|
||||
self._loop(fire_when_done)
|
||||
def _err(f):
|
||||
self.log("ciphertext read failed", failure=f, level=log.UNUSUAL)
|
||||
fire_when_done.errback(f)
|
||||
d.addCallbacks(_done, _err)
|
||||
return None
|
||||
@ -213,6 +228,8 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
|
||||
fetch_size = min(needed, self.CHUNK_SIZE)
|
||||
if fetch_size == 0:
|
||||
return True # all done
|
||||
self.log("fetching %d-%d" % (self._have, self._have+fetch_size),
|
||||
level=log.NOISY)
|
||||
d = self.call("read_encrypted", self._have, fetch_size)
|
||||
def _got_data(ciphertext_v):
|
||||
for data in ciphertext_v:
|
||||
@ -241,6 +258,9 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
|
||||
self._f.close()
|
||||
self._f = None
|
||||
self._readers = []
|
||||
self.log(format="done fetching ciphertext, size=%(size)d",
|
||||
size=os.stat(self._incoming_file)[stat.ST_SIZE],
|
||||
level=log.NOISY)
|
||||
os.rename(self._incoming_file, self._encoding_file)
|
||||
self._done_observers.fire(None)
|
||||
|
||||
|
@ -340,6 +340,11 @@ class EncryptAnUploadable:
|
||||
self._encoding_parameters = None
|
||||
self._file_size = None
|
||||
|
||||
def log(self, *args, **kwargs):
|
||||
if "facility" not in kwargs:
|
||||
kwargs["facility"] = "upload.encryption"
|
||||
return log.msg(*args, **kwargs)
|
||||
|
||||
def get_size(self):
|
||||
if self._file_size is not None:
|
||||
return defer.succeed(self._file_size)
|
||||
@ -381,6 +386,8 @@ class EncryptAnUploadable:
|
||||
segsize = mathutil.next_multiple(segsize, k)
|
||||
self._segment_size = segsize # used by segment hashers
|
||||
self._encoding_parameters = (k, happy, n, segsize)
|
||||
self.log("my encoding parameters: %s" %
|
||||
(self._encoding_parameters,), level=log.NOISY)
|
||||
return self._encoding_parameters
|
||||
d.addCallback(_got_pieces)
|
||||
return d
|
||||
@ -433,6 +440,14 @@ class EncryptAnUploadable:
|
||||
# we've filled this segment
|
||||
self._plaintext_segment_hashes.append(p.digest())
|
||||
self._plaintext_segment_hasher = None
|
||||
self.log("closed hash [%d]: %dB" %
|
||||
(len(self._plaintext_segment_hashes)-1,
|
||||
self._plaintext_segment_hashed_bytes),
|
||||
level=log.NOISY)
|
||||
self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
|
||||
segnum=len(self._plaintext_segment_hashes)-1,
|
||||
hash=idlib.b2a(p.digest()),
|
||||
level=log.NOISY)
|
||||
|
||||
offset += this_segment
|
||||
|
||||
@ -452,6 +467,8 @@ class EncryptAnUploadable:
|
||||
# memory: each chunk is destroyed as soon as we're done with it.
|
||||
while data:
|
||||
chunk = data.pop(0)
|
||||
log.msg(" read_encrypted handling %dB-sized chunk" % len(chunk),
|
||||
level=log.NOISY)
|
||||
self._plaintext_hasher.update(chunk)
|
||||
self._update_segment_hash(chunk)
|
||||
cryptdata.append(self._encryptor.process(chunk))
|
||||
@ -467,6 +484,13 @@ class EncryptAnUploadable:
|
||||
p, segment_left = self._get_segment_hasher()
|
||||
self._plaintext_segment_hashes.append(p.digest())
|
||||
del self._plaintext_segment_hasher
|
||||
self.log("closing plaintext leaf hasher, hashed %d bytes" %
|
||||
self._plaintext_segment_hashed_bytes,
|
||||
level=log.NOISY)
|
||||
self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
|
||||
segnum=len(self._plaintext_segment_hashes)-1,
|
||||
hash=idlib.b2a(p.digest()),
|
||||
level=log.NOISY)
|
||||
assert len(self._plaintext_segment_hashes) == num_segments
|
||||
return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
|
||||
|
||||
@ -522,7 +546,7 @@ class CHKUploader:
|
||||
def start_encrypted(self, encrypted):
|
||||
eu = IEncryptedUploadable(encrypted)
|
||||
|
||||
self._encoder = e = encode.Encoder(self)
|
||||
self._encoder = e = encode.Encoder(self._log_number)
|
||||
d = e.set_encrypted_uploadable(eu)
|
||||
d.addCallback(self.locate_all_shareholders)
|
||||
d.addCallback(self.set_shareholders, e)
|
||||
@ -637,6 +661,9 @@ class RemoteEncryptedUploadable(Referenceable):
|
||||
d.addCallback(_read)
|
||||
return d
|
||||
def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
|
||||
log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
|
||||
(first, last-1, num_segments),
|
||||
level=log.NOISY)
|
||||
d = self._eu.get_plaintext_hashtree_leaves(first, last, num_segments)
|
||||
d.addCallback(list)
|
||||
return d
|
||||
|
Loading…
x
Reference in New Issue
Block a user