mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-06-22 16:58:58 +00:00
storage: we must truncate short segments. Now most tests pass (except uri_extension)
This commit is contained in:
@ -119,7 +119,7 @@ class ValidatedBucket:
|
|||||||
if not self._share_hash:
|
if not self._share_hash:
|
||||||
d1 = self.bucket.get_share_hashes()
|
d1 = self.bucket.get_share_hashes()
|
||||||
else:
|
else:
|
||||||
d1 = defer.succeed(None)
|
d1 = defer.succeed([])
|
||||||
|
|
||||||
# we might need to grab some elements of our block hash tree, to
|
# we might need to grab some elements of our block hash tree, to
|
||||||
# validate the requested block up to the share hash
|
# validate the requested block up to the share hash
|
||||||
@ -149,9 +149,12 @@ class ValidatedBucket:
|
|||||||
sht.set_hashes(sh)
|
sht.set_hashes(sh)
|
||||||
self._share_hash = sht.get_leaf(self.sharenum)
|
self._share_hash = sht.get_leaf(self.sharenum)
|
||||||
|
|
||||||
#log.msg("checking block_hash(shareid=%d, blocknum=%d) len=%d" %
|
|
||||||
# (self.sharenum, blocknum, len(blockdata)))
|
|
||||||
blockhash = hashutil.block_hash(blockdata)
|
blockhash = hashutil.block_hash(blockdata)
|
||||||
|
#log.msg("checking block_hash(shareid=%d, blocknum=%d) len=%d "
|
||||||
|
# "%r .. %r: %s" %
|
||||||
|
# (self.sharenum, blocknum, len(blockdata),
|
||||||
|
# blockdata[:50], blockdata[-50:], idlib.b2a(blockhash)))
|
||||||
|
|
||||||
# we always validate the blockhash
|
# we always validate the blockhash
|
||||||
bh = dict(enumerate(blockhashes))
|
bh = dict(enumerate(blockhashes))
|
||||||
# replace blockhash root with validated value
|
# replace blockhash root with validated value
|
||||||
@ -163,20 +166,33 @@ class ValidatedBucket:
|
|||||||
# likely a programming error
|
# likely a programming error
|
||||||
log.msg("hash failure in block=%d, shnum=%d on %s" %
|
log.msg("hash failure in block=%d, shnum=%d on %s" %
|
||||||
(blocknum, self.sharenum, self.bucket))
|
(blocknum, self.sharenum, self.bucket))
|
||||||
#log.msg(" block length: %d" % len(blockdata))
|
if self._share_hash:
|
||||||
#log.msg(" block hash: %s" % idlib.b2a_or_none(blockhash)) # not safe
|
log.msg(""" failure occurred when checking the block_hash_tree.
|
||||||
#log.msg(" block data: %r" % (blockdata,))
|
This suggests that either the block data was bad, or that the
|
||||||
#log.msg(" root hash: %s" % idlib.b2a(self._roothash))
|
block hashes we received along with it were bad.""")
|
||||||
#log.msg(" share hash tree:\n" + self.share_hash_tree.dump())
|
else:
|
||||||
#log.msg(" block hash tree:\n" + self.block_hash_tree.dump())
|
log.msg(""" the failure probably occurred when checking the
|
||||||
#lines = []
|
share_hash_tree, which suggests that the share hashes we
|
||||||
#for i,h in sorted(sharehashes):
|
received from the remote peer were bad.""")
|
||||||
# lines.append("%3d: %s" % (i, idlib.b2a_or_none(h)))
|
log.msg(" have self._share_hash: %s" % bool(self._share_hash))
|
||||||
#log.msg(" sharehashes:\n" + "\n".join(lines) + "\n")
|
log.msg(" block length: %d" % len(blockdata))
|
||||||
#lines = []
|
log.msg(" block hash: %s" % idlib.b2a_or_none(blockhash)) # not safe
|
||||||
#for i,h in enumerate(blockhashes):
|
if len(blockdata) < 100:
|
||||||
# lines.append("%3d: %s" % (i, idlib.b2a_or_none(h)))
|
log.msg(" block data: %r" % (blockdata,))
|
||||||
#log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
|
else:
|
||||||
|
log.msg(" block data start/end: %r .. %r" %
|
||||||
|
(blockdata[:50], blockdata[-50:]))
|
||||||
|
log.msg(" root hash: %s" % idlib.b2a(self._roothash))
|
||||||
|
log.msg(" share hash tree:\n" + self.share_hash_tree.dump())
|
||||||
|
log.msg(" block hash tree:\n" + self.block_hash_tree.dump())
|
||||||
|
lines = []
|
||||||
|
for i,h in sorted(sharehashes):
|
||||||
|
lines.append("%3d: %s" % (i, idlib.b2a_or_none(h)))
|
||||||
|
log.msg(" sharehashes:\n" + "\n".join(lines) + "\n")
|
||||||
|
lines = []
|
||||||
|
for i,h in enumerate(blockhashes):
|
||||||
|
lines.append("%3d: %s" % (i, idlib.b2a_or_none(h)))
|
||||||
|
log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# If we made it here, the block is good. If the hash trees didn't
|
# If we made it here, the block is good. If the hash trees didn't
|
||||||
|
@ -302,6 +302,11 @@ class Encoder(object):
|
|||||||
d = self.send_subshare(shareid, segnum, subshare)
|
d = self.send_subshare(shareid, segnum, subshare)
|
||||||
dl.append(d)
|
dl.append(d)
|
||||||
subshare_hash = hashutil.block_hash(subshare)
|
subshare_hash = hashutil.block_hash(subshare)
|
||||||
|
#from allmydata.util import idlib
|
||||||
|
#log.msg("creating block (shareid=%d, blocknum=%d) "
|
||||||
|
# "len=%d %r .. %r: %s" %
|
||||||
|
# (shareid, segnum, len(subshare),
|
||||||
|
# subshare[:50], subshare[-50:], idlib.b2a(subshare_hash)))
|
||||||
self.subshare_hashes[shareid].append(subshare_hash)
|
self.subshare_hashes[shareid].append(subshare_hash)
|
||||||
dl = self._gather_responses(dl)
|
dl = self._gather_responses(dl)
|
||||||
def _logit(res):
|
def _logit(res):
|
||||||
|
@ -6,9 +6,8 @@ from twisted.internet import defer
|
|||||||
|
|
||||||
from zope.interface import implements
|
from zope.interface import implements
|
||||||
from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
|
from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
|
||||||
RIBucketReader, IStorageBucketWriter, IStorageBucketReader
|
RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE
|
||||||
from allmydata import interfaces
|
from allmydata.util import fileutil, idlib, mathutil
|
||||||
from allmydata.util import fileutil, idlib
|
|
||||||
from allmydata.util.assertutil import precondition
|
from allmydata.util.assertutil import precondition
|
||||||
|
|
||||||
# store/
|
# store/
|
||||||
@ -151,22 +150,23 @@ a series of four-byte big-endian offset values, which indicate where each
|
|||||||
section starts. Each offset is measured from the beginning of the file.
|
section starts. Each offset is measured from the beginning of the file.
|
||||||
|
|
||||||
0x00: segment size
|
0x00: segment size
|
||||||
0x04: offset of data (=00 00 00 1c)
|
0x04: data size
|
||||||
0x08: offset of plaintext_hash_tree
|
0x08: offset of data (=00 00 00 1c)
|
||||||
0x0c: offset of crypttext_hash_tree
|
0x0c: offset of plaintext_hash_tree
|
||||||
0x10: offset of block_hashes
|
0x10: offset of crypttext_hash_tree
|
||||||
0x14: offset of share_hashes
|
0x14: offset of block_hashes
|
||||||
0x18: offset of uri_extension_length + uri_extension
|
0x18: offset of share_hashes
|
||||||
0x1c: start of data
|
0x1c: offset of uri_extension_length + uri_extension
|
||||||
start of plaintext_hash_tree
|
0x20: start of data
|
||||||
start of crypttext_hash_tree
|
? : start of plaintext_hash_tree
|
||||||
start of block_hashes
|
? : start of crypttext_hash_tree
|
||||||
start of share_hashes
|
? : start of block_hashes
|
||||||
|
? : start of share_hashes
|
||||||
each share_hash is written as a two-byte (big-endian) hashnum
|
each share_hash is written as a two-byte (big-endian) hashnum
|
||||||
followed by the 32-byte SHA-256 hash. We only store the hashes
|
followed by the 32-byte SHA-256 hash. We only store the hashes
|
||||||
necessary to validate the share hash root
|
necessary to validate the share hash root
|
||||||
start of uri_extension_length (four-byte big-endian value)
|
? : start of uri_extension_length (four-byte big-endian value)
|
||||||
start of uri_extension
|
? : start of uri_extension
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def allocated_size(data_size, num_segments, num_share_hashes,
|
def allocated_size(data_size, num_segments, num_share_hashes,
|
||||||
@ -181,10 +181,10 @@ class WriteBucketProxy:
|
|||||||
def __init__(self, rref, data_size, segment_size, num_segments,
|
def __init__(self, rref, data_size, segment_size, num_segments,
|
||||||
num_share_hashes, uri_extension_size):
|
num_share_hashes, uri_extension_size):
|
||||||
self._rref = rref
|
self._rref = rref
|
||||||
|
self._data_size = data_size
|
||||||
self._segment_size = segment_size
|
self._segment_size = segment_size
|
||||||
self._num_segments = num_segments
|
self._num_segments = num_segments
|
||||||
|
|
||||||
HASH_SIZE = interfaces.HASH_SIZE
|
|
||||||
self._segment_hash_size = (2*num_segments - 1) * HASH_SIZE
|
self._segment_hash_size = (2*num_segments - 1) * HASH_SIZE
|
||||||
# how many share hashes are included in each share? This will be
|
# how many share hashes are included in each share? This will be
|
||||||
# about ln2(num_shares).
|
# about ln2(num_shares).
|
||||||
@ -193,7 +193,7 @@ class WriteBucketProxy:
|
|||||||
self._uri_extension_size = uri_extension_size
|
self._uri_extension_size = uri_extension_size
|
||||||
|
|
||||||
offsets = self._offsets = {}
|
offsets = self._offsets = {}
|
||||||
x = 0x1c
|
x = 0x20
|
||||||
offsets['data'] = x
|
offsets['data'] = x
|
||||||
x += data_size
|
x += data_size
|
||||||
offsets['plaintext_hash_tree'] = x
|
offsets['plaintext_hash_tree'] = x
|
||||||
@ -206,16 +206,17 @@ class WriteBucketProxy:
|
|||||||
x += self._share_hash_size
|
x += self._share_hash_size
|
||||||
offsets['uri_extension'] = x
|
offsets['uri_extension'] = x
|
||||||
|
|
||||||
offset_data = struct.pack(">LLLLLLL",
|
offset_data = struct.pack(">LLLLLLLL",
|
||||||
segment_size,
|
segment_size,
|
||||||
|
data_size,
|
||||||
offsets['data'],
|
offsets['data'],
|
||||||
offsets['plaintext_hash_tree'],
|
offsets['plaintext_hash_tree'],
|
||||||
offsets['crypttext_hash_tree'],
|
offsets['crypttext_hash_tree'],
|
||||||
offsets['block_hashes'],
|
offsets['block_hashes'],
|
||||||
offsets['share_hashes'],
|
offsets['share_hashes'],
|
||||||
offsets['uri_extension']
|
offsets['uri_extension'],
|
||||||
)
|
)
|
||||||
assert len(offset_data) == 7*4
|
assert len(offset_data) == 8*4
|
||||||
self._offset_data = offset_data
|
self._offset_data = offset_data
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
@ -229,7 +230,9 @@ class WriteBucketProxy:
|
|||||||
precondition(len(data) == self._segment_size,
|
precondition(len(data) == self._segment_size,
|
||||||
len(data), self._segment_size)
|
len(data), self._segment_size)
|
||||||
else:
|
else:
|
||||||
precondition(len(data) <= self._segment_size,
|
precondition(len(data) == (self._data_size -
|
||||||
|
(self._segment_size *
|
||||||
|
(self._num_segments - 1))),
|
||||||
len(data), self._segment_size)
|
len(data), self._segment_size)
|
||||||
return self._write(offset, data)
|
return self._write(offset, data)
|
||||||
|
|
||||||
@ -298,17 +301,19 @@ class ReadBucketProxy:
|
|||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
# TODO: for small shares, read the whole bucket in start()
|
# TODO: for small shares, read the whole bucket in start()
|
||||||
d = self._read(0, 7*4)
|
d = self._read(0, 8*4)
|
||||||
self._offsets = {}
|
self._offsets = {}
|
||||||
def _got_offsets(data):
|
def _got_offsets(data):
|
||||||
self._segment_size = struct.unpack(">L", data[0:4])[0]
|
self._segment_size = struct.unpack(">L", data[0:4])[0]
|
||||||
x = 4
|
self._data_size = struct.unpack(">L", data[4:8])[0]
|
||||||
|
x = 0x08
|
||||||
for field in ( 'data',
|
for field in ( 'data',
|
||||||
'plaintext_hash_tree',
|
'plaintext_hash_tree',
|
||||||
'crypttext_hash_tree',
|
'crypttext_hash_tree',
|
||||||
'block_hashes',
|
'block_hashes',
|
||||||
'share_hashes',
|
'share_hashes',
|
||||||
'uri_extension' ):
|
'uri_extension',
|
||||||
|
):
|
||||||
offset = struct.unpack(">L", data[x:x+4])[0]
|
offset = struct.unpack(">L", data[x:x+4])[0]
|
||||||
x += 4
|
x += 4
|
||||||
self._offsets[field] = offset
|
self._offsets[field] = offset
|
||||||
@ -316,13 +321,20 @@ class ReadBucketProxy:
|
|||||||
return d
|
return d
|
||||||
|
|
||||||
def get_block(self, blocknum):
|
def get_block(self, blocknum):
|
||||||
|
num_segments = mathutil.div_ceil(self._data_size, self._segment_size)
|
||||||
|
if blocknum < num_segments-1:
|
||||||
|
size = self._segment_size
|
||||||
|
else:
|
||||||
|
size = self._data_size % self._segment_size
|
||||||
|
if size == 0:
|
||||||
|
size = self._segment_size
|
||||||
offset = self._offsets['data'] + blocknum * self._segment_size
|
offset = self._offsets['data'] + blocknum * self._segment_size
|
||||||
return self._read(offset, self._segment_size)
|
return self._read(offset, size)
|
||||||
|
|
||||||
def _str2l(self, s):
|
def _str2l(self, s):
|
||||||
""" split string (pulled from storage) into a list of blockids """
|
""" split string (pulled from storage) into a list of blockids """
|
||||||
return [ s[i:i+interfaces.HASH_SIZE]
|
return [ s[i:i+HASH_SIZE]
|
||||||
for i in range(0, len(s), interfaces.HASH_SIZE) ]
|
for i in range(0, len(s), HASH_SIZE) ]
|
||||||
|
|
||||||
def get_plaintext_hashes(self):
|
def get_plaintext_hashes(self):
|
||||||
offset = self._offsets['plaintext_hash_tree']
|
offset = self._offsets['plaintext_hash_tree']
|
||||||
@ -348,7 +360,6 @@ class ReadBucketProxy:
|
|||||||
def get_share_hashes(self):
|
def get_share_hashes(self):
|
||||||
offset = self._offsets['share_hashes']
|
offset = self._offsets['share_hashes']
|
||||||
size = self._offsets['uri_extension'] - offset
|
size = self._offsets['uri_extension'] - offset
|
||||||
HASH_SIZE = interfaces.HASH_SIZE
|
|
||||||
assert size % (2+HASH_SIZE) == 0
|
assert size % (2+HASH_SIZE) == 0
|
||||||
d = self._read(offset, size)
|
d = self._read(offset, size)
|
||||||
def _unpack_share_hashes(data):
|
def _unpack_share_hashes(data):
|
||||||
|
@ -102,7 +102,7 @@ class BucketProxy(unittest.TestCase):
|
|||||||
|
|
||||||
bw, rb, final = self.make_bucket("test_readwrite", 1406)
|
bw, rb, final = self.make_bucket("test_readwrite", 1406)
|
||||||
bp = WriteBucketProxy(rb,
|
bp = WriteBucketProxy(rb,
|
||||||
data_size=100,
|
data_size=95,
|
||||||
segment_size=25,
|
segment_size=25,
|
||||||
num_segments=4,
|
num_segments=4,
|
||||||
num_share_hashes=3,
|
num_share_hashes=3,
|
||||||
@ -112,7 +112,7 @@ class BucketProxy(unittest.TestCase):
|
|||||||
d.addCallback(lambda res: bp.put_block(0, "a"*25))
|
d.addCallback(lambda res: bp.put_block(0, "a"*25))
|
||||||
d.addCallback(lambda res: bp.put_block(1, "b"*25))
|
d.addCallback(lambda res: bp.put_block(1, "b"*25))
|
||||||
d.addCallback(lambda res: bp.put_block(2, "c"*25))
|
d.addCallback(lambda res: bp.put_block(2, "c"*25))
|
||||||
d.addCallback(lambda res: bp.put_block(3, "d"*25))
|
d.addCallback(lambda res: bp.put_block(3, "d"*20))
|
||||||
d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
|
d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
|
||||||
d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
|
d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
|
||||||
d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
|
d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
|
||||||
@ -136,7 +136,7 @@ class BucketProxy(unittest.TestCase):
|
|||||||
d1.addCallback(lambda res: rbp.get_block(2))
|
d1.addCallback(lambda res: rbp.get_block(2))
|
||||||
d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
|
d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
|
||||||
d1.addCallback(lambda res: rbp.get_block(3))
|
d1.addCallback(lambda res: rbp.get_block(3))
|
||||||
d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*25))
|
d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
|
||||||
|
|
||||||
d1.addCallback(lambda res: rbp.get_plaintext_hashes())
|
d1.addCallback(lambda res: rbp.get_plaintext_hashes())
|
||||||
d1.addCallback(lambda res:
|
d1.addCallback(lambda res:
|
||||||
|
@ -4,7 +4,7 @@ from twisted.python.failure import Failure
|
|||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from cStringIO import StringIO
|
from cStringIO import StringIO
|
||||||
|
|
||||||
from allmydata import upload, encode, storageserver
|
from allmydata import upload, encode
|
||||||
from allmydata.uri import unpack_uri, unpack_lit
|
from allmydata.uri import unpack_uri, unpack_lit
|
||||||
from allmydata.util.assertutil import precondition
|
from allmydata.util.assertutil import precondition
|
||||||
from foolscap import eventual
|
from foolscap import eventual
|
||||||
@ -35,7 +35,7 @@ class FakeStorageServer:
|
|||||||
return d
|
return d
|
||||||
|
|
||||||
def allocate_buckets(self, crypttext_hash, sharenums,
|
def allocate_buckets(self, crypttext_hash, sharenums,
|
||||||
share_size, blocksize, canary):
|
share_size, canary):
|
||||||
#print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
|
#print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
|
||||||
if self.mode == "full":
|
if self.mode == "full":
|
||||||
return (set(), {},)
|
return (set(), {},)
|
||||||
|
Reference in New Issue
Block a user