tahoe-lafs/src/allmydata/immutable/layout.py
2011-08-01 15:43:17 -07:00

504 lines
20 KiB
Python

import struct
from zope.interface import implements
from twisted.internet import defer
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
FileTooLargeError, HASH_SIZE
from allmydata.util import mathutil, observer, pipeline
from allmydata.util.assertutil import precondition
from allmydata.storage.server import si_b2a
class LayoutInvalid(Exception):
""" There is something wrong with these bytes so they can't be
interpreted as the kind of immutable file that I know how to download."""
pass
class RidiculouslyLargeURIExtensionBlock(LayoutInvalid):
""" When downloading a file, the length of the URI Extension Block was
given as >= 2**32. This means the share data must have been corrupted, or
else the original uploader of the file wrote a ridiculous value into the
URI Extension Block length."""
pass
class ShareVersionIncompatible(LayoutInvalid):
""" When downloading a share, its format was not one of the formats we
know how to parse."""
pass
"""
Share data is written in a file. At the start of the file, there is a series
of four-byte big-endian offset values, which indicate where each section
starts. Each offset is measured from the beginning of the share data.
0x00: version number (=00 00 00 01)
0x04: block size # See Footnote 1 below.
0x08: share data size # See Footnote 1 below.
0x0c: offset of data (=00 00 00 24)
0x10: offset of plaintext_hash_tree UNUSED
0x14: offset of crypttext_hash_tree
0x18: offset of block_hashes
0x1c: offset of share_hashes
0x20: offset of uri_extension_length + uri_extension
0x24: start of data
? : start of plaintext_hash_tree UNUSED
? : start of crypttext_hash_tree
? : start of block_hashes
? : start of share_hashes
each share_hash is written as a two-byte (big-endian) hashnum
followed by the 32-byte SHA-256 hash. We store only the hashes
necessary to validate the share hash root
? : start of uri_extension_length (four-byte big-endian value)
? : start of uri_extension
"""
"""
v2 shares: these use 8-byte offsets to remove two of the three ~12GiB size
limitations described in #346.
0x00: version number (=00 00 00 02)
0x04: block size # See Footnote 1 below.
0x0c: share data size # See Footnote 1 below.
0x14: offset of data (=00 00 00 00 00 00 00 44)
0x1c: offset of plaintext_hash_tree UNUSED
0x24: offset of crypttext_hash_tree
0x2c: offset of block_hashes
0x34: offset of share_hashes
0x3c: offset of uri_extension_length + uri_extension
0x44: start of data
: rest of share is the same as v1, above
... ...
? : start of uri_extension_length (eight-byte big-endian value)
? : start of uri_extension
"""
# Footnote 1: as of Tahoe v1.3.0 these fields are not used when reading, but
# they are still provided when writing so that older versions of Tahoe can
# read them.
FORCE_V2 = False # set briefly by unit tests to make small-sized V2 shares
def make_write_bucket_proxy(rref, server,
data_size, block_size, num_segments,
num_share_hashes, uri_extension_size_max):
# Use layout v1 for small files, so they'll be readable by older versions
# (<tahoe-1.3.0). Use layout v2 for large files; they'll only be readable
# by tahoe-1.3.0 or later.
try:
if FORCE_V2:
raise FileTooLargeError
wbp = WriteBucketProxy(rref, server,
data_size, block_size, num_segments,
num_share_hashes, uri_extension_size_max)
except FileTooLargeError:
wbp = WriteBucketProxy_v2(rref, server,
data_size, block_size, num_segments,
num_share_hashes, uri_extension_size_max)
return wbp
class WriteBucketProxy:
implements(IStorageBucketWriter)
fieldsize = 4
fieldstruct = ">L"
def __init__(self, rref, server, data_size, block_size, num_segments,
num_share_hashes, uri_extension_size_max, pipeline_size=50000):
self._rref = rref
self._server = server
self._data_size = data_size
self._block_size = block_size
self._num_segments = num_segments
effective_segments = mathutil.next_power_of_k(num_segments,2)
self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
# how many share hashes are included in each share? This will be
# about ln2(num_shares).
self._share_hashtree_size = num_share_hashes * (2+HASH_SIZE)
# we commit to not sending a uri extension larger than this
self._uri_extension_size_max = uri_extension_size_max
self._create_offsets(block_size, data_size)
# k=3, max_segment_size=128KiB gives us a typical segment of 43691
# bytes. Setting the default pipeline_size to 50KB lets us get two
# segments onto the wire but not a third, which would keep the pipe
# filled.
self._pipeline = pipeline.Pipeline(pipeline_size)
def get_allocated_size(self):
return (self._offsets['uri_extension'] + self.fieldsize +
self._uri_extension_size_max)
def _create_offsets(self, block_size, data_size):
if block_size >= 2**32 or data_size >= 2**32:
raise FileTooLargeError("This file is too large to be uploaded (data_size).")
offsets = self._offsets = {}
x = 0x24
offsets['data'] = x
x += data_size
offsets['plaintext_hash_tree'] = x # UNUSED
x += self._segment_hash_size
offsets['crypttext_hash_tree'] = x
x += self._segment_hash_size
offsets['block_hashes'] = x
x += self._segment_hash_size
offsets['share_hashes'] = x
x += self._share_hashtree_size
offsets['uri_extension'] = x
if x >= 2**32:
raise FileTooLargeError("This file is too large to be uploaded (offsets).")
offset_data = struct.pack(">LLLLLLLLL",
1, # version number
block_size,
data_size,
offsets['data'],
offsets['plaintext_hash_tree'], # UNUSED
offsets['crypttext_hash_tree'],
offsets['block_hashes'],
offsets['share_hashes'],
offsets['uri_extension'],
)
assert len(offset_data) == 0x24
self._offset_data = offset_data
def __repr__(self):
return "<WriteBucketProxy for node %s>" % self._server.get_name()
def put_header(self):
return self._write(0, self._offset_data)
def put_block(self, segmentnum, data):
offset = self._offsets['data'] + segmentnum * self._block_size
assert offset + len(data) <= self._offsets['uri_extension']
assert isinstance(data, str)
if segmentnum < self._num_segments-1:
precondition(len(data) == self._block_size,
len(data), self._block_size)
else:
precondition(len(data) == (self._data_size -
(self._block_size *
(self._num_segments - 1))),
len(data), self._block_size)
return self._write(offset, data)
def put_crypttext_hashes(self, hashes):
offset = self._offsets['crypttext_hash_tree']
assert isinstance(hashes, list)
data = "".join(hashes)
precondition(len(data) == self._segment_hash_size,
len(data), self._segment_hash_size)
precondition(offset + len(data) <= self._offsets['block_hashes'],
offset, len(data), offset+len(data),
self._offsets['block_hashes'])
return self._write(offset, data)
def put_block_hashes(self, blockhashes):
offset = self._offsets['block_hashes']
assert isinstance(blockhashes, list)
data = "".join(blockhashes)
precondition(len(data) == self._segment_hash_size,
len(data), self._segment_hash_size)
precondition(offset + len(data) <= self._offsets['share_hashes'],
offset, len(data), offset+len(data),
self._offsets['share_hashes'])
return self._write(offset, data)
def put_share_hashes(self, sharehashes):
# sharehashes is a list of (index, hash) tuples, so they get stored
# as 2+32=34 bytes each
offset = self._offsets['share_hashes']
assert isinstance(sharehashes, list)
data = "".join([struct.pack(">H", hashnum) + hashvalue
for hashnum,hashvalue in sharehashes])
precondition(len(data) == self._share_hashtree_size,
len(data), self._share_hashtree_size)
precondition(offset + len(data) <= self._offsets['uri_extension'],
offset, len(data), offset+len(data),
self._offsets['uri_extension'])
return self._write(offset, data)
def put_uri_extension(self, data):
offset = self._offsets['uri_extension']
assert isinstance(data, str)
precondition(len(data) <= self._uri_extension_size_max,
len(data), self._uri_extension_size_max)
length = struct.pack(self.fieldstruct, len(data))
return self._write(offset, length+data)
def _write(self, offset, data):
# use a Pipeline to pipeline several writes together. TODO: another
# speedup would be to coalesce small writes into a single call: this
# would reduce the foolscap CPU overhead per share, but wouldn't
# reduce the number of round trips, so it might not be worth the
# effort.
return self._pipeline.add(len(data),
self._rref.callRemote, "write", offset, data)
def close(self):
d = self._pipeline.add(0, self._rref.callRemote, "close")
d.addCallback(lambda ign: self._pipeline.flush())
return d
def abort(self):
return self._rref.callRemoteOnly("abort")
def get_servername(self):
return self._server.get_name()
def get_peerid(self):
return self._server.get_serverid()
class WriteBucketProxy_v2(WriteBucketProxy):
fieldsize = 8
fieldstruct = ">Q"
def _create_offsets(self, block_size, data_size):
if block_size >= 2**64 or data_size >= 2**64:
raise FileTooLargeError("This file is too large to be uploaded (data_size).")
offsets = self._offsets = {}
x = 0x44
offsets['data'] = x
x += data_size
offsets['plaintext_hash_tree'] = x # UNUSED
x += self._segment_hash_size
offsets['crypttext_hash_tree'] = x
x += self._segment_hash_size
offsets['block_hashes'] = x
x += self._segment_hash_size
offsets['share_hashes'] = x
x += self._share_hashtree_size
offsets['uri_extension'] = x
if x >= 2**64:
raise FileTooLargeError("This file is too large to be uploaded (offsets).")
offset_data = struct.pack(">LQQQQQQQQ",
2, # version number
block_size,
data_size,
offsets['data'],
offsets['plaintext_hash_tree'], # UNUSED
offsets['crypttext_hash_tree'],
offsets['block_hashes'],
offsets['share_hashes'],
offsets['uri_extension'],
)
assert len(offset_data) == 0x44, len(offset_data)
self._offset_data = offset_data
class ReadBucketProxy:
implements(IStorageBucketReader)
MAX_UEB_SIZE = 2000 # actual size is closer to 419, but varies by a few bytes
def __init__(self, rref, server, storage_index):
self._rref = rref
self._server = server
self._storage_index = storage_index
self._started = False # sent request to server
self._ready = observer.OneShotObserverList() # got response from server
def get_peerid(self):
return self._server.get_serverid()
def __repr__(self):
return "<ReadBucketProxy %s to peer [%s] SI %s>" % \
(id(self), self._server.get_name(), si_b2a(self._storage_index))
def _start_if_needed(self):
""" Returns a deferred that will be fired when I'm ready to return
data, or errbacks if the starting (header reading and parsing)
process fails."""
if not self._started:
self._start()
return self._ready.when_fired()
def _start(self):
self._started = True
# TODO: for small shares, read the whole bucket in _start()
d = self._fetch_header()
d.addCallback(self._parse_offsets)
# XXX The following two callbacks implement a slightly faster/nicer
# way to get the ueb and sharehashtree, but it requires that the
# storage server be >= v1.3.0.
# d.addCallback(self._fetch_sharehashtree_and_ueb)
# d.addCallback(self._parse_sharehashtree_and_ueb)
def _fail_waiters(f):
self._ready.fire(f)
def _notify_waiters(result):
self._ready.fire(result)
d.addCallbacks(_notify_waiters, _fail_waiters)
return d
def _fetch_header(self):
return self._read(0, 0x44)
def _parse_offsets(self, data):
precondition(len(data) >= 0x4)
self._offsets = {}
(version,) = struct.unpack(">L", data[0:4])
if version != 1 and version != 2:
raise ShareVersionIncompatible(version)
if version == 1:
precondition(len(data) >= 0x24)
x = 0x0c
fieldsize = 0x4
fieldstruct = ">L"
else:
precondition(len(data) >= 0x44)
x = 0x14
fieldsize = 0x8
fieldstruct = ">Q"
self._version = version
self._fieldsize = fieldsize
self._fieldstruct = fieldstruct
for field in ( 'data',
'plaintext_hash_tree', # UNUSED
'crypttext_hash_tree',
'block_hashes',
'share_hashes',
'uri_extension',
):
offset = struct.unpack(fieldstruct, data[x:x+fieldsize])[0]
x += fieldsize
self._offsets[field] = offset
return self._offsets
def _fetch_sharehashtree_and_ueb(self, offsets):
sharehashtree_size = offsets['uri_extension'] - offsets['share_hashes']
return self._read(offsets['share_hashes'],
self.MAX_UEB_SIZE+sharehashtree_size)
def _parse_sharehashtree_and_ueb(self, data):
sharehashtree_size = self._offsets['uri_extension'] - self._offsets['share_hashes']
if len(data) < sharehashtree_size:
raise LayoutInvalid("share hash tree truncated -- should have at least %d bytes -- not %d" % (sharehashtree_size, len(data)))
if sharehashtree_size % (2+HASH_SIZE) != 0:
raise LayoutInvalid("share hash tree malformed -- should have an even multiple of %d bytes -- not %d" % (2+HASH_SIZE, sharehashtree_size))
self._share_hashes = []
for i in range(0, sharehashtree_size, 2+HASH_SIZE):
hashnum = struct.unpack(">H", data[i:i+2])[0]
hashvalue = data[i+2:i+2+HASH_SIZE]
self._share_hashes.append( (hashnum, hashvalue) )
i = self._offsets['uri_extension']-self._offsets['share_hashes']
if len(data) < i+self._fieldsize:
raise LayoutInvalid("not enough bytes to encode URI length -- should be at least %d bytes long, not %d " % (i+self._fieldsize, len(data),))
length = struct.unpack(self._fieldstruct, data[i:i+self._fieldsize])[0]
self._ueb_data = data[i+self._fieldsize:i+self._fieldsize+length]
def _get_block_data(self, unused, blocknum, blocksize, thisblocksize):
offset = self._offsets['data'] + blocknum * blocksize
return self._read(offset, thisblocksize)
def get_block_data(self, blocknum, blocksize, thisblocksize):
d = self._start_if_needed()
d.addCallback(self._get_block_data, blocknum, blocksize, thisblocksize)
return d
def _str2l(self, s):
""" split string (pulled from storage) into a list of blockids """
return [ s[i:i+HASH_SIZE]
for i in range(0, len(s), HASH_SIZE) ]
def _get_crypttext_hashes(self, unused=None):
offset = self._offsets['crypttext_hash_tree']
size = self._offsets['block_hashes'] - offset
d = self._read(offset, size)
d.addCallback(self._str2l)
return d
def get_crypttext_hashes(self):
d = self._start_if_needed()
d.addCallback(self._get_crypttext_hashes)
return d
def _get_block_hashes(self, unused=None, at_least_these=()):
# TODO: fetch only at_least_these instead of all of them.
offset = self._offsets['block_hashes']
size = self._offsets['share_hashes'] - offset
d = self._read(offset, size)
d.addCallback(self._str2l)
return d
def get_block_hashes(self, at_least_these=()):
if at_least_these:
d = self._start_if_needed()
d.addCallback(self._get_block_hashes, at_least_these)
return d
else:
return defer.succeed([])
def _get_share_hashes(self, unused=None):
if hasattr(self, '_share_hashes'):
return self._share_hashes
else:
return self._get_share_hashes_the_old_way()
return self._share_hashes
def get_share_hashes(self):
d = self._start_if_needed()
d.addCallback(self._get_share_hashes)
return d
def _get_share_hashes_the_old_way(self):
""" Tahoe storage servers < v1.3.0 would return an error if you tried
to read past the end of the share, so we need to use the offset and
read just that much."""
offset = self._offsets['share_hashes']
size = self._offsets['uri_extension'] - offset
if size % (2+HASH_SIZE) != 0:
raise LayoutInvalid("share hash tree corrupted -- should occupy a multiple of %d bytes, not %d bytes" % ((2+HASH_SIZE), size))
d = self._read(offset, size)
def _unpack_share_hashes(data):
if len(data) != size:
raise LayoutInvalid("share hash tree corrupted -- got a short read of the share data -- should have gotten %d, not %d bytes" % (size, len(data)))
hashes = []
for i in range(0, size, 2+HASH_SIZE):
hashnum = struct.unpack(">H", data[i:i+2])[0]
hashvalue = data[i+2:i+2+HASH_SIZE]
hashes.append( (hashnum, hashvalue) )
return hashes
d.addCallback(_unpack_share_hashes)
return d
def _get_uri_extension_the_old_way(self, unused=None):
""" Tahoe storage servers < v1.3.0 would return an error if you tried
to read past the end of the share, so we need to fetch the UEB size
and then read just that much."""
offset = self._offsets['uri_extension']
d = self._read(offset, self._fieldsize)
def _got_length(data):
if len(data) != self._fieldsize:
raise LayoutInvalid("not enough bytes to encode URI length -- should be %d bytes long, not %d " % (self._fieldsize, len(data),))
length = struct.unpack(self._fieldstruct, data)[0]
if length >= 2**31:
# URI extension blocks are around 419 bytes long, so this
# must be corrupted. Anyway, the foolscap interface schema
# for "read" will not allow >= 2**31 bytes length.
raise RidiculouslyLargeURIExtensionBlock(length)
return self._read(offset+self._fieldsize, length)
d.addCallback(_got_length)
return d
def _get_uri_extension(self, unused=None):
if hasattr(self, '_ueb_data'):
return self._ueb_data
else:
return self._get_uri_extension_the_old_way()
def get_uri_extension(self):
d = self._start_if_needed()
d.addCallback(self._get_uri_extension)
return d
def _read(self, offset, length):
return self._rref.callRemote("read", offset, length)