mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-01 08:48:01 +00:00
storage: add remote_advise_corrupt_share, for clients to tell storage servers about share corruption that they've discovered. The server logs the report.
This commit is contained in:
parent
6dbef907ac
commit
db37c14ab7
@ -55,6 +55,18 @@ class RIBucketReader(RemoteInterface):
|
|||||||
def read(offset=Offset, length=ReadSize):
|
def read(offset=Offset, length=ReadSize):
|
||||||
return ShareData
|
return ShareData
|
||||||
|
|
||||||
|
def advise_corrupt_share(reason=str):
|
||||||
|
"""Clients who discover hash failures in shares that they have
|
||||||
|
downloaded from me will use this method to inform me about the
|
||||||
|
failures. I will record their concern so that my operator can
|
||||||
|
manually inspect the shares in question. I return None.
|
||||||
|
|
||||||
|
This is a wrapper around RIStorageServer.advise_corrupt_share(),
|
||||||
|
which is tied to a specific share, and therefore does not need the
|
||||||
|
extra share-identifying arguments. Please see that method for full
|
||||||
|
documentation.
|
||||||
|
"""
|
||||||
|
|
||||||
TestVector = ListOf(TupleOf(Offset, ReadSize, str, str))
|
TestVector = ListOf(TupleOf(Offset, ReadSize, str, str))
|
||||||
# elements are (offset, length, operator, specimen)
|
# elements are (offset, length, operator, specimen)
|
||||||
# operator is one of "lt, le, eq, ne, ge, gt"
|
# operator is one of "lt, le, eq, ne, ge, gt"
|
||||||
@ -230,6 +242,23 @@ class RIStorageServer(RemoteInterface):
|
|||||||
"""
|
"""
|
||||||
return TupleOf(bool, DictOf(int, ReadData))
|
return TupleOf(bool, DictOf(int, ReadData))
|
||||||
|
|
||||||
|
def advise_corrupt_share(share_type=str, storage_index=StorageIndex,
|
||||||
|
shnum=int, reason=str):
|
||||||
|
"""Clients who discover hash failures in shares that they have
|
||||||
|
downloaded from me will use this method to inform me about the
|
||||||
|
failures. I will record their concern so that my operator can
|
||||||
|
manually inspect the shares in question. I return None.
|
||||||
|
|
||||||
|
'share_type' is either 'mutable' or 'immutable'. 'storage_index' is a
|
||||||
|
(binary) storage index string, and 'shnum' is the integer share
|
||||||
|
number. 'reason' is a human-readable explanation of the problem,
|
||||||
|
probably including some expected hash values and the computed ones
|
||||||
|
which did not match. Corruption advisories for mutable shares should
|
||||||
|
include a hash of the public key (the same value that appears in the
|
||||||
|
mutable-file verify-cap), since the current share format does not
|
||||||
|
store that on disk.
|
||||||
|
"""
|
||||||
|
|
||||||
class IStorageBucketWriter(Interface):
|
class IStorageBucketWriter(Interface):
|
||||||
"""
|
"""
|
||||||
Objects of this kind live on the client side.
|
Objects of this kind live on the client side.
|
||||||
|
@ -7,7 +7,7 @@ from twisted.application import service
|
|||||||
from zope.interface import implements
|
from zope.interface import implements
|
||||||
from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
|
from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
|
||||||
RIBucketReader, BadWriteEnablerError, IStatsProducer
|
RIBucketReader, BadWriteEnablerError, IStatsProducer
|
||||||
from allmydata.util import base32, fileutil, idlib, log
|
from allmydata.util import base32, fileutil, idlib, log, time_format
|
||||||
from allmydata.util.assertutil import precondition
|
from allmydata.util.assertutil import precondition
|
||||||
import allmydata # for __version__
|
import allmydata # for __version__
|
||||||
|
|
||||||
@ -322,9 +322,11 @@ class BucketWriter(Referenceable):
|
|||||||
class BucketReader(Referenceable):
|
class BucketReader(Referenceable):
|
||||||
implements(RIBucketReader)
|
implements(RIBucketReader)
|
||||||
|
|
||||||
def __init__(self, ss, sharefname):
|
def __init__(self, ss, sharefname, storage_index=None, shnum=None):
|
||||||
self.ss = ss
|
self.ss = ss
|
||||||
self._share_file = ShareFile(sharefname)
|
self._share_file = ShareFile(sharefname)
|
||||||
|
self.storage_index = storage_index
|
||||||
|
self.shnum = shnum
|
||||||
|
|
||||||
def remote_read(self, offset, length):
|
def remote_read(self, offset, length):
|
||||||
start = time.time()
|
start = time.time()
|
||||||
@ -333,6 +335,11 @@ class BucketReader(Referenceable):
|
|||||||
self.ss.count("read")
|
self.ss.count("read")
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
def remote_advise_corrupt_share(self, reason):
|
||||||
|
return self.ss.remote_advise_corrupt_share("immutable",
|
||||||
|
self.storage_index,
|
||||||
|
self.shnum,
|
||||||
|
reason)
|
||||||
|
|
||||||
# the MutableShareFile is like the ShareFile, but used for mutable data. It
|
# the MutableShareFile is like the ShareFile, but used for mutable data. It
|
||||||
# has a different layout. See docs/mutable.txt for more details.
|
# has a different layout. See docs/mutable.txt for more details.
|
||||||
@ -770,6 +777,9 @@ class StorageServer(service.MultiService, Referenceable):
|
|||||||
sharedir = os.path.join(storedir, "shares")
|
sharedir = os.path.join(storedir, "shares")
|
||||||
fileutil.make_dirs(sharedir)
|
fileutil.make_dirs(sharedir)
|
||||||
self.sharedir = sharedir
|
self.sharedir = sharedir
|
||||||
|
# we don't actually create the corruption-advisory dir until necessary
|
||||||
|
self.corruption_advisory_dir = os.path.join(storedir,
|
||||||
|
"corruption-advisories")
|
||||||
self.sizelimit = sizelimit
|
self.sizelimit = sizelimit
|
||||||
self.no_storage = discard_storage
|
self.no_storage = discard_storage
|
||||||
self.readonly_storage = readonly_storage
|
self.readonly_storage = readonly_storage
|
||||||
@ -1075,7 +1085,8 @@ class StorageServer(service.MultiService, Referenceable):
|
|||||||
log.msg("storage: get_buckets %s" % si_s)
|
log.msg("storage: get_buckets %s" % si_s)
|
||||||
bucketreaders = {} # k: sharenum, v: BucketReader
|
bucketreaders = {} # k: sharenum, v: BucketReader
|
||||||
for shnum, filename in self._get_bucket_shares(storage_index):
|
for shnum, filename in self._get_bucket_shares(storage_index):
|
||||||
bucketreaders[shnum] = BucketReader(self, filename)
|
bucketreaders[shnum] = BucketReader(self, filename,
|
||||||
|
storage_index, shnum)
|
||||||
self.add_latency("get", time.time() - start)
|
self.add_latency("get", time.time() - start)
|
||||||
return bucketreaders
|
return bucketreaders
|
||||||
|
|
||||||
@ -1206,3 +1217,25 @@ class StorageServer(service.MultiService, Referenceable):
|
|||||||
facility="tahoe.storage", level=log.NOISY, parent=lp)
|
facility="tahoe.storage", level=log.NOISY, parent=lp)
|
||||||
self.add_latency("readv", time.time() - start)
|
self.add_latency("readv", time.time() - start)
|
||||||
return datavs
|
return datavs
|
||||||
|
|
||||||
|
def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
|
||||||
|
reason):
|
||||||
|
fileutil.make_dirs(self.corruption_advisory_dir)
|
||||||
|
now = time_format.iso_utc(sep="T")
|
||||||
|
si_s = base32.b2a(storage_index)
|
||||||
|
fn = os.path.join(self.corruption_advisory_dir,
|
||||||
|
"%s--%s-%d" % (now, si_s, shnum))
|
||||||
|
f = open(fn, "w")
|
||||||
|
f.write("report: Share Corruption\n")
|
||||||
|
f.write("type: %s\n" % share_type)
|
||||||
|
f.write("storage_index: %s\n" % si_s)
|
||||||
|
f.write("share_number: %d\n" % shnum)
|
||||||
|
f.write("\n")
|
||||||
|
f.write(reason)
|
||||||
|
f.write("\n")
|
||||||
|
f.close()
|
||||||
|
log.msg(format=("client claims corruption in (%(share_type)s) " +
|
||||||
|
"%(si)s-%(shnum)d: %(reason)s"),
|
||||||
|
share_type=share_type, si=si_s, shnum=shnum, reason=reason,
|
||||||
|
level=log.SCARY, umid="SGx2fA")
|
||||||
|
return None
|
||||||
|
@ -5,7 +5,7 @@ from twisted.internet import defer
|
|||||||
import time, os.path, stat
|
import time, os.path, stat
|
||||||
import itertools
|
import itertools
|
||||||
from allmydata import interfaces
|
from allmydata import interfaces
|
||||||
from allmydata.util import fileutil, hashutil
|
from allmydata.util import fileutil, hashutil, base32
|
||||||
from allmydata.storage import BucketWriter, BucketReader, \
|
from allmydata.storage import BucketWriter, BucketReader, \
|
||||||
StorageServer, MutableShareFile, \
|
StorageServer, MutableShareFile, \
|
||||||
storage_index_to_dir, DataTooLargeError, LeaseInfo
|
storage_index_to_dir, DataTooLargeError, LeaseInfo
|
||||||
@ -592,6 +592,51 @@ class Server(unittest.TestCase):
|
|||||||
self.failUnlessEqual(set(b.keys()), set([0,1,2]))
|
self.failUnlessEqual(set(b.keys()), set([0,1,2]))
|
||||||
self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
|
self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
|
||||||
|
|
||||||
|
def test_advise_corruption(self):
|
||||||
|
workdir = self.workdir("test_advise_corruption")
|
||||||
|
ss = StorageServer(workdir, discard_storage=True)
|
||||||
|
ss.setNodeID("\x00" * 20)
|
||||||
|
ss.setServiceParent(self.sparent)
|
||||||
|
|
||||||
|
si0_s = base32.b2a("si0")
|
||||||
|
ss.remote_advise_corrupt_share("immutable", "si0", 0,
|
||||||
|
"This share smells funny.\n")
|
||||||
|
reportdir = os.path.join(workdir, "corruption-advisories")
|
||||||
|
reports = os.listdir(reportdir)
|
||||||
|
self.failUnlessEqual(len(reports), 1)
|
||||||
|
report_si0 = reports[0]
|
||||||
|
self.failUnless(si0_s in report_si0, report_si0)
|
||||||
|
f = open(os.path.join(reportdir, report_si0), "r")
|
||||||
|
report = f.read()
|
||||||
|
f.close()
|
||||||
|
self.failUnless("type: immutable" in report)
|
||||||
|
self.failUnless(("storage_index: %s" % si0_s) in report)
|
||||||
|
self.failUnless("share_number: 0" in report)
|
||||||
|
self.failUnless("This share smells funny." in report)
|
||||||
|
|
||||||
|
# test the RIBucketWriter version too
|
||||||
|
si1_s = base32.b2a("si1")
|
||||||
|
already,writers = self.allocate(ss, "si1", [1], 75)
|
||||||
|
self.failUnlessEqual(already, set())
|
||||||
|
self.failUnlessEqual(set(writers.keys()), set([1]))
|
||||||
|
writers[1].remote_write(0, "data")
|
||||||
|
writers[1].remote_close()
|
||||||
|
|
||||||
|
b = ss.remote_get_buckets("si1")
|
||||||
|
self.failUnlessEqual(set(b.keys()), set([1]))
|
||||||
|
b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
|
||||||
|
|
||||||
|
reports = os.listdir(reportdir)
|
||||||
|
self.failUnlessEqual(len(reports), 2)
|
||||||
|
report_si1 = [r for r in reports if si1_s in r][0]
|
||||||
|
f = open(os.path.join(reportdir, report_si1), "r")
|
||||||
|
report = f.read()
|
||||||
|
f.close()
|
||||||
|
self.failUnless("type: immutable" in report)
|
||||||
|
self.failUnless(("storage_index: %s" % si1_s) in report)
|
||||||
|
self.failUnless("share_number: 1" in report)
|
||||||
|
self.failUnless("This share tastes like dust." in report)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class MutableServer(unittest.TestCase):
|
class MutableServer(unittest.TestCase):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user