start work on new encoder, with merkle trees and subshares and stuff

This commit is contained in:
Brian Warner 2006-12-13 20:32:35 -07:00
parent d9dcf5f098
commit 133e4a4394
4 changed files with 1383 additions and 0 deletions

727
allmydata/chunk.py Normal file
View File

@ -0,0 +1,727 @@
"""
Read and write chunks from files.
Version 1.0.0.
A file is divided into blocks, each of which has size L{BLOCK_SIZE}
(except for the last block, which may be smaller). Blocks are encoded
into chunks. One publishes the hash of the entire file. Clients
who want to download the file first obtain the hash, then the clients
can receive chunks in any order. Cryptographic hashing is used to
verify each received chunk before writing to disk. Thus it is
impossible to download corrupt data if one has the correct file hash.
One obtains the hash of a complete file via
L{CompleteChunkFile.file_hash}. One can read chunks from a complete
file by the sequence operations of C{len()} and subscripting on a
L{CompleteChunkFile} object. One can open an empty or partially
downloaded file with L{PartialChunkFile}, and read and write chunks
to this file. A chunk will fail to write if its contents and index
are not consistent with the overall file hash passed to
L{PartialChunkFile} when the partial chunk file was first created.
The chunks have an overhead of less than 4% for files of size
less than C{10**20} bytes.
Benchmarks:
- On a 3 GHz Pentium 3, it took 3.4 minutes to first make a
L{CompleteChunkFile} object for a 4 GB file. Up to 10 MB of
memory was used as the constructor ran. A metafile filename
was passed to the constructor, and so the hash information was
written to the metafile. The object used a negligible amount
of memory after the constructor was finished.
- Creation of L{CompleteChunkFile} objects in future runs of the
program took negligible time, since the hash information was
already stored in the metafile.
@var BLOCK_SIZE: Size of a block. See L{BlockFile}.
@var MAX_CHUNK_SIZE: Upper bound on the size of a chunk.
See L{CompleteChunkFile}.
free (adj.): unencumbered; not under the control of others
Written by Connelly Barnes in 2005 and released into the
public domain with no warranty of any kind, either expressed
or implied. It probably won't make your computer catch on fire,
or eat your children, but it might. Use at your own risk.
"""
import sha
import os
#import os.path
from allmydata.util import bencode
__all__ = ['CompleteChunkFile', 'PartialChunkFile']
__version__ = '1.0.0'
BLOCK_SIZE = 65536
MAX_CHUNK_SIZE = BLOCK_SIZE + 4096
def hash(s):
"""
Cryptographic hash function used by this module.
"""
return sha.new(s).digest()
def roundup_pow2(x):
"""
Round integer C{x} up to the nearest power of 2.
"""
ans = 1
while ans < x:
ans *= 2
return ans
class CompleteBinaryTreeMixin:
"""
Adds convenience methods to a complete binary tree.
Assumes the total number of elements in the binary tree may be
accessed via C{__len__}, and that each element can be retrieved
using list subscripting.
Tree is indexed like so::
0
/ \
1 2
/ \ / \
3 4 5 6
/ \ / \ / \ / \
7 8 9 10 11 12 13 14
"""
def parent(self, i):
"""
Index of the parent of C{i}.
"""
if i < 1 or (hasattr(self, '__len__') and i >= len(self)):
raise IndexError('index out of range: ' + repr(i))
return (i - 1) // 2
def lchild(self, i):
"""
Index of the left child of C{i}.
"""
ans = 2 * i + 1
if i < 0 or (hasattr(self, '__len__') and ans >= len(self)):
raise IndexError('index out of range: ' + repr(i))
return ans
def rchild(self, i):
"""
Index of right child of C{i}.
"""
ans = 2 * i + 2
if i < 0 or (hasattr(self, '__len__') and ans >= len(self)):
raise IndexError('index out of range: ' + repr(i))
return ans
def sibling(self, i):
"""
Index of sibling of C{i}.
"""
parent = self.parent(i)
if self.lchild(parent) == i:
return self.rchild(parent)
else:
return self.lchild(parent)
def needed(self, i):
"""
Return a list of nodes that are necessary for the hash chain.
"""
if i < 0 or i >= len(self):
raise IndexError('index out of range: ' + repr(i))
needed = []
here = i
while here != 0:
needed.append(self.sibling(here))
here = self.parent(here)
return needed
class HashTree(CompleteBinaryTreeMixin, list):
"""
Compute Merkle hashes at any node in a complete binary tree.
Tree is indexed like so::
0
/ \
1 2
/ \ / \
3 4 5 6
/ \ / \ / \ / \
7 8 9 10 11 12 13 14 <- List passed to constructor.
"""
def __init__(self, L):
"""
Create complete binary tree from list of hash strings.
The list is augmented by hashes so its length is a power of 2, and
then this is used as the bottom row of the hash tree.
The augmenting is done so that if the augmented element is at
index C{i}, then its value is C{hash(bencode.bencode((i, '')))}.
"""
# Augment the list.
start = len(L)
end = roundup_pow2(len(L))
L = L + [None] * (end - start)
for i in range(start, end):
L[i] = hash(bencode.bencode((i, '')))
# Form each row of the tree.
rows = [L]
while len(rows[-1]) != 1:
last = rows[-1]
rows += [[hash(last[2*i] + last[2*i+1]) for i in xrange(len(last)//2)]]
# Flatten the list of rows into a single list.
rows.reverse()
self[:] = sum(rows, [])
class BlockFile:
"""
Reads and writes blocks of data to a binary file.
It is assumed that the binary file does not change in size.
@ivar file_name: Full path to file.
@ivar file_size: Size of file in bytes.
@ivar block_size: Size of each block.
"""
def __init__(self, file_name, mode, block_size, file_size=None):
"""
Initialize block reader or writer on given file name.
If mode is 'r', the file must already exist and it is opened for
reading only. If mode is 'w', the file will be created with size
C{file_size} if it does not exist, and it is opened for reading
and writing.
Note that C{file_size} is ignored if the file already exists.
"""
self.mode = mode
self.file_name = os.path.abspath(file_name)
assert self.mode in ['r', 'w']
if mode == 'r':
f = open(self.file_name, 'rb')
f.close()
# Create file if it doesn't exist.
created = False
if mode == 'w' and not os.path.exists(self.file_name):
created = True
buf = ' ' * 1024
f = open(self.file_name, 'wb')
for i in xrange(file_size // len(buf)):
f.write(buf)
f.write(' ' * (file_size % len(buf)))
f.close()
self.file_size = os.stat(self.file_name).st_size
if created:
assert self.file_size == file_size
self.block_size = block_size
self.__block_count = self.file_size // self.block_size
if self.file_size % self.block_size == 0:
self.last_block_size = self.block_size
else:
self.last_block_size = self.file_size % self.block_size
self.__block_count += 1
def __getitem__(self, i):
"""
Get block i.
"""
if i < 0 or i >= len(self):
raise IndexError('block index out of range: ' + repr(i))
f = open(self.file_name, 'rb')
try:
f.seek(i * self.block_size)
ans = f.read(self.block_size)
finally:
f.close()
return ans
def __setitem__(self, i, s):
"""
Set block i.
"""
if self.mode != 'w':
raise ValueError('file opened for reading only')
if i < 0 or i >= len(self):
raise IndexError('block index out of range: ' + repr(i))
if i < len(self) - 1:
if len(s) != self.block_size:
raise ValueError('length of value must equal block_size')
else:
if len(s) != self.last_block_size:
raise ValueError('length of value must equal last_block_size')
f = open(self.file_name, 'rb+')
try:
f.seek(i * self.block_size)
f.write(s)
finally:
f.close()
def __len__(self):
"""
Get number of blocks.
"""
return int(self.__block_count)
class MetaFile(CompleteBinaryTreeMixin):
"""
A L{HashTree} stored on disk, with a timestamp.
The list of hashes can be accessed using subscripting and
C{__len__}, in the same manner as for L{HashTree}.
Note that the constructor takes the entire list associated with
the L{HashTree}, not just the bottom row of the tree.
@ivar meta_name: Full path to metafile.
"""
def __init__(self, meta_name, mode, L=None):
"""
Open an existing meta-file for reading or writing.
If C{mode} is 'r', the meta-file must already exist and it is
opened for reading only, and the list C{L} is ignored. If C{mode}
is 'w', the file will be created if it does not exist (from the
list of hashes given in C{L}), and it is opened for reading and
writing.
"""
self.meta_name = os.path.abspath(meta_name)
self.mode = mode
assert self.mode in ['r', 'w']
# A timestamp is stored at index 0. The MetaFile instance
# offsets all indices passed to __getitem__, __setitem__ by
# this offset, and pretends it has length equal to
# self.sublength.
self.offset = 1
if self.mode == 'w':
suggested_length = len(hash('')) * (len(L)+self.offset)
else:
suggested_length = None
created = False
if self.mode == 'w' and not os.path.exists(self.meta_name):
created = True
self.block_file = BlockFile(self.meta_name, self.mode,
len(hash('')),
suggested_length)
self.sublength = len(self.block_file) - self.offset
if created:
for i in xrange(len(L)):
self.block_file[i + self.offset] = L[i]
def __getitem__(self, i):
if i < 0 or i >= self.sublength:
raise IndexError('bad meta-file block index')
return self.block_file[i + self.offset]
def __setitem__(self, i, value):
if i < 0 or i >= self.sublength:
raise IndexError('bad meta-file block index')
self.block_file[i + self.offset] = value
def __len__(self):
return self.sublength
def set_timestamp(self, file_name):
"""
Set meta file's timestamp equal to the timestamp for C{file_name}.
"""
st = os.stat(file_name)
timestamp = bencode.bencode((st.st_size, st.st_mtime))
self.block_file[0] = sha.new(timestamp).digest()
def check_timestamp(self, file_name):
"""
True if meta file's timestamp equals timestamp for C{file_name}.
"""
st = os.stat(file_name)
timestamp = bencode.bencode((st.st_size, st.st_mtime))
return self.block_file[0] == sha.new(timestamp).digest()
class CompleteChunkFile(BlockFile):
"""
Reads chunks from a fully-downloaded file.
A chunk C{i} is created from block C{i}. Block C{i} is unencoded
data read from the file by the L{BlockFile}. Chunk C{i} is
an encoded string created from block C{i}.
Chunks can be read using list subscripting. The total number of
chunks (equals the total number of blocks) is given by L{__len__}.
@ivar file_name: Full path to file.
@ivar file_size: Size of file in bytes.
@ivar file_hash: Hash of file.
@ivar meta_name: Full path to metafile, or C{None}.
@ivar tree: L{HashTree} or L{MetaFile} instance for the file.
One can extract a hash from any node in the hash
tree.
"""
def __init__(self, file_name, meta_name=None, callback=None):
"""
Initialize reader on the given file name.
The entire file will be read and the hash will be computed from
the file. This may take a long time, so C{callback()} is called
frequently during this process. This allows you to reduce CPU
usage if you wish.
The C{meta_name} argument is optional. If it is specified, then the
hashes for C{file_name} will be stored under the file
C{meta_name}. If a C{CompleteChunkFile} is created on the same
file and metafile in the future, then the hashes will not need to
be recomputed and the constructor will return instantly. The
metafile contains a file and date stamp, so that if the file stored
in C{file_name} is modified, then the hashes will be recomputed.
"""
BlockFile.__init__(self, file_name, 'r', block_size=65536)
# Whether we need to compute the hash tree
compute_tree = False
self.meta_name = meta_name
if self.meta_name != None:
self.meta_name = os.path.abspath(self.meta_name)
self.meta = None
if self.meta_name == None:
compute_tree = True
else:
try:
meta = MetaFile(self.meta_name, 'r')
assert meta.check_timestamp(self.file_name)
except (IOError, AssertionError):
compute_tree = True
# Compute the hash tree if needed.
if compute_tree:
chunk_hashes = [None] * len(self)
for i in xrange(len(self)):
triple = (self.file_size, i, BlockFile.__getitem__(self, i))
chunk_hashes[i] = hash(bencode.bencode(triple))
if callback:
callback()
self.tree = HashTree(chunk_hashes)
del chunk_hashes
# If a meta-file was given, make self.tree be a MetaFile instance.
if self.meta_name != None:
if compute_tree:
# Did we compute the hash tree? Then store it to disk.
self.tree = MetaFile(self.meta_name, 'w', self.tree)
# Update its timestamp to be consistent with the file we
# just hashed.
self.tree.set_timestamp(self.file_name)
else:
# Read existing file from disk.
self.tree = MetaFile(self.meta_name, 'r')
self.file_hash = self.tree[0]
def __getitem__(self, i):
"""
Get chunk C{i}.
Raises C{ValueError} if the file's contents changed since the
CompleteFileChunkReader was instantiated.
"""
return encode_chunk(BlockFile.__getitem__(self, i), i,
self.file_size, self.tree)
def encode_chunk(block, index, file_size, tree):
"""
Encode a chunk.
Given a block at index C{index} in a file with size C{file_size},
and a L{HashTree} or L{MetaFile} instance C{tree}, computes and
returns a chunk string for the given block.
The C{tree} argument needs to have correct hashes only at certain
indices. Check out the code for details. In any case, if a hash
is wrong an exception will be raised.
"""
block_count = (len(tree) + 1) // 2
if index < 0 or index >= block_count:
raise IndexError('block index out of range: ' + repr(index))
suffix = bencode.bencode((file_size, index, block))
current = len(tree) - block_count + index
prefix = []
while current > 0:
sibling = tree.sibling(current)
prefix += [tree[current], tree[sibling]]
current = tree.parent(current)
prefix = ''.join(prefix)
# Encode the chunk
chunk = bencode.bencode((prefix, suffix))
# Check to make sure it decodes properly.
decode_chunk(chunk, file_size, tree)
return chunk
def decode_chunk(chunk, file_size, tree):
"""
Decode a chunk.
Given file with size C{file_size} and a L{HashTree} or L{MetaFile}
instance C{tree}, return C{(index, block, tree_items)}. Here
C{index} is the block index where string C{block} should be placed
in the file. Also C{tree_items} is a dict mapping indices within
the L{HashTree} or L{MetaFile} tree object associated with the
given file to the corresponding hashes at those indices. These
have been verified against the file's hash, so it is known that
they are correct.
Raises C{ValueError} if chunk verification fails.
"""
file_hash = tree[0]
block_count = (len(tree) + 1) // 2
try:
# Decode the chunk
try:
(prefix, suffix) = bencode.bdecode(chunk)
except:
raise AssertionError()
assert isinstance(prefix, str)
assert isinstance(suffix, str)
# Verify the suffix against the hashes in the prefix.
hash_len = len(hash(''))
L = [prefix[hash_len*i:hash_len*(i+1)] for i in range(len(prefix)//hash_len)]
L += [file_hash]
assert L[0] == hash(suffix)
branches = []
for i in range(0, len(L)-1, 2):
if hash(L[i] + L[i+1]) == L[i+2]:
branches += [0]
elif hash(L[i+1] + L[i]) == L[i+2]:
branches += [1]
else:
raise AssertionError()
# Decode the suffix
try:
(claim_file_size, claim_index, block) = bencode.bdecode(suffix)
except:
raise AssertionError()
assert isinstance(claim_file_size, int) or isinstance(claim_file_size, long)
assert isinstance(claim_index, int) or isinstance(claim_index, long)
assert isinstance(block, str)
assert file_size == claim_file_size
# Compute the index of the block, and check it.
found_index = sum([branches[i]*2**i for i in range(len(branches))])
assert found_index == claim_index
# Now fill in the tree_items dict.
tree_items = {}
current = (len(tree) - block_count) + found_index
i = 0
while current > 0 and i + 1 < len(L):
tree_items[current] = L[i]
# Next item is our sibling.
tree_items[tree.sibling(current)] = L[i+1]
i += 2
current = tree.parent(current)
return (found_index, block, tree_items)
except AssertionError:
raise ValueError('corrupt chunk')
class PartialChunkFile(BlockFile):
"""
Reads and writes chunks to a partially downloaded file.
@ivar file_name: Full path to file.
@ivar file_size: Size of file in bytes.
@ivar file_hash: Hash of file.
@ivar meta_name: Full path to metafile.
@ivar tree: L{MetaFile} instance for the file.
The hashes in this hash tree are valid only for
nodes that we have been sent hashes for.
"""
def __init__(self, file_name, meta_name, file_hash=None, file_size=None):
"""
Initialize reader/writer for the given file name and metafile name.
If neither C{file_name} nor C{meta_file} exist, then both are
created. The C{file_hash} and C{file_size} arguments are used to
initialize the two files.
If both C{file_name} and C{meta_file} exist, then the hash and
file size arguments are ignored, and those values are instead read
from the files.
If one file exists and the other does not, an C{IOError} is raised.
"""
self.meta_name = os.path.abspath(meta_name)
meta_exists = os.path.exists(self.meta_name)
file_exists = os.path.exists(os.path.abspath(file_name))
BlockFile.__init__(self, os.path.abspath(file_name), 'w',
BLOCK_SIZE, file_size)
if file_exists and not meta_exists:
raise IOError('metafile ' + repr(self.meta_name) +
' missing for file ' + repr(self.file_name))
if meta_exists and not file_exists:
raise IOError('file ' + repr(self.file_name) +
' missing for metafile ' + repr(self.meta_name))
tree_count = 2 * roundup_pow2(len(self)) - 1
self.tree = MetaFile(self.meta_name, 'w', [hash('')] * tree_count)
if not meta_exists and not file_exists:
self.tree[0] = file_hash
self.file_hash = self.tree[0]
def __getitem__(self, i):
"""
Get chunk C{i}.
Raises C{ValueError} if chunk has not yet been downloaded or is
corrupted.
"""
return encode_chunk(BlockFile.__getitem__(self, i), i,
self.file_size, self.tree)
def __setitem__(self, i, chunk):
"""
Set chunk C{i}.
Raises C{ValueError} if the chunk is invalid.
"""
(index, block, tree_items) = decode_chunk(chunk,
self.file_size, self.tree)
if index != i:
raise ValueError('incorrect index for chunk')
BlockFile.__setitem__(self, index, block)
for (tree_index, tree_value) in tree_items.items():
self.tree[tree_index] = tree_value
def test(filename1='temp-out', metaname1='temp-out.meta',
filename2='temp-out2', metaname2='temp-out2.meta'):
"""
Unit tests.
"""
print 'Testing:'
import random
ntests = 100
max_file_size = 200000
# Test CompleteChunkFile.
if os.path.exists(metaname1):
os.remove(metaname1)
for i in range(ntests):
fsize = random.randrange(max_file_size)
# Make some random string of size 'fsize' to go in the file.
s = ''.join([sha.new(str(j)).digest() for j in range(fsize//20+1)])
assert len(s) >= fsize
s = s[:fsize]
f = open(filename1, 'wb')
f.write(s)
f.close()
C = CompleteChunkFile(filename1)
for j in range(len(C)):
C[j]
C = CompleteChunkFile(filename1, metaname1)
for j in range(len(C)):
C[j]
C = CompleteChunkFile(filename1, metaname1)
for j in range(len(C)):
C[j]
os.remove(metaname1)
os.remove(filename1)
print ' CompleteChunkFile: OK'
# Test PartialChunkFile
for i in range(ntests):
fsize = random.randrange(max_file_size)
# Make some random string of size 'fsize' to go in the file.
s = ''.join([sha.new(str(j)).digest() for j in range(fsize//20+1)])
assert len(s) >= fsize
s = s[:fsize]
f = open(filename1, 'wb')
f.write(s)
f.close()
C1 = CompleteChunkFile(filename1)
if os.path.exists(filename2):
os.remove(filename2)
if os.path.exists(metaname2):
os.remove(metaname2)
C2 = PartialChunkFile(filename2, metaname2, C1.file_hash, C1.file_size)
assert len(C1) == len(C2)
assert C2.tree[0] == C1.tree[0]
for j in range(len(C2)):
try:
C2[j]
ok = False
except ValueError:
ok = True
if not ok:
raise AssertionError()
for j in range(len(C2)//2):
k = random.randrange(len(C2))
if len(C1) > 1:
assert C1[k] != C1[(k+1)%len(C1)]
try:
C2[k] = C1[(k+1)%len(C1)]
ok = False
except ValueError:
ok = True
if not ok:
raise AssertionError()
C2[k] = C1[k]
assert C2[k] == C1[k]
for j in range(len(C2)):
C2[j] = C1[j]
assert C2[j] == C1[j]
os.remove(filename1)
os.remove(filename2)
os.remove(metaname2)
print ' PartialChunkFile: OK'
if __name__ == '__main__':
test()

204
allmydata/encode_new.py Normal file
View File

@ -0,0 +1,204 @@
#! /usr/bin/python
import math
from twisted.internet import defer
from allmydata.chunk import HashTree
from Crypto.Cipher import AES
import sha
def hash(data):
return sha.new(data).digest()
"""
The goal of the encoder is to turn the original file into a series of
'shares'. Each share is going to a 'shareholder' (nominally each shareholder
is a different host, but for small meshes there may be overlap). The number
of shares is chosen to hit our reliability goals (more shares on more
machines means more reliability), and is limited by overhead (proportional to
numshares or log(numshares)) and the encoding technology in use (Reed-Solomon
only permits 256 shares total). It is also constrained by the amount of data
we want to send to each host. For estimating purposes, think of 100 shares
out of which we need 25 to reconstruct the file.
The encoder starts by cutting the original file into segments. All segments
except the last are of equal size. The segment size is chosen to constrain
the memory footprint (which will probably vary between 1x and 4x segment
size) and to constrain the overhead (which will be proportional to either the
number of segments or log(number of segments)).
Each segment (A,B,C) is read into memory, encrypted, and encoded into
subshares. The 'share' (say, share #1) that makes it out to a host is a
collection of these subshares (subshare A1, B1, C1), plus some hash-tree
information necessary to validate the data upon retrieval. Only one segment
is handled at a time: all subshares for segment A are delivered before any
work is begun on segment B.
As subshares are created, we retain the hash of each one. The list of
subshare hashes for a single share (say, hash(A1), hash(B1), hash(C1)) is
used to form the base of a Merkle hash tree for that share (hashtrees[1]).
This hash tree has one terminal leaf per subshare. The complete subshare hash
tree is sent to the shareholder after all the data has been sent. At
retrieval time, the decoder will ask for specific pieces of this tree before
asking for subshares, whichever it needs to validate those subshares.
[TODO: we don't really need to generate this whole subshare hash tree
ourselves. It would be sufficient to have the shareholder generate it and
just tell us the root. This gives us an extra level of validation on the
transfer, though, and it is relatively cheap to compute.]
Each of these subshare hash trees has a root hash. The collection of these
root hashes for all shares are collected into the 'share hash tree', which
has one terminal leaf per share. After sending the subshares and the complete
subshare hash tree to each shareholder, we send them the portion of the share
hash tree that is necessary to validate their share. The root of the share
hash tree is put into the URI.
"""
class Encoder(object):
def setup(self, infile):
self.infile = infile
infile.seek(0, 2)
self.file_size = infile.tell()
infile.seek(0, 0)
fsize = 1.0 * self.file_size
self.segment_size = 1024
self.num_segments = int(math.ceil(fsize / self.segment_size))
self.num_shares = 100
self.share_size = self.file_size / 25
def get_reservation_size(self):
self.num_shares = 100
self.share_size = self.file_size / 25
overhead = self.compute_overhead()
return self.share_size + overhead
def setup_encryption(self):
self.key = "\x00"*16
self.cryptor = AES.new(key=self.key, mode=AES.MODE_CTR,
counterstart="\x00"*16)
self.segment_num = 0
self.subshare_hashes = [[]] * self.num_shares
# subshare_hashes[i] is a list that will be accumulated and then send
# to landlord[i]. This list contains a hash of each segment_share
# that we sent to that landlord.
self.share_root_hashes = [None] * self.num_shares
def start(self):
self.setup_encryption()
d = defer.succeed(None)
for i in range(self.num_segments):
d.addCallback(lambda res: self.do_segment(i))
d.addCallback(lambda res: self.send_all_subshare_hash_trees())
d.addCallback(lambda res: self.send_all_share_hash_trees())
d.addCallback(lambda res: self.close_all_shareholders())
d.addCallback(lambda res: self.done())
return d
def encode_segment(self, crypttext):
shares = [crypttext] * self.num_shares
return shares
def do_segment(self, segnum):
segment_plaintext = self.infile.read(self.segment_size)
segment_crypttext = self.cryptor.encrypt(segment_plaintext)
del segment_plaintext
subshares_for_this_segment = self.encode_segment(segment_crypttext)
del segment_crypttext
dl = []
for share_num,subshare in enumerate(subshares_for_this_segment):
d = self.send_subshare(share_num, self.segment_num, subshare)
dl.append(d)
self.subshare_hashes[share_num].append(hash(subshare))
self.segment_num += 1
return defer.DeferredList(dl)
def send_subshare(self, share_num, segment_num, subshare):
#if False:
# offset = hash_size + segment_num * segment_size
# return self.send(share_num, "write", subshare, offset)
return self.send(share_num, "put_subshare", segment_num, subshare)
def send(self, share_num, methname, *args, **kwargs):
ll = self.landlords[share_num]
return ll.callRemote(methname, *args, **kwargs)
def send_all_subshare_hash_trees(self):
dl = []
for share_num,hashes in enumerate(self.subshare_hashes):
# hashes is a list of the hashes of all subshares that were sent
# to shareholder[share_num].
dl.append(self.send_one_subshare_hash_tree(share_num, hashes))
return defer.DeferredList(dl)
def send_one_subshare_hash_tree(self, share_num, subshare_hashes):
t = HashTree(subshare_hashes)
all_hashes = list(t)
# all_hashes[0] is the root hash, == hash(ah[1]+ah[2])
# all_hashes[1] is the left child, == hash(ah[3]+ah[4])
# all_hashes[n] == hash(all_hashes[2*n+1] + all_hashes[2*n+2])
self.share_root_hashes[share_num] = t[0]
ll = self.landlords[share_num]
if False:
block = "".join(all_hashes)
return ll.callRemote("write", block, offset=0)
return ll.callRemote("put_subshare_hashes", all_hashes)
def send_all_share_hash_trees(self):
dl = []
for h in self.share_root_hashes:
assert h
# create the share hash tree
t = HashTree(self.share_root_hashes)
# the root of this hash tree goes into our URI
self.root_hash = t[0]
# now send just the necessary pieces out to each shareholder
for i in range(self.num_shares):
needed_hash_indices = t.needed_for(i)
dl.append(self.send_one_share_hash_tree(i, needed_hash_indices))
return defer.DeferredList(dl)
def send_one_share_hash_tree(self, share_num, needed_hashes):
ll = self.landlords[share_num]
return ll.callRemote("put_share_hashes", needed_hashes)
def close_all_shareholders(self):
dl = []
for ll in self.landlords:
dl.append(ll.callRemote("close"))
return defer.DeferredList(dl)
def done(self):
return self.root_hash
from foolscap import RemoteInterface
from foolscap.schema import ListOf, TupleOf, Nothing
_None = Nothing()
class RIStorageBucketWriter(RemoteInterface):
def put_subshare(segment_number=int, subshare=str):
return _None
def put_segment_hashes(all_hashes=ListOf(str)):
return _None
def put_share_hashes(needed_hashes=ListOf(TupleOf(int,str))):
return _None
#def write(data=str, offset=int):
# return _None
class RIStorageBucketReader(RemoteInterface):
def get_share_hashes():
return ListOf(TupleOf(int,str))
def get_segment_hashes(which=ListOf(int)):
return ListOf(str)
def get_subshare(segment_number=int):
return str
#def read(size=int, offset=int):
# return str

View File

@ -0,0 +1,19 @@
#! /usr/bin/python
from twisted.trial import unittest
from twisted.internet import defer
from allmydata import encode_new
from cStringIO import StringIO
class MyEncoder(encode_new.Encoder):
def send(self, share_num, methname, *args, **kwargs):
return defer.succeed(None)
class Encode(unittest.TestCase):
def OFFtest_1(self):
e = MyEncoder()
data = StringIO("some data to encode\n")
e.setup(data)
d = e.start()
return d

433
allmydata/util/bencode.py Normal file
View File

@ -0,0 +1,433 @@
#!/usr/bin/env python
# -*- coding: MacRoman -*-
"""
A library for streaming and unstreaming of simple objects, designed
for speed, compactness, and ease of implementation.
The basic functions are bencode and bdecode. bencode takes an object
and returns a string, bdecode takes a string and returns an object.
bdecode raises a ValueError if you give it an invalid string.
The objects passed in may be nested dicts, lists, ints, floats, strings,
and Python boolean and None types. For example, all of the following
may be bencoded -
{'a': [0, 1], 'b': None}
[None, ['a', 2, ['c', None]]]
{'spam': (2,3,4)}
{'name': 'Cronus', 'spouse': 'Rhea', 'children': ['Hades', 'Poseidon']}
In general bdecode(bencode(spam)) == spam, but tuples and lists are
encoded the same, so bdecode(bencode((0, 1))) is [0, 1] rather
than (0, 1). Longs and ints are also encoded the same way, so
bdecode(bencode(4)) is a long.
Dict keys are required to be basestrings (byte strings or unicode objects),
to avoid a mess of potential implementation incompatibilities. bencode is
intended to be used for protocols which are going to be re-implemented many
times, so it's very conservative in that regard.
Which type is encoded is determined by the first character, 'i', 'n', 'f',
'd', 'l', 'b', 'u', and any digit. They indicate integer, null, float,
dict, list, boolean, unicode string, and string, respectively.
Strings are length-prefixed in base 10, followed by a colon.
bencode('spam') == '4:spam'
Unicode string objects are indicated with an initial u, a base 10
length-prefix, and the remaining bytes in utf-8 encoding.
bencode(u'\u00bfHabla espa\u00f1ol?') == 'ËHabla espaÐol?'
Nulls are indicated by a single 'n'.
bencode(None) == 'n'
Integers are encoded base 10 and terminated with an 'e' -
bencode(3) == 'i3e'
bencode(-20) == 'i-20e'
Floats are encoded in base 10 and terminated with an 'e' -
bencode(3.2) == 'f3.2e'
bencode(-23.4532) == 'f-23.4532e'
Lists are encoded in list order, terminated by an 'e' -
bencode(['abc', 'd']) == 'l3:abc1:de'
bencode([2, 'f']) == 'li2e1:fe'
Dicts are encoded by containing alternating keys and values.
The keys are encoded in sorted order, but sort order is not
enforced on the decode. Dicts are terminated by an 'e'. Dict
keys can be either bytestrings or unicode strings. For example -
bencode({'spam': 'eggs'}) == 'd4:spam4:eggse'
bencode({'ab': 2, 'a': None}) == 'd1:an2:abi2ee'
bencode({'a' : 1, u'\xab': 2}) == 'd1:ai1eu4:\xfe\xff\x00\xa8i2ee'
Truncated strings come first, so in sort order 'a' comes before 'abc'.
"""
# This file is licensed under the GNU Lesser General Public License v2.1.
#
# Originally written by Mojo Nation.
# Rewritten by Bram Cohen.
# Further enhanced by Allmydata to support additional Python types (Boolean
# None, Float, and Unicode strings.)
from types import IntType, LongType, FloatType, ListType, TupleType, DictType, StringType, UnicodeType, BooleanType, NoneType
from cStringIO import StringIO
import string
def bencode(data):
"""
encodes objects as strings, see module documentation for more info
"""
result = StringIO()
bwrite(data, result)
return result.getvalue()
def bwrite(data, result):
# a generic using pje's type dispatch will be faster here
try:
encoder = encoders[type(data)]
except KeyError:
encoder = None
# Catch subclasses of built-in types
for t,coder in encoders.items():
if isinstance(data, t):
encoder = coder
break
if not encoder:
raise ValueError("unsupported data type: %s" % type(data))
encoder(data, result)
encoders = {}
def encode_int(data, result):
result.write('i' + str(data) + 'e')
encoders[IntType] = encode_int
encoders[LongType] = encode_int
def encode_float(data, result):
result.write('f' + str(data) + 'e')
encoders[FloatType] = encode_float
def encode_bool(data, result):
if data:
result.write('b1')
else:
result.write('b0')
encoders[BooleanType] = encode_bool
def encode_list(data, result):
result.write('l')
_bwrite = bwrite
for item in data:
_bwrite(item, result)
result.write('e')
encoders[TupleType] = encode_list
encoders[ListType] = encode_list
encoders[set] = encode_list
def encode_string(data, result):
result.write(str(len(data)) + ':' + data)
encoders[StringType] = encode_string
def encode_unicode(data, result):
payload = data.encode('utf-8')
result.write('u' + str(len(payload)) + ':' + payload)
encoders[UnicodeType] = encode_unicode
def encode_dict(data, result):
result.write('d')
_bwrite = bwrite
keylist = data.keys()
keylist.sort()
for key in keylist:
_bwrite(key, result)
_bwrite(data[key], result)
result.write('e')
encoders[DictType] = encode_dict
encoders[NoneType] = lambda data, result: result.write('n')
def bdecode(s):
"""
Does the opposite of bencode. Raises a ValueError if there's a problem.
"""
try:
result, index = bread(s, 0)
if index != len(s):
raise ValueError('left over stuff at end: %s' % s[index:])
return result
except IndexError, e:
raise ValueError(str(e))
except KeyError, e:
raise ValueError(str(e))
def bread(s, index):
return decoders[s[index]](s, index)
decoders = {}
def decode_raw_string(s, index):
ci = s.index(":", index)
ei = ci + int(s[index:ci]) + 1
if ei > len(s):
raise ValueError('length encoding indicates premature end of string')
return (s[ci+1:ei], ei)
for c in string.digits:
decoders[c] = decode_raw_string
def decode_unicode_string(s, index):
ci = s.index(":", index)
ei = ci + int(s[index+1:ci]) + 1
if ei > len(s):
raise ValueError('length encoding indicates premature end of string')
return (unicode(s[ci+1:ei], 'utf-8'), ei)
decoders['u'] = decode_unicode_string
def decode_int(s, index):
ei = s.index('e', index)
return (long(s[index+1:ei]), ei+1)
decoders['i'] = decode_int
def decode_float(s, index):
ei = s.index('e', index)
return (float(s[index+1:ei]), ei+1)
decoders['f'] = decode_float
def decode_bool(s, index):
val = s[index+1]
if val == '1':
return True, index+2
elif val == '0':
return False, index+2
else:
raise ValueError('invalid boolean encoding: %s' % s[index:index+2])
decoders['b'] = decode_bool
# decoders['n'] = lambda s, index: decoders_n.inc('n') or (None, index + 1)
decoders['n'] = lambda s, index: (None, index + 1)
def decode_list(s, index):
# decoders_n.inc('l')
result = []
index += 1
_bread = bread
while s[index] != 'e':
next, index = _bread(s, index)
result.append(next)
return result, index + 1
decoders['l'] = decode_list
def decode_dict(s, index):
# decoders_n.inc('d')
result = {}
index += 1
_decode_string = decode_raw_string
_decode_unicode = decode_unicode_string
_bread = bread
while s[index] != 'e':
if s[index] in string.digits:
key, index = _decode_string(s, index)
elif s[index] == "u":
key, index = _decode_unicode(s, index)
else:
raise ValueError("dict key must be basestring")
if key in result:
raise ValueError("dict key was repeated")
value, index = _bread(s, index)
result[key] = value
return result, index + 1
decoders['d'] = decode_dict
def test_decode_raw_string():
assert decode_raw_string('1:a', 0) == ('a', 3)
assert decode_raw_string('0:', 0) == ('', 2)
assert decode_raw_string('10:aaaaaaaaaaaaaaaaaaaaaaaaa', 0) == ('aaaaaaaaaa', 13)
assert decode_raw_string('10:', 1) == ('', 3)
# non-reexp version does not check for this case
# try:
# decode_raw_string('01:a', 0)
# assert 0, 'failed'
# except ValueError:
# pass
try:
decode_raw_string('--1:a', 0)
assert 0, 'failed'
except ValueError:
pass
try:
decode_raw_string('h', 0)
assert 0, 'failed'
except ValueError:
pass
try:
decode_raw_string('h:', 0)
assert 0, 'failed'
except ValueError:
pass
try:
decode_raw_string('1', 0)
assert 0, 'failed'
except ValueError:
pass
try:
decode_raw_string('', 0)
assert 0, 'failed'
except ValueError:
pass
try:
decode_raw_string('5:a', 0)
assert 0, 'failed'
except ValueError:
pass
def test_encode_and_decode_unicode_results_in_unicode_type():
assert bdecode(bencode(u'\u00bfHabla espa\u00f1ol?')) == u'\u00bfHabla espa\u00f1ol?'
def test_encode_and_decode_unicode_at_least_preserves_the_content_even_if_it_flattens_the_type():
test_string = bdecode(bencode(u'\u00bfHabla espa\u00f1ol?'))
if isinstance(test_string, unicode):
assert test_string == u'\u00bfHabla espa\u00f1ol?'
elif isinstance(test_string, str):
assert test_string.decode('utf-8') == u'\u00bfHabla espa\u00f1ol?'
else:
assert 0, 'flunked'
def test_dict_forbids_non_string_key():
try:
bdecode('di3ene')
assert 0, 'failed'
except ValueError:
pass
def test_dict_forbids_key_repeat():
try:
bdecode('d1:an1:ane')
assert 0, 'failed'
except ValueError:
pass
def test_empty_dict():
assert bdecode('de') == {}
def test_dict_allows_unicode_keys():
assert bdecode(bencode({'a': 1, u'\xa8': 2})) == {'a': 1L, u'\xa8': 2L}
def test_ValueError_in_decode_unknown():
try:
bdecode('x')
assert 0, 'flunked'
except ValueError:
pass
def test_encode_and_decode_none():
assert bdecode(bencode(None)) == None
def test_encode_and_decode_long():
assert bdecode(bencode(-23452422452342L)) == -23452422452342L
def test_encode_and_decode_int():
assert bdecode(bencode(2)) == 2
def test_encode_and_decode_float():
assert bdecode(bencode(3.4)) == 3.4
assert bdecode(bencode(0.0)) == 0.0
assert bdecode(bencode(-4.56)) == -4.56
assert bdecode(bencode(-0.0)) == -0.0
def test_encode_and_decode_bool():
assert bdecode(bencode(True)) == True
assert bdecode(bencode(False)) == False
# the non-regexp methods no longer check for canonical ints, but we
# don't parse input we did not generate using bencode, so I will leave
# these commented out for now
#def test_decode_noncanonical_int():
# try:
# bdecode('i03e')
# assert 0
# except ValueError:
# pass
# try:
# bdecode('i3 e')
# assert 0
# except ValueError:
# pass
# try:
# bdecode('i 3e')
# assert 0
# except ValueError:
# pass
# try:
# bdecode('i-0e')
# assert 0
# except ValueError:
# pass
def test_encode_and_decode_dict():
x = {'42': 3}
assert bdecode(bencode(x)) == x
def test_encode_and_decode_list():
assert bdecode(bencode([])) == []
def test_encode_and_decode_tuple():
assert bdecode(bencode(())) == []
def test_encode_and_decode_empty_dict():
assert bdecode(bencode({})) == {}
def test_encode_and_decode_complex_object():
spam = [[], 0, -3, -345234523543245234523L, {}, 'spam', None, {'a': [3]}, {}, {'a': 1L, u'\xa8': 2L}]
assert bencode(bdecode(bencode(spam))) == bencode(spam)
assert bdecode(bencode(spam)) == spam
def test_unfinished_list():
try:
bdecode('ln')
assert 0
except ValueError:
pass
def test_unfinished_dict():
try:
bdecode('d')
assert 0
except ValueError:
pass
try:
bdecode('d1:a')
assert 0
except ValueError:
pass
def test_unsupported_type():
try:
bencode(lambda: None)
assert 0
except ValueError:
pass