checker: remember checker results, but only in ram for now

This commit is contained in:
Brian Warner 2007-10-22 17:46:24 -07:00
parent 81c818f90c
commit 57f994fb02
3 changed files with 135 additions and 12 deletions

View File

@ -6,6 +6,7 @@ This does no verification of the shares whatsoever. If the peer claims to
have the share, we believe them.
"""
import time, os.path
from twisted.internet import defer
from twisted.application import service
from twisted.python import log
@ -176,10 +177,41 @@ class SimpleCHKFileVerifier(download.FileDownloader):
return d
class SQLiteCheckerResults:
def __init__(self, results_file):
pass
def add_results(self, uri_to_check, when, results):
pass
def get_results_for(self, uri_to_check):
return []
class InMemoryCheckerResults:
def __init__(self):
self.results = {} # indexed by uri
def add_results(self, uri_to_check, when, results):
if uri_to_check not in self.results:
self.results[uri_to_check] = []
self.results[uri_to_check].append( (when, results) )
def get_results_for(self, uri_to_check):
return self.results.get(uri_to_check, [])
class Checker(service.MultiService):
"""I am a service that helps perform file checks.
"""
name = "checker"
def __init__(self):
service.MultiService.__init__(self)
self.results = None
def startService(self):
service.MultiService.startService(self)
if self.parent:
results_file = os.path.join(self.parent.basedir,
"checker_results.db")
if os.path.exists(results_file):
self.results = SQLiteCheckerResults(results_file)
else:
self.results = InMemoryCheckerResults()
def check(self, uri_to_check):
uri_to_check = IVerifierURI(uri_to_check)
@ -188,14 +220,22 @@ class Checker(service.MultiService):
elif isinstance(uri_to_check, uri.CHKFileVerifierURI):
peer_getter = self.parent.get_permuted_peers
c = SimpleCHKFileChecker(peer_getter, uri_to_check)
return c.check()
d = c.check()
elif isinstance(uri_to_check, uri.DirnodeVerifierURI):
tub = self.parent.tub
c = SimpleDirnodeChecker(tub)
return c.check(uri_to_check)
d = c.check(uri_to_check)
else:
raise ValueError("I don't know how to check '%s'" % (uri_to_check,))
def _done(res):
# TODO: handle exceptions too, record something useful about them
if self.results:
self.results.add_results(uri_to_check, time.time(), res)
return res
d.addCallback(_done)
return d
def verify(self, uri_to_verify):
uri_to_verify = IVerifierURI(uri_to_verify)
if uri_to_verify is None:
@ -211,3 +251,9 @@ class Checker(service.MultiService):
else:
raise ValueError("I don't know how to verify '%s'" %
(uri_to_verify,))
def checker_results_for(self, uri_to_check):
if self.results:
return self.results.get_results_for(IVerifierURI(uri_to_check))
return []

View File

@ -992,6 +992,66 @@ class IUploader(Interface):
def upload_filehandle(filehane):
"""Like upload(), but accepts an open filehandle."""
class IChecker(Interface):
def check(uri_to_check):
"""Accepts an IVerifierURI, and checks upon the health of its target.
For now, uri_to_check must be an IVerifierURI. In the future we
expect to relax that to be anything that can be adapted to
IVerifierURI (like read-only or read-write dirnode/filenode URIs).
This returns a Deferred. For dirnodes, this fires with either True or
False (dirnodes are not distributed, so their health is a boolean).
For filenodes, this fires with a tuple of (needed_shares,
total_shares, found_shares, sharemap). The first three are ints. The
basic health of the file is found_shares / needed_shares: if less
than 1.0, the file is unrecoverable.
The sharemap has a key for each sharenum. The value is a list of
(binary) nodeids who hold that share. If two shares are kept on the
same nodeid, they will fail as a pair, and overall reliability is
decreased.
The IChecker instance remembers the results of the check. By default,
these results are stashed in RAM (and are forgotten at shutdown). If
a file named 'checker_results.db' exists in the node's basedir, it is
used as a sqlite database of results, making them persistent across
runs. To start using this feature, just 'touch checker_results.db',
and the node will initialize it properly the next time it is started.
"""
def verify(uri_to_check):
"""Accepts an IVerifierURI, and verifies the crypttext of the target.
This is a more-intensive form of checking. For verification, the
file's crypttext contents are retrieved, and the associated hash
checks are performed. If a storage server is holding a corrupted
share, verification will detect the problem, but checking will not.
This returns a Deferred that fires with True if the crypttext hashes
look good, and will probably raise an exception if anything goes
wrong.
For dirnodes, 'verify' is the same as 'check', so the Deferred will
fire with True or False.
Verification currently only uses a minimal subset of peers, so a lot
of share corruption will not be caught by it. We expect to improve
this in the future.
"""
def checker_results_for(uri_to_check):
"""Accepts an IVerifierURI, and returns a list of checker results.
Each element of the list is a two-entry tuple: (when, results).
The 'when' values are timestamps (float seconds since epoch), and the
results are as defined in the check() method.
Note: at the moment, this is specified to return synchronously. We
might need to back away from this in the future.
"""
class IVirtualDrive(Interface):
"""I am a service that may be available to a client.

View File

@ -1,6 +1,6 @@
from base64 import b32encode
import os, sys
import os, sys, time
from cStringIO import StringIO
from twisted.trial import unittest
from twisted.internet import defer, reactor
@ -860,16 +860,20 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
def _test_checker(self, res):
vdrive0 = self.clients[0].getServiceNamed("vdrive")
checker1 = self.clients[1].getServiceNamed("checker")
d = vdrive0.get_node_at_path("~")
d.addCallback(lambda home: home.build_manifest())
def _check_all(manifest):
dl = []
for si in manifest:
dl.append(checker1.check(si))
return deferredutil.DeferredListShouldSucceed(dl)
d.addCallback(_check_all)
def _done(res):
d.addCallback(self._test_checker_2)
return d
def _test_checker_2(self, manifest):
checker1 = self.clients[1].getServiceNamed("checker")
dl = []
starting_time = time.time()
for si in manifest:
dl.append(checker1.check(si))
d = deferredutil.DeferredListShouldSucceed(dl)
def _check_checker_results(res):
for i in res:
if type(i) is bool:
self.failUnless(i is True)
@ -883,7 +887,20 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
for shpeers in sharemap.values():
peers.update(shpeers)
self.failUnlessEqual(len(peers), self.numclients-1)
d.addCallback(_done)
d.addCallback(_check_checker_results)
def _check_stored_results(res):
finish_time = time.time()
all_results = []
for si in manifest:
results = checker1.checker_results_for(si)
self.failUnlessEqual(len(results), 1)
when, those_results = results[0]
self.failUnless(isinstance(when, (int, float)))
self.failUnless(starting_time <= when <= finish_time)
all_results.append(those_results)
_check_checker_results(all_results)
d.addCallback(_check_stored_results)
return d
def _test_verifier(self, res):