add logging around peer selection and upload

This commit is contained in:
Jean-Paul Calderone 2019-04-03 10:32:04 -04:00
parent 83b520bd68
commit 3c44cb65a9

View File

@ -28,6 +28,115 @@ from pycryptopp.cipher.aes import AES
from six.moves import cStringIO as StringIO
from happiness_upload import share_placement, calculate_happiness
from ..util.eliotutil import (
log_call_deferred,
inline_callbacks,
)
from eliot import (
ActionType,
MessageType,
Message,
Field,
)
_TOTAL_SHARES = Field.for_types(
u"total_shares",
[int, long],
u"The total number of shares desired.",
)
def _serialize_peers(peers):
return sorted(base32.b2a(p) for p in peers)
_PEERS = Field(
u"peers",
_serialize_peers,
u"The read/write peers being considered.",
)
_READONLY_PEERS = Field(
u"readonly_peers",
_serialize_peers,
u"The read-only peers being considered.",
)
def _serialize_existing_shares(existing_shares):
return {
server: list(shares)
for (server, shares)
in existing_shares.iteritems()
}
_EXISTING_SHARES = Field(
u"existing_shares",
_serialize_existing_shares,
u"The shares that are believed to already have been placed.",
)
def _serialize_happiness_mappings(happiness_mappings):
return {
sharenum: base32.b2a(serverid)
for (sharenum, serverid)
in happiness_mappings.iteritems()
}
_HAPPINESS_MAPPINGS = Field(
u"happiness_mappings",
_serialize_happiness_mappings,
u"The computed happiness mapping for a particular upload.",
)
_HAPPINESS = Field.for_types(
u"happiness",
[int, long],
u"The computed happiness of a certain placement.",
)
_UPLOAD_TRACKERS = Field(
u"upload_trackers",
lambda trackers: list(
dict(
server=tracker.get_name(),
shareids=sorted(tracker.buckets.keys()),
)
for tracker
in trackers
),
u"Some servers which have agreed to hold some shares for us.",
)
_ALREADY_SERVERIDS = Field(
u"already_serverids",
lambda d: d,
u"Some servers which are already holding some shares that we were interested in uploading.",
)
LOCATE_ALL_SHAREHOLDERS = ActionType(
u"immutable:upload:locate-all-shareholders",
[],
[_UPLOAD_TRACKERS, _ALREADY_SERVERIDS],
u"Existing shareholders are being identified to plan upload actions.",
)
GET_SHARE_PLACEMENTS = MessageType(
u"immutable:upload:get-share-placements",
[_TOTAL_SHARES, _PEERS, _READONLY_PEERS, _EXISTING_SHARES, _HAPPINESS_MAPPINGS, _HAPPINESS],
u"Share placement is being computed for an upload.",
)
_EFFECTIVE_HAPPINESS = Field.for_types(
u"effective_happiness",
[int, long],
u"The computed happiness value of a share placement map.",
)
CONVERGED_HAPPINESS = MessageType(
u"immutable:upload:get-shareholders:converged-happiness",
[_EFFECTIVE_HAPPINESS],
u"The share placement algorithm has converged and placements efforts are complete.",
)
# this wants to live in storage, not here
class TooFullError(Exception):
@ -249,6 +358,14 @@ class PeerSelector(object):
shares = set(range(self.total_shares))
self.happiness_mappings = share_placement(self.peers, self.readonly_peers, shares, self.existing_shares)
self.happiness = calculate_happiness(self.happiness_mappings)
GET_SHARE_PLACEMENTS.log(
total_shares=self.total_shares,
peers=self.peers,
readonly_peers=self.readonly_peers,
existing_shares=self.existing_shares,
happiness_mappings=self.happiness_mappings,
happiness=self.happiness,
)
return self.happiness_mappings
@ -334,7 +451,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
return readonly_trackers, write_trackers
@defer.inlineCallbacks
@inline_callbacks
def get_shareholders(self, storage_broker, secret_holder,
storage_index, share_size, block_size,
num_segments, total_shares, needed_shares,
@ -552,6 +669,10 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
# for k, v in merged.items():
# print(" {} -> {}".format(k, v))
CONVERGED_HAPPINESS.log(
effective_happiness=effective_happiness,
)
if effective_happiness < min_happiness:
msg = failure_message(
peer_count=len(self.serverids_with_shares),
@ -1060,6 +1181,7 @@ class CHKUploader(object):
kwargs["facility"] = "tahoe.upload"
return log.msg(*args, **kwargs)
@log_call_deferred(action_type=u"immutable:upload:chk:start")
def start(self, encrypted_uploadable):
"""Start uploading the file.
@ -1087,7 +1209,8 @@ class CHKUploader(object):
return defer.succeed(None)
return self._encoder.abort()
@defer.inlineCallbacks
@log_call_deferred(action_type=u"immutable:upload:chk:start-encrypted")
@inline_callbacks
def start_encrypted(self, encrypted):
"""
Returns a Deferred that will fire with the UploadResults instance.
@ -1104,10 +1227,12 @@ class CHKUploader(object):
)
# this just returns itself
yield self._encoder.set_encrypted_uploadable(eu)
(upload_trackers, already_serverids) = yield self.locate_all_shareholders(self._encoder, started)
yield self.set_shareholders(upload_trackers, already_serverids, self._encoder)
with LOCATE_ALL_SHAREHOLDERS() as action:
(upload_trackers, already_serverids) = yield self.locate_all_shareholders(self._encoder, started)
action.add_success_fields(upload_trackers=upload_trackers, already_serverids=already_serverids)
self.set_shareholders(upload_trackers, already_serverids, self._encoder)
verifycap = yield self._encoder.start()
results = yield self._encrypted_done(verifycap)
results = self._encrypted_done(verifycap)
defer.returnValue(results)
def locate_all_shareholders(self, encoder, started):
@ -1181,7 +1306,9 @@ class CHKUploader(object):
encoder.set_shareholders(buckets, servermap)
def _encrypted_done(self, verifycap):
"""Returns a Deferred that will fire with the UploadResults instance."""
"""
:return UploadResults: A description of the outcome of the upload.
"""
e = self._encoder
sharemap = dictutil.DictOfSets()
servermap = dictutil.DictOfSets()