tahoe-lafs/src/allmydata/immutable/layout.py

302 lines
11 KiB
Python

import struct
from zope.interface import implements
from twisted.internet import defer
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
FileTooLargeError, HASH_SIZE
from allmydata.util import mathutil, idlib
from allmydata.util.assertutil import _assert, precondition
"""
Share data is written into a single file. At the start of the file, there is
a series of four-byte big-endian offset values, which indicate where each
section starts. Each offset is measured from the beginning of the file.
0x00: version number (=00 00 00 01)
0x04: segment size
0x08: data size
0x0c: offset of data (=00 00 00 24)
0x10: offset of plaintext_hash_tree
0x14: offset of crypttext_hash_tree
0x18: offset of block_hashes
0x1c: offset of share_hashes
0x20: offset of uri_extension_length + uri_extension
0x24: start of data
? : start of plaintext_hash_tree
? : start of crypttext_hash_tree
? : start of block_hashes
? : start of share_hashes
each share_hash is written as a two-byte (big-endian) hashnum
followed by the 32-byte SHA-256 hash. We only store the hashes
necessary to validate the share hash root
? : start of uri_extension_length (four-byte big-endian value)
? : start of uri_extension
"""
def allocated_size(data_size, num_segments, num_share_hashes,
uri_extension_size):
wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
uri_extension_size, None)
uri_extension_starts_at = wbp._offsets['uri_extension']
return uri_extension_starts_at + 4 + uri_extension_size
class WriteBucketProxy:
implements(IStorageBucketWriter)
def __init__(self, rref, data_size, segment_size, num_segments,
num_share_hashes, uri_extension_size, nodeid):
self._rref = rref
self._data_size = data_size
self._segment_size = segment_size
self._num_segments = num_segments
self._nodeid = nodeid
if segment_size >= 2**32 or data_size >= 2**32:
raise FileTooLargeError("This file is too large to be uploaded (data_size).")
effective_segments = mathutil.next_power_of_k(num_segments,2)
self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
# how many share hashes are included in each share? This will be
# about ln2(num_shares).
self._share_hash_size = num_share_hashes * (2+HASH_SIZE)
# we commit to not sending a uri extension larger than this
self._uri_extension_size = uri_extension_size
offsets = self._offsets = {}
x = 0x24
offsets['data'] = x
x += data_size
offsets['plaintext_hash_tree'] = x
x += self._segment_hash_size
offsets['crypttext_hash_tree'] = x
x += self._segment_hash_size
offsets['block_hashes'] = x
x += self._segment_hash_size
offsets['share_hashes'] = x
x += self._share_hash_size
offsets['uri_extension'] = x
if x >= 2**32:
raise FileTooLargeError("This file is too large to be uploaded (offsets).")
offset_data = struct.pack(">LLLLLLLLL",
1, # version number
segment_size,
data_size,
offsets['data'],
offsets['plaintext_hash_tree'],
offsets['crypttext_hash_tree'],
offsets['block_hashes'],
offsets['share_hashes'],
offsets['uri_extension'],
)
assert len(offset_data) == 0x24
self._offset_data = offset_data
def __repr__(self):
if self._nodeid:
nodeid_s = idlib.nodeid_b2a(self._nodeid)
else:
nodeid_s = "[None]"
return "<allmydata.storage.WriteBucketProxy for node %s>" % nodeid_s
def start(self):
return self._write(0, self._offset_data)
def put_block(self, segmentnum, data):
offset = self._offsets['data'] + segmentnum * self._segment_size
assert offset + len(data) <= self._offsets['uri_extension']
assert isinstance(data, str)
if segmentnum < self._num_segments-1:
precondition(len(data) == self._segment_size,
len(data), self._segment_size)
else:
precondition(len(data) == (self._data_size -
(self._segment_size *
(self._num_segments - 1))),
len(data), self._segment_size)
return self._write(offset, data)
def put_plaintext_hashes(self, hashes):
offset = self._offsets['plaintext_hash_tree']
assert isinstance(hashes, list)
data = "".join(hashes)
precondition(len(data) == self._segment_hash_size,
len(data), self._segment_hash_size)
precondition(offset+len(data) <= self._offsets['crypttext_hash_tree'],
offset, len(data), offset+len(data),
self._offsets['crypttext_hash_tree'])
return self._write(offset, data)
def put_crypttext_hashes(self, hashes):
offset = self._offsets['crypttext_hash_tree']
assert isinstance(hashes, list)
data = "".join(hashes)
precondition(len(data) == self._segment_hash_size,
len(data), self._segment_hash_size)
precondition(offset + len(data) <= self._offsets['block_hashes'],
offset, len(data), offset+len(data),
self._offsets['block_hashes'])
return self._write(offset, data)
def put_block_hashes(self, blockhashes):
offset = self._offsets['block_hashes']
assert isinstance(blockhashes, list)
data = "".join(blockhashes)
precondition(len(data) == self._segment_hash_size,
len(data), self._segment_hash_size)
precondition(offset + len(data) <= self._offsets['share_hashes'],
offset, len(data), offset+len(data),
self._offsets['share_hashes'])
return self._write(offset, data)
def put_share_hashes(self, sharehashes):
# sharehashes is a list of (index, hash) tuples, so they get stored
# as 2+32=34 bytes each
offset = self._offsets['share_hashes']
assert isinstance(sharehashes, list)
data = "".join([struct.pack(">H", hashnum) + hashvalue
for hashnum,hashvalue in sharehashes])
precondition(len(data) == self._share_hash_size,
len(data), self._share_hash_size)
precondition(offset + len(data) <= self._offsets['uri_extension'],
offset, len(data), offset+len(data),
self._offsets['uri_extension'])
return self._write(offset, data)
def put_uri_extension(self, data):
offset = self._offsets['uri_extension']
assert isinstance(data, str)
precondition(len(data) <= self._uri_extension_size,
len(data), self._uri_extension_size)
length = struct.pack(">L", len(data))
return self._write(offset, length+data)
def _write(self, offset, data):
# TODO: for small shares, buffer the writes and do just a single call
return self._rref.callRemote("write", offset, data)
def close(self):
return self._rref.callRemote("close")
def abort(self):
return self._rref.callRemoteOnly("abort")
class ReadBucketProxy:
implements(IStorageBucketReader)
def __init__(self, rref, peerid=None, storage_index_s=None):
self._rref = rref
self._peerid = peerid
self._si_s = storage_index_s
self._started = False
def get_peerid(self):
return self._peerid
def __repr__(self):
peerid_s = idlib.shortnodeid_b2a(self._peerid)
return "<ReadBucketProxy to peer [%s] SI %s>" % (peerid_s,
self._si_s)
def startIfNecessary(self):
if self._started:
return defer.succeed(self)
d = self.start()
d.addCallback(lambda res: self)
return d
def start(self):
# TODO: for small shares, read the whole bucket in start()
d = self._read(0, 0x24)
d.addCallback(self._parse_offsets)
def _started(res):
self._started = True
return res
d.addCallback(_started)
return d
def _parse_offsets(self, data):
precondition(len(data) == 0x24)
self._offsets = {}
(version, self._segment_size, self._data_size) = \
struct.unpack(">LLL", data[0:0xc])
_assert(version == 1)
x = 0x0c
for field in ( 'data',
'plaintext_hash_tree',
'crypttext_hash_tree',
'block_hashes',
'share_hashes',
'uri_extension',
):
offset = struct.unpack(">L", data[x:x+4])[0]
x += 4
self._offsets[field] = offset
return self._offsets
def get_block(self, blocknum):
num_segments = mathutil.div_ceil(self._data_size, self._segment_size)
if blocknum < num_segments-1:
size = self._segment_size
else:
size = self._data_size % self._segment_size
if size == 0:
size = self._segment_size
offset = self._offsets['data'] + blocknum * self._segment_size
return self._read(offset, size)
def _str2l(self, s):
""" split string (pulled from storage) into a list of blockids """
return [ s[i:i+HASH_SIZE]
for i in range(0, len(s), HASH_SIZE) ]
def get_plaintext_hashes(self):
offset = self._offsets['plaintext_hash_tree']
size = self._offsets['crypttext_hash_tree'] - offset
d = self._read(offset, size)
d.addCallback(self._str2l)
return d
def get_crypttext_hashes(self):
offset = self._offsets['crypttext_hash_tree']
size = self._offsets['block_hashes'] - offset
d = self._read(offset, size)
d.addCallback(self._str2l)
return d
def get_block_hashes(self):
offset = self._offsets['block_hashes']
size = self._offsets['share_hashes'] - offset
d = self._read(offset, size)
d.addCallback(self._str2l)
return d
def get_share_hashes(self):
offset = self._offsets['share_hashes']
size = self._offsets['uri_extension'] - offset
assert size % (2+HASH_SIZE) == 0
d = self._read(offset, size)
def _unpack_share_hashes(data):
assert len(data) == size
hashes = []
for i in range(0, size, 2+HASH_SIZE):
hashnum = struct.unpack(">H", data[i:i+2])[0]
hashvalue = data[i+2:i+2+HASH_SIZE]
hashes.append( (hashnum, hashvalue) )
return hashes
d.addCallback(_unpack_share_hashes)
return d
def get_uri_extension(self):
offset = self._offsets['uri_extension']
d = self._read(offset, 4)
def _got_length(data):
length = struct.unpack(">L", data)[0]
return self._read(offset+4, length)
d.addCallback(_got_length)
return d
def _read(self, offset, length):
return self._rref.callRemote("read", offset, length)