immutable: use PrefixingLogMixin to organize logging in Tahoe2PeerSelector and add more detailed messages about peer

This commit is contained in:
Zooko O'Whielacronx 2010-07-19 01:20:00 -07:00
parent 2ebe2c2ff3
commit 0f94923f22
3 changed files with 34 additions and 25 deletions

View File

@ -74,6 +74,9 @@ EXTENSION_SIZE = 1000
# TODO: actual extensions are closer to 419 bytes, so we can probably lower
# this.
def pretty_print_shnum_to_servers(s):
return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
class PeerTracker:
def __init__(self, peerid, storage_server,
sharesize, blocksize, num_segments, num_share_hashes,
@ -152,7 +155,7 @@ class PeerTracker:
del self.buckets[sharenum]
class Tahoe2PeerSelector:
class Tahoe2PeerSelector(log.PrefixingLogMixin):
def __init__(self, upload_id, logparent=None, upload_status=None):
self.upload_id = upload_id
@ -163,7 +166,8 @@ class Tahoe2PeerSelector:
self.num_peers_contacted = 0
self.last_failure_msg = None
self._status = IUploadStatus(upload_status)
self._log_parent = log.msg("%s starting" % self, parent=logparent)
log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
self.log("starting", level=log.OPERATIONAL)
def __repr__(self):
return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
@ -268,10 +272,9 @@ class Tahoe2PeerSelector:
ds.append(d)
self.num_peers_contacted += 1
self.query_count += 1
log.msg("asking peer %s for any existing shares for "
"upload id %s"
% (idlib.shortnodeid_b2a(peer.peerid), self.upload_id),
level=log.NOISY, parent=self._log_parent)
self.log("asking peer %s for any existing shares" %
(idlib.shortnodeid_b2a(peer.peerid),),
level=log.NOISY)
dl = defer.DeferredList(ds)
dl.addCallback(lambda ign: self._loop())
return dl
@ -283,18 +286,18 @@ class Tahoe2PeerSelector:
Tahoe2PeerSelector._existing_shares.
"""
if isinstance(res, failure.Failure):
log.msg("%s got error during existing shares check: %s"
self.log("%s got error during existing shares check: %s"
% (idlib.shortnodeid_b2a(peer), res),
level=log.UNUSUAL, parent=self._log_parent)
level=log.UNUSUAL)
self.error_count += 1
self.bad_query_count += 1
else:
buckets = res
if buckets:
self.peers_with_shares.add(peer)
log.msg("response from peer %s: alreadygot=%s"
self.log("response to get_buckets() from peer %s: alreadygot=%s"
% (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
level=log.NOISY, parent=self._log_parent)
level=log.NOISY)
for bucket in buckets:
self.preexisting_shares.setdefault(bucket, set()).add(peer)
if self.homeless_shares and bucket in self.homeless_shares:
@ -328,9 +331,8 @@ class Tahoe2PeerSelector:
merged = merge_peers(self.preexisting_shares, self.use_peers)
effective_happiness = servers_of_happiness(merged)
if self.servers_of_happiness <= effective_happiness:
msg = ("peer selection successful for %s: %s" % (self,
self._get_progress_message()))
log.msg(msg, parent=self._log_parent)
msg = ("server selection successful for %s: %s: pretty_print_merged: %s, self.use_peers: %s, self.preexisting_shares: %s" % (self, self._get_progress_message(), pretty_print_shnum_to_servers(merged), [', '.join(["%s: %s" % (k, idlib.shortnodeid_b2a(v._nodeid),) for k,v in p.buckets.iteritems()]) for p in self.use_peers], pretty_print_shnum_to_servers(self.preexisting_shares)))
self.log(msg, level=log.OPERATIONAL)
return (self.use_peers, self.preexisting_shares)
else:
# We're not okay right now, but maybe we can fix it by
@ -374,8 +376,7 @@ class Tahoe2PeerSelector:
self.needed_shares,
self.servers_of_happiness,
effective_happiness)
log.msg("server selection unsuccessful for %r: %s (%s), merged=%r"
% (self, msg, self._get_progress_message(), merged), level=log.INFREQUENT)
self.log("server selection unsuccessful for %r: %s (%s), merged=%s" % (self, msg, self._get_progress_message(), pretty_print_shnum_to_servers(merged)), level=log.INFREQUENT)
return self._failed("%s (%s)" % (msg, self._get_progress_message()))
if self.uncontacted_peers:
@ -398,7 +399,7 @@ class Tahoe2PeerSelector:
elif self.contacted_peers:
# ask a peer that we've already asked.
if not self._started_second_pass:
log.msg("starting second pass", parent=self._log_parent,
self.log("starting second pass",
level=log.NOISY)
self._started_second_pass = True
num_shares = mathutil.div_ceil(len(self.homeless_shares),
@ -436,20 +437,23 @@ class Tahoe2PeerSelector:
self._get_progress_message()))
if self.last_failure_msg:
msg += " (%s)" % (self.last_failure_msg,)
log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
self.log(msg, level=log.UNUSUAL)
return self._failed(msg)
else:
# we placed enough to be happy, so we're done
if self._status:
self._status.set_status("Placed all shares")
msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
self.log(msg, level=log.OPERATIONAL)
return (self.use_peers, self.preexisting_shares)
def _got_response(self, res, peer, shares_to_ask, put_peer_here):
if isinstance(res, failure.Failure):
# This is unusual, and probably indicates a bug or a network
# problem.
log.msg("%s got error during peer selection: %s" % (peer, res),
level=log.UNUSUAL, parent=self._log_parent)
self.log("%s got error during peer selection: %s" % (peer, res),
level=log.UNUSUAL)
self.error_count += 1
self.bad_query_count += 1
self.homeless_shares = list(shares_to_ask) + self.homeless_shares
@ -468,10 +472,10 @@ class Tahoe2PeerSelector:
self.last_failure_msg = msg
else:
(alreadygot, allocated) = res
log.msg("response from peer %s: alreadygot=%s, allocated=%s"
self.log("response to allocate_buckets() from peer %s: alreadygot=%s, allocated=%s"
% (idlib.shortnodeid_b2a(peer.peerid),
tuple(sorted(alreadygot)), tuple(sorted(allocated))),
level=log.NOISY, parent=self._log_parent)
level=log.NOISY)
progress = False
for s in alreadygot:
self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
@ -914,7 +918,7 @@ class CHKUploader:
@paran already_peers: a dict mapping sharenum to a set of peerids
that claim to already have this share
"""
self.log("_send_shares, upload_servers is %s" % (upload_servers,))
self.log("set_shareholders; upload_servers is %s, already_peers is %s" % ([', '.join(["%s: %s" % (k, idlib.shortnodeid_b2a(v._nodeid),) for k,v in p.buckets.iteritems()]) for p in upload_servers], already_peers))
# record already-present shares in self._results
self._results.preexisting_shares = len(already_peers)
@ -928,6 +932,7 @@ class CHKUploader:
for shnum in peer.buckets:
self._peer_trackers[shnum] = peer
servermap.setdefault(shnum, set()).add(peer.peerid)
self.log("set_shareholders; %s (%s) == %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in upload_servers]), [(p.buckets, p.peerid) for p in upload_servers]))
assert len(buckets) == sum([len(peer.buckets) for peer in upload_servers]), "%s (%s) != %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in upload_servers]), [(p.buckets, p.peerid) for p in upload_servers])
encoder.set_shareholders(buckets, servermap)

View File

@ -5,7 +5,7 @@ from twisted.application import service
from zope.interface import implements
from allmydata.interfaces import RIStorageServer, IStatsProducer
from allmydata.util import fileutil, log, time_format
from allmydata.util import fileutil, idlib, log, time_format
import allmydata # for __full_version__
from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
@ -106,6 +106,9 @@ class StorageServer(service.MultiService, Referenceable):
expiration_sharetypes)
self.lease_checker.setServiceParent(self)
def __repr__(self):
return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
def add_bucket_counter(self):
statefile = os.path.join(self.storedir, "bucket_counter.state")
self.bucket_counter = BucketCountingCrawler(self, statefile)

View File

@ -11,6 +11,7 @@ import allmydata # for __full_version__
from allmydata import uri, monitor, client
from allmydata.immutable import upload, encode
from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
from allmydata.util import log
from allmydata.util.assertutil import precondition
from allmydata.util.deferredutil import DeferredListShouldSucceed
from allmydata.test.no_network import GridTestMixin
@ -710,6 +711,7 @@ def combinations(iterable, r):
def is_happy_enough(servertoshnums, h, k):
""" I calculate whether servertoshnums achieves happiness level h. I do this with a naïve "brute force search" approach. (See src/allmydata/util/happinessutil.py for a better algorithm.) """
print "servertoshnums: ", servertoshnums, "h: ", h, "k: ", k
if len(servertoshnums) < h:
return False
# print "servertoshnums: ", servertoshnums, h, k
@ -798,9 +800,9 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
def _add_server(self, server_number, readonly=False):
assert self.g, "I tried to find a grid at self.g, but failed"
ss = self.g.make_server(server_number, readonly)
log.msg("just created a server, number: %s => %s" % (server_number, ss,))
self.g.add_server(server_number, ss)
def _add_server_with_share(self, server_number, share_number=None,
readonly=False):
self._add_server(server_number, readonly)
@ -861,7 +863,6 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
d.addCallback(_store_shares)
return d
def test_configure_parameters(self):
self.basedir = self.mktemp()
hooks = {0: self._set_up_nodes_extra_config}