establish IEncoder/IDecoder, create suitable interfaces for both the simple replicating encoder and the py_ecc one, add a (failing) unit test for it

This commit is contained in:
Brian Warner 2007-01-04 21:52:51 -07:00
parent 6d1ba16ef3
commit f31fc06d89
3 changed files with 304 additions and 5 deletions

View File

@ -1,10 +1,49 @@
# -*- test-case-name: allmydata.test.test_encode_share -*-
from zope.interface import implements
from twisted.internet import defer
import sha
from allmydata.util import idlib
from allmydata.util import idlib, mathutil
from allmydata.interfaces import IEncoder, IDecoder
from allmydata.py_ecc import rs_code
def netstring(s):
return "%d:%s," % (len(s), s)
class ReplicatingEncoder(object):
implements(IEncoder)
ENCODER_TYPE = 0
def set_params(self, data_size, required_shares, total_shares):
self.data_size = data_size
self.required_shares = required_shares
self.total_shares = total_shares
def get_encoder_type(self):
return self.ENCODER_TYPE
def get_serialized_params(self):
return "%d" % self.required_shares
def get_share_size(self):
return self.data_size
def encode(self, data):
shares = [(i,data) for i in range(self.total_shares)]
return defer.succeed(shares)
class ReplicatingDecoder(object):
implements(IDecoder)
def set_serialized_params(self, params):
self.required_shares = int(params)
def decode(self, some_shares):
assert len(some_shares) >= self.required_shares
data = some_shares[0][1]
return defer.succeed(data)
class Encoder(object):
def __init__(self, infile, m):
self.infile = infile
@ -48,3 +87,99 @@ class Decoder(object):
if self._verifierid:
assert self._verifierid == vid, "%s != %s" % (idlib.b2a(self._verifierid), idlib.b2a(vid))
class PyRSEncoder(object):
ENCODER_TYPE = 1
# we will break the data into vectors in which each element is a single
# byte (i.e. a single number from 0 to 255), and the length of the vector
# is equal to the number of required_shares. We use padding to make the
# last chunk of data long enough to match, and we record the data_size in
# the serialized parameters to strip this padding out on the receiving
# end.
def set_params(self, data_size, required_shares, total_shares):
self.data_size = data_size
self.required_shares = required_shares
self.total_shares = total_shares
self.chunk_size = required_shares
self.num_chunks = mathutil.div_ceil(data_size, self.chunk_size)
self.last_chunk_padding = mathutil.pad_size(data_size, required_shares)
self.share_size = self.num_chunks
self.encoder = rs_code.RSCode(total_shares, required_shares)
def get_encoder_type(self):
return self.ENCODER_TYPE
def get_serialized_params(self):
return "%d:%d:%d" % (self.data_size, self.required_shares,
self.total_shares)
def get_share_size(self):
return self.share_size
def encode(self, data):
share_data = [ [] for i in range(self.total_shares)]
for i in range(self.num_chunks):
offset = i*self.chunk_size
chunk = data[offset:offset+self.chunk_size]
if i == self.num_chunks-1:
chunk = chunk + "\x00"*self.last_chunk_padding
assert len(chunk) == self.chunk_size
input_vector = [ord(x) for x in chunk]
output_vector = self.encoder.Encode(input_vector)
assert len(output_vector) == self.total_shares
for i2,out in enumerate(output_vector):
out_chars = [chr(x) for x in out]
out_string = "".join(out_chars)
share_data[i2].append(out_string)
shares = [ (i, "".join(share_data[i]))
for i in range(self.total_shares) ]
return defer.succeed(shares)
class PyRSDecoder(object):
def set_serialized_params(self, params):
pieces = params.split(":")
self.data_size = int(pieces[0])
self.required_shares = int(pieces[1])
self.total_shares = int(pieces[2])
self.chunk_size = self.required_shares
self.num_chunks = mathutil.div_ceil(self.data_size, self.chunk_size)
self.last_chunk_padding = mathutil.pad_size(self.data_size,
self.required_shares)
self.share_size = self.num_chunks
self.encoder = rs_code.RSCode(self.total_shares, self.required_shares)
def decode(self, some_shares):
chunk_size = self.chunk_size
assert len(some_shares) >= self.required_shares
chunks = [ [] for i in range(self.num_chunks) ]
have_shares = {}
for share_num, share_data in some_shares:
have_shares[share_num] = share_data
for i in range(self.num_chunks):
offset = i*chunk_size
received_vector = []
for j in range(self.total_shares):
share = have_shares.get(j)
if share is not None:
v1 = [ord(x) for x in share[offset:offset+chunk_size]]
received_vector.append(v1)
else:
received_vector.append(None)
decoded_vector = self.encoder.DecodeImmediate(received_vector)
if i == self.num_chunks-1:
decoded_vector = decoded_vector[:-self.last_chunk_padding]
chunk = "".join([chr(x) for x in decoded_vector])
chunks.append(chunk)
data = "".join(chunks)
return defer.succeed(data)
all_encoders = {
ReplicatingEncoder.ENCODER_TYPE: (ReplicatingEncoder, ReplicatingDecoder),
PyRSEncoder.ENCODER_TYPE: (PyRSEncoder, PyRSDecoder),
}

View File

@ -1,4 +1,5 @@
from zope.interface import Interface
from foolscap.schema import StringConstraint, ListOf, TupleOf, Any, Nothing
from foolscap import RemoteInterface
@ -69,7 +70,78 @@ class RIMutableDirectoryNode(RemoteInterface):
# need more to move directories
# TODO: figleaf gets confused when the last line of a file is a comment. I
# suspect an off-by-one error in the code that decides which lines are code
# and which are not.
pass
class IEncoder(Interface):
def set_params(data_size, required_shares, total_shares):
"""Set up the parameters of this encoder.
See encode() for a description of how these parameters are used.
"""
def get_encoder_type():
"""Return an integer that describes the type of this encoder.
There must be a global table of encoder classes. This method returns
an index into this table; the value at this index is an encoder
class, and this encoder is an instance of that class.
"""
def get_serialized_params(): # TODO: maybe, maybe not
"""Return a string that describes the parameters of this encoder.
This string can be passed to the decoder to prepare it for handling
the encoded shares we create. It might contain more information than
was presented to set_params(), if there is some flexibility of
parameter choice.
This string is intended to be embedded in the URI, so there are
several restrictions on its contents. At the moment I'm thinking that
this means it may contain hex digits and colons, and nothing else.
The idea is that the URI contains '%d:%s.' %
(encoder.get_encoder_type(), encoder.get_serialized_params()), and
this is enough information to construct a compatible decoder.
"""
def get_share_size():
"""Return the length of the shares that encode() will produce.
"""
def encode(data):
"""Encode a chunk of data. This may be called multiple times. Each
call is independent.
The data must be a string with a length that exactly matches the
data_size promised by set_params().
For each call, encode() will return a Deferred that fires with a list
of 'total_shares' tuples. Each tuple is of the form (sharenum,
share), where sharenum is an int (from 0 total_shares-1), and share
is a string. The get_share_size() method can be used to determine the
length of the 'share' strings returned by encode().
The memory usage of this function is expected to be on the order of
total_shares * get_share_size().
"""
class IDecoder(Interface):
def set_serialized_params(params):
"""Set up the parameters of this encoder, from a string returned by
encoder.get_serialized_params()."""
def decode(some_shares):
"""Decode a partial list of shares into data.
'some_shares' must be a list of (sharenum, share) tuples, a subset of
the shares returned by IEncoder.encode(). Each share must be of the
same length. The share tuples may appear in any order, but of course
each tuple must have a sharenum that correctly matches the associated
share data string.
This returns a Deferred which fires with a string. This string will
always have a length equal to the 'data_size' value passed into the
original IEncoder.set_params() call.
The length of 'some_shares' must be equal or greater than the value
of 'required_shares' passed into the original IEncoder.set_params()
call.
"""

View File

@ -0,0 +1,92 @@
import os
from twisted.trial import unittest
from twisted.internet import defer
from allmydata.encode import PyRSEncoder, PyRSDecoder, ReplicatingEncoder, ReplicatingDecoder
import random
class Tester:
#enc_class = PyRSEncoder
#dec_class = PyRSDecoder
def do_test(self, size, required_shares, total_shares):
data0 = os.urandom(size)
enc = self.enc_class()
enc.set_params(size, required_shares, total_shares)
serialized_params = enc.get_serialized_params()
d = enc.encode(data0)
def _done(shares):
self.failUnlessEqual(len(shares), total_shares)
self.shares = shares
d.addCallback(_done)
def _decode(shares):
dec = self.dec_class()
dec.set_serialized_params(serialized_params)
d1 = dec.decode(shares)
return d1
def _check_data(data1):
self.failUnlessEqual(len(data1), len(data0))
self.failUnless(data1 == data0)
def _decode_all_ordered(res):
# can we decode using all of the shares?
return _decode(self.shares)
d.addCallback(_decode_all_ordered)
d.addCallback(_check_data)
def _decode_all_shuffled(res):
# can we decode, using all the shares, but in random order?
shuffled_shares = self.shares[:]
random.shuffle(shuffled_shares)
return _decode(shuffled_shares)
d.addCallback(_decode_all_shuffled)
d.addCallback(_check_data)
def _decode_some(res):
# decode with a minimal subset of the shares
some_shares = self.shares[:required_shares]
return _decode(some_shares)
d.addCallback(_decode_some)
d.addCallback(_check_data)
def _decode_some_random(res):
# use a randomly-selected minimal subset
some_shares = random.sample(self.shares, required_shares)
return _decode(some_shares)
d.addCallback(_decode_some_random)
d.addCallback(_check_data)
def _decode_multiple(res):
# make sure we can re-use the decoder object
shares1 = random.sample(self.shares, required_shares)
shares2 = random.sample(self.shares, required_shares)
dec = self.dec_class()
dec.set_serialized_params(serialized_params)
d1 = dec.decode(shares1)
d1.addCallback(_check_data)
d1.addCallback(lambda res: dec.decode(shares2))
d1.addCallback(_check_data)
return d1
d.addCallback(_decode_multiple)
return d
def test_encode(self):
return self.do_test(1000, 25, 100)
def test_sizes(self):
d = defer.succeed(None)
for i in range(1, 100):
d.addCallback(lambda res,size: self.do_test(size, 4, 10), i)
return d
class PyRS(unittest.TestCase, Tester):
enc_class = PyRSEncoder
dec_class = PyRSDecoder
class Replicating(unittest.TestCase, Tester):
enc_class = ReplicatingEncoder
dec_class = ReplicatingDecoder