more hierarchical logging: download/upload/encode

This commit is contained in:
Brian Warner 2007-11-19 19:33:41 -07:00
parent 869b690378
commit 33a5f8ba6b
4 changed files with 74 additions and 32 deletions

View File

@ -219,17 +219,27 @@ class BlockDownloader:
self.vbucket = vbucket
self.blocknum = blocknum
self.parent = parent
self._log_number = self.parent.log("starting block %d" % blocknum)
def log(self, msg, parent=None):
if parent is None:
parent = self._log_number
return self.parent.log(msg, parent=parent)
def start(self, segnum):
lognum = self.log("get_block(segnum=%d)" % segnum)
d = self.vbucket.get_block(segnum)
d.addCallbacks(self._hold_block, self._got_block_error)
d.addCallbacks(self._hold_block, self._got_block_error,
callbackArgs=(lognum,), errbackArgs=(lognum,))
return d
def _hold_block(self, data):
def _hold_block(self, data, lognum):
self.log("got block", parent=lognum)
self.parent.hold_block(self.blocknum, data)
def _got_block_error(self, f):
log.msg("BlockDownloader[%d] got error: %s" % (self.blocknum, f))
def _got_block_error(self, f, lognum):
self.log("BlockDownloader[%d] got error: %s" % (self.blocknum, f),
parent=lognum)
self.parent.bucket_failed(self.vbucket)
class SegmentDownloader:
@ -244,6 +254,13 @@ class SegmentDownloader:
self.segmentnumber = segmentnumber
self.needed_blocks = needed_shares
self.blocks = {} # k: blocknum, v: data
self._log_number = self.parent.log("starting segment %d" %
segmentnumber)
def log(self, msg, parent=None):
if parent is None:
parent = self._log_number
return self.parent.log(msg, parent=parent)
def start(self):
return self._download()

View File

@ -2,7 +2,6 @@
from zope.interface import implements
from twisted.internet import defer
from twisted.python import log
from foolscap import eventual
from allmydata import uri
from allmydata.hashtree import HashTree
@ -76,7 +75,7 @@ class Encoder(object):
TOTAL_SHARES = 10
MAX_SEGMENT_SIZE = 1*MiB
def __init__(self, options={}):
def __init__(self, options={}, parent=None):
object.__init__(self)
self.MAX_SEGMENT_SIZE = options.get("max_segment_size",
self.MAX_SEGMENT_SIZE)
@ -89,12 +88,22 @@ class Encoder(object):
self.TOTAL_SHARES = n
self.uri_extension_data = {}
self._codec = None
self._parent = parent
if self._parent:
self._log_number = self._parent.log("starting Encoder %s" % self)
def __repr__(self):
if hasattr(self, "_storage_index"):
return "<Encoder for %s>" % idlib.b2a(self._storage_index)[:6]
return "<Encoder for unknown storage index>"
def log(self, msg, parent=None):
if not self._parent:
return
if parent is None:
parent = self._log_number
return self._parent.log(msg, parent=parent)
def set_size(self, size):
assert not self._codec
self.file_size = size
@ -105,6 +114,7 @@ class Encoder(object):
self.NEEDED_SHARES = k
self.SHARES_OF_HAPPINESS = d
self.TOTAL_SHARES = n
self.log("set_params: %d,%d,%d" % (k, d, n))
def _setup_codec(self):
self.num_shares = self.TOTAL_SHARES
@ -205,6 +215,7 @@ class Encoder(object):
self.landlords = landlords.copy()
def start(self):
self.log("starting")
#paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
if not self._codec:
self._setup_codec()
@ -372,10 +383,11 @@ class Encoder(object):
_assert(set(self.landlords.keys()).issubset(set(shareids)),
shareids=shareids, landlords=self.landlords)
dl = []
lognum = self.log("send_segment(%d)" % segnum)
for i in range(len(shares)):
subshare = shares[i]
shareid = shareids[i]
d = self.send_subshare(shareid, segnum, subshare)
d = self.send_subshare(shareid, segnum, subshare, lognum)
dl.append(d)
subshare_hash = hashutil.block_hash(subshare)
#from allmydata.util import idlib
@ -387,39 +399,46 @@ class Encoder(object):
dl = self._gather_responses(dl)
def _logit(res):
log.msg("%s uploaded %s / %s bytes (%d%%) of your file." %
(self,
self.segment_size*(segnum+1),
self.segment_size*self.num_segments,
100 * (segnum+1) / self.num_segments,
))
self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
(self,
self.segment_size*(segnum+1),
self.segment_size*self.num_segments,
100 * (segnum+1) / self.num_segments,
))
return res
dl.addCallback(_logit)
return dl
def send_subshare(self, shareid, segment_num, subshare):
def send_subshare(self, shareid, segment_num, subshare, lognum):
if shareid not in self.landlords:
return defer.succeed(None)
sh = self.landlords[shareid]
lognum2 = self.log("put_block to %s" % self.landlords[shareid],
parent=lognum)
d = sh.put_block(segment_num, subshare)
def _done(res):
self.log("put_block done", parent=lognum2)
return res
d.addCallback(_done)
d.addErrback(self._remove_shareholder, shareid,
"segnum=%d" % segment_num)
return d
def _remove_shareholder(self, why, shareid, where):
log.msg("error while sending %s to shareholder=%d: %s" %
(where, shareid, why)) # UNUSUAL
ln = self.log("UNUSUAL: error while sending %s to shareholder=%d: %s" %
(where, shareid, why))
if shareid in self.landlords:
del self.landlords[shareid]
else:
# even more UNUSUAL
log.msg(" weird, they weren't in our list of landlords")
self.log("WEIRD: they weren't in our list of landlords", parent=ln)
if len(self.landlords) < self.shares_of_happiness:
msg = "lost too many shareholders during upload: %s" % why
raise NotEnoughPeersError(msg)
log.msg("but we can still continue with %s shares, we'll be happy "
"with at least %s" % (len(self.landlords),
self.shares_of_happiness))
self.log("but we can still continue with %s shares, we'll be happy "
"with at least %s" % (len(self.landlords),
self.shares_of_happiness),
parent=ln)
def _gather_responses(self, dl):
d = defer.DeferredList(dl, fireOnOneErrback=True)
@ -452,7 +471,7 @@ class Encoder(object):
return d
def send_plaintext_hash_tree_to_all_shareholders(self):
log.msg("%s sending plaintext hash tree" % self)
self.log("sending plaintext hash tree")
dl = []
for shareid in self.landlords.keys():
d = self.send_plaintext_hash_tree(shareid,
@ -469,7 +488,7 @@ class Encoder(object):
return d
def send_crypttext_hash_tree_to_all_shareholders(self):
log.msg("%s sending crypttext hash tree" % self)
self.log("sending crypttext hash tree")
t = HashTree(self._crypttext_hashes)
all_hashes = list(t)
self.uri_extension_data["crypttext_root_hash"] = t[0]
@ -487,7 +506,7 @@ class Encoder(object):
return d
def send_all_subshare_hash_trees(self):
log.msg("%s sending subshare hash trees" % self)
self.log("sending subshare hash trees")
dl = []
for shareid,hashes in enumerate(self.subshare_hashes):
# hashes is a list of the hashes of all subshares that were sent
@ -514,7 +533,7 @@ class Encoder(object):
# validate their share. This includes the share hash itself, but does
# not include the top-level hash root (which is stored securely in
# the URI instead).
log.msg("%s sending all share hash trees" % self)
self.log("sending all share hash trees")
dl = []
for h in self.share_root_hashes:
assert h
@ -540,7 +559,7 @@ class Encoder(object):
return d
def send_uri_extension_to_all_shareholders(self):
log.msg("%s: sending uri_extension" % self)
self.log("sending uri_extension")
for k in ('crypttext_root_hash', 'crypttext_hash',
'plaintext_root_hash', 'plaintext_hash',
):
@ -559,7 +578,7 @@ class Encoder(object):
return d
def close_all_shareholders(self):
log.msg("%s: closing shareholders" % self)
self.log("closing shareholders")
dl = []
for shareid in self.landlords:
d = self.landlords[shareid].close()
@ -568,12 +587,12 @@ class Encoder(object):
return self._gather_responses(dl)
def done(self):
log.msg("%s: upload done" % self)
self.log("upload done")
return (self.uri_extension_hash, self.required_shares,
self.num_shares, self.file_size)
def err(self, f):
log.msg("%s: upload failed: %s" % (self, f)) # UNUSUAL
self.log("UNUSUAL: %s: upload failed: %s" % (self, f))
if f.check(defer.FirstError):
return f.value.subFailure
return f

View File

@ -139,7 +139,6 @@ class FakeClient:
def __init__(self, mode="good", num_servers=50):
self.mode = mode
self.num_servers = num_servers
self.introducer_client = FakeIntroducerClient()
def get_permuted_peers(self, storage_index, include_myself):
peers = [ ("%20d"%fakeid, "%20d"%fakeid, FakePeer(self.mode),)
for fakeid in range(self.num_servers) ]

View File

@ -420,10 +420,16 @@ class CHKUploader:
self._client = client
self._wait_for_numpeers = wait_for_numpeers
self._options = options
self._log_number = self._client.log("CHKUploader starting")
def set_params(self, encoding_parameters):
self._encoding_parameters = encoding_parameters
def log(self, msg, parent=None):
if parent is None:
parent = self._log_number
return self._client.log(msg, parent=parent)
def start(self, uploadable):
"""Start uploading the file.
@ -431,7 +437,7 @@ class CHKUploader:
string)."""
uploadable = IUploadable(uploadable)
log.msg("starting upload of %s" % uploadable)
self.log("starting upload of %s" % uploadable)
eu = EncryptAnUploadable(uploadable)
d = self.start_encrypted(eu)
@ -445,7 +451,7 @@ class CHKUploader:
def start_encrypted(self, encrypted):
eu = IEncryptedUploadable(encrypted)
e = encode.Encoder(self._options)
e = encode.Encoder(self._options, self)
e.set_params(self._encoding_parameters)
d = e.set_encrypted_uploadable(eu)
def _wait_for_peers(res):
@ -467,6 +473,7 @@ class CHKUploader:
def locate_all_shareholders(self, encoder):
storage_index = encoder.get_param("storage_index")
upload_id = idlib.b2a(storage_index)[:6]
self.log("using storage index %s" % upload_id)
peer_selector = self.peer_selector_class(upload_id)
share_size = encoder.get_param("share_size")
@ -484,7 +491,7 @@ class CHKUploader:
"""
@param used_peers: a sequence of PeerTracker objects
"""
log.msg("_send_shares, used_peers is %s" % (used_peers,))
self.log("_send_shares, used_peers is %s" % (used_peers,))
for peer in used_peers:
assert isinstance(peer, PeerTracker)
buckets = {}