rearrange encode/upload, add URIs, switch to ReplicatingEncoder

Added metadata to the bucket store, which is used to hold the share number
(but the bucket doesn't know that, it just gets a string).

Modified the codec interfaces a bit.

Try to pass around URIs to/from download/upload instead of verifierids.
URI format is still in flux.

Change the current (primitive) file encoder to use a ReplicatingEncoder
because it provides ICodecEncoder. We will be moving to the (less primitive)
file encoder (currently in allmydata.encode_new) eventually, but for now
this change lets us test out PyRS or zooko's upcoming C-based RS codec in
something larger than a single unit test. This primitive file encoder only
uses a single segment, and has no merkle trees.

Also added allmydata.util.deferredutil for a DeferredList wrapper that
errbacks (but only when all component Deferreds have fired) if there were
any errors, which unfortunately is not a behavior available from the standard
DeferredList.
This commit is contained in:
Brian Warner 2007-01-15 21:22:22 -07:00
parent 78a9e815c5
commit 3209fd5e09
15 changed files with 392 additions and 103 deletions

View File

@ -2,16 +2,13 @@ import os
from foolscap import Referenceable
from twisted.application import service
from twisted.python.failure import Failure
from twisted.python import log
from allmydata.util import idlib
from zope.interface import implements
from allmydata.interfaces import RIBucketWriter, RIBucketReader
from allmydata.util.assertutil import precondition, _assert
class NoSuchBucketError(Failure):
pass
class BucketStore(service.MultiService, Referenceable):
def __init__(self, store_dir):
precondition(os.path.isdir(store_dir))
@ -94,6 +91,8 @@ class WriteBucket(Bucket):
def __init__(self, bucket_dir, verifierid, bucket_num, size):
Bucket.__init__(self, bucket_dir, verifierid)
precondition(not os.path.exists(bucket_dir))
#log.msg("WriteBucket [%s]: creating bucket %s"
# % (idlib.b2a(verifierid), bucket_dir))
os.mkdir(bucket_dir)
self._open = True
@ -119,6 +118,8 @@ class WriteBucket(Bucket):
def close(self):
precondition(self._bytes_written == self._size)
#log.msg("WriteBucket.close [%s] (%s)"
# % (idlib.b2a(self._verifierid), self._bucket_dir))
self._data.close()
self._write_attr('closed', '')
self._open = False

View File

@ -140,6 +140,7 @@ class Client(node.Node, Referenceable):
# sort of at most max_count elements
results = []
for nodeid in self.all_peers:
assert isinstance(nodeid, str)
permuted = sha.new(key + nodeid).digest()
results.append((permuted, nodeid))
results.sort()

View File

@ -14,10 +14,10 @@ class ReplicatingEncoder(object):
implements(ICodecEncoder)
ENCODER_TYPE = 0
def set_params(self, data_size, required_shares, total_shares):
def set_params(self, data_size, required_shares, max_shares):
self.data_size = data_size
self.required_shares = required_shares
self.total_shares = total_shares
self.max_shares = max_shares
def get_encoder_type(self):
return self.ENCODER_TYPE
@ -28,8 +28,11 @@ class ReplicatingEncoder(object):
def get_share_size(self):
return self.data_size
def encode(self, data):
shares = [(i,data) for i in range(self.total_shares)]
def encode(self, data, num_shares=None):
if num_shares is None:
num_shares = self.max_shares
assert num_shares <= self.max_shares
shares = [(i,data) for i in range(num_shares)]
return defer.succeed(shares)
class ReplicatingDecoder(object):
@ -38,6 +41,9 @@ class ReplicatingDecoder(object):
def set_serialized_params(self, params):
self.required_shares = int(params)
def get_required_shares(self):
return self.required_shares
def decode(self, some_shares):
assert len(some_shares) >= self.required_shares
data = some_shares[0][1]
@ -117,32 +123,38 @@ class PyRSEncoder(object):
# than 20 minutes to run the test_encode_share tests, so I disabled most
# of them. (uh, hello, it's running figleaf)
def set_params(self, data_size, required_shares, total_shares):
assert required_shares <= total_shares
def set_params(self, data_size, required_shares, max_shares):
assert required_shares <= max_shares
self.data_size = data_size
self.required_shares = required_shares
self.total_shares = total_shares
self.max_shares = max_shares
self.chunk_size = required_shares
self.num_chunks = mathutil.div_ceil(data_size, self.chunk_size)
self.last_chunk_padding = mathutil.pad_size(data_size, required_shares)
self.share_size = self.num_chunks
self.encoder = rs_code.RSCode(total_shares, required_shares, 8)
self.encoder = rs_code.RSCode(max_shares, required_shares, 8)
def get_encoder_type(self):
return self.ENCODER_TYPE
def get_serialized_params(self):
return "%d:%d:%d" % (self.data_size, self.required_shares,
self.total_shares)
self.max_shares)
def get_share_size(self):
return self.share_size
def encode(self, data):
share_data = [ [] for i in range(self.total_shares)]
def encode(self, data, num_shares=None):
if num_shares is None:
num_shares = self.max_shares
assert num_shares <= self.max_shares
# we create self.max_shares shares, then throw out any extra ones
# so that we always return exactly num_shares shares.
share_data = [ [] for i in range(self.max_shares)]
for i in range(self.num_chunks):
# we take self.chunk_size bytes from the input string, and
# turn it into self.total_shares bytes.
# turn it into self.max_shares bytes.
offset = i*self.chunk_size
# Note string slices aren't an efficient way to use memory, so
# when we upgrade from the unusably slow py_ecc prototype to a
@ -155,12 +167,12 @@ class PyRSEncoder(object):
input_vector = [ord(x) for x in chunk]
assert len(input_vector) == self.required_shares
output_vector = self.encoder.Encode(input_vector)
assert len(output_vector) == self.total_shares
assert len(output_vector) == self.max_shares
for i2,out in enumerate(output_vector):
share_data[i2].append(chr(out))
shares = [ (i, "".join(share_data[i]))
for i in range(self.total_shares) ]
for i in range(num_shares) ]
return defer.succeed(shares)
class PyRSDecoder(object):
@ -170,23 +182,26 @@ class PyRSDecoder(object):
pieces = params.split(":")
self.data_size = int(pieces[0])
self.required_shares = int(pieces[1])
self.total_shares = int(pieces[2])
self.max_shares = int(pieces[2])
self.chunk_size = self.required_shares
self.num_chunks = mathutil.div_ceil(self.data_size, self.chunk_size)
self.last_chunk_padding = mathutil.pad_size(self.data_size,
self.required_shares)
self.share_size = self.num_chunks
self.encoder = rs_code.RSCode(self.total_shares, self.required_shares,
self.encoder = rs_code.RSCode(self.max_shares, self.required_shares,
8)
if False:
print "chunk_size: %d" % self.chunk_size
print "num_chunks: %d" % self.num_chunks
print "last_chunk_padding: %d" % self.last_chunk_padding
print "share_size: %d" % self.share_size
print "total_shares: %d" % self.total_shares
print "max_shares: %d" % self.max_shares
print "required_shares: %d" % self.required_shares
def get_required_shares(self):
return self.required_shares
def decode(self, some_shares):
chunk_size = self.chunk_size
assert len(some_shares) >= self.required_shares
@ -198,7 +213,7 @@ class PyRSDecoder(object):
# this takes one byte from each share, and turns the combination
# into a single chunk
received_vector = []
for j in range(self.total_shares):
for j in range(self.max_shares):
share = have_shares.get(j)
if share is not None:
received_vector.append(ord(share[i]))

View File

@ -1,11 +1,12 @@
import os
import os, sha
from zope.interface import Interface, implements
from twisted.python import failure, log
from twisted.internet import defer
from twisted.application import service
from allmydata.util import idlib
from allmydata.util import idlib, bencode
from allmydata.util.deferredutil import DeferredListShouldSucceed
from allmydata import codec
class NotEnoughPeersError(Exception):
@ -15,13 +16,21 @@ class HaveAllPeersError(Exception):
# we use this to jump out of the loop
pass
def unpack_uri(uri):
assert uri.startswith("URI:")
return bencode.bdecode(uri[4:])
class FileDownloader:
debug = False
def __init__(self, peer, verifierid):
def __init__(self, peer, verifierid, encoding_params):
self._peer = peer
assert isinstance(verifierid, str)
assert len(verifierid) == 20
self._verifierid = verifierid
self._decoder = codec.ReplicatingDecoder()
self._decoder.set_serialized_params(encoding_params)
self.needed_shares = self._decoder.get_required_shares()
def set_download_target(self, target):
self._target = target
@ -30,15 +39,8 @@ class FileDownloader:
def _cancel(self):
pass
def make_decoder(self):
n = self._shares = 4
k = self._desired_shares = 2
self._target.open()
self._decoder = codec.Decoder(self._target, k, n,
self._verifierid)
def start(self):
log.msg("starting download")
log.msg("starting download [%s]" % (idlib.b2a(self._verifierid),))
if self.debug:
print "starting download"
# first step: who should we download from?
@ -75,7 +77,7 @@ class FileDownloader:
bucket_nums)
self.landlords.append( (peerid, buckets) )
if len(self.landlords) >= self._desired_shares:
if len(self.landlords) >= self.needed_shares:
if self.debug: print " we're done!"
raise HaveAllPeersError
# otherwise we fall through to search more peers
@ -107,7 +109,34 @@ class FileDownloader:
all_buckets = []
for peerid, buckets in self.landlords:
all_buckets.extend(buckets)
d = self._decoder.start(all_buckets)
# TODO: try to avoid pulling multiple shares from the same peer
all_buckets = all_buckets[:self.needed_shares]
# retrieve all shares
dl = []
shares = []
for (bucket_num, bucket) in all_buckets:
d0 = bucket.callRemote("get_metadata")
d1 = bucket.callRemote("read")
d2 = DeferredListShouldSucceed([d0, d1])
def _got(res):
sharenum_s, sharedata = res
sharenum = bencode.bdecode(sharenum_s)
shares.append((sharenum, sharedata))
d2.addCallback(_got)
dl.append(d2)
d = DeferredListShouldSucceed(dl)
d.addCallback(lambda res: self._decoder.decode(shares))
def _write(data):
self._target.open()
hasher = sha.new(netstring("allmydata_v1_verifierid"))
hasher.update(data)
vid = hasher.digest()
assert self._verifierid == vid, "%s != %s" % (idlib.b2a(self._verifierid), idlib.b2a(vid))
self._target.write(data)
d.addCallback(_write)
def _done(res):
self._target.close()
return self._target.finish()
@ -204,26 +233,29 @@ class Downloader(service.MultiService):
"""
implements(IDownloader)
name = "downloader"
debug = False
def download(self, verifierid, t):
def download(self, uri, t):
(verifierid, params) = unpack_uri(uri)
assert self.parent
assert self.running
assert isinstance(verifierid, str)
t = IDownloadTarget(t)
assert t.write
assert t.close
dl = FileDownloader(self.parent, verifierid)
dl = FileDownloader(self.parent, verifierid, params)
dl.set_download_target(t)
dl.make_decoder()
if self.debug:
dl.debug = True
d = dl.start()
return d
# utility functions
def download_to_data(self, verifierid):
return self.download(verifierid, Data())
def download_to_filename(self, verifierid, filename):
return self.download(verifierid, FileName(filename))
def download_to_filehandle(self, verifierid, filehandle):
return self.download(verifierid, FileHandle(filehandle))
def download_to_data(self, uri):
return self.download(uri, Data())
def download_to_filename(self, uri, filename):
return self.download(uri, FileName(filename))
def download_to_filehandle(self, uri, filehandle):
return self.download(uri, FileHandle(filehandle))

View File

@ -129,6 +129,7 @@ class Encoder(object):
segment_plaintext = self.infile.read(self.segment_size)
segment_crypttext = self.cryptor.encrypt(segment_plaintext)
del segment_plaintext
assert self.encoder.max_shares == self.num_shares
d = self.encoder.encode(segment_crypttext)
d.addCallback(self._encoded_segment)
return d

View File

@ -74,10 +74,10 @@ class MutableDirectoryNode(Referenceable):
return self.make_subnode(absname)
remote_add_directory = add_directory
def add_file(self, name, data):
def add_file(self, name, uri):
self.validate_name(name)
f = open(os.path.join(self._basedir, name), "wb")
f.write(data)
f.write(uri)
f.close()
remote_add_file = add_file

View File

@ -6,6 +6,7 @@ from foolscap import RemoteInterface
Nodeid = StringConstraint(20) # binary format 20-byte SHA1 hash
PBURL = StringConstraint(150)
Verifierid = StringConstraint(20)
URI = StringConstraint(100) # kind of arbitrary
ShareData = StringConstraint(100000)
# these four are here because Foolscap does not yet support the kind of
# restriction I really want to apply to these.
@ -65,7 +66,7 @@ class RIMutableDirectoryNode(RemoteInterface):
def add_directory(name=str):
return RIMutableDirectoryNode_
def add_file(name=str, data=Verifierid):
def add_file(name=str, uri=URI):
return None
def remove(name=str):
@ -75,7 +76,7 @@ class RIMutableDirectoryNode(RemoteInterface):
class ICodecEncoder(Interface):
def set_params(data_size, required_shares, total_shares):
def set_params(data_size, required_shares, max_shares):
"""Set up the parameters of this encoder.
See encode() for a description of how these parameters are used.
@ -109,28 +110,58 @@ class ICodecEncoder(Interface):
"""Return the length of the shares that encode() will produce.
"""
def encode(data):
def encode(data, num_shares=None):
"""Encode a chunk of data. This may be called multiple times. Each
call is independent.
The data must be a string with a length that exactly matches the
data_size promised by set_params().
'num_shares', if provided, must be equal or less than the
'max_shares' set in set_params. If 'num_shares' is left at None, this
method will produce 'max_shares' shares. This can be used to minimize
the work that the encoder needs to do if we initially thought that we
would need, say, 100 shares, but now that it is time to actually
encode the data we only have 75 peers to send data to.
For each call, encode() will return a Deferred that fires with a list
of 'total_shares' tuples. Each tuple is of the form (sharenum,
share), where sharenum is an int (from 0 total_shares-1), and share
is a string. The get_share_size() method can be used to determine the
length of the 'share' strings returned by encode().
sharedata), where sharenum is an int (from 0 total_shares-1), and
sharedata is a string. The get_share_size() method can be used to
determine the length of the 'sharedata' strings returned by encode().
The (sharenum, sharedata) tuple must be kept together during storage
and retrieval. Specifically, the share data is useless by itself: the
decoder needs to be told which share is which by providing it with
both the share number and the actual share data.
The memory usage of this function is expected to be on the order of
total_shares * get_share_size().
"""
# design note: we could embed the share number in the sharedata by
# returning bencode((sharenum,sharedata)). The advantage would be
# making it easier to keep these two pieces together, and probably
# avoiding a round trip when reading the remote bucket (although this
# could be achieved by changing RIBucketReader.read to
# read_data_and_metadata). The disadvantage is that the share number
# wants to be exposed to the storage/bucket layer (specifically to
# handle the next stage of peer-selection algorithm in which we
# propose to keep share#A on a given peer and they are allowed to
# tell us that they already have share#B). Also doing this would make
# the share size somewhat variable (one-digit sharenumbers will be a
# byte shorter than two-digit sharenumbers), unless we zero-pad the
# sharenumbers based upon the max_total_shares declared in
# set_params.
class ICodecDecoder(Interface):
def set_serialized_params(params):
"""Set up the parameters of this encoder, from a string returned by
encoder.get_serialized_params()."""
def get_required_shares():
"""Return the number of shares needed to reconstruct the data.
set_serialized_params() must be called before this."""
def decode(some_shares):
"""Decode a partial list of shares into data.

View File

@ -17,3 +17,8 @@ class Basic(unittest.TestCase):
self.failUnlessEqual(c.permute_peerids("two"), ['0','4','2','1','3'])
c.all_peers = []
self.failUnlessEqual(c.permute_peerids("one"), [])
c2 = client.Client("")
c2.all_peers = ["%d" % i for i in range(5)]
self.failUnlessEqual(c2.permute_peerids("one"), ['3','1','0','4','2'])

View File

@ -10,17 +10,23 @@ class Tester:
#enc_class = PyRSEncoder
#dec_class = PyRSDecoder
def do_test(self, size, required_shares, total_shares):
def do_test(self, size, required_shares, max_shares, fewer_shares=None):
data0 = os.urandom(size)
enc = self.enc_class()
enc.set_params(size, required_shares, total_shares)
enc.set_params(size, required_shares, max_shares)
serialized_params = enc.get_serialized_params()
log.msg("serialized_params: %s" % serialized_params)
d = enc.encode(data0)
def _done(shares):
self.failUnlessEqual(len(shares), total_shares)
def _done_encoding_all(shares):
self.failUnlessEqual(len(shares), max_shares)
self.shares = shares
d.addCallback(_done)
d.addCallback(_done_encoding_all)
if fewer_shares is not None:
# also validate that the num_shares= parameter works
d.addCallback(lambda res: enc.encode(data0, fewer_shares))
def _check_fewer_shares(some_shares):
self.failUnlessEqual(len(some_shares), fewer_shares)
d.addCallback(_check_fewer_shares)
def _decode(shares):
dec = self.dec_class()
@ -91,7 +97,7 @@ class Tester:
def test_encode2(self):
if os.uname()[1] == "slave3" and self.enc_class == PyRSEncoder:
raise unittest.SkipTest("slave3 is really slow")
return self.do_test(123, 25, 100)
return self.do_test(123, 25, 100, 90)
def test_sizes(self):
raise unittest.SkipTest("omg this would take forever")
@ -114,13 +120,13 @@ class BenchPyRS(unittest.TestCase):
def test_big(self):
size = 10000
required_shares = 25
total_shares = 100
max_shares = 100
# this lets us use a persistent lookup table, stored outside the
# _trial_temp directory (which is deleted each time trial is run)
os.symlink("../ffield.lut.8", "ffield.lut.8")
enc = self.enc_class()
self.start()
enc.set_params(size, required_shares, total_shares)
enc.set_params(size, required_shares, max_shares)
serialized_params = enc.get_serialized_params()
print "encoder ready", self.stop()
self.start()
@ -132,7 +138,7 @@ class BenchPyRS(unittest.TestCase):
now_shares = time.time()
print "shares ready", self.stop()
self.start()
self.failUnlessEqual(len(shares), total_shares)
self.failUnlessEqual(len(shares), max_shares)
d.addCallback(_done)
d.addCallback(lambda res: enc.encode(data0))
d.addCallback(_done)

View File

@ -116,16 +116,20 @@ class SystemTest(unittest.TestCase):
d1 = u.upload_data(DATA)
return d1
d.addCallback(_do_upload)
def _upload_done(verifierid):
log.msg("upload finished: verifierid=%s" % idlib.b2a(verifierid))
def _upload_done(uri):
log.msg("upload finished: uri is %s" % (uri,))
dl = self.clients[1].getServiceNamed("downloader")
d1 = dl.download_to_data(verifierid)
d1 = dl.download_to_data(uri)
return d1
d.addCallback(_upload_done)
def _download_done(data):
log.msg("download finished")
self.failUnlessEqual(data, DATA)
d.addCallback(_download_done)
def _oops(res):
log.msg("oops, an error orccurred, finishing: %s" % res)
return res
d.addErrback(_oops)
return d
test_upload_and_download.timeout = 20

View File

@ -1,9 +1,10 @@
from twisted.trial import unittest
from twisted.internet import defer
from twisted.application import service
from cStringIO import StringIO
from allmydata import upload
from allmydata import upload, download
class StringBucketProxy:
# This is for unit tests: make a StringIO look like a RIBucketWriter.
@ -64,8 +65,10 @@ class FakeClient:
return defer.fail(IndexError("no connection to that peer"))
return defer.succeed(peer)
class NextPeerUploader(upload.FileUploader):
def _got_all_peers(self, res):
_size = 100
def _got_enough_peers(self, res):
return res
class NextPeer(unittest.TestCase):
@ -81,12 +84,12 @@ class NextPeer(unittest.TestCase):
for peerid, bucketnum in expected]
self.failUnlessEqual(u.landlords, exp)
VERIFIERID = "\x00" * 20
def test_0(self):
c = FakeClient([])
u = NextPeerUploader(c)
u._verifierid = "verifierid"
u._shares = 2
u._share_size = 100
u.set_verifierid(self.VERIFIERID)
u.set_params(2, 2, 2)
d = u.start()
def _check(f):
f.trap(upload.NotEnoughPeersError)
@ -97,9 +100,8 @@ class NextPeer(unittest.TestCase):
def test_1(self):
c = FakeClient(self.responses)
u = NextPeerUploader(c)
u._verifierid = "verifierid"
u._shares = 2
u._share_size = 100
u.set_verifierid(self.VERIFIERID)
u.set_params(2, 2, 2)
d = u.start()
def _check(res):
self.failUnlessEqual(u.goodness_points, 2)
@ -112,9 +114,8 @@ class NextPeer(unittest.TestCase):
def test_2(self):
c = FakeClient(self.responses)
u = NextPeerUploader(c)
u._verifierid = "verifierid"
u._shares = 3
u._share_size = 100
u.set_verifierid(self.VERIFIERID)
u.set_params(3, 3, 3)
d = u.start()
def _check(res):
self.failUnlessEqual(u.goodness_points, 3)
@ -135,9 +136,8 @@ class NextPeer(unittest.TestCase):
def test_3(self):
c = FakeClient(self.responses2)
u = NextPeerUploader(c)
u._verifierid = "verifierid"
u._shares = 3
u._share_size = 100
u.set_verifierid(self.VERIFIERID)
u.set_params(3, 3, 3)
d = u.start()
def _check(res):
self.failUnlessEqual(u.goodness_points, 3)
@ -158,9 +158,8 @@ class NextPeer(unittest.TestCase):
def test_4(self):
c = FakeClient(self.responses3)
u = NextPeerUploader(c)
u._verifierid = "verifierid"
u._shares = 4
u._share_size = 100
u.set_verifierid(self.VERIFIERID)
u.set_params(4, 4, 4)
d = u.start()
def _check(res):
self.failUnlessEqual(u.goodness_points, 4)
@ -171,3 +170,88 @@ class NextPeer(unittest.TestCase):
])
d.addCallback(_check)
return d
class FakePeer2:
def __init__(self, peerid):
self.peerid = peerid
self.data = ""
def callRemote(self, methname, *args, **kwargs):
if methname == "allocate_bucket":
return defer.maybeDeferred(self._allocate_bucket, *args, **kwargs)
if methname == "write":
return defer.maybeDeferred(self._write, *args, **kwargs)
if methname == "set_metadata":
return defer.maybeDeferred(self._set_metadata, *args, **kwargs)
if methname == "close":
return defer.maybeDeferred(self._close, *args, **kwargs)
return defer.maybeDeferred(self._bad_name, methname)
def _allocate_bucket(self, verifierid, bucket_num, size, leaser, canary):
self.allocated_size = size
return self
def _write(self, data):
self.data = self.data + data
def _set_metadata(self, metadata):
self.metadata = metadata
def _close(self):
pass
def _bad_name(self, methname):
raise NameError("FakePeer2 has no such method named '%s'" % methname)
class FakeClient2:
nodeid = "fakeclient"
def __init__(self, max_peers):
self.peers = []
for peerid in range(max_peers):
self.peers.append(FakePeer2(str(peerid)))
def permute_peerids(self, key, max_peers):
assert max_peers == None
return [str(i) for i in range(len(self.peers))]
def get_remote_service(self, peerid, name):
peer = self.peers[int(peerid)]
if not peer:
return defer.fail(IndexError("no connection to that peer"))
return defer.succeed(peer)
class Uploader(unittest.TestCase):
def setUp(self):
node = self.node = FakeClient2(10)
u = self.u = upload.Uploader()
u.running = 1
u.parent = node
def _check(self, uri):
self.failUnless(isinstance(uri, str))
self.failUnless(uri.startswith("URI:"))
verifierid, params = download.unpack_uri(uri)
self.failUnless(isinstance(verifierid, str))
self.failUnlessEqual(len(verifierid), 20)
self.failUnless(isinstance(params, str))
peers = self.node.peers
self.failUnlessEqual(peers[0].allocated_size,
len(peers[0].data))
def testData(self):
data = "This is some data to upload"
d = self.u.upload_data(data)
d.addCallback(self._check)
return d
def testFileHandle(self):
data = "This is some data to upload"
d = self.u.upload_filehandle(StringIO(data))
d.addCallback(self._check)
return d
def testFilename(self):
fn = "Uploader-testFilename.data"
f = open(fn, "w")
data = "This is some data to upload"
f.write(data)
f.close()
d = self.u.upload_filename(fn)
d.addCallback(self._check)
return d

View File

@ -5,7 +5,8 @@ from twisted.internet import defer
from twisted.application import service
from foolscap import Referenceable
from allmydata.util import idlib
from allmydata.util import idlib, bencode
from allmydata.util.deferredutil import DeferredListShouldSucceed
from allmydata import codec
from cStringIO import StringIO
@ -22,54 +23,84 @@ class HaveAllPeersError(Exception):
class TooFullError(Exception):
pass
class FileUploader:
debug = False
def __init__(self, peer):
self._peer = peer
def set_params(self, min_shares, target_goodness, max_shares):
self.min_shares = min_shares
self.target_goodness = target_goodness
self.max_shares = max_shares
def set_filehandle(self, filehandle):
self._filehandle = filehandle
filehandle.seek(0, 2)
self._size = filehandle.tell()
filehandle.seek(0)
def make_encoder(self):
self._needed_shares = 4
self._shares = 4
self._encoder = codec.Encoder(self._filehandle, self._shares)
self._share_size = self._size
def set_verifierid(self, vid):
assert isinstance(vid, str)
assert len(vid) == 20
self._verifierid = vid
def start(self):
log.msg("starting upload")
"""Start uploading the file.
The source of the data to be uploaded must have been set before this
point by calling set_filehandle().
This method returns a Deferred that will fire with the URI (a
string)."""
log.msg("starting upload [%s]" % (idlib.b2a(self._verifierid),))
if self.debug:
print "starting upload"
assert self.min_shares
assert self.target_goodness
# create the encoder, so we can know how large the shares will be
total_shares = self.max_shares
needed_shares = self.min_shares
self._encoder = codec.ReplicatingEncoder()
self._encoder.set_params(self._size, needed_shares, total_shares)
self._share_size = self._encoder.get_share_size()
# first step: who should we upload to?
# maybe limit max_peers to 2*len(self.shares), to reduce memory
# footprint
# We will talk to at most max_peers (which can be None to mean no
# limit). Maybe limit max_peers to 2*len(self.shares), to reduce
# memory footprint. For now, make it unlimited.
max_peers = None
self.permuted = self._peer.permute_peerids(self._verifierid, max_peers)
self._total_peers = len(self.permuted)
for p in self.permuted:
assert isinstance(p, str)
# we will shrink self.permuted as we give up on peers
self.peer_index = 0
self.goodness_points = 0
self.target_goodness = self._shares
self.landlords = [] # list of (peerid, bucket_num, remotebucket)
d = defer.maybeDeferred(self._check_next_peer)
d.addCallback(self._got_all_peers)
d.addCallback(self._got_enough_peers)
d.addCallback(self._compute_uri)
return d
def _compute_uri(self, params):
return "URI:%s" % bencode.bencode((self._verifierid, params))
def _check_next_peer(self):
if self.debug:
log.msg("FileUploader._check_next_peer: %d permuted, %d goodness"
" (want %d), have %d landlords, %d total peers" %
(len(self.permuted), self.goodness_points,
self.target_goodness, len(self.landlords),
self._total_peers))
if len(self.permuted) == 0:
# there are no more to check
raise NotEnoughPeersError("%s goodness, want %s, have %d "
@ -98,7 +129,8 @@ class FileUploader:
print " peerid %s will grant us a lease" % idlib.b2a(peerid)
self.landlords.append( (peerid, bucket_num, bucket) )
self.goodness_points += 1
if self.goodness_points >= self.target_goodness:
if (self.goodness_points >= self.target_goodness and
len(self.landlords) >= self.min_shares):
if self.debug: print " we're done!"
raise HaveAllPeersError()
# otherwise we fall through to allocate more peers
@ -129,11 +161,52 @@ class FileUploader:
d.addBoth(_done_with_peer)
return d
def _got_all_peers(self, res):
d = self._encoder.do_upload(self.landlords)
d.addCallback(lambda res: self._verifierid)
def _got_enough_peers(self, res):
landlords = self.landlords
if self.debug:
log.msg("FileUploader._got_enough_peers")
log.msg(" %d landlords" % len(landlords))
if len(landlords) < 20:
log.msg(" peerids: %s" % " ".join([idlib.b2a(l[0])
for l in landlords]))
log.msg(" buckets: %s" % " ".join([str(l[1])
for l in landlords]))
# assign shares to landlords
self.sharemap = {}
for peerid, bucket_num, bucket in landlords:
self.sharemap[bucket_num] = bucket
# the sharemap should have exactly len(landlords) shares, with
# no holes
assert sorted(self.sharemap.keys()) == range(len(landlords))
# encode all the data at once: this class does not use segmentation
data = self._filehandle.read()
d = self._encoder.encode(data, len(landlords))
d.addCallback(self._send_all_shares)
d.addCallback(lambda res: self._encoder.get_serialized_params())
return d
def _send_one_share(self, bucket, sharedata, metadata):
d = bucket.callRemote("write", sharedata)
d.addCallback(lambda res:
bucket.callRemote("set_metadata", metadata))
d.addCallback(lambda res:
bucket.callRemote("close"))
return d
def _send_all_shares(self, shares):
dl = []
for share in shares:
(sharenum,sharedata) = share
if self.debug:
log.msg(" writing share %d" % sharenum)
metadata = bencode.bencode(sharenum)
assert len(sharedata) == self._share_size
assert isinstance(sharedata, str)
bucket = self.sharemap[sharenum]
d = self._send_one_share(bucket, sharedata, metadata)
dl.append(d)
return DeferredListShouldSucceed(dl)
def netstring(s):
return "%d:%s," % (len(s), s)
@ -175,24 +248,35 @@ class Uploader(service.MultiService):
"""I am a service that allows file uploading.
"""
name = "uploader"
uploader_class = FileUploader
debug = False
def _compute_verifierid(self, f):
hasher = sha.new(netstring("allmydata_v1_verifierid"))
f.seek(0)
hasher.update(f.read())
data = f.read()
hasher.update(data)#f.read())
f.seek(0)
# note: this is only of the plaintext data, no encryption yet
return hasher.digest()
def upload(self, f):
# this returns (verifierid, encoding_params)
assert self.parent
assert self.running
f = IUploadable(f)
fh = f.get_filehandle()
u = FileUploader(self.parent)
u = self.uploader_class(self.parent)
if self.debug:
u.debug = True
u.set_filehandle(fh)
# TODO: change this to (2,2,4) once Foolscap is fixed to allow
# connect-to-self and Client is fixed to include ourselves in the
# peerlist. Otherwise this usually fails because we give a share to
# the eventual downloader, and they won't try to get a share from
# themselves.
u.set_params(2, 3, 4)
u.set_verifierid(self._compute_verifierid(fh))
u.make_encoder()
d = u.start()
def _done(res):
f.close_filehandle(fh)

View File

@ -0,0 +1,17 @@
from twisted.internet import defer
# utility wrapper for DeferredList
def _check_deferred_list(results):
# if any of the component Deferreds failed, return the first failure such
# that an addErrback() would fire. If all were ok, return a list of the
# results (without the success/failure booleans)
for success,f in results:
if not success:
return f
return [r[1] for r in results]
def DeferredListShouldSucceed(dl):
d = defer.DeferredList(dl)
d.addCallback(_check_deferred_list)
return d

View File

@ -1,7 +1,14 @@
from base64 import b32encode, b32decode
def b2a(i):
assert isinstance(i, str), "tried to idlib.b2a non-string '%s'" % (i,)
return b32encode(i).lower()
def a2b(i):
return b32decode(i.upper())
assert isinstance(i, str), "tried to idlib.a2b non-string '%s'" % (i,)
try:
return b32decode(i.upper())
except TypeError:
print "b32decode failed on a %s byte string '%s'" % (len(i), i)
raise

View File

@ -84,8 +84,9 @@ class VDrive(service.MultiService):
d = self.dirpath(dir_or_path)
def _got_dir(dirnode):
d1 = ul.upload(uploadable)
d1.addCallback(lambda vid:
dirnode.callRemote("add_file", name, vid))
def _add(uri):
return dirnode.callRemote("add_file", name, uri)
d1.addCallback(_add)
return d1
d.addCallback(_got_dir)
def _done(res):