Remove ResponseCache in favor of MDMFSlotReadProxy's cache. closes #1240.

This contains several merged patches. Individual messages follow, latest first:

* Fix a warning from check-miscaptures.
* In retrieve.py, explicitly test whether a key is in self.servermap.proxies
  rather than catching KeyError.
* Added a new comment to the MDMF version of the test I removed, explaining
  the removal of the SDMF version.
* Removed test_corrupt_all_block_hash_tree_late, since the entire block_hash_tree
  is cached in the servermap for an SDMF file.
* Fixed several tests that require files larger than the servermap cache.
* Remove unused test_response_cache_memory_leak().
* Exercise the cache.
* Test infrastructure for counting cache misses on MDMF files.
* Removed the ResponseCache. Instead, the MDMFSlotReadProxy initialized
  by ServerMap is kept around so Retrieve can access it. The ReadProxy
  has a cache of the first 1000 bytes initially read from each share by
  the ServerMap. We're able to satisfy a number of requests out of this
  cache, so roundtrips are reduced from 84 to 60 in test_deepcheck_mdmf.
  There is still some mystery about under what conditions the cache has
  fewer than 1000 bytes. Also this breaks some existing unit tests that
  depend on the inner behavior of ResponseCache.
* The servermap.proxies (a cache of SlotReadProxies) is now keyed
  by (verinfo,serverid,shnum) rather than just (serverid,shnum)
* Minor cosmetic changes
* Added a test failure if the number of cache misses is too high.

Author: Andrew Miller <amiller@dappervision.com>
Signed-off-by: David-Sarah Hopwood <davidsarah@jacaranda.org>
This commit is contained in:
David-Sarah Hopwood 2012-12-27 00:00:17 +00:00
parent 8618929833
commit 4563ba456b
8 changed files with 87 additions and 185 deletions

View File

@ -1,6 +1,4 @@
from allmydata.util.spans import DataSpans
MODE_CHECK = "MODE_CHECK" # query all peers
MODE_ANYTHING = "MODE_ANYTHING" # one recoverable version
MODE_WRITE = "MODE_WRITE" # replace all shares, probably.. not for initial
@ -59,56 +57,3 @@ class CorruptShareError(BadShareError):
class UnknownVersionError(BadShareError):
"""The share we received was of a version we don't recognize."""
class ResponseCache:
"""I cache share data, to reduce the number of round trips used during
mutable file operations. All of the data in my cache is for a single
storage index, but I will keep information on multiple shares for
that storage index.
I maintain a highest-seen sequence number, and will flush all entries
each time this number increases (this doesn't necessarily imply that
all entries have the same sequence number).
My cache is indexed by a (verinfo, shnum) tuple.
My cache entries are DataSpans instances, each representing a set of
non-overlapping byteranges.
"""
def __init__(self):
self.cache = {}
self.seqnum = None
def _clear(self):
# also used by unit tests
self.cache = {}
def add(self, verinfo, shnum, offset, data):
seqnum = verinfo[0]
if seqnum > self.seqnum:
self._clear()
self.seqnum = seqnum
index = (verinfo, shnum)
if index in self.cache:
self.cache[index].add(offset, data)
else:
spans = DataSpans()
spans.add(offset, data)
self.cache[index] = spans
def read(self, verinfo, shnum, offset, length):
"""Try to satisfy a read request from cache.
Returns data, or None if the cache did not hold the entire requested span.
"""
# TODO: perhaps return a DataSpans object representing the fragments
# that we have, instead of only returning a hit if we can satisfy the
# whole request from cache.
index = (verinfo, shnum)
if index in self.cache:
return self.cache[index].get(offset, length)
else:
return None

View File

@ -17,7 +17,7 @@ from pycryptopp.cipher.aes import AES
from allmydata.mutable.publish import Publish, MutableData,\
TransformingUploadable
from allmydata.mutable.common import MODE_READ, MODE_WRITE, MODE_CHECK, UnrecoverableFileError, \
ResponseCache, UncoordinatedWriteError
UncoordinatedWriteError
from allmydata.mutable.servermap import ServerMap, ServermapUpdater
from allmydata.mutable.retrieve import Retrieve
from allmydata.mutable.checker import MutableChecker, MutableCheckAndRepairer
@ -65,7 +65,6 @@ class MutableFileNode:
self._required_shares = default_encoding_parameters["k"]
self._total_shares = default_encoding_parameters["n"]
self._sharemap = {} # known shares, shnum-to-[nodeids]
self._cache = ResponseCache()
self._most_recent_size = None
# filled in after __init__ if we're being created for the first time;
# filled in by the servermap updater before publishing, otherwise.
@ -180,10 +179,6 @@ class MutableFileNode:
self._privkey = privkey
def _populate_encprivkey(self, encprivkey):
self._encprivkey = encprivkey
def _add_to_cache(self, verinfo, shnum, offset, data):
self._cache.add(verinfo, shnum, offset, data)
def _read_from_cache(self, verinfo, shnum, offset, length):
return self._cache.read(verinfo, shnum, offset, length)
def get_write_enabler(self, server):
seed = server.get_foolscap_write_enabler_seed()

View File

@ -1192,7 +1192,8 @@ class MDMFSlotReadProxy:
rref,
storage_index,
shnum,
data=""):
data="",
data_is_everything=False):
# Start the initialization process.
self._rref = rref
self._storage_index = storage_index
@ -1223,8 +1224,14 @@ class MDMFSlotReadProxy:
# If the user has chosen to initialize us with some data, we'll
# try to satisfy subsequent data requests with that data before
# asking the storage server for it. If
# asking the storage server for it.
self._data = data
# If the provided data is known to be complete, then we know there's
# nothing to be gained by querying the server, so we should just
# partially satisfy requests with what we have.
self._data_is_everything = data_is_everything
# The way callers interact with cache in the filenode returns
# None if there isn't any cached data, but the way we index the
# cached data requires a string, so convert None to "".
@ -1738,7 +1745,8 @@ class MDMFSlotReadProxy:
# TODO: It's entirely possible to tweak this so that it just
# fulfills the requests that it can, and not demand that all
# requests are satisfiable before running it.
if not unsatisfiable and not force_remote:
if not unsatisfiable or self._data_is_everything:
results = [self._data[offset:offset+length]
for (offset, length) in readvs]
results = {self.shnum: results}

View File

@ -286,16 +286,14 @@ class Retrieve:
self.remaining_sharemap = DictOfSets()
for (shnum, server, timestamp) in shares:
self.remaining_sharemap.add(shnum, server)
# If the servermap update fetched anything, it fetched at least 1
# KiB, so we ask for that much.
# TODO: Change the cache methods to allow us to fetch all of the
# data that they have, then change this method to do that.
any_cache = self._node._read_from_cache(self.verinfo, shnum,
0, 1000)
reader = MDMFSlotReadProxy(server.get_rref(),
self._storage_index,
shnum,
any_cache)
# Reuse the SlotReader from the servermap.
key = (self.verinfo, server.get_serverid(),
self._storage_index, shnum)
if key in self.servermap.proxies:
reader = self.servermap.proxies[key]
else:
reader = MDMFSlotReadProxy(server.get_rref(),
self._storage_index, shnum, None)
reader.server = server
self.readers[shnum] = reader
assert len(self.remaining_sharemap) >= k
@ -766,6 +764,7 @@ class Retrieve:
block_and_salt, blockhashes, sharehashes = results
block, salt = block_and_salt
assert type(block) is str, (block, salt)
blockhashes = dict(enumerate(blockhashes))
self.log("the reader gave me the following blockhashes: %s" % \
@ -838,12 +837,13 @@ class Retrieve:
#needed.discard(0)
self.log("getting blockhashes for segment %d, share %d: %s" % \
(segnum, reader.shnum, str(needed)))
d1 = reader.get_blockhashes(needed, force_remote=True)
# TODO is force_remote necessary here?
d1 = reader.get_blockhashes(needed, force_remote=False)
if self.share_hash_tree.needed_hashes(reader.shnum):
need = self.share_hash_tree.needed_hashes(reader.shnum)
self.log("also need sharehashes for share %d: %s" % (reader.shnum,
str(need)))
d2 = reader.get_sharehashes(need, force_remote=True)
d2 = reader.get_sharehashes(need, force_remote=False)
else:
d2 = defer.succeed({}) # the logic in the next method
# expects a dict

View File

@ -119,6 +119,7 @@ class ServerMap:
self._bad_shares = {} # maps (server,shnum) to old checkstring
self._last_update_mode = None
self._last_update_time = 0
self.proxies = {}
self.update_data = {} # shnum -> [(verinfo,(blockhashes,start,end)),..]
# where blockhashes is a list of bytestrings (the result of
# layout.MDMFSlotReadProxy.get_blockhashes), and start/end are both
@ -631,19 +632,6 @@ class ServermapUpdater:
self._servermap.add_problem(f)
def _cache_good_sharedata(self, verinfo, shnum, now, data):
"""
If one of my queries returns successfully (which means that we
were able to and successfully did validate the signature), I
cache the data that we initially fetched from the storage
server. This will help reduce the number of roundtrips that need
to occur when the file is downloaded, or when the file is
updated.
"""
if verinfo:
self._node._add_to_cache(verinfo, shnum, 0, data)
def _got_results(self, datavs, server, readsize, storage_index, started):
lp = self.log(format="got result from [%(name)s], %(numshares)d shares",
name=server.get_name(),
@ -675,7 +663,9 @@ class ServermapUpdater:
reader = MDMFSlotReadProxy(ss,
storage_index,
shnum,
data)
data,
data_is_everything=(len(data) < readsize))
# our goal, with each response, is to validate the version
# information and share data as best we can at this point --
# we do this by validating the signature. To do this, we
@ -747,13 +737,21 @@ class ServermapUpdater:
d5 = defer.succeed(None)
dl = defer.DeferredList([d, d2, d3, d4, d5])
def _append_proxy(passthrough, shnum=shnum, reader=reader):
# Store the proxy (with its cache) keyed by serverid and
# version.
_, (_,verinfo), _, _, _ = passthrough
verinfo = self._make_verinfo_hashable(verinfo)
self._servermap.proxies[(verinfo,
server.get_serverid(),
storage_index, shnum)] = reader
return passthrough
dl.addCallback(_append_proxy)
dl.addBoth(self._turn_barrier)
dl.addCallback(lambda results, shnum=shnum:
self._got_signature_one_share(results, shnum, server, lp))
dl.addErrback(lambda error, shnum=shnum, data=data:
self._got_corrupt_share(error, shnum, server, data, lp))
dl.addCallback(lambda verinfo, shnum=shnum, data=data:
self._cache_good_sharedata(verinfo, shnum, now, data))
ds.append(dl)
# dl is a deferred list that will fire when all of the shares
# that we found on this server are done processing. When dl fires,
@ -817,6 +815,10 @@ class ServermapUpdater:
return None
_, verinfo, signature, __, ___ = results
verinfo = self._make_verinfo_hashable(verinfo[1])
# This tuple uniquely identifies a share on the grid; we use it
# to keep track of the ones that we've already seen.
(seqnum,
root_hash,
saltish,
@ -825,22 +827,8 @@ class ServermapUpdater:
k,
n,
prefix,
offsets) = verinfo[1]
offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
offsets_tuple) = verinfo
# XXX: This should be done for us in the method, so
# presumably you can go in there and fix it.
verinfo = (seqnum,
root_hash,
saltish,
segsize,
datalen,
k,
n,
prefix,
offsets_tuple)
# This tuple uniquely identifies a share on the grid; we use it
# to keep track of the ones that we've already seen.
if verinfo not in self._valid_versions:
# This is a new version tuple, and we need to validate it
@ -879,13 +867,7 @@ class ServermapUpdater:
return verinfo
def _got_update_results_one_share(self, results, share):
"""
I record the update results in results.
"""
assert len(results) == 4
verinfo, blockhashes, start, end = results
def _make_verinfo_hashable(self, verinfo):
(seqnum,
root_hash,
saltish,
@ -895,10 +877,9 @@ class ServermapUpdater:
n,
prefix,
offsets) = verinfo
offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
# XXX: This should be done for us in the method, so
# presumably you can go in there and fix it.
verinfo = (seqnum,
root_hash,
saltish,
@ -908,7 +889,15 @@ class ServermapUpdater:
n,
prefix,
offsets_tuple)
return verinfo
def _got_update_results_one_share(self, results, share):
"""
I record the update results in results.
"""
assert len(results) == 4
verinfo, blockhashes, start, end = results
verinfo = self._make_verinfo_hashable(verinfo)
update_data = (blockhashes, start, end)
self._servermap.set_update_data_for_share_and_verinfo(share,
verinfo,

View File

@ -43,6 +43,10 @@ class LocalWrapper:
self.hung_until = None
self.post_call_notifier = None
self.disconnectors = {}
self.counter_by_methname = {}
def _clear_counters(self):
self.counter_by_methname = {}
def callRemoteOnly(self, methname, *args, **kwargs):
d = self.callRemote(methname, *args, **kwargs)
@ -62,6 +66,8 @@ class LocalWrapper:
kwargs = dict([(k,wrap(kwargs[k])) for k in kwargs])
def _really_call():
def incr(d, k): d[k] = d.setdefault(k, 0) + 1
incr(self.counter_by_methname, methname)
meth = getattr(self.original, "remote_" + methname)
return meth(*args, **kwargs)

View File

@ -1097,6 +1097,23 @@ class Dirnode(GridTestMixin, unittest.TestCase,
d.addCallback(_check_results)
return d
def test_deepcheck_cachemisses(self):
self.basedir = "dirnode/Dirnode/test_mdmf_cachemisses"
self.set_up_grid()
d = self._test_deepcheck_create()
# Clear the counters and set the rootnode
d.addCallback(lambda rootnode:
not [ss._clear_counters() for ss
in self.g.wrappers_by_id.values()] or rootnode)
d.addCallback(lambda rootnode: rootnode.start_deep_check().when_done())
def _check(ign):
count = sum([ss.counter_by_methname['slot_readv']
for ss in self.g.wrappers_by_id.values()])
self.failIf(count > 60, 'Expected only 60 cache misses,'
'unfortunately there were %d' % (count,))
d.addCallback(_check)
return d
def test_deepcheck_mdmf(self):
self.basedir = "dirnode/Dirnode/test_deepcheck_mdmf"
self.set_up_grid()

View File

@ -21,7 +21,7 @@ from allmydata.storage.common import storage_index_to_dir
from allmydata.scripts import debug
from allmydata.mutable.filenode import MutableFileNode, BackoffAgent
from allmydata.mutable.common import ResponseCache, \
from allmydata.mutable.common import \
MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
NeedMoreDataError, UnrecoverableFileError, UncoordinatedWriteError, \
NotEnoughServersError, CorruptShareError
@ -639,25 +639,6 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
d.addCallback(_created)
return d
def test_response_cache_memory_leak(self):
d = self.nodemaker.create_mutable_file("contents")
def _created(n):
d = n.download_best_version()
d.addCallback(lambda res: self.failUnlessEqual(res, "contents"))
d.addCallback(lambda ign: self.failUnless(isinstance(n._cache, ResponseCache)))
def _check_cache(expected):
# The total size of cache entries should not increase on the second download;
# in fact the cache contents should be identical.
d2 = n.download_best_version()
d2.addCallback(lambda rep: self.failUnlessEqual(repr(n._cache.cache), expected))
return d2
d.addCallback(lambda ign: _check_cache(repr(n._cache.cache)))
return d
d.addCallback(_created)
return d
def test_create_with_initial_contents_function(self):
data = "initial contents"
def _make_contents(n):
@ -1528,15 +1509,6 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin, PublishMixin):
corrupt_early=False,
failure_checker=_check)
def test_corrupt_all_block_hash_tree_late(self):
def _check(res):
f = res[0]
self.failUnless(f.check(NotEnoughSharesError))
return self._test_corrupt_all("block_hash_tree",
"block hash tree failure",
corrupt_early=False,
failure_checker=_check)
def test_corrupt_all_block_late(self):
def _check(res):
@ -1618,17 +1590,20 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin, PublishMixin):
d.addCallback(lambda ignored:
self._test_corrupt_all(("block_hash_tree", 12 * 32),
"block hash tree failure",
corrupt_early=False,
corrupt_early=True,
should_succeed=False))
return d
def test_corrupt_mdmf_block_hash_tree_late(self):
# Note - there is no SDMF counterpart to this test, as the SDMF
# files are guaranteed to have exactly one block, and therefore
# the block hash tree fits within the initial read (#1240).
d = self.publish_mdmf()
d.addCallback(lambda ignored:
self._test_corrupt_all(("block_hash_tree", 12 * 32),
"block hash tree failure",
corrupt_early=True,
corrupt_early=False,
should_succeed=False))
return d
@ -2233,9 +2208,9 @@ class MultipleEncodings(unittest.TestCase):
# then mix up the shares, to make sure that download survives seeing
# a variety of encodings. This is actually kind of tricky to set up.
contents1 = "Contents for encoding 1 (3-of-10) go here"
contents2 = "Contents for encoding 2 (4-of-9) go here"
contents3 = "Contents for encoding 3 (4-of-7) go here"
contents1 = "Contents for encoding 1 (3-of-10) go here"*1000
contents2 = "Contents for encoding 2 (4-of-9) go here"*1000
contents3 = "Contents for encoding 3 (4-of-7) go here"*1000
# we make a retrieval object that doesn't know what encoding
# parameters to use
@ -2403,39 +2378,6 @@ class MultipleVersions(unittest.TestCase, PublishMixin, CheckerMixin):
return d
class Utils(unittest.TestCase):
def test_cache(self):
c = ResponseCache()
# xdata = base62.b2a(os.urandom(100))[:100]
xdata = "1Ex4mdMaDyOl9YnGBM3I4xaBF97j8OQAg1K3RBR01F2PwTP4HohB3XpACuku8Xj4aTQjqJIR1f36mEj3BCNjXaJmPBEZnnHL0U9l"
ydata = "4DCUQXvkEPnnr9Lufikq5t21JsnzZKhzxKBhLhrBB6iIcBOWRuT4UweDhjuKJUre8A4wOObJnl3Kiqmlj4vjSLSqUGAkUD87Y3vs"
c.add("v1", 1, 0, xdata)
c.add("v1", 1, 2000, ydata)
self.failUnlessEqual(c.read("v2", 1, 10, 11), None)
self.failUnlessEqual(c.read("v1", 2, 10, 11), None)
self.failUnlessEqual(c.read("v1", 1, 0, 10), xdata[:10])
self.failUnlessEqual(c.read("v1", 1, 90, 10), xdata[90:])
self.failUnlessEqual(c.read("v1", 1, 300, 10), None)
self.failUnlessEqual(c.read("v1", 1, 2050, 5), ydata[50:55])
self.failUnlessEqual(c.read("v1", 1, 0, 101), None)
self.failUnlessEqual(c.read("v1", 1, 99, 1), xdata[99:100])
self.failUnlessEqual(c.read("v1", 1, 100, 1), None)
self.failUnlessEqual(c.read("v1", 1, 1990, 9), None)
self.failUnlessEqual(c.read("v1", 1, 1990, 10), None)
self.failUnlessEqual(c.read("v1", 1, 1990, 11), None)
self.failUnlessEqual(c.read("v1", 1, 1990, 15), None)
self.failUnlessEqual(c.read("v1", 1, 1990, 19), None)
self.failUnlessEqual(c.read("v1", 1, 1990, 20), None)
self.failUnlessEqual(c.read("v1", 1, 1990, 21), None)
self.failUnlessEqual(c.read("v1", 1, 1990, 25), None)
self.failUnlessEqual(c.read("v1", 1, 1999, 25), None)
# test joining fragments
c = ResponseCache()
c.add("v1", 1, 0, xdata[:10])
c.add("v1", 1, 10, xdata[10:20])
self.failUnlessEqual(c.read("v1", 1, 0, 20), xdata[:20])
class Exceptions(unittest.TestCase):
def test_repr(self):
nmde = NeedMoreDataError(100, 50, 100)
@ -2443,6 +2385,7 @@ class Exceptions(unittest.TestCase):
ucwe = UncoordinatedWriteError()
self.failUnless("UncoordinatedWriteError" in repr(ucwe), repr(ucwe))
class SameKeyGenerator:
def __init__(self, pubkey, privkey):
self.pubkey = pubkey
@ -2514,7 +2457,7 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
self.basedir = "mutable/Problems/test_retrieve_surprise"
self.set_up_grid()
nm = self.g.clients[0].nodemaker
d = nm.create_mutable_file(MutableData("contents 1"))
d = nm.create_mutable_file(MutableData("contents 1"*4000))
def _created(n):
d = defer.succeed(None)
d.addCallback(lambda res: n.get_servermap(MODE_READ))
@ -2528,7 +2471,6 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
# now attempt to retrieve the old version with the old servermap.
# This will look like someone has changed the file since we
# updated the servermap.
d.addCallback(lambda res: n._cache._clear())
d.addCallback(lambda res: log.msg("starting doomed read"))
d.addCallback(lambda res:
self.shouldFail(NotEnoughSharesError,