mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-31 16:36:20 +00:00
storage: introduce v2 immutable shares, with 8-byte offsets fields, to remove two of the three size limitations in #346. This code handles v2 shares but does not generate them. We'll make a release with this v2-tolerance, wait a while, then make a second release that actually generates v2 shares, to avoid compatibility problems.
This commit is contained in:
parent
a0250e320b
commit
7031a69bee
@ -34,15 +34,37 @@ section starts. Each offset is measured from the beginning of the file.
|
||||
? : start of uri_extension
|
||||
"""
|
||||
|
||||
"""
|
||||
v2 shares: these use 8-byte offsets to remove two of the three ~12GiB size
|
||||
limitations described in #346.
|
||||
|
||||
0x00: version number (=00 00 00 02)
|
||||
0x04: segment size
|
||||
0x0c: data size
|
||||
0x14: offset of data (=00 00 00 00 00 00 00 44)
|
||||
0x1c: offset of plaintext_hash_tree
|
||||
0x24: offset of crypttext_hash_tree
|
||||
0x2c: offset of block_hashes
|
||||
0x34: offset of share_hashes
|
||||
0x3c: offset of uri_extension_length + uri_extension
|
||||
0x44: start of data
|
||||
: rest of share is the same as v1, above
|
||||
... ...
|
||||
? : start of uri_extension_length (eight-byte big-endian value)
|
||||
"""
|
||||
|
||||
def allocated_size(data_size, num_segments, num_share_hashes,
|
||||
uri_extension_size):
|
||||
wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
|
||||
uri_extension_size, None)
|
||||
uri_extension_starts_at = wbp._offsets['uri_extension']
|
||||
return uri_extension_starts_at + 4 + uri_extension_size
|
||||
return uri_extension_starts_at + wbp.fieldsize + uri_extension_size
|
||||
|
||||
class WriteBucketProxy:
|
||||
implements(IStorageBucketWriter)
|
||||
fieldsize = 4
|
||||
fieldstruct = ">L"
|
||||
|
||||
def __init__(self, rref, data_size, segment_size, num_segments,
|
||||
num_share_hashes, uri_extension_size, nodeid):
|
||||
self._rref = rref
|
||||
@ -51,9 +73,6 @@ class WriteBucketProxy:
|
||||
self._num_segments = num_segments
|
||||
self._nodeid = nodeid
|
||||
|
||||
if segment_size >= 2**32 or data_size >= 2**32:
|
||||
raise FileTooLargeError("This file is too large to be uploaded (data_size).")
|
||||
|
||||
effective_segments = mathutil.next_power_of_k(num_segments,2)
|
||||
self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
|
||||
# how many share hashes are included in each share? This will be
|
||||
@ -62,6 +81,12 @@ class WriteBucketProxy:
|
||||
# we commit to not sending a uri extension larger than this
|
||||
self._uri_extension_size = uri_extension_size
|
||||
|
||||
self._create_offsets(segment_size, data_size)
|
||||
|
||||
def _create_offsets(self, segment_size, data_size):
|
||||
if segment_size >= 2**32 or data_size >= 2**32:
|
||||
raise FileTooLargeError("This file is too large to be uploaded (data_size).")
|
||||
|
||||
offsets = self._offsets = {}
|
||||
x = 0x24
|
||||
offsets['data'] = x
|
||||
@ -169,7 +194,7 @@ class WriteBucketProxy:
|
||||
assert isinstance(data, str)
|
||||
precondition(len(data) <= self._uri_extension_size,
|
||||
len(data), self._uri_extension_size)
|
||||
length = struct.pack(">L", len(data))
|
||||
length = struct.pack(self.fieldstruct, len(data))
|
||||
return self._write(offset, length+data)
|
||||
|
||||
def _write(self, offset, data):
|
||||
@ -182,6 +207,45 @@ class WriteBucketProxy:
|
||||
def abort(self):
|
||||
return self._rref.callRemoteOnly("abort")
|
||||
|
||||
class WriteBucketProxy_v2(WriteBucketProxy):
|
||||
fieldsize = 8
|
||||
fieldstruct = ">Q"
|
||||
|
||||
def _create_offsets(self, segment_size, data_size):
|
||||
if segment_size >= 2**64 or data_size >= 2**64:
|
||||
raise FileTooLargeError("This file is too large to be uploaded (data_size).")
|
||||
|
||||
offsets = self._offsets = {}
|
||||
x = 0x44
|
||||
offsets['data'] = x
|
||||
x += data_size
|
||||
offsets['plaintext_hash_tree'] = x
|
||||
x += self._segment_hash_size
|
||||
offsets['crypttext_hash_tree'] = x
|
||||
x += self._segment_hash_size
|
||||
offsets['block_hashes'] = x
|
||||
x += self._segment_hash_size
|
||||
offsets['share_hashes'] = x
|
||||
x += self._share_hash_size
|
||||
offsets['uri_extension'] = x
|
||||
|
||||
if x >= 2**64:
|
||||
raise FileTooLargeError("This file is too large to be uploaded (offsets).")
|
||||
|
||||
offset_data = struct.pack(">LQQQQQQQQ",
|
||||
2, # version number
|
||||
segment_size,
|
||||
data_size,
|
||||
offsets['data'],
|
||||
offsets['plaintext_hash_tree'],
|
||||
offsets['crypttext_hash_tree'],
|
||||
offsets['block_hashes'],
|
||||
offsets['share_hashes'],
|
||||
offsets['uri_extension'],
|
||||
)
|
||||
assert len(offset_data) == 0x44, len(offset_data)
|
||||
self._offset_data = offset_data
|
||||
|
||||
class ReadBucketProxy:
|
||||
implements(IStorageBucketReader)
|
||||
def __init__(self, rref, peerid=None, storage_index_s=None):
|
||||
@ -207,7 +271,7 @@ class ReadBucketProxy:
|
||||
|
||||
def start(self):
|
||||
# TODO: for small shares, read the whole bucket in start()
|
||||
d = self._read(0, 0x24)
|
||||
d = self._read(0, 0x44)
|
||||
d.addCallback(self._parse_offsets)
|
||||
def _started(res):
|
||||
self._started = True
|
||||
@ -216,12 +280,30 @@ class ReadBucketProxy:
|
||||
return d
|
||||
|
||||
def _parse_offsets(self, data):
|
||||
precondition(len(data) == 0x24)
|
||||
precondition(len(data) >= 0x4)
|
||||
self._offsets = {}
|
||||
(version, self._segment_size, self._data_size) = \
|
||||
struct.unpack(">LLL", data[0:0xc])
|
||||
_assert(version == 1)
|
||||
x = 0x0c
|
||||
(version,) = struct.unpack(">L", data[0:4])
|
||||
_assert(version in (1,2))
|
||||
|
||||
if version == 1:
|
||||
precondition(len(data) >= 0x24)
|
||||
x = 0x0c
|
||||
fieldsize = 0x4
|
||||
fieldstruct = ">L"
|
||||
(self._segment_size,
|
||||
self._data_size) = struct.unpack(">LL", data[0x4:0xc])
|
||||
else:
|
||||
precondition(len(data) >= 0x44)
|
||||
x = 0x14
|
||||
fieldsize = 0x8
|
||||
fieldstruct = ">Q"
|
||||
(self._segment_size,
|
||||
self._data_size) = struct.unpack(">QQ", data[0x4:0x14])
|
||||
|
||||
self._version = version
|
||||
self._fieldsize = fieldsize
|
||||
self._fieldstruct = fieldstruct
|
||||
|
||||
for field in ( 'data',
|
||||
'plaintext_hash_tree',
|
||||
'crypttext_hash_tree',
|
||||
@ -229,8 +311,8 @@ class ReadBucketProxy:
|
||||
'share_hashes',
|
||||
'uri_extension',
|
||||
):
|
||||
offset = struct.unpack(">L", data[x:x+4])[0]
|
||||
x += 4
|
||||
offset = struct.unpack(fieldstruct, data[x:x+fieldsize])[0]
|
||||
x += fieldsize
|
||||
self._offsets[field] = offset
|
||||
return self._offsets
|
||||
|
||||
@ -289,10 +371,10 @@ class ReadBucketProxy:
|
||||
|
||||
def get_uri_extension(self):
|
||||
offset = self._offsets['uri_extension']
|
||||
d = self._read(offset, 4)
|
||||
d = self._read(offset, self._fieldsize)
|
||||
def _got_length(data):
|
||||
length = struct.unpack(">L", data)[0]
|
||||
return self._read(offset+4, length)
|
||||
length = struct.unpack(self._fieldstruct, data)[0]
|
||||
return self._read(offset+self._fieldsize, length)
|
||||
d.addCallback(_got_length)
|
||||
return d
|
||||
|
||||
|
@ -9,7 +9,8 @@ from allmydata.util import fileutil, hashutil
|
||||
from allmydata.storage import BucketWriter, BucketReader, \
|
||||
StorageServer, MutableShareFile, \
|
||||
storage_index_to_dir, DataTooLargeError, LeaseInfo
|
||||
from allmydata.immutable.layout import WriteBucketProxy, ReadBucketProxy
|
||||
from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
|
||||
ReadBucketProxy
|
||||
from allmydata.interfaces import BadWriteEnablerError
|
||||
from allmydata.test.common import LoggingServiceParent
|
||||
|
||||
@ -131,7 +132,7 @@ class BucketProxy(unittest.TestCase):
|
||||
uri_extension_size=500, nodeid=None)
|
||||
self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
|
||||
|
||||
def test_readwrite(self):
|
||||
def _do_test_readwrite(self, header_size, wbp_class, rbp_class):
|
||||
# Let's pretend each share has 100 bytes of data, and that there are
|
||||
# 4 segments (25 bytes each), and 8 shares total. So the three
|
||||
# per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
|
||||
@ -141,6 +142,9 @@ class BucketProxy(unittest.TestCase):
|
||||
# long. That should make the whole share:
|
||||
#
|
||||
# 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
|
||||
# 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
|
||||
|
||||
sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
|
||||
|
||||
plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
|
||||
for i in range(7)]
|
||||
@ -152,14 +156,14 @@ class BucketProxy(unittest.TestCase):
|
||||
for i in (1,9,13)]
|
||||
uri_extension = "s" + "E"*498 + "e"
|
||||
|
||||
bw, rb, sharefname = self.make_bucket("test_readwrite", 1414)
|
||||
bp = WriteBucketProxy(rb,
|
||||
data_size=95,
|
||||
segment_size=25,
|
||||
num_segments=4,
|
||||
num_share_hashes=3,
|
||||
uri_extension_size=len(uri_extension),
|
||||
nodeid=None)
|
||||
bw, rb, sharefname = self.make_bucket("test_readwrite", sharesize)
|
||||
bp = wbp_class(rb,
|
||||
data_size=95,
|
||||
segment_size=25,
|
||||
num_segments=4,
|
||||
num_share_hashes=3,
|
||||
uri_extension_size=len(uri_extension),
|
||||
nodeid=None)
|
||||
|
||||
d = bp.start()
|
||||
d.addCallback(lambda res: bp.put_block(0, "a"*25))
|
||||
@ -178,7 +182,7 @@ class BucketProxy(unittest.TestCase):
|
||||
br = BucketReader(self, sharefname)
|
||||
rb = RemoteBucket()
|
||||
rb.target = br
|
||||
rbp = ReadBucketProxy(rb, peerid="abc")
|
||||
rbp = rbp_class(rb, peerid="abc")
|
||||
self.failUnless("to peer" in repr(rbp))
|
||||
self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
|
||||
|
||||
@ -213,7 +217,11 @@ class BucketProxy(unittest.TestCase):
|
||||
|
||||
return d
|
||||
|
||||
def test_readwrite_v1(self):
|
||||
return self._do_test_readwrite(0x24, WriteBucketProxy, ReadBucketProxy)
|
||||
|
||||
def test_readwrite_v2(self):
|
||||
return self._do_test_readwrite(0x44, WriteBucketProxy_v2, ReadBucketProxy)
|
||||
|
||||
class Server(unittest.TestCase):
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user