mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-04-07 10:56:49 +00:00
mutable WIP: improve logging a bit
This commit is contained in:
parent
c0c8d72b44
commit
4f07d96667
@ -1,10 +1,11 @@
|
||||
|
||||
from allmydata.util import idlib
|
||||
|
||||
MODE_CHECK = "query all peers"
|
||||
MODE_ANYTHING = "one recoverable version"
|
||||
MODE_WRITE = "replace all shares, probably" # not for initial creation
|
||||
MODE_ENOUGH = "enough"
|
||||
MODE_CHECK = "MODE_CHECK" # query all peers
|
||||
MODE_ANYTHING = "MODE_ANYTHING" # one recoverable version
|
||||
MODE_WRITE = "MODE_WRITE" # replace all shares, probably.. not for initial
|
||||
# creation
|
||||
MODE_READ = "MODE_READ"
|
||||
|
||||
class NotMutableError(Exception):
|
||||
pass
|
||||
|
@ -12,7 +12,7 @@ from pycryptopp.publickey import rsa
|
||||
from pycryptopp.cipher.aes import AES
|
||||
|
||||
from publish import Publish
|
||||
from common import MODE_ENOUGH, MODE_WRITE, UnrecoverableFileError, \
|
||||
from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \
|
||||
ResponseCache
|
||||
from servermap import ServerMap, ServermapUpdater
|
||||
from retrieve import Retrieve
|
||||
@ -214,7 +214,7 @@ class MutableFileNode:
|
||||
|
||||
# methods exposed to the higher-layer application
|
||||
|
||||
def update_servermap(self, old_map=None, mode=MODE_ENOUGH):
|
||||
def update_servermap(self, old_map=None, mode=MODE_READ):
|
||||
servermap = old_map or ServerMap()
|
||||
d = self.obtain_lock()
|
||||
d.addCallback(lambda res:
|
||||
@ -280,7 +280,7 @@ class MutableFileNode:
|
||||
d.addCallback(_done)
|
||||
return d
|
||||
|
||||
def _update_and_retrieve_best(self, old_map=None, mode=MODE_ENOUGH):
|
||||
def _update_and_retrieve_best(self, old_map=None, mode=MODE_READ):
|
||||
d = self.update_servermap(old_map=old_map, mode=mode)
|
||||
def _updated(smap):
|
||||
goal = smap.best_recoverable_version()
|
||||
|
@ -452,7 +452,7 @@ class Retrieve:
|
||||
if isinstance(res, failure.Failure):
|
||||
self.log("Retrieve done, with failure", failure=res)
|
||||
else:
|
||||
self.log("Retrieve done, success!: res=%s" % (res,))
|
||||
self.log("Retrieve done, success!")
|
||||
# remember the encoding parameters, use them again next time
|
||||
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
|
||||
offsets_tuple) = self.verinfo
|
||||
|
@ -7,7 +7,7 @@ from allmydata.util import base32, hashutil, idlib, log
|
||||
from allmydata import storage
|
||||
from pycryptopp.publickey import rsa
|
||||
|
||||
from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_ENOUGH, \
|
||||
from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
|
||||
DictOfSets, CorruptShareError, NeedMoreDataError
|
||||
from layout import unpack_prefix_and_signature, unpack_header, unpack_share
|
||||
|
||||
@ -128,6 +128,18 @@ class ServerMap:
|
||||
seqnums.append(0)
|
||||
return max(seqnums)
|
||||
|
||||
def summarize_versions(self):
|
||||
"""Return a string describing which versions we know about."""
|
||||
versionmap = self.make_versionmap()
|
||||
bits = []
|
||||
for (verinfo, shares) in versionmap.items():
|
||||
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
|
||||
offsets_tuple) = verinfo
|
||||
shnums = set([shnum for (shnum, peerid, timestamp) in shares])
|
||||
bits.append("%d*seq%d-%s" %
|
||||
(len(shnums), seqnum, base32.b2a(root_hash)[:4]))
|
||||
return "/".join(bits)
|
||||
|
||||
def recoverable_versions(self):
|
||||
"""Return a set of versionids, one for each version that is currently
|
||||
recoverable."""
|
||||
@ -183,7 +195,7 @@ class ServerMap:
|
||||
pass
|
||||
|
||||
class ServermapUpdater:
|
||||
def __init__(self, filenode, servermap, mode=MODE_ENOUGH):
|
||||
def __init__(self, filenode, servermap, mode=MODE_READ):
|
||||
"""I update a servermap, locating a sufficient number of useful
|
||||
shares and remembering where they are located.
|
||||
|
||||
@ -218,7 +230,8 @@ class ServermapUpdater:
|
||||
self._need_privkey = True
|
||||
|
||||
prefix = storage.si_b2a(self._storage_index)[:5]
|
||||
self._log_number = log.msg("SharemapUpdater(%s): starting" % prefix)
|
||||
self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
|
||||
si=prefix, mode=mode)
|
||||
|
||||
def log(self, *args, **kwargs):
|
||||
if "parent" not in kwargs:
|
||||
@ -599,7 +612,7 @@ class ServermapUpdater:
|
||||
|
||||
if self.mode == MODE_ANYTHING:
|
||||
if recoverable_versions:
|
||||
self.log("MODE_ANYTHING and %d recoverable versions: done"
|
||||
self.log("%d recoverable versions: done"
|
||||
% len(recoverable_versions),
|
||||
parent=lp)
|
||||
return self._done()
|
||||
@ -607,24 +620,23 @@ class ServermapUpdater:
|
||||
if self.mode == MODE_CHECK:
|
||||
# we used self._must_query, and we know there aren't any
|
||||
# responses still waiting, so that means we must be done
|
||||
self.log("MODE_CHECK: done",
|
||||
parent=lp)
|
||||
self.log("done", parent=lp)
|
||||
return self._done()
|
||||
|
||||
MAX_IN_FLIGHT = 5
|
||||
if self.mode == MODE_ENOUGH:
|
||||
if self.mode == MODE_READ:
|
||||
# if we've queried k+epsilon servers, and we see a recoverable
|
||||
# version, and we haven't seen any unrecoverable higher-seqnum'ed
|
||||
# versions, then we're done.
|
||||
|
||||
if self._queries_completed < self.num_peers_to_query:
|
||||
self.log(format="ENOUGH, %(completed)d completed, %(query)d to query: need more",
|
||||
self.log(format="%(completed)d completed, %(query)d to query: need more",
|
||||
completed=self._queries_completed,
|
||||
query=self.num_peers_to_query,
|
||||
parent=lp)
|
||||
return self._send_more_queries(MAX_IN_FLIGHT)
|
||||
if not recoverable_versions:
|
||||
self.log("ENOUGH, no recoverable versions: need more",
|
||||
self.log("no recoverable versions: need more",
|
||||
parent=lp)
|
||||
return self._send_more_queries(MAX_IN_FLIGHT)
|
||||
highest_recoverable = max(recoverable_versions)
|
||||
@ -635,12 +647,11 @@ class ServermapUpdater:
|
||||
# don't yet see enough shares to recover it. Try harder.
|
||||
# TODO: consider sending more queries.
|
||||
# TODO: consider limiting the search distance
|
||||
self.log("ENOUGH, evidence of higher seqnum: need more")
|
||||
self.log("evidence of higher seqnum: need more")
|
||||
return self._send_more_queries(MAX_IN_FLIGHT)
|
||||
# all the unrecoverable versions were old or concurrent with a
|
||||
# recoverable version. Good enough.
|
||||
self.log("ENOUGH: no higher-seqnum: done",
|
||||
parent=lp)
|
||||
self.log("no higher-seqnum: done", parent=lp)
|
||||
return self._done()
|
||||
|
||||
if self.mode == MODE_WRITE:
|
||||
@ -650,8 +661,7 @@ class ServermapUpdater:
|
||||
# every server in the world.
|
||||
|
||||
if not recoverable_versions:
|
||||
self.log("WRITE, no recoverable versions: need more",
|
||||
parent=lp)
|
||||
self.log("no recoverable versions: need more", parent=lp)
|
||||
return self._send_more_queries(MAX_IN_FLIGHT)
|
||||
|
||||
last_found = -1
|
||||
@ -673,7 +683,7 @@ class ServermapUpdater:
|
||||
if last_found != -1:
|
||||
num_not_found += 1
|
||||
if num_not_found >= self.EPSILON:
|
||||
self.log("MODE_WRITE: found our boundary, %s" %
|
||||
self.log("found our boundary, %s" %
|
||||
"".join(states),
|
||||
parent=lp)
|
||||
found_boundary = True
|
||||
@ -715,8 +725,7 @@ class ServermapUpdater:
|
||||
|
||||
# if we hit here, we didn't find our boundary, so we're still
|
||||
# waiting for peers
|
||||
self.log("MODE_WRITE: no boundary yet, %s" % "".join(states),
|
||||
parent=lp)
|
||||
self.log("no boundary yet, %s" % "".join(states), parent=lp)
|
||||
return self._send_more_queries(MAX_IN_FLIGHT)
|
||||
|
||||
# otherwise, keep up to 5 queries in flight. TODO: this is pretty
|
||||
@ -756,6 +765,7 @@ class ServermapUpdater:
|
||||
self._servermap.last_update_mode = self.mode
|
||||
self._servermap.last_update_time = self._started
|
||||
# the servermap will not be touched after this
|
||||
self.log("servermap: %s" % self._servermap.summarize_versions())
|
||||
eventually(self._done_deferred.callback, self._servermap)
|
||||
|
||||
def _fatal_error(self, f):
|
||||
|
@ -16,7 +16,7 @@ import sha
|
||||
|
||||
from allmydata.mutable.node import MutableFileNode
|
||||
from allmydata.mutable.common import DictOfSets, ResponseCache, \
|
||||
MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_ENOUGH, UnrecoverableFileError
|
||||
MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, UnrecoverableFileError
|
||||
from allmydata.mutable.retrieve import Retrieve
|
||||
from allmydata.mutable.publish import Publish
|
||||
from allmydata.mutable.servermap import ServerMap, ServermapUpdater
|
||||
@ -444,7 +444,7 @@ class Servermap(unittest.TestCase):
|
||||
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
|
||||
d.addCallback(lambda res: ms(mode=MODE_WRITE))
|
||||
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
|
||||
d.addCallback(lambda res: ms(mode=MODE_ENOUGH))
|
||||
d.addCallback(lambda res: ms(mode=MODE_READ))
|
||||
# this more stops at k+epsilon, and epsilon=k, so 6 shares
|
||||
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
|
||||
d.addCallback(lambda res: ms(mode=MODE_ANYTHING))
|
||||
@ -455,7 +455,7 @@ class Servermap(unittest.TestCase):
|
||||
# increasing order of number of servers queried, since once a server
|
||||
# gets into the servermap, we'll always ask it for an update.
|
||||
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 3))
|
||||
d.addCallback(lambda sm: us(sm, mode=MODE_ENOUGH))
|
||||
d.addCallback(lambda sm: us(sm, mode=MODE_READ))
|
||||
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
|
||||
d.addCallback(lambda sm: us(sm, mode=MODE_WRITE))
|
||||
d.addCallback(lambda sm: us(sm, mode=MODE_CHECK))
|
||||
@ -470,7 +470,7 @@ class Servermap(unittest.TestCase):
|
||||
ms = self.make_servermap
|
||||
us = self.update_servermap
|
||||
|
||||
d.addCallback(lambda res: ms(mode=MODE_ENOUGH))
|
||||
d.addCallback(lambda res: ms(mode=MODE_READ))
|
||||
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
|
||||
def _made_map(sm):
|
||||
v = sm.best_recoverable_version()
|
||||
@ -521,7 +521,7 @@ class Servermap(unittest.TestCase):
|
||||
d.addCallback(lambda res: ms(mode=MODE_WRITE))
|
||||
d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
|
||||
|
||||
d.addCallback(lambda res: ms(mode=MODE_ENOUGH))
|
||||
d.addCallback(lambda res: ms(mode=MODE_READ))
|
||||
d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
|
||||
|
||||
return d
|
||||
@ -554,7 +554,7 @@ class Servermap(unittest.TestCase):
|
||||
d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
|
||||
d.addCallback(lambda res: ms(mode=MODE_WRITE))
|
||||
d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
|
||||
d.addCallback(lambda res: ms(mode=MODE_ENOUGH))
|
||||
d.addCallback(lambda res: ms(mode=MODE_READ))
|
||||
d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
|
||||
|
||||
return d
|
||||
@ -575,7 +575,7 @@ class Roundtrip(unittest.TestCase):
|
||||
d.addCallback(_created)
|
||||
return d
|
||||
|
||||
def make_servermap(self, mode=MODE_ENOUGH, oldmap=None):
|
||||
def make_servermap(self, mode=MODE_READ, oldmap=None):
|
||||
if oldmap is None:
|
||||
oldmap = ServerMap()
|
||||
smu = ServermapUpdater(self._fn, oldmap, mode)
|
||||
@ -784,7 +784,7 @@ class Roundtrip(unittest.TestCase):
|
||||
def test_corrupt_some(self):
|
||||
# corrupt the data of first five shares (so the servermap thinks
|
||||
# they're good but retrieve marks them as bad), so that the
|
||||
# MODE_ENOUGH set of 6 will be insufficient, forcing node.download to
|
||||
# MODE_READ set of 6 will be insufficient, forcing node.download to
|
||||
# retry with more servers.
|
||||
corrupt(None, self._storage, "share_data", range(5))
|
||||
d = self.make_servermap()
|
||||
@ -847,7 +847,7 @@ class MultipleEncodings(unittest.TestCase):
|
||||
d.addCallback(_published)
|
||||
return d
|
||||
|
||||
def make_servermap(self, mode=MODE_ENOUGH, oldmap=None):
|
||||
def make_servermap(self, mode=MODE_READ, oldmap=None):
|
||||
if oldmap is None:
|
||||
oldmap = ServerMap()
|
||||
smu = ServermapUpdater(self._fn, oldmap, mode)
|
||||
|
Loading…
x
Reference in New Issue
Block a user