From 3c44cb65a9c7a3435b205620160bb0a885842eea Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Wed, 3 Apr 2019 10:32:04 -0400 Subject: [PATCH] add logging around peer selection and upload --- src/allmydata/immutable/upload.py | 139 ++++++++++++++++++++++++++++-- 1 file changed, 133 insertions(+), 6 deletions(-) diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index 6bad4ce9f..ee887d0c8 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -28,6 +28,115 @@ from pycryptopp.cipher.aes import AES from six.moves import cStringIO as StringIO from happiness_upload import share_placement, calculate_happiness +from ..util.eliotutil import ( + log_call_deferred, + inline_callbacks, +) + +from eliot import ( + ActionType, + MessageType, + Message, + Field, +) + +_TOTAL_SHARES = Field.for_types( + u"total_shares", + [int, long], + u"The total number of shares desired.", +) + +def _serialize_peers(peers): + return sorted(base32.b2a(p) for p in peers) + +_PEERS = Field( + u"peers", + _serialize_peers, + u"The read/write peers being considered.", +) + +_READONLY_PEERS = Field( + u"readonly_peers", + _serialize_peers, + u"The read-only peers being considered.", +) + +def _serialize_existing_shares(existing_shares): + return { + server: list(shares) + for (server, shares) + in existing_shares.iteritems() + } + +_EXISTING_SHARES = Field( + u"existing_shares", + _serialize_existing_shares, + u"The shares that are believed to already have been placed.", +) + +def _serialize_happiness_mappings(happiness_mappings): + return { + sharenum: base32.b2a(serverid) + for (sharenum, serverid) + in happiness_mappings.iteritems() + } + +_HAPPINESS_MAPPINGS = Field( + u"happiness_mappings", + _serialize_happiness_mappings, + u"The computed happiness mapping for a particular upload.", +) + +_HAPPINESS = Field.for_types( + u"happiness", + [int, long], + u"The computed happiness of a certain placement.", +) + +_UPLOAD_TRACKERS = Field( + u"upload_trackers", + lambda trackers: list( + dict( + server=tracker.get_name(), + shareids=sorted(tracker.buckets.keys()), + ) + for tracker + in trackers + ), + u"Some servers which have agreed to hold some shares for us.", +) + +_ALREADY_SERVERIDS = Field( + u"already_serverids", + lambda d: d, + u"Some servers which are already holding some shares that we were interested in uploading.", +) + +LOCATE_ALL_SHAREHOLDERS = ActionType( + u"immutable:upload:locate-all-shareholders", + [], + [_UPLOAD_TRACKERS, _ALREADY_SERVERIDS], + u"Existing shareholders are being identified to plan upload actions.", +) + +GET_SHARE_PLACEMENTS = MessageType( + u"immutable:upload:get-share-placements", + [_TOTAL_SHARES, _PEERS, _READONLY_PEERS, _EXISTING_SHARES, _HAPPINESS_MAPPINGS, _HAPPINESS], + u"Share placement is being computed for an upload.", +) + +_EFFECTIVE_HAPPINESS = Field.for_types( + u"effective_happiness", + [int, long], + u"The computed happiness value of a share placement map.", +) + +CONVERGED_HAPPINESS = MessageType( + u"immutable:upload:get-shareholders:converged-happiness", + [_EFFECTIVE_HAPPINESS], + u"The share placement algorithm has converged and placements efforts are complete.", +) + # this wants to live in storage, not here class TooFullError(Exception): @@ -249,6 +358,14 @@ class PeerSelector(object): shares = set(range(self.total_shares)) self.happiness_mappings = share_placement(self.peers, self.readonly_peers, shares, self.existing_shares) self.happiness = calculate_happiness(self.happiness_mappings) + GET_SHARE_PLACEMENTS.log( + total_shares=self.total_shares, + peers=self.peers, + readonly_peers=self.readonly_peers, + existing_shares=self.existing_shares, + happiness_mappings=self.happiness_mappings, + happiness=self.happiness, + ) return self.happiness_mappings @@ -334,7 +451,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): return readonly_trackers, write_trackers - @defer.inlineCallbacks + @inline_callbacks def get_shareholders(self, storage_broker, secret_holder, storage_index, share_size, block_size, num_segments, total_shares, needed_shares, @@ -552,6 +669,10 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): # for k, v in merged.items(): # print(" {} -> {}".format(k, v)) + CONVERGED_HAPPINESS.log( + effective_happiness=effective_happiness, + ) + if effective_happiness < min_happiness: msg = failure_message( peer_count=len(self.serverids_with_shares), @@ -1060,6 +1181,7 @@ class CHKUploader(object): kwargs["facility"] = "tahoe.upload" return log.msg(*args, **kwargs) + @log_call_deferred(action_type=u"immutable:upload:chk:start") def start(self, encrypted_uploadable): """Start uploading the file. @@ -1087,7 +1209,8 @@ class CHKUploader(object): return defer.succeed(None) return self._encoder.abort() - @defer.inlineCallbacks + @log_call_deferred(action_type=u"immutable:upload:chk:start-encrypted") + @inline_callbacks def start_encrypted(self, encrypted): """ Returns a Deferred that will fire with the UploadResults instance. @@ -1104,10 +1227,12 @@ class CHKUploader(object): ) # this just returns itself yield self._encoder.set_encrypted_uploadable(eu) - (upload_trackers, already_serverids) = yield self.locate_all_shareholders(self._encoder, started) - yield self.set_shareholders(upload_trackers, already_serverids, self._encoder) + with LOCATE_ALL_SHAREHOLDERS() as action: + (upload_trackers, already_serverids) = yield self.locate_all_shareholders(self._encoder, started) + action.add_success_fields(upload_trackers=upload_trackers, already_serverids=already_serverids) + self.set_shareholders(upload_trackers, already_serverids, self._encoder) verifycap = yield self._encoder.start() - results = yield self._encrypted_done(verifycap) + results = self._encrypted_done(verifycap) defer.returnValue(results) def locate_all_shareholders(self, encoder, started): @@ -1181,7 +1306,9 @@ class CHKUploader(object): encoder.set_shareholders(buckets, servermap) def _encrypted_done(self, verifycap): - """Returns a Deferred that will fire with the UploadResults instance.""" + """ + :return UploadResults: A description of the outcome of the upload. + """ e = self._encoder sharemap = dictutil.DictOfSets() servermap = dictutil.DictOfSets()