switch upload to use encode_new, fix a few things (but not nearly all of them)

This commit is contained in:
Brian Warner 2007-03-30 11:53:03 -07:00
parent 7d7ce7a059
commit 9a2e0cf28e
5 changed files with 45 additions and 40 deletions

View File

@ -39,9 +39,6 @@ class ReplicatingEncoder(object):
def get_serialized_params(self):
return "%d" % self.required_shares
def get_share_size(self):
return self.data_size
def get_block_size(self):
return self.data_size
@ -97,9 +94,6 @@ class CRSEncoder(object):
return "%d-%d-%d" % (self.data_size, self.required_shares,
self.max_shares)
def get_share_size(self):
return self.share_size
def get_block_size(self):
return self.share_size

View File

@ -75,6 +75,8 @@ PiB=1024*TiB
class Encoder(object):
implements(IEncoder)
NEEDED_SHARES = 25
TOTAL_SHARES = 100
def setup(self, infile):
self.infile = infile
@ -82,28 +84,37 @@ class Encoder(object):
self.file_size = infile.tell()
infile.seek(0, 0)
self.num_shares = 100
self.required_shares = 25
self.num_shares = self.TOTAL_SHARES
self.required_shares = self.NEEDED_SHARES
self.segment_size = min(2*MiB, self.file_size)
self.setup_codec()
def get_reservation_size(self):
def setup_codec(self):
self._codec = CRSEncoder()
self._codec.set_params(self.segment_size, self.required_shares,
self.num_shares)
def get_share_size(self):
share_size = mathutil.div_ceil(self.file_size, self.required_shares)
overhead = self.compute_overhead()
return share_size + overhead
def compute_overhead(self):
return 0
def get_block_size(self):
return self._codec.get_block_size()
def set_shareholders(self, landlords):
self.landlords = landlords.copy()
def start(self):
#paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
self.num_segments = mathutil.div_ceil(self.file_size,
self.segment_size)
self.share_size = mathutil.div_ceil(self.file_size,
self.required_shares)
self.setup_encryption()
self.setup_encoder()
self.setup_codec()
d = defer.succeed(None)
for i in range(self.num_segments):
d.addCallback(lambda res: self.do_segment(i))
@ -124,11 +135,6 @@ class Encoder(object):
# that we sent to that landlord.
self.share_root_hashes = [None] * self.num_shares
def setup_encoder(self):
self.encoder = CRSEncoder()
self.encoder.set_params(self.segment_size, self.required_shares,
self.num_shares)
def do_segment(self, segnum):
chunks = []
# the ICodecEncoder API wants to receive a total of self.segment_size
@ -137,7 +143,7 @@ class Encoder(object):
# these pieces need to be the same size as the share which the codec
# will generate. Therefore we must feed it with input_piece_size that
# equals the output share size.
input_piece_size = self.encoder.get_share_size()
input_piece_size = self._codec.get_block_size()
# as a result, the number of input pieces per encode() call will be
# equal to the number of required shares with which the codec was
@ -154,7 +160,7 @@ class Encoder(object):
input_piece += ('\x00' * (input_piece_size - len(input_piece)))
encrypted_piece = self.cryptor.encrypt(input_piece)
chunks.append(encrypted_piece)
d = self.encoder.encode(chunks)
d = self._codec.encode(chunks)
d.addCallback(self._encoded_segment)
return d

View File

@ -165,10 +165,6 @@ class ICodecEncoder(Interface):
"""Return the length of the shares that encode() will produce.
"""
def get_share_size():
"""Return the length of the shares that encode() will produce.
"""
def encode_proposal(data, desired_share_ids=None):
"""Encode some data.
@ -332,11 +328,25 @@ class IEncoder(Interface):
before calling get_reservation_size().
"""
def get_reservation_size():
def get_share_size():
"""I return the size of the data that will be stored on each
shareholder. It is useful to determine this size before asking
potential shareholders whether they will grant a lease or not, since
their answers will depend upon how much space we need.
shareholder. This is aggregate amount of data that will be sent to
the shareholder, summed over all the put_block() calls I will ever
make.
TODO: this might also include some amount of overhead, like the size
of all the hashes. We need to decide whether this is useful or not.
It is useful to determine this size before asking potential
shareholders whether they will grant a lease or not, since their
answers will depend upon how much space we need.
"""
def get_block_size(): # TODO: can we avoid exposing this?
"""I return the size of the individual blocks that will be delivered
to a shareholder's put_block() method. By knowing this, the
shareholder will be able to keep all blocks in a single file and
still provide random access when reading them.
"""
def set_shareholders(shareholders):

View File

@ -62,6 +62,7 @@ class UpDown(unittest.TestCase):
NUM_SHARES = 100
assert e.num_shares == NUM_SHARES # else we'll be completely confused
e.segment_size = 25 # force use of multiple segments
e.setup_codec() # need to rebuild the codec for that change
NUM_SEGMENTS = 4
assert (NUM_SEGMENTS-1)*e.segment_size < len(data) <= NUM_SEGMENTS*e.segment_size
shareholders = {}

View File

@ -5,7 +5,7 @@ from twisted.application import service
from foolscap import Referenceable
from allmydata.util import idlib, mathutil
from allmydata import codec
from allmydata import encode_new
from allmydata.uri import pack_uri
from allmydata.interfaces import IUploadable, IUploader
@ -45,8 +45,6 @@ class PeerTracker:
class FileUploader:
debug = False
ENCODERCLASS = codec.CRSEncoder
def __init__(self, client):
self._client = client
@ -83,20 +81,16 @@ class FileUploader:
assert self.needed_shares
# create the encoder, so we can know how large the shares will be
self._encoder = self.ENCODERCLASS()
self._last_seg_encoder = self.ENCODERCLASS() # This one is for encoding the final segment, which might be shorter than the others.
self._codec_name = self._encoder.get_encoder_type()
self._encoder.set_params(self.segment_size, self.needed_shares, self.total_shares)
xyz
self._encoder = encode_new.Encoder()
self._encoder.setup(infile)
share_size = self._encoder.get_share_size()
block_size = self._encoder.get_block_size()
paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
self._block_size = self._encoder.get_block_size()
# first step: who should we upload to?
# we are responsible for locating the shareholders. self._encoder is
# responsible for handling the data and sending out the shares.
peers = self._client.get_permuted_peers(self._verifierid)
assert peers
trackers = [ (permutedid, PeerTracker(peerid, conn, self._share_size, self._block_size, self._verifierid),)
trackers = [ (permutedid, PeerTracker(peerid, conn, share_size, block_size, self._verifierid),)
for permutedid, peerid, conn in peers ]
ring_things = [] # a list of (position_in_ring, whatami, x) where whatami is 0 if x is a sharenum or else 1 if x is a PeerTracker instance
ring_things.extend([ (permutedpeerid, 1, peer,) for permutedpeerid, peer in trackers ])
@ -196,7 +190,7 @@ xyz
def _compute_uri(self, roothash):
params = self._encoder.get_serialized_params()
return pack_uri(self._codec_name, params, self._verifierid, roothash, self.needed_shares, self.total_shares, self._size, self._encoder.segment_size)
return pack_uri(self._encoder.get_encoder_type(), params, self._verifierid, roothash, self.needed_shares, self.total_shares, self._size, self._encoder.segment_size)
def netstring(s):