mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-04-26 13:59:59 +00:00
add a simple checker, for both files and directories
This commit is contained in:
parent
81c56b8e53
commit
9da1d70676
103
src/allmydata/checker.py
Normal file
103
src/allmydata/checker.py
Normal file
@ -0,0 +1,103 @@
|
||||
|
||||
"""
|
||||
Given a StorageIndex, count how many shares we can find.
|
||||
|
||||
This does no verification of the shares whatsoever. If the peer claims to
|
||||
have the share, we believe them.
|
||||
"""
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.application import service
|
||||
from twisted.python import log
|
||||
from allmydata.interfaces import IVerifierURI
|
||||
from allmydata import uri
|
||||
|
||||
class SimpleCHKFileChecker:
|
||||
|
||||
def __init__(self, peer_getter):
|
||||
self.peer_getter = peer_getter
|
||||
self.found_shares = set()
|
||||
|
||||
'''
|
||||
def check_synchronously(self, si):
|
||||
# this is how we would write this class if we were using synchronous
|
||||
# messages (or if we used promises).
|
||||
found = set()
|
||||
for (pmpeerid, peerid, connection) in self.peer_getter(storage_index):
|
||||
buckets = connection.get_service("storageserver").get_buckets(si)
|
||||
found.update(buckets.keys())
|
||||
return len(found)
|
||||
'''
|
||||
|
||||
def check(self, uri_to_check):
|
||||
d = self._get_all_shareholders(uri_to_check.storage_index)
|
||||
d.addCallback(self._done)
|
||||
return d
|
||||
|
||||
def _get_all_shareholders(self, storage_index):
|
||||
dl = []
|
||||
for (pmpeerid, peerid, connection) in self.peer_getter(storage_index):
|
||||
d = connection.callRemote("get_service", "storageserver")
|
||||
d.addCallback(lambda ss: ss.callRemote("get_buckets",
|
||||
storage_index))
|
||||
d.addCallbacks(self._got_response, self._got_error)
|
||||
dl.append(d)
|
||||
return defer.DeferredList(dl)
|
||||
|
||||
def _got_response(self, buckets):
|
||||
# buckets is a dict: maps shum to an rref of the server who holds it
|
||||
self.found_shares.update(buckets.keys())
|
||||
|
||||
def _got_error(self, f):
|
||||
if f.check(KeyError):
|
||||
pass
|
||||
log.err(f)
|
||||
pass
|
||||
|
||||
def _done(self, res):
|
||||
return len(self.found_shares)
|
||||
|
||||
class SimpleDirnodeChecker:
|
||||
|
||||
def __init__(self, tub):
|
||||
self.tub = tub
|
||||
|
||||
def check(self, node):
|
||||
si = node.storage_index
|
||||
d = self.tub.getReference(node.furl)
|
||||
d.addCallback(self._get_dirnode, node.storage_index)
|
||||
d.addCallbacks(self._success, self._failed)
|
||||
return d
|
||||
|
||||
def _get_dirnode(self, rref, storage_index):
|
||||
d = rref.callRemote("list", storage_index)
|
||||
return d
|
||||
|
||||
def _success(self, res):
|
||||
return True
|
||||
def _failed(self, f):
|
||||
if f.check(IndexError):
|
||||
return False
|
||||
log.err(f)
|
||||
return False
|
||||
|
||||
class Checker(service.MultiService):
|
||||
"""I am a service that helps perform file checks.
|
||||
"""
|
||||
name = "checker"
|
||||
|
||||
def check(self, uri_to_check):
|
||||
uri_to_check = IVerifierURI(uri_to_check)
|
||||
if uri_to_check is None:
|
||||
return defer.succeed(True)
|
||||
elif isinstance(uri_to_check, uri.CHKFileVerifierURI):
|
||||
peer_getter = self.parent.get_permuted_peers
|
||||
c = SimpleCHKFileChecker(peer_getter)
|
||||
return c.check(uri_to_check)
|
||||
elif isinstance(uri_to_check, uri.DirnodeVerifierURI):
|
||||
tub = self.parent.tub
|
||||
c = SimpleDirnodeChecker(tub)
|
||||
return c.check(uri_to_check)
|
||||
else:
|
||||
raise ValueError("I don't know how to check '%s'" % (uri_to_check,))
|
||||
|
@ -14,6 +14,7 @@ from allmydata.Crypto.Util.number import bytes_to_long
|
||||
from allmydata.storage import StorageServer
|
||||
from allmydata.upload import Uploader
|
||||
from allmydata.download import Downloader
|
||||
from allmydata.checker import Checker
|
||||
from allmydata.control import ControlServer
|
||||
from allmydata.introducer import IntroducerClient
|
||||
from allmydata.vdrive import VirtualDrive
|
||||
@ -39,6 +40,7 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
|
||||
self.init_options()
|
||||
self.add_service(Uploader())
|
||||
self.add_service(Downloader())
|
||||
self.add_service(Checker())
|
||||
self.add_service(VirtualDrive())
|
||||
webport = self.get_config("webport")
|
||||
if webport:
|
||||
|
@ -340,22 +340,26 @@ class ImmutableDirectoryNode:
|
||||
return d
|
||||
|
||||
def build_manifest(self):
|
||||
# given a dirnode, construct a list refresh-capabilities for all the
|
||||
# nodes it references.
|
||||
# given a dirnode, construct a frozenset of verifier-capabilities for
|
||||
# all the nodes it references.
|
||||
|
||||
# this is just a tree-walker, except that following each edge
|
||||
# requires a Deferred.
|
||||
|
||||
manifest = set()
|
||||
manifest.add(self.get_refresh_capability())
|
||||
manifest.add(self.get_verifier())
|
||||
|
||||
d = self._build_manifest_from_node(self, manifest)
|
||||
# LIT nodes have no refresh-capability: their data is stored inside
|
||||
# the URI itself, so there is no need to refresh anything. They
|
||||
# indicate this by returning None from their get_refresh_capability
|
||||
# method. We need to remove any such Nones from our set.
|
||||
d.addCallback(lambda res: manifest.discard(None))
|
||||
d.addCallback(lambda res: manifest)
|
||||
def _done(res):
|
||||
# LIT nodes have no verifier-capability: their data is stored
|
||||
# inside the URI itself, so there is no need to refresh anything.
|
||||
# They indicate this by returning None from their get_verifier
|
||||
# method. We need to remove any such Nones from our set. We also
|
||||
# want to convert all these caps into strings.
|
||||
return frozenset([cap.to_string()
|
||||
for cap in manifest
|
||||
if cap is not None])
|
||||
d.addCallback(_done)
|
||||
return d
|
||||
|
||||
def _build_manifest_from_node(self, node, manifest):
|
||||
@ -363,17 +367,19 @@ class ImmutableDirectoryNode:
|
||||
def _got_list(res):
|
||||
dl = []
|
||||
for name, child in res.iteritems():
|
||||
manifest.add(child.get_refresh_capability())
|
||||
if IDirectoryNode.providedBy(child) and child not in manifest:
|
||||
dl.append(self._build_manifest_from_node(child, manifest))
|
||||
verifier = child.get_verifier()
|
||||
if verifier not in manifest:
|
||||
manifest.add(verifier)
|
||||
if IDirectoryNode.providedBy(child):
|
||||
dl.append(self._build_manifest_from_node(child,
|
||||
manifest))
|
||||
if dl:
|
||||
return defer.DeferredList(dl)
|
||||
d.addCallback(_got_list)
|
||||
return d
|
||||
|
||||
def get_refresh_capability(self):
|
||||
u = IDirnodeURI(self._uri).get_readonly()
|
||||
return "DIR-REFRESH:%s" % idlib.b2a(u.storage_index)
|
||||
def get_verifier(self):
|
||||
return IDirnodeURI(self._uri).get_verifier()
|
||||
|
||||
def get_child_at_path(self, path):
|
||||
if not path:
|
||||
@ -441,11 +447,8 @@ class FileNode:
|
||||
return cmp(self.__class__, them.__class__)
|
||||
return cmp(self.uri, them.uri)
|
||||
|
||||
def get_refresh_capability(self):
|
||||
u = IFileURI(self.uri)
|
||||
if isinstance(u, uri.CHKFileURI):
|
||||
return "CHK-REFRESH:%s" % idlib.b2a(u.storage_index)
|
||||
return None
|
||||
def get_verifier(self):
|
||||
return IFileURI(self.uri).get_verifier()
|
||||
|
||||
def download(self, target):
|
||||
downloader = self._client.getServiceNamed("downloader")
|
||||
|
@ -311,6 +311,25 @@ class IURI(Interface):
|
||||
"""Return another IURI instance, which represents a read-only form of
|
||||
this one. If is_readonly() is True, this returns self."""
|
||||
|
||||
def get_verifier():
|
||||
"""Return an instance that provides IVerifierURI, which can be used
|
||||
to check on the availability of the file or directory, without
|
||||
providing enough capabilities to actually read or modify the
|
||||
contents. This may return None if the file does not need checking or
|
||||
verification (e.g. LIT URIs).
|
||||
"""
|
||||
|
||||
def to_string():
|
||||
"""Return a string of printable ASCII characters, suitable for
|
||||
passing into init_from_string."""
|
||||
|
||||
class IVerifierURI(Interface):
|
||||
def init_from_string(uri):
|
||||
"""Accept a string (as created by my to_string() method) and populate
|
||||
this instance with its data. I am not normally called directly,
|
||||
please use the module-level uri.from_string() function to convert
|
||||
arbitrary URI strings into IURI-providing instances."""
|
||||
|
||||
def to_string():
|
||||
"""Return a string of printable ASCII characters, suitable for
|
||||
passing into init_from_string."""
|
||||
@ -318,6 +337,7 @@ class IURI(Interface):
|
||||
class IDirnodeURI(Interface):
|
||||
"""I am a URI which represents a dirnode."""
|
||||
|
||||
|
||||
class IFileURI(Interface):
|
||||
"""I am a URI which represents a filenode."""
|
||||
def get_size():
|
||||
@ -338,10 +358,12 @@ class IFileNode(Interface):
|
||||
def get_size():
|
||||
"""Return the length (in bytes) of the data this node represents."""
|
||||
|
||||
def get_refresh_capability():
|
||||
"""Return a string that represents the 'refresh capability' for this
|
||||
node. The holder of this capability will be able to renew the lease
|
||||
for this node, protecting it from garbage-collection.
|
||||
def get_verifier():
|
||||
"""Return an IVerifierURI instance that represents the
|
||||
'verifiy/refresh capability' for this node. The holder of this
|
||||
capability will be able to renew the lease for this node, protecting
|
||||
it from garbage-collection. They will also be able to ask a server if
|
||||
it holds a share for the file or directory.
|
||||
"""
|
||||
|
||||
class IDirectoryNode(Interface):
|
||||
@ -374,10 +396,12 @@ class IDirectoryNode(Interface):
|
||||
get_immutable_uri() will return the same thing as get_uri().
|
||||
"""
|
||||
|
||||
def get_refresh_capability():
|
||||
"""Return a string that represents the 'refresh capability' for this
|
||||
node. The holder of this capability will be able to renew the lease
|
||||
for this node, protecting it from garbage-collection.
|
||||
def get_verifier():
|
||||
"""Return an IVerifierURI instance that represents the
|
||||
'verifiy/refresh capability' for this node. The holder of this
|
||||
capability will be able to renew the lease for this node, protecting
|
||||
it from garbage-collection. They will also be able to ask a server if
|
||||
it holds a share for the file or directory.
|
||||
"""
|
||||
|
||||
def list():
|
||||
@ -444,8 +468,8 @@ class IDirectoryNode(Interface):
|
||||
Deferred that fires when the operation finishes."""
|
||||
|
||||
def build_manifest():
|
||||
"""Return a set of refresh-capabilities for all nodes (directories
|
||||
and files) reachable from this one."""
|
||||
"""Return a frozenset of verifier-capability strings for all nodes
|
||||
(directories and files) reachable from this one."""
|
||||
|
||||
class ICodecEncoder(Interface):
|
||||
def set_params(data_size, required_shares, max_shares):
|
||||
|
@ -288,11 +288,11 @@ class Test(unittest.TestCase):
|
||||
def _check_manifest(manifest):
|
||||
manifest = sorted(list(manifest))
|
||||
self.failUnlessEqual(len(manifest), 5)
|
||||
expected = [self.rootnode.get_refresh_capability(),
|
||||
self.bar_node.get_refresh_capability(),
|
||||
self.file1_node.get_refresh_capability(),
|
||||
file2_node.get_refresh_capability(),
|
||||
self.baz_node.get_refresh_capability(),
|
||||
expected = [self.rootnode.get_verifier().to_string(),
|
||||
self.bar_node.get_verifier().to_string(),
|
||||
self.file1_node.get_verifier().to_string(),
|
||||
file2_node.get_verifier().to_string(),
|
||||
self.baz_node.get_verifier().to_string(),
|
||||
]
|
||||
expected.sort()
|
||||
self.failUnlessEqual(manifest, expected)
|
||||
@ -387,10 +387,10 @@ class Test(unittest.TestCase):
|
||||
def _check_manifest2(manifest):
|
||||
manifest = sorted(list(manifest))
|
||||
self.failUnlessEqual(len(manifest), 4)
|
||||
expected = [self.rootnode.get_refresh_capability(),
|
||||
self.bar_node.get_refresh_capability(),
|
||||
file2_node.get_refresh_capability(),
|
||||
self.baz_node.get_refresh_capability(),
|
||||
expected = [self.rootnode.get_verifier().to_string(),
|
||||
self.bar_node.get_verifier().to_string(),
|
||||
file2_node.get_verifier().to_string(),
|
||||
self.baz_node.get_verifier().to_string(),
|
||||
]
|
||||
expected.sort()
|
||||
self.failUnlessEqual(manifest, expected)
|
||||
|
@ -8,7 +8,7 @@ from twisted.internet import threads # CLI tests use deferToThread
|
||||
from twisted.application import service
|
||||
from allmydata import client, uri, download, upload
|
||||
from allmydata.introducer_and_vdrive import IntroducerAndVdrive
|
||||
from allmydata.util import fileutil, testutil
|
||||
from allmydata.util import fileutil, testutil, deferredutil
|
||||
from allmydata.scripts import runner
|
||||
from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
|
||||
from allmydata.dirnode import NotMutableError
|
||||
@ -103,6 +103,9 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
|
||||
return d
|
||||
|
||||
def wait_for_connections(self, ignored=None):
|
||||
# TODO: replace this with something that takes a list of peerids and
|
||||
# fires when they've all been heard from, instead of using a count
|
||||
# and a threshold
|
||||
for c in self.clients:
|
||||
if (not c.introducer_client or
|
||||
len(list(c.get_all_peerids())) != self.numclients):
|
||||
@ -291,6 +294,7 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
|
||||
d.addCallback(self._test_web_start)
|
||||
d.addCallback(self._test_control)
|
||||
d.addCallback(self._test_cli)
|
||||
d.addCallback(self._test_checker)
|
||||
return d
|
||||
test_vdrive.timeout = 1100
|
||||
|
||||
@ -788,3 +792,20 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
|
||||
d.addCallback(_done)
|
||||
return d
|
||||
|
||||
def _test_checker(self, res):
|
||||
vdrive0 = self.clients[0].getServiceNamed("vdrive")
|
||||
checker1 = self.clients[1].getServiceNamed("checker")
|
||||
d = vdrive0.get_node_at_path("~")
|
||||
d.addCallback(lambda home: home.build_manifest())
|
||||
def _check_all(manifest):
|
||||
dl = []
|
||||
for si in manifest:
|
||||
dl.append(checker1.check(si))
|
||||
return deferredutil.DeferredListShouldSucceed(dl)
|
||||
d.addCallback(_check_all)
|
||||
def _done(res):
|
||||
for i in res:
|
||||
self.failUnless(i is True or i == 10)
|
||||
d.addCallback(_done)
|
||||
return d
|
||||
|
||||
|
@ -3,7 +3,7 @@ import re
|
||||
from zope.interface import implements
|
||||
from twisted.python.components import registerAdapter
|
||||
from allmydata.util import idlib, hashutil
|
||||
from allmydata.interfaces import IURI, IDirnodeURI, IFileURI
|
||||
from allmydata.interfaces import IURI, IDirnodeURI, IFileURI, IVerifierURI
|
||||
|
||||
# the URI shall be an ascii representation of the file. It shall contain
|
||||
# enough information to retrieve and validate the contents. It shall be
|
||||
@ -86,6 +86,66 @@ class CHKFileURI(_BaseURI):
|
||||
def get_size(self):
|
||||
return self.size
|
||||
|
||||
def get_verifier(self):
|
||||
return CHKFileVerifierURI(storage_index=self.storage_index,
|
||||
uri_extension_hash=self.uri_extension_hash,
|
||||
needed_shares=self.needed_shares,
|
||||
total_shares=self.total_shares,
|
||||
size=self.size)
|
||||
|
||||
class CHKFileVerifierURI(_BaseURI):
|
||||
implements(IVerifierURI)
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
# construct me with kwargs, since there are so many of them
|
||||
if not kwargs:
|
||||
return
|
||||
keys = ("storage_index", "uri_extension_hash",
|
||||
"needed_shares", "total_shares", "size")
|
||||
for name in kwargs:
|
||||
if name in keys:
|
||||
value = kwargs[name]
|
||||
setattr(self, name, value)
|
||||
else:
|
||||
raise TypeError("CHKFileVerifierURI does not accept "
|
||||
"'%s=' argument"
|
||||
% name)
|
||||
|
||||
def init_from_string(self, uri):
|
||||
assert uri.startswith("URI:CHK-Verifier:"), uri
|
||||
d = {}
|
||||
(header_uri, header_chk,
|
||||
storage_index_s, uri_extension_hash_s,
|
||||
needed_shares_s, total_shares_s, size_s) = uri.split(":")
|
||||
assert header_uri == "URI"
|
||||
assert header_chk == "CHK-Verifier"
|
||||
|
||||
self.storage_index = idlib.a2b(storage_index_s)
|
||||
assert isinstance(self.storage_index, str)
|
||||
assert len(self.storage_index) == 16 # sha256 hash truncated to 128
|
||||
|
||||
self.uri_extension_hash = idlib.a2b(uri_extension_hash_s)
|
||||
assert isinstance(self.uri_extension_hash, str)
|
||||
assert len(self.uri_extension_hash) == 32 # sha56 hash
|
||||
|
||||
self.needed_shares = int(needed_shares_s)
|
||||
self.total_shares = int(total_shares_s)
|
||||
self.size = int(size_s)
|
||||
return self
|
||||
|
||||
def to_string(self):
|
||||
assert isinstance(self.needed_shares, int)
|
||||
assert isinstance(self.total_shares, int)
|
||||
assert isinstance(self.size, (int,long))
|
||||
|
||||
return ("URI:CHK-Verifier:%s:%s:%d:%d:%d" %
|
||||
(idlib.b2a(self.storage_index),
|
||||
idlib.b2a(self.uri_extension_hash),
|
||||
self.needed_shares,
|
||||
self.total_shares,
|
||||
self.size))
|
||||
|
||||
|
||||
class LiteralFileURI(_BaseURI):
|
||||
implements(IURI, IFileURI)
|
||||
|
||||
@ -109,6 +169,10 @@ class LiteralFileURI(_BaseURI):
|
||||
def get_readonly(self):
|
||||
return self
|
||||
|
||||
def get_verifier(self):
|
||||
# LIT files need no verification, all the data is present in the URI
|
||||
return None
|
||||
|
||||
def get_size(self):
|
||||
return len(self.data)
|
||||
|
||||
@ -151,6 +215,8 @@ class DirnodeURI(_BaseURI):
|
||||
return True
|
||||
def get_readonly(self):
|
||||
return ReadOnlyDirnodeURI(self.furl, self.readkey)
|
||||
def get_verifier(self):
|
||||
return DirnodeVerifierURI(self.furl, self.storage_index)
|
||||
|
||||
class ReadOnlyDirnodeURI(_BaseURI):
|
||||
implements(IURI, IDirnodeURI)
|
||||
@ -191,16 +257,49 @@ class ReadOnlyDirnodeURI(_BaseURI):
|
||||
return True
|
||||
def get_readonly(self):
|
||||
return self
|
||||
def get_verifier(self):
|
||||
return DirnodeVerifierURI(self.furl, self.storage_index)
|
||||
|
||||
class DirnodeVerifierURI(_BaseURI):
|
||||
implements(IVerifierURI)
|
||||
|
||||
def __init__(self, furl=None, storage_index=None):
|
||||
if furl is not None or storage_index is not None:
|
||||
assert furl is not None
|
||||
assert storage_index is not None
|
||||
self.furl = furl
|
||||
self.storage_index = storage_index
|
||||
|
||||
def init_from_string(self, uri):
|
||||
# URI:DIR-Verifier:furl:storageindex
|
||||
# but note that the furl contains colons
|
||||
prefix = "URI:DIR-Verifier:"
|
||||
assert uri.startswith(prefix)
|
||||
uri = uri[len(prefix):]
|
||||
colon = uri.rindex(":")
|
||||
self.furl = uri[:colon]
|
||||
self.storage_index = idlib.a2b(uri[colon+1:])
|
||||
return self
|
||||
|
||||
def to_string(self):
|
||||
return "URI:DIR-Verifier:%s:%s" % (self.furl,
|
||||
idlib.b2a(self.storage_index))
|
||||
|
||||
|
||||
|
||||
def from_string(s):
|
||||
if s.startswith("URI:CHK:"):
|
||||
return CHKFileURI().init_from_string(s)
|
||||
elif s.startswith("URI:CHK-Verifier:"):
|
||||
return CHKFileVerifierURI().init_from_string(s)
|
||||
elif s.startswith("URI:LIT:"):
|
||||
return LiteralFileURI().init_from_string(s)
|
||||
elif s.startswith("URI:DIR:"):
|
||||
return DirnodeURI().init_from_string(s)
|
||||
elif s.startswith("URI:DIR-RO:"):
|
||||
return ReadOnlyDirnodeURI().init_from_string(s)
|
||||
elif s.startswith("URI:DIR-Verifier:"):
|
||||
return DirnodeVerifierURI().init_from_string(s)
|
||||
else:
|
||||
raise TypeError("unknown URI type: %s.." % s[:10])
|
||||
|
||||
@ -220,6 +319,12 @@ def from_string_filenode(s):
|
||||
|
||||
registerAdapter(from_string_filenode, str, IFileURI)
|
||||
|
||||
def from_string_verifier(s):
|
||||
u = from_string(s)
|
||||
assert IVerifierURI.providedBy(u)
|
||||
return u
|
||||
registerAdapter(from_string_verifier, str, IVerifierURI)
|
||||
|
||||
|
||||
def pack_extension(data):
|
||||
pieces = []
|
||||
|
Loading…
x
Reference in New Issue
Block a user