add --add-lease to 'tahoe check', 'tahoe deep-check', and webapi.

This commit is contained in:
Brian Warner 2009-02-17 19:32:43 -07:00
parent e9563ebc02
commit bce4a5385b
16 changed files with 331 additions and 64 deletions

View File

@ -767,6 +767,12 @@ POST $URL?t=check
If a verify=true argument is provided, the node will perform a more
intensive check, downloading and verifying every single bit of every share.
If an add-lease=true argument is provided, the node will also add (or
renew) a lease to every share it encounters. Each lease will keep the share
alive for a certain period of time (one month by default). Once the last
lease expires or is explicitly cancelled, the storage server is allowed to
delete the share.
If an output=JSON argument is provided, the response will be
machine-readable JSON instead of human-oriented HTML. The data is a
dictionary with the following keys:
@ -837,7 +843,7 @@ POST $URL?t=start-deep-check (must add &ophandle=XYZ)
BAD_REQUEST) will be signalled if it is invoked on a file. The recursive
walker will deal with loops safely.
This accepts the same verify= argument as t=check.
This accepts the same verify= and add-lease= arguments as t=check.
Since this operation can take a long time (perhaps a second per object),
the ophandle= argument is required (see "Slow Operations, Progress, and
@ -931,9 +937,9 @@ POST $URL?t=check&repair=true
or corrupted), it will perform a "repair". During repair, any missing
shares will be regenerated and uploaded to new servers.
This accepts the same verify=true argument as t=check. When an output=JSON
argument is provided, the machine-readable JSON response will contain the
following keys:
This accepts the same verify=true and add-lease= arguments as t=check. When
an output=JSON argument is provided, the machine-readable JSON response
will contain the following keys:
storage-index: a base32-encoded string with the objects's storage index,
or an empty string for LIT files
@ -961,9 +967,10 @@ POST $URL?t=start-deep-check&repair=true (must add &ophandle=XYZ)
invoked on a directory. An error (400 BAD_REQUEST) will be signalled if it
is invoked on a file. The recursive walker will deal with loops safely.
This accepts the same verify=true argument as t=start-deep-check. It uses
the same ophandle= mechanism as start-deep-check. When an output=JSON
argument is provided, the response will contain the following keys:
This accepts the same verify= and add-lease= arguments as
t=start-deep-check. It uses the same ophandle= mechanism as
start-deep-check. When an output=JSON argument is provided, the response
will contain the following keys:
finished: (bool) True if the operation has completed, else False
root-storage-index: a base32-encoded string with the storage index of the

View File

@ -234,11 +234,11 @@ class NewDirectoryNode:
def get_storage_index(self):
return self._uri._filenode_uri.storage_index
def check(self, monitor, verify=False):
def check(self, monitor, verify=False, add_lease=False):
"""Perform a file check. See IChecker.check for details."""
return self._node.check(monitor, verify)
def check_and_repair(self, monitor, verify=False):
return self._node.check_and_repair(monitor, verify)
return self._node.check(monitor, verify, add_lease)
def check_and_repair(self, monitor, verify=False, add_lease=False):
return self._node.check_and_repair(monitor, verify, add_lease)
def list(self):
"""I return a Deferred that fires with a dictionary mapping child
@ -560,11 +560,11 @@ class NewDirectoryNode:
# children for which we've got both a write-cap and a read-cap
return self.deep_traverse(DeepStats(self))
def start_deep_check(self, verify=False):
return self.deep_traverse(DeepChecker(self, verify, repair=False))
def start_deep_check(self, verify=False, add_lease=False):
return self.deep_traverse(DeepChecker(self, verify, repair=False, add_lease=add_lease))
def start_deep_check_and_repair(self, verify=False):
return self.deep_traverse(DeepChecker(self, verify, repair=True))
def start_deep_check_and_repair(self, verify=False, add_lease=False):
return self.deep_traverse(DeepChecker(self, verify, repair=True, add_lease=add_lease))
@ -695,13 +695,14 @@ class ManifestWalker(DeepStats):
class DeepChecker:
def __init__(self, root, verify, repair):
def __init__(self, root, verify, repair, add_lease):
root_si = root.get_storage_index()
self._lp = log.msg(format="deep-check starting (%(si)s),"
" verify=%(verify)s, repair=%(repair)s",
si=base32.b2a(root_si), verify=verify, repair=repair)
self._verify = verify
self._repair = repair
self._add_lease = add_lease
if repair:
self._results = DeepCheckAndRepairResults(root_si)
else:
@ -714,10 +715,10 @@ class DeepChecker:
def add_node(self, node, childpath):
if self._repair:
d = node.check_and_repair(self.monitor, self._verify)
d = node.check_and_repair(self.monitor, self._verify, self._add_lease)
d.addCallback(self._results.add_check_and_repair, childpath)
else:
d = node.check(self.monitor, self._verify)
d = node.check(self.monitor, self._verify, self._add_lease)
d.addCallback(self._results.add_check, childpath)
d.addCallback(lambda ignored: self._stats.add_node(node, childpath))
return d

View File

@ -1,10 +1,14 @@
from foolscap import DeadReferenceError
from twisted.internet import defer
from allmydata import hashtree
from allmydata.check_results import CheckResults
from allmydata.immutable import download
from allmydata.uri import CHKFileVerifierURI
from allmydata.util.assertutil import precondition
from allmydata.util import base32, deferredutil, dictutil, log, rrefutil
from allmydata.util.hashutil import file_renewal_secret_hash, \
file_cancel_secret_hash, bucket_renewal_secret_hash, \
bucket_cancel_secret_hash
from allmydata.immutable import layout
@ -29,7 +33,7 @@ class Checker(log.PrefixingLogMixin):
object that was passed into my constructor whether this task has been
cancelled (by invoking its raise_if_cancelled() method).
"""
def __init__(self, client, verifycap, servers, verify, monitor):
def __init__(self, client, verifycap, servers, verify, add_lease, monitor):
assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap))
assert precondition(isinstance(servers, (set, frozenset)), servers)
for (serverid, serverrref) in servers:
@ -45,9 +49,22 @@ class Checker(log.PrefixingLogMixin):
self._monitor = monitor
self._servers = servers
self._verify = verify # bool: verify what the servers claim, or not?
self._add_lease = add_lease
self._share_hash_tree = None
frs = file_renewal_secret_hash(client.get_renewal_secret(),
self._verifycap.storage_index)
self.file_renewal_secret = frs
fcs = file_cancel_secret_hash(client.get_cancel_secret(),
self._verifycap.storage_index)
self.file_cancel_secret = fcs
def _get_renewal_secret(self, peerid):
return bucket_renewal_secret_hash(self.file_renewal_secret, peerid)
def _get_cancel_secret(self, peerid):
return bucket_cancel_secret_hash(self.file_cancel_secret, peerid)
def _get_buckets(self, server, storageindex, serverid):
"""Return a deferred that eventually fires with ({sharenum: bucket},
serverid, success). In case the server is disconnected or returns a
@ -58,6 +75,24 @@ class Checker(log.PrefixingLogMixin):
responded.)"""
d = server.callRemote("get_buckets", storageindex)
if self._add_lease:
renew_secret = self._get_renewal_secret(serverid)
cancel_secret = self._get_cancel_secret(serverid)
d2 = server.callRemote("add_lease", storageindex,
renew_secret, cancel_secret)
dl = defer.DeferredList([d, d2])
def _done(res):
[(get_success, get_result),
(addlease_success, addlease_result)] = res
if (not addlease_success and
not addlease_result.check(IndexError)):
# tahoe=1.3.0 raised IndexError on non-existant buckets,
# which we ignore. But report others, including the
# unfortunate internal KeyError bug that <1.3.0 had.
return addlease_result # propagate error
return get_result
dl.addCallback(_done)
d = dl
def _wrap_results(res):
for k in res:

View File

@ -199,11 +199,12 @@ class FileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
def get_storage_index(self):
return self.u.storage_index
def check_and_repair(self, monitor, verify=False):
def check_and_repair(self, monitor, verify=False, add_lease=False):
verifycap = self.get_verify_cap()
servers = self._client.get_servers("storage")
c = Checker(client=self._client, verifycap=verifycap, servers=servers, verify=verify, monitor=monitor)
c = Checker(client=self._client, verifycap=verifycap, servers=servers,
verify=verify, add_lease=add_lease, monitor=monitor)
d = c.start()
def _maybe_repair(cr):
crr = CheckAndRepairResults(self.u.storage_index)
@ -251,8 +252,10 @@ class FileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
d.addCallback(_maybe_repair)
return d
def check(self, monitor, verify=False):
v = Checker(client=self._client, verifycap=self.get_verify_cap(), servers=self._client.get_servers("storage"), verify=verify, monitor=monitor)
def check(self, monitor, verify=False, add_lease=False):
v = Checker(client=self._client, verifycap=self.get_verify_cap(),
servers=self._client.get_servers("storage"),
verify=verify, add_lease=add_lease, monitor=monitor)
return v.start()
def read(self, consumer, offset=0, size=None):
@ -310,10 +313,10 @@ class LiteralFileNode(_ImmutableFileNodeBase):
def get_storage_index(self):
return None
def check(self, monitor, verify=False):
def check(self, monitor, verify=False, add_lease=False):
return defer.succeed(None)
def check_and_repair(self, monitor, verify=False):
def check_and_repair(self, monitor, verify=False, add_lease=False):
return defer.succeed(None)
def read(self, consumer, offset=0, size=None):

View File

@ -1551,7 +1551,7 @@ class IUploader(Interface):
"""TODO: how should this work?"""
class ICheckable(Interface):
def check(monitor, verify=False):
def check(monitor, verify=False, add_lease=False):
"""Check upon my health, optionally repairing any problems.
This returns a Deferred that fires with an instance that provides
@ -1585,13 +1585,21 @@ class ICheckable(Interface):
failures during retrieval, or is malicious or buggy, then
verification will detect the problem, but checking will not.
If add_lease=True, I will ensure that an up-to-date lease is present
on each share. The lease secrets will be derived from by node secret
(in BASEDIR/private/secret), so either I will add a new lease to the
share, or I will merely renew the lease that I already had. In a
future version of the storage-server protocol (once Accounting has
been implemented), there may be additional options here to define the
kind of lease that is obtained (which account number to claim, etc).
TODO: any problems seen during checking will be reported to the
health-manager.furl, a centralized object which is responsible for
figuring out why files are unhealthy so corrective action can be
taken.
"""
def check_and_repair(monitor, verify=False):
def check_and_repair(monitor, verify=False, add_lease=False):
"""Like check(), but if the file/directory is not healthy, attempt to
repair the damage.
@ -1605,7 +1613,7 @@ class ICheckable(Interface):
ICheckAndRepairResults."""
class IDeepCheckable(Interface):
def start_deep_check(verify=False):
def start_deep_check(verify=False, add_lease=False):
"""Check upon the health of me and everything I can reach.
This is a recursive form of check(), useable only on dirnodes.
@ -1614,7 +1622,7 @@ class IDeepCheckable(Interface):
object.
"""
def start_deep_check_and_repair(verify=False):
def start_deep_check_and_repair(verify=False, add_lease=False):
"""Check upon the health of me and everything I can reach. Repair
anything that isn't healthy.

View File

@ -21,9 +21,10 @@ class MutableChecker:
self.need_repair = False
self.responded = set() # set of (binary) nodeids
def check(self, verify=False):
def check(self, verify=False, add_lease=False):
servermap = ServerMap()
u = ServermapUpdater(self._node, self._monitor, servermap, MODE_CHECK)
u = ServermapUpdater(self._node, self._monitor, servermap, MODE_CHECK,
add_lease=add_lease)
history = self._node._client.get_history()
if history:
history.notify_mapupdate(u.get_status())
@ -285,8 +286,8 @@ class MutableCheckAndRepairer(MutableChecker):
self.cr_results.pre_repair_results = self.results
self.need_repair = False
def check(self, verify=False):
d = MutableChecker.check(self, verify)
def check(self, verify=False, add_lease=False):
d = MutableChecker.check(self, verify, add_lease)
d.addCallback(self._maybe_repair)
d.addCallback(lambda res: self.cr_results)
return d

View File

@ -246,13 +246,13 @@ class MutableFileNode:
#################################
# ICheckable
def check(self, monitor, verify=False):
def check(self, monitor, verify=False, add_lease=False):
checker = self.checker_class(self, monitor)
return checker.check(verify)
return checker.check(verify, add_lease)
def check_and_repair(self, monitor, verify=False):
def check_and_repair(self, monitor, verify=False, add_lease=False):
checker = self.check_and_repairer_class(self, monitor)
return checker.check(verify)
return checker.check(verify, add_lease)
#################################
# IRepairable

View File

@ -338,7 +338,8 @@ class ServerMap:
class ServermapUpdater:
def __init__(self, filenode, monitor, servermap, mode=MODE_READ):
def __init__(self, filenode, monitor, servermap, mode=MODE_READ,
add_lease=False):
"""I update a servermap, locating a sufficient number of useful
shares and remembering where they are located.
@ -348,6 +349,7 @@ class ServermapUpdater:
self._monitor = monitor
self._servermap = servermap
self.mode = mode
self._add_lease = add_lease
self._running = True
self._storage_index = filenode.get_storage_index()
@ -536,6 +538,24 @@ class ServermapUpdater:
def _do_read(self, ss, peerid, storage_index, shnums, readv):
d = ss.callRemote("slot_readv", storage_index, shnums, readv)
if self._add_lease:
renew_secret = self._node.get_renewal_secret(peerid)
cancel_secret = self._node.get_cancel_secret(peerid)
d2 = ss.callRemote("add_lease", storage_index,
renew_secret, cancel_secret)
dl = defer.DeferredList([d, d2])
def _done(res):
[(readv_success, readv_result),
(addlease_success, addlease_result)] = res
if (not addlease_success and
not addlease_result.check(IndexError)):
# tahoe 1.3.0 raised IndexError on non-existant buckets,
# which we ignore. Unfortunately tahoe <1.3.0 had a bug
# and raised KeyError, which we report.
return addlease_result # propagate error
return readv_result
dl.addCallback(_done)
return dl
return d
def _got_results(self, datavs, peerid, readsize, stuff, started):

View File

@ -247,6 +247,7 @@ class CheckOptions(VDriveOptions):
("raw", None, "Display raw JSON data instead of parsed"),
("verify", None, "Verify all hashes, instead of merely querying share presence"),
("repair", None, "Automatically repair any problems found"),
("add-lease", None, "Add/renew lease on all shares"),
]
def parseArgs(self, where=''):
self.where = where
@ -261,6 +262,7 @@ class DeepCheckOptions(VDriveOptions):
("raw", None, "Display raw JSON data instead of parsed"),
("verify", None, "Verify all hashes, instead of merely querying share presence"),
("repair", None, "Automatically repair any problems found"),
("add-lease", None, "Add/renew lease on all shares"),
("verbose", "v", "Be noisy about what is happening."),
]
def parseArgs(self, where=''):

View File

@ -27,6 +27,8 @@ def check(options):
url += "&verify=true"
if options["repair"]:
url += "&repair=true"
if options["add-lease"]:
url += "&add-lease=true"
resp = do_http("POST", url)
if resp.status != 200:
@ -248,6 +250,8 @@ class DeepCheckStreamer(LineOnlyReceiver):
output = DeepCheckAndRepairOutput(options)
else:
output = DeepCheckOutput(options)
if options["add-lease"]:
url += "&add-lease=true"
resp = do_http("POST", url)
if resp.status not in (200, 302):
print >>stderr, "ERROR", resp.status, resp.reason, resp.read()

View File

@ -53,7 +53,7 @@ class FakeCHKFileNode:
def get_storage_index(self):
return self.storage_index
def check(self, monitor, verify=False):
def check(self, monitor, verify=False, add_lease=False):
r = CheckResults(self.my_uri, self.storage_index)
is_bad = self.bad_shares.get(self.storage_index, None)
data = {}
@ -81,7 +81,7 @@ class FakeCHKFileNode:
r.set_data(data)
r.set_needs_rebalancing(False)
return defer.succeed(r)
def check_and_repair(self, monitor, verify=False):
def check_and_repair(self, monitor, verify=False, add_lease=False):
d = self.check(verify)
def _got(cr):
r = CheckAndRepairResults(self.storage_index)
@ -189,7 +189,7 @@ class FakeMutableFileNode:
def get_storage_index(self):
return self.storage_index
def check(self, monitor, verify=False):
def check(self, monitor, verify=False, add_lease=False):
r = CheckResults(self.my_uri, self.storage_index)
is_bad = self.bad_shares.get(self.storage_index, None)
data = {}
@ -219,7 +219,7 @@ class FakeMutableFileNode:
r.set_needs_rebalancing(False)
return defer.succeed(r)
def check_and_repair(self, monitor, verify=False):
def check_and_repair(self, monitor, verify=False, add_lease=False):
d = self.check(verify)
def _got(cr):
r = CheckAndRepairResults(self.storage_index)
@ -228,7 +228,7 @@ class FakeMutableFileNode:
d.addCallback(_got)
return d
def deep_check(self, verify=False):
def deep_check(self, verify=False, add_lease=False):
d = self.check(verify)
def _done(r):
dr = DeepCheckResults(self.storage_index)
@ -237,7 +237,7 @@ class FakeMutableFileNode:
d.addCallback(_done)
return d
def deep_check_and_repair(self, verify=False):
def deep_check_and_repair(self, verify=False, add_lease=False):
d = self.check_and_repair(verify)
def _done(r):
dr = DeepCheckAndRepairResults(self.storage_index)

View File

@ -200,9 +200,9 @@ class GridTestMixin:
def tearDown(self):
return self.s.stopService()
def set_up_grid(self, client_config_hooks={}):
def set_up_grid(self, num_clients=1, client_config_hooks={}):
# self.basedir must be set
self.g = NoNetworkGrid(self.basedir,
self.g = NoNetworkGrid(self.basedir, num_clients=num_clients,
client_config_hooks=client_config_hooks)
self.g.setServiceParent(self.s)
self.client_webports = [c.getServiceNamed("webish").listener._port.getHost().port

View File

@ -42,13 +42,13 @@ class Marker:
def get_storage_index(self):
return self.storage_index
def check(self, monitor, verify=False):
def check(self, monitor, verify=False, add_lease=False):
r = CheckResults(uri.from_string(self.nodeuri), None)
r.set_healthy(True)
r.set_recoverable(True)
return defer.succeed(r)
def check_and_repair(self, monitor, verify=False):
def check_and_repair(self, monitor, verify=False, add_lease=False):
d = self.check(verify)
def _got(cr):
r = CheckAndRepairResults(None)

View File

@ -6,7 +6,7 @@ from twisted.trial import unittest
from twisted.internet import defer, reactor
from twisted.web import client, error, http
from twisted.python import failure, log
from allmydata import interfaces, uri, webish
from allmydata import interfaces, uri, webish, storage
from allmydata.immutable import upload, download
from allmydata.web import status, common
from allmydata.scripts.debug import CorruptShareOptions, corrupt_share
@ -2526,14 +2526,14 @@ class Util(unittest.TestCase):
class Grid(GridTestMixin, WebErrorMixin, unittest.TestCase):
def GET(self, urlpath, followRedirect=False, return_response=False,
method="GET", **kwargs):
method="GET", clientnum=0, **kwargs):
# if return_response=True, this fires with (data, statuscode,
# respheaders) instead of just data.
assert not isinstance(urlpath, unicode)
url = self.client_baseurls[0] + urlpath
url = self.client_baseurls[clientnum] + urlpath
factory = HTTPClientGETFactory(url, method=method,
followRedirect=followRedirect, **kwargs)
reactor.connectTCP("localhost", self.client_webports[0], factory)
reactor.connectTCP("localhost", self.client_webports[clientnum],factory)
d = factory.deferred
def _got_data(data):
return (data, factory.status, factory.response_headers)
@ -2541,10 +2541,10 @@ class Grid(GridTestMixin, WebErrorMixin, unittest.TestCase):
d.addCallback(_got_data)
return factory.deferred
def CHECK(self, ign, which, args):
def CHECK(self, ign, which, args, clientnum=0):
fileurl = self.fileurls[which]
url = fileurl + "?" + args
return self.GET(url, method="POST")
return self.GET(url, method="POST", clientnum=clientnum)
def test_filecheck(self):
self.basedir = "web/Grid/filecheck"
@ -2940,3 +2940,184 @@ class Grid(GridTestMixin, WebErrorMixin, unittest.TestCase):
d.addErrback(self.explain_web_error)
return d
def _count_leases(self, ignored, which):
u = self.uris[which]
shares = self.find_shares(u)
lease_counts = []
for shnum, serverid, fn in shares:
if u.startswith("URI:SSK") or u.startswith("URI:DIR2"):
sf = storage.MutableShareFile(fn)
num_leases = len(sf.debug_get_leases())
elif u.startswith("URI:CHK"):
sf = storage.ShareFile(fn)
num_leases = len(list(sf.iter_leases()))
else:
raise RuntimeError("can't get leases on %s" % u)
lease_counts.append( (fn, num_leases) )
return lease_counts
def _assert_leasecount(self, lease_counts, expected):
for (fn, num_leases) in lease_counts:
if num_leases != expected:
self.fail("expected %d leases, have %d, on %s" %
(expected, num_leases, fn))
def test_add_lease(self):
self.basedir = "web/Grid/add_lease"
self.set_up_grid(num_clients=2)
c0 = self.g.clients[0]
self.uris = {}
DATA = "data" * 100
d = c0.upload(upload.Data(DATA, convergence=""))
def _stash_uri(ur, which):
self.uris[which] = ur.uri
d.addCallback(_stash_uri, "one")
d.addCallback(lambda ign:
c0.upload(upload.Data(DATA+"1", convergence="")))
d.addCallback(_stash_uri, "two")
def _stash_mutable_uri(n, which):
self.uris[which] = n.get_uri()
assert isinstance(self.uris[which], str)
d.addCallback(lambda ign: c0.create_mutable_file(DATA+"2"))
d.addCallback(_stash_mutable_uri, "mutable")
def _compute_fileurls(ignored):
self.fileurls = {}
for which in self.uris:
self.fileurls[which] = "uri/" + urllib.quote(self.uris[which])
d.addCallback(_compute_fileurls)
d.addCallback(self._count_leases, "one")
d.addCallback(self._assert_leasecount, 1)
d.addCallback(self._count_leases, "two")
d.addCallback(self._assert_leasecount, 1)
d.addCallback(self._count_leases, "mutable")
d.addCallback(self._assert_leasecount, 1)
d.addCallback(self.CHECK, "one", "t=check") # no add-lease
def _got_html_good(res):
self.failUnless("Healthy" in res, res)
self.failIf("Not Healthy" in res, res)
d.addCallback(_got_html_good)
d.addCallback(self._count_leases, "one")
d.addCallback(self._assert_leasecount, 1)
d.addCallback(self._count_leases, "two")
d.addCallback(self._assert_leasecount, 1)
d.addCallback(self._count_leases, "mutable")
d.addCallback(self._assert_leasecount, 1)
# this CHECK uses the original client, which uses the same
# lease-secrets, so it will just renew the original lease
d.addCallback(self.CHECK, "one", "t=check&add-lease=true")
d.addCallback(_got_html_good)
d.addCallback(self._count_leases, "one")
d.addCallback(self._assert_leasecount, 1)
d.addCallback(self._count_leases, "two")
d.addCallback(self._assert_leasecount, 1)
d.addCallback(self._count_leases, "mutable")
d.addCallback(self._assert_leasecount, 1)
# this CHECK uses an alternate client, which adds a second lease
d.addCallback(self.CHECK, "one", "t=check&add-lease=true", clientnum=1)
d.addCallback(_got_html_good)
d.addCallback(self._count_leases, "one")
d.addCallback(self._assert_leasecount, 2)
d.addCallback(self._count_leases, "two")
d.addCallback(self._assert_leasecount, 1)
d.addCallback(self._count_leases, "mutable")
d.addCallback(self._assert_leasecount, 1)
d.addCallback(self.CHECK, "mutable", "t=check&add-lease=true")
d.addCallback(_got_html_good)
d.addCallback(self._count_leases, "one")
d.addCallback(self._assert_leasecount, 2)
d.addCallback(self._count_leases, "two")
d.addCallback(self._assert_leasecount, 1)
d.addCallback(self._count_leases, "mutable")
d.addCallback(self._assert_leasecount, 1)
d.addCallback(self.CHECK, "mutable", "t=check&add-lease=true",
clientnum=1)
d.addCallback(_got_html_good)
d.addCallback(self._count_leases, "one")
d.addCallback(self._assert_leasecount, 2)
d.addCallback(self._count_leases, "two")
d.addCallback(self._assert_leasecount, 1)
d.addCallback(self._count_leases, "mutable")
d.addCallback(self._assert_leasecount, 2)
d.addErrback(self.explain_web_error)
return d
def test_deep_add_lease(self):
self.basedir = "web/Grid/deep_add_lease"
self.set_up_grid(num_clients=2)
c0 = self.g.clients[0]
self.uris = {}
self.fileurls = {}
DATA = "data" * 100
d = c0.create_empty_dirnode()
def _stash_root_and_create_file(n):
self.rootnode = n
self.uris["root"] = n.get_uri()
self.fileurls["root"] = "uri/" + urllib.quote(n.get_uri()) + "/"
return n.add_file(u"one", upload.Data(DATA, convergence=""))
d.addCallback(_stash_root_and_create_file)
def _stash_uri(fn, which):
self.uris[which] = fn.get_uri()
d.addCallback(_stash_uri, "one")
d.addCallback(lambda ign:
self.rootnode.add_file(u"small",
upload.Data("literal",
convergence="")))
d.addCallback(_stash_uri, "small")
d.addCallback(lambda ign: c0.create_mutable_file("mutable"))
d.addCallback(lambda fn: self.rootnode.set_node(u"mutable", fn))
d.addCallback(_stash_uri, "mutable")
d.addCallback(self.CHECK, "root", "t=stream-deep-check") # no add-lease
def _done(res):
units = [simplejson.loads(line)
for line in res.splitlines()
if line]
# root, one, small, mutable, stats
self.failUnlessEqual(len(units), 4+1)
d.addCallback(_done)
d.addCallback(self._count_leases, "root")
d.addCallback(self._assert_leasecount, 1)
d.addCallback(self._count_leases, "one")
d.addCallback(self._assert_leasecount, 1)
d.addCallback(self._count_leases, "mutable")
d.addCallback(self._assert_leasecount, 1)
d.addCallback(self.CHECK, "root", "t=stream-deep-check&add-lease=true")
d.addCallback(_done)
d.addCallback(self._count_leases, "root")
d.addCallback(self._assert_leasecount, 1)
d.addCallback(self._count_leases, "one")
d.addCallback(self._assert_leasecount, 1)
d.addCallback(self._count_leases, "mutable")
d.addCallback(self._assert_leasecount, 1)
d.addCallback(self.CHECK, "root", "t=stream-deep-check&add-lease=true",
clientnum=1)
d.addCallback(_done)
d.addCallback(self._count_leases, "root")
d.addCallback(self._assert_leasecount, 2)
d.addCallback(self._count_leases, "one")
d.addCallback(self._assert_leasecount, 2)
d.addCallback(self._count_leases, "mutable")
d.addCallback(self._assert_leasecount, 2)
d.addErrback(self.explain_web_error)
return d

View File

@ -355,11 +355,12 @@ class DirectoryNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin):
# check this directory
verify = boolean_of_arg(get_arg(req, "verify", "false"))
repair = boolean_of_arg(get_arg(req, "repair", "false"))
add_lease = boolean_of_arg(get_arg(req, "add-lease", "false"))
if repair:
d = self.node.check_and_repair(Monitor(), verify)
d = self.node.check_and_repair(Monitor(), verify, add_lease)
d.addCallback(lambda res: CheckAndRepairResults(res))
else:
d = self.node.check(Monitor(), verify)
d = self.node.check(Monitor(), verify, add_lease)
d.addCallback(lambda res: CheckResults(res))
return d
@ -374,18 +375,20 @@ class DirectoryNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin):
raise NeedOperationHandleError("slow operation requires ophandle=")
verify = boolean_of_arg(get_arg(ctx, "verify", "false"))
repair = boolean_of_arg(get_arg(ctx, "repair", "false"))
add_lease = boolean_of_arg(get_arg(ctx, "add-lease", "false"))
if repair:
monitor = self.node.start_deep_check_and_repair(verify)
monitor = self.node.start_deep_check_and_repair(verify, add_lease)
renderer = DeepCheckAndRepairResults(monitor)
else:
monitor = self.node.start_deep_check(verify)
monitor = self.node.start_deep_check(verify, add_lease)
renderer = DeepCheckResults(monitor)
return self._start_operation(monitor, renderer, ctx)
def _POST_stream_deep_check(self, ctx):
verify = boolean_of_arg(get_arg(ctx, "verify", "false"))
repair = boolean_of_arg(get_arg(ctx, "repair", "false"))
walker = DeepCheckStreamer(ctx, self.node, verify, repair)
add_lease = boolean_of_arg(get_arg(ctx, "add-lease", "false"))
walker = DeepCheckStreamer(ctx, self.node, verify, repair, add_lease)
monitor = self.node.deep_traverse(walker)
walker.setMonitor(monitor)
# register to hear stopProducing. The walker ignores pauseProducing.
@ -930,11 +933,12 @@ class ManifestStreamer(dirnode.DeepStats):
class DeepCheckStreamer(dirnode.DeepStats):
implements(IPushProducer)
def __init__(self, ctx, origin, verify, repair):
def __init__(self, ctx, origin, verify, repair, add_lease):
dirnode.DeepStats.__init__(self, origin)
self.req = IRequest(ctx)
self.verify = verify
self.repair = repair
self.add_lease = add_lease
def setMonitor(self, monitor):
self.monitor = monitor
@ -971,10 +975,10 @@ class DeepCheckStreamer(dirnode.DeepStats):
data["storage-index"] = si
if self.repair:
d = node.check_and_repair(self.monitor, self.verify)
d = node.check_and_repair(self.monitor, self.verify, self.add_lease)
d.addCallback(self.add_check_and_repair, data)
else:
d = node.check(self.monitor, self.verify)
d = node.check(self.monitor, self.verify, self.add_lease)
d.addCallback(self.add_check, data)
d.addCallback(self.write_line)
return d

View File

@ -256,13 +256,14 @@ class FileNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin):
def _POST_check(self, req):
verify = boolean_of_arg(get_arg(req, "verify", "false"))
repair = boolean_of_arg(get_arg(req, "repair", "false"))
add_lease = boolean_of_arg(get_arg(req, "add-lease", "false"))
if isinstance(self.node, LiteralFileNode):
return defer.succeed(LiteralCheckResults())
if repair:
d = self.node.check_and_repair(Monitor(), verify)
d = self.node.check_and_repair(Monitor(), verify, add_lease)
d.addCallback(lambda res: CheckAndRepairResults(res))
else:
d = self.node.check(Monitor(), verify)
d = self.node.check(Monitor(), verify, add_lease)
d.addCallback(lambda res: CheckResults(res))
return d