import re
import json
import os.path, shutil
from twisted.trial import unittest
from twisted.internet import defer
from nevow.inevow import IRequest
from zope.interface import implementer
from twisted.web.server import Request
from twisted.web.test.requesthelper import DummyChannel
from twisted.web.template import flattenString
from allmydata import check_results, uri
from allmydata import uri as tahoe_uri
from allmydata.util import base32
from allmydata.web import check_results as web_check_results
from allmydata.storage_client import StorageFarmBroker, NativeStorageServer
from allmydata.storage.server import storage_index_to_dir
from allmydata.monitor import Monitor
from allmydata.test.no_network import GridTestMixin
from allmydata.immutable.upload import Data
from allmydata.test.common_web import WebRenderingMixin
from allmydata.mutable.publish import MutableData
from .common import (
EMPTY_CLIENT_CONFIG,
)
class FakeClient(object):
def get_storage_broker(self):
return self.storage_broker
# XXX: We have to have this class because `common.get_arg()` expects a
# `nevow.inevow.IRequest`, which `twisted.web.server.Request` isn't.
# Also, the request needs to have `args` and `fields` properties so
# that `allmydata.web.common.get_arg()` won't complain.
@implementer(IRequest)
class TestRequest(Request):
def __init__(self, args=None, fields=None):
Request.__init__(self, DummyChannel())
self.args = args or {}
self.fields = fields or {}
class WebResultsRendering(unittest.TestCase):
def remove_tags(self, s):
s = re.sub(r'<[^>]*>', ' ', s)
s = re.sub(r'\s+', ' ', s)
return s
def create_fake_client(self):
sb = StorageFarmBroker(True, None, EMPTY_CLIENT_CONFIG)
# s.get_name() (the "short description") will be "v0-00000000".
# s.get_longname() will include the -long suffix.
servers = [("v0-00000000-long", "\x00"*20, "peer-0"),
("v0-ffffffff-long", "\xff"*20, "peer-f"),
("v0-11111111-long", "\x11"*20, "peer-11")]
for (key_s, binary_tubid, nickname) in servers:
server_id = key_s
tubid_b32 = base32.b2a(binary_tubid)
furl = "pb://%s@nowhere/fake" % tubid_b32
ann = { "version": 0,
"service-name": "storage",
"anonymous-storage-FURL": furl,
"permutation-seed-base32": "",
"nickname": unicode(nickname),
"app-versions": {}, # need #466 and v2 introducer
"my-version": "ver",
"oldest-supported": "oldest",
}
s = NativeStorageServer(server_id, ann, None, None, None)
sb.test_add_server(server_id, s)
c = FakeClient()
c.storage_broker = sb
return c
def render_json(self, resource):
return resource.render(TestRequest(args={"output": ["json"]}))
def render_element(self, element, args=None):
d = flattenString(TestRequest(args), element)
return unittest.TestCase().successResultOf(d)
def test_literal(self):
lcr = web_check_results.LiteralCheckResultsElement()
html = self.render_element(lcr)
s = self.remove_tags(html)
self.failUnlessIn("Literal files are always healthy", s)
html = self.render_element(lcr, args={"return_to": ["FOOURL"]})
s = self.remove_tags(html)
self.failUnlessIn("Literal files are always healthy", s)
self.failUnlessIn('Return to file.', html)
c = self.create_fake_client()
lcr = web_check_results.LiteralCheckResultsRenderer(c)
js = self.render_json(lcr)
j = json.loads(js)
self.failUnlessEqual(j["storage-index"], "")
self.failUnlessEqual(j["results"]["healthy"], True)
def test_check(self):
c = self.create_fake_client()
sb = c.storage_broker
serverid_1 = "\x00"*20
serverid_f = "\xff"*20
server_1 = sb.get_stub_server(serverid_1)
server_f = sb.get_stub_server(serverid_f)
u = uri.CHKFileURI("\x00"*16, "\x00"*32, 3, 10, 1234)
data = { "count_happiness": 8,
"count_shares_needed": 3,
"count_shares_expected": 9,
"count_shares_good": 10,
"count_good_share_hosts": 11,
"count_recoverable_versions": 1,
"count_unrecoverable_versions": 0,
"servers_responding": [],
"sharemap": {"shareid1": [server_1, server_f]},
"count_wrong_shares": 0,
"list_corrupt_shares": [],
"count_corrupt_shares": 0,
"list_incompatible_shares": [],
"count_incompatible_shares": 0,
"report": [], "share_problems": [], "servermap": None,
}
cr = check_results.CheckResults(u, u.get_storage_index(),
healthy=True, recoverable=True,
summary="groovy",
**data)
w = web_check_results.CheckResultsRendererElement(c, cr)
html = self.render_element(w)
s = self.remove_tags(html)
self.failUnlessIn("File Check Results for SI=2k6avp", s) # abbreviated
self.failUnlessIn("Healthy : groovy", s)
self.failUnlessIn("Share Counts: need 3-of-9, have 10", s)
self.failUnlessIn("Happiness Level: 8", s)
self.failUnlessIn("Hosts with good shares: 11", s)
self.failUnlessIn("Corrupt shares: none", s)
self.failUnlessIn("Wrong Shares: 0", s)
self.failUnlessIn("Recoverable Versions: 1", s)
self.failUnlessIn("Unrecoverable Versions: 0", s)
self.failUnlessIn("Good Shares (sorted in share order): Share ID Nickname Node ID shareid1 peer-0 00000000 peer-f ffffffff", s)
cr = check_results.CheckResults(u, u.get_storage_index(),
healthy=False, recoverable=True,
summary="ungroovy",
**data)
w = web_check_results.CheckResultsRendererElement(c, cr)
html = self.render_element(w)
s = self.remove_tags(html)
self.failUnlessIn("File Check Results for SI=2k6avp", s) # abbreviated
self.failUnlessIn("Not Healthy! : ungroovy", s)
data["count_corrupt_shares"] = 1
data["list_corrupt_shares"] = [(server_1, u.get_storage_index(), 2)]
cr = check_results.CheckResults(u, u.get_storage_index(),
healthy=False, recoverable=False,
summary="rather dead",
**data)
w = web_check_results.CheckResultsRendererElement(c, cr)
html = self.render_element(w)
s = self.remove_tags(html)
self.failUnlessIn("File Check Results for SI=2k6avp", s) # abbreviated
self.failUnlessIn("Not Recoverable! : rather dead", s)
self.failUnlessIn("Corrupt shares: Share ID Nickname Node ID sh#2 peer-0 00000000", s)
html = self.render_element(w)
s = self.remove_tags(html)
self.failUnlessIn("File Check Results for SI=2k6avp", s) # abbreviated
self.failUnlessIn("Not Recoverable! : rather dead", s)
html = self.render_element(w, args={"return_to": ["FOOURL"]})
self.failUnlessIn('Return to file/directory.',
html)
w = web_check_results.CheckResultsRenderer(c, cr)
d = self.render_json(w)
def _check_json(jdata):
j = json.loads(jdata)
self.failUnlessEqual(j["summary"], "rather dead")
self.failUnlessEqual(j["storage-index"],
"2k6avpjga3dho3zsjo6nnkt7n4")
expected = {'count-happiness': 8,
'count-shares-expected': 9,
'healthy': False,
'count-unrecoverable-versions': 0,
'count-shares-needed': 3,
'sharemap': {"shareid1":
["v0-00000000-long", "v0-ffffffff-long"]},
'count-recoverable-versions': 1,
'list-corrupt-shares':
[["v0-00000000-long", "2k6avpjga3dho3zsjo6nnkt7n4", 2]],
'count-good-share-hosts': 11,
'count-wrong-shares': 0,
'count-shares-good': 10,
'count-corrupt-shares': 1,
'servers-responding': [],
'recoverable': False,
}
self.failUnlessEqual(j["results"], expected)
_check_json(d)
w = web_check_results.CheckResultsRendererElement(c, cr)
d = self.render_element(w)
def _check(html):
s = self.remove_tags(html)
self.failUnlessIn("File Check Results for SI=2k6avp", s)
self.failUnlessIn("Not Recoverable! : rather dead", s)
_check(html)
def test_check_and_repair(self):
c = self.create_fake_client()
sb = c.storage_broker
serverid_1 = "\x00"*20
serverid_f = "\xff"*20
u = uri.CHKFileURI("\x00"*16, "\x00"*32, 3, 10, 1234)
data = { "count_happiness": 5,
"count_shares_needed": 3,
"count_shares_expected": 10,
"count_shares_good": 6,
"count_good_share_hosts": 7,
"count_recoverable_versions": 1,
"count_unrecoverable_versions": 0,
"servers_responding": [],
"sharemap": {"shareid1": [sb.get_stub_server(serverid_1),
sb.get_stub_server(serverid_f)]},
"count_wrong_shares": 0,
"list_corrupt_shares": [],
"count_corrupt_shares": 0,
"list_incompatible_shares": [],
"count_incompatible_shares": 0,
"report": [], "share_problems": [], "servermap": None,
}
pre_cr = check_results.CheckResults(u, u.get_storage_index(),
healthy=False, recoverable=True,
summary="illing",
**data)
data = { "count_happiness": 9,
"count_shares_needed": 3,
"count_shares_expected": 10,
"count_shares_good": 10,
"count_good_share_hosts": 11,
"count_recoverable_versions": 1,
"count_unrecoverable_versions": 0,
"servers_responding": [],
"sharemap": {"shareid1": [sb.get_stub_server(serverid_1),
sb.get_stub_server(serverid_f)]},
"count_wrong_shares": 0,
"count_corrupt_shares": 0,
"list_corrupt_shares": [],
"list_incompatible_shares": [],
"count_incompatible_shares": 0,
"report": [], "share_problems": [], "servermap": None,
}
post_cr = check_results.CheckResults(u, u.get_storage_index(),
healthy=True, recoverable=True,
summary="groovy",
**data)
crr = check_results.CheckAndRepairResults(u.get_storage_index())
crr.pre_repair_results = pre_cr
crr.post_repair_results = post_cr
crr.repair_attempted = False
w = web_check_results.CheckAndRepairResultsRendererElement(c, crr)
html = self.render_element(w)
s = self.remove_tags(html)
self.failUnlessIn("File Check-And-Repair Results for SI=2k6avp", s)
self.failUnlessIn("Healthy : groovy", s)
self.failUnlessIn("No repair necessary", s)
self.failUnlessIn("Post-Repair Checker Results:", s)
self.failUnlessIn("Share Counts: need 3-of-10, have 10", s)
crr.repair_attempted = True
crr.repair_successful = True
html = self.render_element(w)
s = self.remove_tags(html)
self.failUnlessIn("File Check-And-Repair Results for SI=2k6avp", s)
self.failUnlessIn("Healthy : groovy", s)
self.failUnlessIn("Repair successful", s)
self.failUnlessIn("Post-Repair Checker Results:", s)
crr.repair_attempted = True
crr.repair_successful = False
post_cr = check_results.CheckResults(u, u.get_storage_index(),
healthy=False, recoverable=True,
summary="better",
**data)
crr.post_repair_results = post_cr
html = self.render_element(w)
s = self.remove_tags(html)
self.failUnlessIn("File Check-And-Repair Results for SI=2k6avp", s)
self.failUnlessIn("Not Healthy! : better", s)
self.failUnlessIn("Repair unsuccessful", s)
self.failUnlessIn("Post-Repair Checker Results:", s)
crr.repair_attempted = True
crr.repair_successful = False
post_cr = check_results.CheckResults(u, u.get_storage_index(),
healthy=False, recoverable=False,
summary="worse",
**data)
crr.post_repair_results = post_cr
html = self.render_element(w)
s = self.remove_tags(html)
self.failUnlessIn("File Check-And-Repair Results for SI=2k6avp", s)
self.failUnlessIn("Not Recoverable! : worse", s)
self.failUnlessIn("Repair unsuccessful", s)
self.failUnlessIn("Post-Repair Checker Results:", s)
w2 = web_check_results.CheckAndRepairResultsRenderer(c, crr)
d = self.render_json(w2)
def _got_json(data):
j = json.loads(data)
self.failUnlessEqual(j["repair-attempted"], True)
self.failUnlessEqual(j["storage-index"],
"2k6avpjga3dho3zsjo6nnkt7n4")
self.failUnlessEqual(j["pre-repair-results"]["summary"], "illing")
self.failUnlessEqual(j["post-repair-results"]["summary"], "worse")
_got_json(d)
w3 = web_check_results.CheckAndRepairResultsRenderer(c, None)
d = self.render_json(w3)
def _got_lit_results(data):
j = json.loads(data)
self.failUnlessEqual(j["repair-attempted"], False)
self.failUnlessEqual(j["storage-index"], "")
_got_lit_results(d)
class BalancingAct(GridTestMixin, unittest.TestCase):
# test for #1115 regarding the 'count-good-share-hosts' metric
def add_server(self, server_number, readonly=False):
assert self.g, "I tried to find a grid at self.g, but failed"
ss = self.g.make_server(server_number, readonly)
#log.msg("just created a server, number: %s => %s" % (server_number, ss,))
self.g.add_server(server_number, ss)
def add_server_with_share(self, server_number, uri, share_number=None,
readonly=False):
self.add_server(server_number, readonly)
if share_number is not None:
self.copy_share_to_server(uri, share_number, server_number)
def copy_share_to_server(self, uri, share_number, server_number):
ss = self.g.servers_by_number[server_number]
# Copy share i from the directory associated with the first
# storage server to the directory associated with this one.
assert self.g, "I tried to find a grid at self.g, but failed"
assert self.shares, "I tried to find shares at self.shares, but failed"
old_share_location = self.shares[share_number][2]
new_share_location = os.path.join(ss.storedir, "shares")
si = tahoe_uri.from_string(self.uri).get_storage_index()
new_share_location = os.path.join(new_share_location,
storage_index_to_dir(si))
if not os.path.exists(new_share_location):
os.makedirs(new_share_location)
new_share_location = os.path.join(new_share_location,
str(share_number))
if old_share_location != new_share_location:
shutil.copy(old_share_location, new_share_location)
shares = self.find_uri_shares(uri)
# Make sure that the storage server has the share.
self.failUnless((share_number, ss.my_nodeid, new_share_location)
in shares)
def _pretty_shares_chart(self, uri):
# Servers are labeled A-Z, shares are labeled 0-9
letters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
assert len(self.g.servers_by_number) < len(letters), \
"This little printing function is only meant for < 26 servers"
shares_chart = {}
names = dict(zip([ss.my_nodeid
for _,ss in self.g.servers_by_number.iteritems()],
letters))
for shnum, serverid, _ in self.find_uri_shares(uri):
shares_chart.setdefault(shnum, []).append(names[serverid])
return shares_chart
def test_good_share_hosts(self):
self.basedir = "checker/BalancingAct/1115"
self.set_up_grid(num_servers=1)
c0 = self.g.clients[0]
c0.encoding_params['happy'] = 1
c0.encoding_params['n'] = 4
c0.encoding_params['k'] = 3
DATA = "data" * 100
d = c0.upload(Data(DATA, convergence=""))
def _stash_immutable(ur):
self.imm = c0.create_node_from_uri(ur.get_uri())
self.uri = self.imm.get_uri()
d.addCallback(_stash_immutable)
d.addCallback(lambda ign:
self.find_uri_shares(self.uri))
def _store_shares(shares):
self.shares = shares
d.addCallback(_store_shares)
def add_three(_, i):
# Add a new server with just share 3
self.add_server_with_share(i, self.uri, 3)
#print self._pretty_shares_chart(self.uri)
for i in range(1,5):
d.addCallback(add_three, i)
def _check_and_repair(_):
return self.imm.check_and_repair(Monitor())
def _check_counts(crr, shares_good, good_share_hosts):
prr = crr.get_post_repair_results()
self.failUnlessEqual(prr.get_share_counter_good(), shares_good)
self.failUnlessEqual(prr.get_host_counter_good_shares(),
good_share_hosts)
"""
Initial sharemap:
0:[A] 1:[A] 2:[A] 3:[A,B,C,D,E]
4 good shares, but 5 good hosts
After deleting all instances of share #3 and repairing:
0:[A], 1:[A,B], 2:[C,A], 3:[E]
# actually: {0: ['E', 'A'], 1: ['C', 'A'], 2: ['A', 'B'], 3: ['D']}
Still 4 good shares but now 4 good hosts
"""
d.addCallback(_check_and_repair)
d.addCallback(_check_counts, 4, 5)
d.addCallback(lambda _: self.delete_shares_numbered(self.uri, [3]))
d.addCallback(_check_and_repair)
# it can happen that our uploader will choose, e.g., to upload
# to servers B, C, D, E .. which will mean that all 5 serves
# now contain our shares (and thus "respond").
def _check_happy(crr):
prr = crr.get_post_repair_results()
self.assertTrue(prr.get_host_counter_good_shares() >= 4)
return crr
d.addCallback(_check_happy)
d.addCallback(lambda _: all([self.g.break_server(sid)
for sid in self.g.get_all_serverids()]))
d.addCallback(_check_and_repair)
d.addCallback(_check_counts, 0, 0)
return d
class AddLease(GridTestMixin, unittest.TestCase):
# test for #875, in which failures in the add-lease call cause
# false-negatives in the checker
def test_875(self):
self.basedir = "checker/AddLease/875"
self.set_up_grid(num_servers=1)
c0 = self.g.clients[0]
c0.encoding_params['happy'] = 1
self.uris = {}
DATA = "data" * 100
d = c0.upload(Data(DATA, convergence=""))
def _stash_immutable(ur):
self.imm = c0.create_node_from_uri(ur.get_uri())
d.addCallback(_stash_immutable)
d.addCallback(lambda ign:
c0.create_mutable_file(MutableData("contents")))
def _stash_mutable(node):
self.mut = node
d.addCallback(_stash_mutable)
def _check_cr(cr, which):
self.failUnless(cr.is_healthy(), which)
# these two should work normally
d.addCallback(lambda ign: self.imm.check(Monitor(), add_lease=True))
d.addCallback(_check_cr, "immutable-normal")
d.addCallback(lambda ign: self.mut.check(Monitor(), add_lease=True))
d.addCallback(_check_cr, "mutable-normal")
really_did_break = []
# now break the server's remote_add_lease call
def _break_add_lease(ign):
def broken_add_lease(*args, **kwargs):
really_did_break.append(1)
raise KeyError("intentional failure, should be ignored")
assert self.g.servers_by_number[0].remote_add_lease
self.g.servers_by_number[0].remote_add_lease = broken_add_lease
d.addCallback(_break_add_lease)
# and confirm that the files still look healthy
d.addCallback(lambda ign: self.mut.check(Monitor(), add_lease=True))
d.addCallback(_check_cr, "mutable-broken")
d.addCallback(lambda ign: self.imm.check(Monitor(), add_lease=True))
d.addCallback(_check_cr, "immutable-broken")
d.addCallback(lambda ign: self.failUnless(really_did_break))
return d
class CounterHolder(object):
def __init__(self):
self._num_active_block_fetches = 0
self._max_active_block_fetches = 0
from allmydata.immutable.checker import ValidatedReadBucketProxy
class MockVRBP(ValidatedReadBucketProxy):
def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size, counterholder):
ValidatedReadBucketProxy.__init__(self, sharenum, bucket,
share_hash_tree, num_blocks,
block_size, share_size)
self.counterholder = counterholder
def get_block(self, blocknum):
self.counterholder._num_active_block_fetches += 1
if self.counterholder._num_active_block_fetches > self.counterholder._max_active_block_fetches:
self.counterholder._max_active_block_fetches = self.counterholder._num_active_block_fetches
d = ValidatedReadBucketProxy.get_block(self, blocknum)
def _mark_no_longer_active(res):
self.counterholder._num_active_block_fetches -= 1
return res
d.addBoth(_mark_no_longer_active)
return d
class TooParallel(GridTestMixin, unittest.TestCase):
# bug #1395: immutable verifier was aggressively parallized, checking all
# blocks of all shares at the same time, blowing our memory budget and
# crashing with MemoryErrors on >1GB files.
def test_immutable(self):
import allmydata.immutable.checker
origVRBP = allmydata.immutable.checker.ValidatedReadBucketProxy
self.basedir = "checker/TooParallel/immutable"
# If any code asks to instantiate a ValidatedReadBucketProxy,
# we give them a MockVRBP which is configured to use our
# CounterHolder.
counterholder = CounterHolder()
def make_mock_VRBP(*args, **kwargs):
return MockVRBP(counterholder=counterholder, *args, **kwargs)
allmydata.immutable.checker.ValidatedReadBucketProxy = make_mock_VRBP
d = defer.succeed(None)
def _start(ign):
self.set_up_grid(num_servers=4)
self.c0 = self.g.clients[0]
self.c0.encoding_params = { "k": 1,
"happy": 4,
"n": 4,
"max_segment_size": 5,
}
self.uris = {}
DATA = "data" * 100 # 400/5 = 80 blocks
return self.c0.upload(Data(DATA, convergence=""))
d.addCallback(_start)
def _do_check(ur):
n = self.c0.create_node_from_uri(ur.get_uri())
return n.check(Monitor(), verify=True)
d.addCallback(_do_check)
def _check(cr):
# the verifier works on all 4 shares in parallel, but only
# fetches one block from each share at a time, so we expect to
# see 4 parallel fetches
self.failUnlessEqual(counterholder._max_active_block_fetches, 4)
d.addCallback(_check)
def _clean_up(res):
allmydata.immutable.checker.ValidatedReadBucketProxy = origVRBP
return res
d.addBoth(_clean_up)
return d