verifier: serialize the fetching of blocks within a share so that we don't use too much RAM

Shares are still verified in parallel, but within a share, don't request a
block until the previous block has been verified and the memory we used to hold
it has been freed up.

Patch originally due to Brian. This version has a mockery-patchery-style test
which is "low tech" (it implements the patching inline in the test code instead
of using an extension of the mock.patch() function from the mock library) and
which unpatches in case of exception.

fixes #1395
This commit is contained in:
Zooko O'Whielacronx 2011-08-01 23:37:03 -07:00
parent 4028908b2e
commit f426e82287
2 changed files with 84 additions and 6 deletions

View File

@ -616,14 +616,18 @@ class Checker(log.PrefixingLogMixin):
# to free up the RAM
return None
def _get_blocks(vrbp):
ds = []
for blocknum in range(veup.num_segments):
def _get_block(ign, blocknum):
db = vrbp.get_block(blocknum)
db.addCallback(_discard_result)
ds.append(db)
# this gatherResults will fire once every block of this share has
# been downloaded and verified, or else it will errback.
return deferredutil.gatherResults(ds)
return db
dbs = defer.succeed(None)
for blocknum in range(veup.num_segments):
dbs.addCallback(_get_block, blocknum)
# The Deferred we return will fire after every block of this
# share has been downloaded and verified successfully, or else it
# will errback as soon as the first error is observed.
return dbs
d.addCallback(_get_blocks)
# if none of those errbacked, the blocks (and the hashes above them)

View File

@ -1,6 +1,7 @@
import simplejson
from twisted.trial import unittest
from twisted.internet import defer
from allmydata import check_results, uri
from allmydata.web import check_results as web_check_results
from allmydata.storage_client import StorageFarmBroker, NativeStorageServer
@ -319,3 +320,76 @@ class AddLease(GridTestMixin, unittest.TestCase):
d.addCallback(lambda ign: self.failUnless(really_did_break))
return d
class CounterHolder(object):
def __init__(self):
self._num_active_block_fetches = 0
self._max_active_block_fetches = 0
from allmydata.immutable.checker import ValidatedReadBucketProxy
class MockVRBP(ValidatedReadBucketProxy):
def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size, counterholder):
ValidatedReadBucketProxy.__init__(self, sharenum, bucket,
share_hash_tree, num_blocks,
block_size, share_size)
self.counterholder = counterholder
def get_block(self, blocknum):
self.counterholder._num_active_block_fetches += 1
if self.counterholder._num_active_block_fetches > self.counterholder._max_active_block_fetches:
self.counterholder._max_active_block_fetches = self.counterholder._num_active_block_fetches
d = ValidatedReadBucketProxy.get_block(self, blocknum)
def _mark_no_longer_active(res):
self.counterholder._num_active_block_fetches -= 1
return res
d.addBoth(_mark_no_longer_active)
return d
class TooParallel(GridTestMixin, unittest.TestCase):
# bug #1395: immutable verifier was aggressively parallized, checking all
# blocks of all shares at the same time, blowing our memory budget and
# crashing with MemoryErrors on >1GB files.
def test_immutable(self):
import allmydata.immutable.checker
origVRBP = allmydata.immutable.checker.ValidatedReadBucketProxy
self.basedir = "checker/TooParallel/immutable"
# If any code asks to instantiate a ValidatedReadBucketProxy,
# we give them a MockVRBP which is configured to use our
# CounterHolder.
counterholder = CounterHolder()
def make_mock_VRBP(*args, **kwargs):
return MockVRBP(counterholder=counterholder, *args, **kwargs)
allmydata.immutable.checker.ValidatedReadBucketProxy = make_mock_VRBP
d = defer.succeed(None)
def _start(ign):
self.set_up_grid(num_servers=4)
self.c0 = self.g.clients[0]
self.c0.DEFAULT_ENCODING_PARAMETERS = { "k": 1,
"happy": 4,
"n": 4,
"max_segment_size": 5,
}
self.uris = {}
DATA = "data" * 100 # 400/5 = 80 blocks
return self.c0.upload(Data(DATA, convergence=""))
d.addCallback(_start)
def _do_check(ur):
n = self.c0.create_node_from_uri(ur.uri)
return n.check(Monitor(), verify=True)
d.addCallback(_do_check)
def _check(cr):
# the verifier works on all 4 shares in parallel, but only
# fetches one block from each share at a time, so we expect to
# see 4 parallel fetches
self.failUnlessEqual(counterholder._max_active_block_fetches, 4)
d.addCallback(_check)
def _clean_up(res):
allmydata.immutable.checker.ValidatedReadBucketProxy = origVRBP
return res
d.addBoth(_clean_up)
return d
test_immutable.timeout = 10