mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-19 03:06:33 +00:00
finish storage server and write new download
This commit is contained in:
parent
6cb8361382
commit
f4a718c5b6
@ -1,149 +0,0 @@
|
||||
import os
|
||||
|
||||
from foolscap import Referenceable
|
||||
from twisted.application import service
|
||||
#from twisted.python import log
|
||||
from allmydata.util import idlib
|
||||
from zope.interface import implements
|
||||
from allmydata.interfaces import RIBucketWriter, RIBucketReader
|
||||
|
||||
from allmydata.util.assertutil import precondition, _assert
|
||||
|
||||
class BucketStore(service.MultiService, Referenceable):
|
||||
def __init__(self, store_dir):
|
||||
precondition(os.path.isdir(store_dir))
|
||||
service.MultiService.__init__(self)
|
||||
self._store_dir = store_dir
|
||||
|
||||
self._leases = set() # should do weakref dances.
|
||||
|
||||
def _get_bucket_dir(self, verifierid):
|
||||
avid = idlib.b2a(verifierid)
|
||||
return os.path.join(self._store_dir, avid)
|
||||
|
||||
def has_bucket(self, verifierid):
|
||||
return os.path.exists(self._get_bucket_dir(verifierid))
|
||||
|
||||
def allocate_bucket(self, verifierid, bucket_num, size,
|
||||
leaser_credentials, canary):
|
||||
bucket_dir = self._get_bucket_dir(verifierid)
|
||||
precondition(not os.path.exists(bucket_dir))
|
||||
precondition(isinstance(bucket_num, int))
|
||||
bucket = WriteBucket(bucket_dir, verifierid, bucket_num, size)
|
||||
bucket.set_leaser(leaser_credentials)
|
||||
lease = Lease(verifierid, leaser_credentials, bucket, canary)
|
||||
self._leases.add(lease)
|
||||
return lease
|
||||
|
||||
def get_buckets(self, verifierid):
|
||||
# for now, only returns those created by this process, in this run
|
||||
bucket_dir = self._get_bucket_dir(verifierid)
|
||||
if os.path.exists(bucket_dir):
|
||||
b = ReadBucket(bucket_dir, verifierid)
|
||||
return [(b.get_bucket_num(), b)]
|
||||
else:
|
||||
return []
|
||||
|
||||
class Lease(Referenceable):
|
||||
implements(RIBucketWriter)
|
||||
|
||||
def __init__(self, verifierid, leaser, bucket, canary):
|
||||
self._leaser = leaser
|
||||
self._verifierid = verifierid
|
||||
self._bucket = bucket
|
||||
canary.notifyOnDisconnect(self._lost_canary)
|
||||
|
||||
def get_bucket(self):
|
||||
return self._bucket
|
||||
|
||||
def remote_write(self, data):
|
||||
self._bucket.write(data)
|
||||
|
||||
def remote_set_metadata(self, metadata):
|
||||
self._bucket.set_metadata(metadata)
|
||||
|
||||
def remote_close(self):
|
||||
self._bucket.close()
|
||||
|
||||
def _lost_canary(self):
|
||||
pass
|
||||
|
||||
class Bucket:
|
||||
def __init__(self, bucket_dir, verifierid):
|
||||
self._bucket_dir = bucket_dir
|
||||
self._verifierid = verifierid
|
||||
|
||||
def _write_attr(self, name, val):
|
||||
f = file(os.path.join(self._bucket_dir, name), 'wb')
|
||||
f.write(val)
|
||||
f.close()
|
||||
|
||||
def _read_attr(self, name):
|
||||
f = file(os.path.join(self._bucket_dir, name), 'rb')
|
||||
data = f.read()
|
||||
f.close()
|
||||
return data
|
||||
|
||||
def is_complete(self):
|
||||
return os.path.exists(os.path.join(self._bucket_dir, 'closed'))
|
||||
|
||||
class WriteBucket(Bucket):
|
||||
def __init__(self, bucket_dir, verifierid, bucket_num, size):
|
||||
Bucket.__init__(self, bucket_dir, verifierid)
|
||||
precondition(not os.path.exists(bucket_dir))
|
||||
#log.msg("WriteBucket [%s]: creating bucket %s"
|
||||
# % (idlib.b2a(verifierid), bucket_dir))
|
||||
os.mkdir(bucket_dir)
|
||||
|
||||
self._open = True
|
||||
self._size = size
|
||||
self._data = file(os.path.join(self._bucket_dir, 'data'), 'wb')
|
||||
self._bytes_written = 0
|
||||
|
||||
self._write_attr('bucket_num', str(bucket_num))
|
||||
|
||||
def set_leaser(self, leaser):
|
||||
self._write_attr('leases', leaser)
|
||||
|
||||
def write(self, data):
|
||||
precondition(self._open)
|
||||
precondition(len(data) + self._bytes_written <= self._size)
|
||||
self._data.write(data)
|
||||
self._data.flush()
|
||||
self._bytes_written += len(data)
|
||||
|
||||
def set_metadata(self, metadata):
|
||||
precondition(self._open)
|
||||
self._write_attr('metadata', metadata)
|
||||
|
||||
def close(self):
|
||||
precondition(self._bytes_written == self._size)
|
||||
#log.msg("WriteBucket.close [%s] (%s)"
|
||||
# % (idlib.b2a(self._verifierid), self._bucket_dir))
|
||||
self._data.close()
|
||||
self._write_attr('closed', '')
|
||||
self._open = False
|
||||
|
||||
def is_complete(self):
|
||||
complete = Bucket.is_complete(self)
|
||||
if complete:
|
||||
_assert(os.path.getsize(os.path.join(self._bucket_dir, 'data')) == self._size)
|
||||
return complete
|
||||
|
||||
class ReadBucket(Bucket, Referenceable):
|
||||
implements(RIBucketReader)
|
||||
|
||||
def __init__(self, bucket_dir, verifierid):
|
||||
Bucket.__init__(self, bucket_dir, verifierid)
|
||||
precondition(self.is_complete()) # implicitly asserts bucket_dir exists
|
||||
|
||||
def get_bucket_num(self):
|
||||
return int(self._read_attr('bucket_num'))
|
||||
|
||||
def read(self):
|
||||
return self._read_attr('data')
|
||||
remote_read = read
|
||||
|
||||
def get_metadata(self):
|
||||
return self._read_attr('metadata')
|
||||
remote_get_metadata = get_metadata
|
@ -42,6 +42,9 @@ class ReplicatingEncoder(object):
|
||||
def get_share_size(self):
|
||||
return self.data_size
|
||||
|
||||
def get_block_size(self):
|
||||
return self.data_size
|
||||
|
||||
def encode(self, inshares, desired_shareids=None):
|
||||
assert isinstance(inshares, list)
|
||||
for inshare in inshares:
|
||||
@ -59,7 +62,7 @@ class ReplicatingDecoder(object):
|
||||
def set_serialized_params(self, params):
|
||||
self.required_shares = int(params)
|
||||
|
||||
def get_required_shares(self):
|
||||
def get_needed_shares(self):
|
||||
return self.required_shares
|
||||
|
||||
def decode(self, some_shares, their_shareids):
|
||||
@ -97,6 +100,9 @@ class CRSEncoder(object):
|
||||
def get_share_size(self):
|
||||
return self.share_size
|
||||
|
||||
def get_block_size(self):
|
||||
return self.share_size
|
||||
|
||||
def encode(self, inshares, desired_share_ids=None):
|
||||
precondition(desired_share_ids is None or len(desired_share_ids) <= self.max_shares, desired_share_ids, self.max_shares)
|
||||
|
||||
@ -129,7 +135,7 @@ class CRSDecoder(object):
|
||||
print "max_shares: %d" % self.max_shares
|
||||
print "required_shares: %d" % self.required_shares
|
||||
|
||||
def get_required_shares(self):
|
||||
def get_needed_shares(self):
|
||||
return self.required_shares
|
||||
|
||||
def decode(self, some_shares, their_shareids):
|
||||
|
@ -1,21 +1,5 @@
|
||||
import os
|
||||
|
||||
# 'app' is overwritten by manhole when the connection is established. We set
|
||||
# it to None now to keep pyflakes from complaining.
|
||||
app = None
|
||||
|
||||
def get_random_bucket_on(nodeid, size=200):
|
||||
d = app.get_remote_service(nodeid, 'storageserver')
|
||||
def get_bucket(rss):
|
||||
return rss.callRemote('allocate_bucket',
|
||||
verifierid=os.urandom(20),
|
||||
bucket_num=26,
|
||||
size=size,
|
||||
leaser=app.tub.tubID,
|
||||
)
|
||||
d.addCallback(get_bucket)
|
||||
return d
|
||||
|
||||
def write_to_bucket(bucket, bytes=100):
|
||||
return bucket.callRemote('write', data=os.urandom(bytes))
|
||||
|
||||
|
@ -1,13 +1,14 @@
|
||||
|
||||
import os, sha
|
||||
import os, random, sha
|
||||
from zope.interface import implements
|
||||
from twisted.python import failure, log
|
||||
from twisted.python import log
|
||||
from twisted.internet import defer
|
||||
from twisted.application import service
|
||||
|
||||
from allmydata.util import idlib, bencode
|
||||
from allmydata.util import idlib, bencode, mathutil
|
||||
from allmydata.util.deferredutil import DeferredListShouldSucceed
|
||||
from allmydata import codec
|
||||
from allmydata.Crypto.Cipher import AES
|
||||
from allmydata.uri import unpack_uri
|
||||
from allmydata.interfaces import IDownloadTarget, IDownloader
|
||||
|
||||
@ -18,92 +19,187 @@ class HaveAllPeersError(Exception):
|
||||
# we use this to jump out of the loop
|
||||
pass
|
||||
|
||||
|
||||
class Output:
|
||||
def __init__(self, downloadable, key):
|
||||
self.downloadable = downloadable
|
||||
self._decryptor = AES.new(key=key, mode=AES.MODE_CTR,
|
||||
counterstart="\x00"*16)
|
||||
self._verifierid_hasher = sha.new(netstring("allmydata_v1_verifierid"))
|
||||
self._fileid_hasher = sha.new(netstring("allmydata_v1_fileid"))
|
||||
def write(self, crypttext):
|
||||
self._verifierid_hasher.update(crypttext)
|
||||
plaintext = self._decryptor.decrypt(crypttext)
|
||||
self._fileid_hasher.update(plaintext)
|
||||
self.downloadable.write(plaintext)
|
||||
def finish(self):
|
||||
self.downloadable.close()
|
||||
return self.downloadable.finish()
|
||||
|
||||
class BlockDownloader:
|
||||
def __init__(self, bucket, blocknum, parent):
|
||||
self.bucket = bucket
|
||||
self.blocknum = blocknum
|
||||
self.parent = parent
|
||||
|
||||
def start(self, segnum):
|
||||
d = self.bucket.callRemote('get_block', segnum)
|
||||
d.addCallbacks(self._hold_block, self._got_block_error)
|
||||
return d
|
||||
|
||||
def _hold_block(self, data):
|
||||
self.parent.hold_block(self.blocknum, data)
|
||||
|
||||
def _got_block_error(self, f):
|
||||
self.parent.bucket_failed(self.blocknum, self.bucket)
|
||||
|
||||
class SegmentDownloader:
|
||||
def __init__(self, segmentnumber, needed_shares):
|
||||
self.segmentnumber = segmentnumber
|
||||
self.needed_blocks = needed_shares
|
||||
self.blocks = {} # k: blocknum, v: data
|
||||
|
||||
def start(self):
|
||||
return self._download()
|
||||
|
||||
def _download(self):
|
||||
d = self._try()
|
||||
def _done(res):
|
||||
if len(self.blocks) >= self.needed_blocks:
|
||||
return self.blocks
|
||||
else:
|
||||
return self._download()
|
||||
d.addCallback(_done)
|
||||
return d
|
||||
|
||||
def _try(self):
|
||||
while len(self.parent.active_buckets) < self.needed_blocks:
|
||||
# need some more
|
||||
otherblocknums = list(set(self.parent._share_buckets.keys()) - set(self.parent.active_buckets.keys()))
|
||||
if not otherblocknums:
|
||||
raise NotEnoughPeersError
|
||||
blocknum = random.choice(otherblocknums)
|
||||
self.parent.active_buckets[blocknum] = random.choice(self.parent._share_buckets[blocknum])
|
||||
|
||||
# Now we have enough buckets, in self.parent.active_buckets.
|
||||
l = []
|
||||
for blocknum, bucket in self.parent.active_buckets.iteritems():
|
||||
bd = BlockDownloader(bucket, blocknum, self)
|
||||
d = bd.start(self.segmentnumber)
|
||||
l.append(d)
|
||||
return defer.DeferredList(l)
|
||||
|
||||
def hold_block(self, blocknum, data):
|
||||
self.blocks[blocknum] = data
|
||||
|
||||
def bucket_failed(self, shnum, bucket):
|
||||
del self.parent.active_buckets[shnum]
|
||||
s = self.parent._share_buckets[shnum]
|
||||
s.remove(bucket)
|
||||
if not s:
|
||||
del self.parent._share_buckets[shnum]
|
||||
|
||||
class FileDownloader:
|
||||
debug = False
|
||||
|
||||
def __init__(self, peer, uri):
|
||||
self._peer = peer
|
||||
(codec_name, codec_params, verifierid) = unpack_uri(uri)
|
||||
def __init__(self, client, uri, downloadable):
|
||||
self._client = client
|
||||
self._downloadable = downloadable
|
||||
(codec_name, codec_params, verifierid, roothash, needed_shares, total_shares, size, segment_size) = unpack_uri(uri)
|
||||
assert isinstance(verifierid, str)
|
||||
assert len(verifierid) == 20
|
||||
self._verifierid = verifierid
|
||||
self._roothash = roothash
|
||||
self._decoder = codec.get_decoder_by_name(codec_name)
|
||||
self._decoder.set_serialized_params(codec_params)
|
||||
self.needed_shares = self._decoder.get_required_shares()
|
||||
self._total_segments = mathutil.div_ceil(size, segment_size)
|
||||
self._current_segnum = 0
|
||||
self._segment_size = segment_size
|
||||
self._needed_shares = self._decoder.get_needed_shares()
|
||||
|
||||
def set_download_target(self, target):
|
||||
self._target = target
|
||||
self._target.register_canceller(self._cancel)
|
||||
|
||||
def _cancel(self):
|
||||
pass
|
||||
# future:
|
||||
# self._share_hash_tree = ??
|
||||
# self._subshare_hash_trees = {} # k:shnum, v: hashtree
|
||||
# each time we start using a new shnum, we must acquire a share hash
|
||||
# from one of the buckets that provides that shnum, then validate it against
|
||||
# the rest of the share hash tree that they provide. Then, each time we
|
||||
# get a block in that share, we must validate the block against the rest
|
||||
# of the subshare hash tree that that bucket will provide.
|
||||
|
||||
def start(self):
|
||||
log.msg("starting download [%s]" % (idlib.b2a(self._verifierid),))
|
||||
if self.debug:
|
||||
print "starting download"
|
||||
# first step: who should we download from?
|
||||
self.active_buckets = {} # k: shnum, v: bucket
|
||||
self._share_buckets = {} # k: shnum, v: set of buckets
|
||||
|
||||
# maybe limit max_peers to 2*len(self.shares), to reduce memory
|
||||
# footprint
|
||||
max_peers = None
|
||||
key = "\x00" * 16
|
||||
self._output = Output(self._downloadable, key)
|
||||
|
||||
self.permuted = self._peer.get_permuted_connections(self._verifierid, max_peers)
|
||||
for p in self.permuted:
|
||||
assert isinstance(p, str)
|
||||
self.landlords = [] # list of (peerid, bucket_num, remotebucket)
|
||||
|
||||
d = defer.maybeDeferred(self._check_next_peer)
|
||||
d.addCallback(self._got_all_peers)
|
||||
d = defer.maybeDeferred(self._get_all_shareholders)
|
||||
d.addCallback(self._got_all_shareholders)
|
||||
d.addCallback(self._download_all_segments)
|
||||
d.addCallback(self._done)
|
||||
return d
|
||||
|
||||
def _check_next_peer(self):
|
||||
if len(self.permuted) == 0:
|
||||
# there are no more to check
|
||||
def _get_all_shareholders(self):
|
||||
dl = []
|
||||
for (permutedpeerid, peerid, connection) in self._client.get_permuted_peers(self._verifierid):
|
||||
d = connection.callRemote("get_buckets", self._verifierid)
|
||||
d.addCallbacks(self._got_response, self._got_error,
|
||||
callbackArgs=(connection,))
|
||||
dl.append(d)
|
||||
return defer.DeferredList(dl)
|
||||
|
||||
def _got_response(self, buckets, connection):
|
||||
for sharenum, bucket in buckets:
|
||||
self._share_buckets.setdefault(sharenum, set()).add(bucket)
|
||||
|
||||
def _got_error(self, f):
|
||||
self._client.log("Somebody failed. -- %s" % (f,))
|
||||
|
||||
def _got_all_shareholders(self, res):
|
||||
if len(self._share_buckets) < self._needed_shares:
|
||||
raise NotEnoughPeersError
|
||||
peerid = self.permuted.pop(0)
|
||||
|
||||
d = self._peer.get_remote_service(peerid, "storageserver")
|
||||
def _got_peer(service):
|
||||
bucket_num = len(self.landlords)
|
||||
if self.debug: print "asking %s" % idlib.b2a(peerid)
|
||||
d2 = service.callRemote("get_buckets", verifierid=self._verifierid)
|
||||
def _got_response(buckets):
|
||||
if buckets:
|
||||
bucket_nums = [num for (num,bucket) in buckets]
|
||||
if self.debug:
|
||||
print " peerid %s has buckets %s" % (idlib.b2a(peerid),
|
||||
bucket_nums)
|
||||
self.active_buckets = {}
|
||||
|
||||
self.landlords.append( (peerid, buckets) )
|
||||
if len(self.landlords) >= self.needed_shares:
|
||||
if self.debug: print " we're done!"
|
||||
raise HaveAllPeersError
|
||||
# otherwise we fall through to search more peers
|
||||
d2.addCallback(_got_response)
|
||||
return d2
|
||||
d.addCallback(_got_peer)
|
||||
|
||||
def _done_with_peer(res):
|
||||
if self.debug: print "done with peer %s:" % idlib.b2a(peerid)
|
||||
if isinstance(res, failure.Failure):
|
||||
if res.check(HaveAllPeersError):
|
||||
if self.debug: print " all done"
|
||||
# we're done!
|
||||
return
|
||||
if res.check(IndexError):
|
||||
if self.debug: print " no connection"
|
||||
else:
|
||||
if self.debug: print " other error:", res
|
||||
else:
|
||||
if self.debug: print " they had data for us"
|
||||
# we get here for either good peers (when we still need more), or
|
||||
# after checking a bad peer (and thus still need more). So now we
|
||||
# need to grab a new peer.
|
||||
return self._check_next_peer()
|
||||
d.addBoth(_done_with_peer)
|
||||
def _download_all_segments(self):
|
||||
d = self._download_segment(self._current_segnum)
|
||||
def _done(res):
|
||||
if self._current_segnum == self._total_segments:
|
||||
return None
|
||||
return self._download_segment(self._current_segnum)
|
||||
d.addCallback(_done)
|
||||
return d
|
||||
|
||||
def _download_segment(self, segnum):
|
||||
segmentdler = SegmentDownloader(segnum, self._needed_shares)
|
||||
d = segmentdler.start()
|
||||
d.addCallback(self._decoder.decode)
|
||||
def _done(res):
|
||||
self._current_segnum += 1
|
||||
if self._current_segnum == self._total_segments:
|
||||
data = ''.join(res)
|
||||
padsize = mathutil.pad_size(self._size, self._segment_size)
|
||||
data = data[:-padsize]
|
||||
self.output.write(data)
|
||||
else:
|
||||
for buf in res:
|
||||
self.output.write(buf)
|
||||
d.addCallback(_done)
|
||||
return d
|
||||
|
||||
def _done(self, res):
|
||||
return self._output.finish()
|
||||
|
||||
def _write_data(self, data):
|
||||
self._verifierid_hasher.update(data)
|
||||
|
||||
|
||||
|
||||
# old stuff
|
||||
def _got_all_peers(self, res):
|
||||
all_buckets = []
|
||||
for peerid, buckets in self.landlords:
|
||||
|
@ -57,7 +57,11 @@ class RIBucketWriter(RemoteInterface):
|
||||
class RIStorageServer(RemoteInterface):
|
||||
def allocate_buckets(verifierid=Verifierid, sharenums=SetOf(int),
|
||||
sharesize=int, blocksize=int, canary=Referenceable_):
|
||||
# if the canary is lost before close(), the bucket is deleted
|
||||
"""
|
||||
@param canary: If the canary is lost before close(), the bucket is deleted.
|
||||
@return: tuple of (alreadygot, allocated), where alreadygot is what we
|
||||
already have and is what we hereby agree to accept
|
||||
"""
|
||||
return TupleOf(SetOf(int), DictOf(int, RIBucketWriter))
|
||||
def get_buckets(verifierid=Verifierid):
|
||||
return DictOf(int, RIBucketReader_)
|
||||
@ -66,7 +70,7 @@ class RIBucketReader(RemoteInterface):
|
||||
def get_block(blocknum=int):
|
||||
return ShareData
|
||||
def get_block_hashes():
|
||||
return ListOf(Hash))
|
||||
return ListOf(Hash)
|
||||
def get_share_hashes():
|
||||
return ListOf(TupleOf(int, Hash))
|
||||
|
||||
@ -157,6 +161,10 @@ class ICodecEncoder(Interface):
|
||||
compatible decoder.
|
||||
"""
|
||||
|
||||
def get_block_size():
|
||||
"""Return the length of the shares that encode() will produce.
|
||||
"""
|
||||
|
||||
def get_share_size():
|
||||
"""Return the length of the shares that encode() will produce.
|
||||
"""
|
||||
@ -271,7 +279,7 @@ class ICodecDecoder(Interface):
|
||||
"""Set up the parameters of this encoder, from a string returned by
|
||||
encoder.get_serialized_params()."""
|
||||
|
||||
def get_required_shares():
|
||||
def get_needed_shares():
|
||||
"""Return the number of shares needed to reconstruct the data.
|
||||
set_serialized_params() is required to be called before this."""
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
import os
|
||||
import os, re
|
||||
|
||||
from foolscap import Referenceable
|
||||
from twisted.application import service
|
||||
@ -10,7 +10,7 @@ from allmydata.util import bencode, fileutil, idlib
|
||||
from allmydata.util.assertutil import _assert, precondition
|
||||
|
||||
# store/
|
||||
# store/tmp # temp dirs named $VERIFIERID/$SHARENUM that will be moved to store/ on success
|
||||
# store/incoming # temp dirs named $VERIFIERID/$SHARENUM that will be moved to store/ on success
|
||||
# store/$VERIFIERID
|
||||
# store/$VERIFIERID/$SHARENUM
|
||||
# store/$VERIFIERID/$SHARENUM/blocksize
|
||||
@ -24,20 +24,20 @@ NUM_RE=re.compile("[1-9][0-9]*")
|
||||
class BucketWriter(Referenceable):
|
||||
implements(RIBucketWriter)
|
||||
|
||||
def __init__(self, tmphome, finalhome, blocksize):
|
||||
self.tmphome = tmphome
|
||||
def __init__(self, incominghome, finalhome, blocksize):
|
||||
self.incominghome = incominghome
|
||||
self.finalhome = finalhome
|
||||
self.blocksize = blocksize
|
||||
self.closed = False
|
||||
self._write_file('blocksize', str(blocksize))
|
||||
|
||||
def _write_file(self, fname, data):
|
||||
open(os.path.join(tmphome, fname), 'wb').write(data)
|
||||
open(os.path.join(self.incominghome, fname), 'wb').write(data)
|
||||
|
||||
def remote_put_block(self, segmentnum, data):
|
||||
precondition(not self.closed)
|
||||
assert len(data) == self.blocksize
|
||||
f = open(os.path.join(self.tmphome, 'data'), 'wb')
|
||||
f = open(os.path.join(self.incominghome, 'data'), 'wb')
|
||||
f.seek(self.blocksize*segmentnum)
|
||||
f.write(data)
|
||||
|
||||
@ -54,7 +54,7 @@ class BucketWriter(Referenceable):
|
||||
def close(self):
|
||||
precondition(not self.closed)
|
||||
# TODO assert or check the completeness and consistency of the data that has been written
|
||||
fileutil.rename(self.tmphome, self.finalhome)
|
||||
fileutil.rename(self.incominghome, self.finalhome)
|
||||
self.closed = True
|
||||
|
||||
def str2l(s):
|
||||
@ -87,24 +87,28 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
def __init__(self, storedir):
|
||||
fileutil.make_dirs(storedir)
|
||||
self.storedir = storedir
|
||||
self.tmpdir = os.path.join(storedir, 'tmp')
|
||||
self._clean_trash()
|
||||
fileutil.make_dirs(self.tmpdir)
|
||||
self.incomingdir = os.path.join(storedir, 'incoming')
|
||||
self._clean_incomplete()
|
||||
fileutil.make_dirs(self.incomingdir)
|
||||
|
||||
service.MultiService.__init__(self)
|
||||
|
||||
def _clean_trash(self):
|
||||
fileutil.rm_dir(self.tmpdir)
|
||||
def _clean_incomplete(self):
|
||||
fileutil.rm_dir(self.incomingdir)
|
||||
|
||||
def remote_allocate_buckets(self, verifierid, sharenums, sharesize,
|
||||
blocksize, canary):
|
||||
bucketwriters = {} # k: sharenum, v: BucketWriter
|
||||
for sharenum in sharenums:
|
||||
tmphome = os.path.join(self.tmpdir, idlib.a2b(verifierid), "%d"%sharenum)
|
||||
finalhome = os.path.join(self.storedir, idlib.a2b(verifierid), "%d"%sharenum)
|
||||
bucketwriters[sharenum] = BucketWriter(tmphome, finalhome, blocksize)
|
||||
alreadygot = set()
|
||||
bucketwriters = {} # k: shnum, v: BucketWriter
|
||||
for shnum in sharenums:
|
||||
incominghome = os.path.join(self.incomingdir, idlib.a2b(verifierid), "%d"%shnum)
|
||||
finalhome = os.path.join(self.storedir, idlib.a2b(verifierid), "%d"%shnum)
|
||||
if os.path.exists(incominghome) or os.path.exists(finalhome):
|
||||
alreadygot.add(shnum)
|
||||
else:
|
||||
bucketwriters[shnum] = BucketWriter(incominghome, finalhome, blocksize)
|
||||
|
||||
return bucketwriters
|
||||
return alreadygot, bucketwriters
|
||||
|
||||
def remote_get_buckets(self, verifierid):
|
||||
bucketreaders = {} # k: sharenum, v: BucketReader
|
||||
|
@ -6,221 +6,19 @@ from cStringIO import StringIO
|
||||
from allmydata import upload
|
||||
from allmydata.uri import unpack_uri
|
||||
|
||||
class StringBucketProxy:
|
||||
# This is for unit tests: make a StringIO look like a RIBucketWriter.
|
||||
|
||||
def __init__(self):
|
||||
self.data = StringIO()
|
||||
self.size = None
|
||||
self.done = False
|
||||
|
||||
def callRemote(self, methname, **kwargs):
|
||||
if methname == "write":
|
||||
return defer.maybeDeferred(self.write, **kwargs)
|
||||
elif methname == "close":
|
||||
return defer.maybeDeferred(self.close, **kwargs)
|
||||
else:
|
||||
return defer.fail(NameError("no such method named %s" % methname))
|
||||
|
||||
def write(self, data):
|
||||
self.data.write(data)
|
||||
def close(self):
|
||||
self.done = True
|
||||
|
||||
|
||||
class FakePeer:
|
||||
def __init__(self, peerid, response):
|
||||
self.peerid = peerid
|
||||
self.response = response
|
||||
|
||||
def callRemote(self, methname, *args, **kwargs):
|
||||
assert not args
|
||||
return defer.maybeDeferred(self._callRemote, methname, **kwargs)
|
||||
|
||||
def _callRemote(self, methname, **kwargs):
|
||||
assert methname == "allocate_bucket"
|
||||
#assert kwargs["size"] == 100
|
||||
assert kwargs["leaser"] == "fakeclient"
|
||||
if self.response == "good":
|
||||
return self
|
||||
raise upload.TooFullError()
|
||||
class FakeStorageServer:
|
||||
pass
|
||||
|
||||
class FakeClient:
|
||||
nodeid = "fakeclient"
|
||||
def __init__(self, responses):
|
||||
self.peers = []
|
||||
for peerid,r in enumerate(responses):
|
||||
if r == "disconnected":
|
||||
self.peers.append(None)
|
||||
else:
|
||||
self.peers.append(FakePeer(str(peerid), r))
|
||||
|
||||
def get_permuted_connections(self, key):
|
||||
return [str(i) for i in range(len(self.peers))]
|
||||
|
||||
def get_remote_service(self, peerid, name):
|
||||
peer = self.peers[int(peerid)]
|
||||
if not peer:
|
||||
return defer.fail(IndexError("no connection to that peer"))
|
||||
return defer.succeed(peer)
|
||||
|
||||
|
||||
class NextPeerUploader(upload.FileUploader):
|
||||
_size = 100
|
||||
def _got_enough_peers(self, res):
|
||||
return res
|
||||
|
||||
class NextPeer(unittest.TestCase):
|
||||
responses = ["good", # 0
|
||||
"full", # 1
|
||||
"full", # 2
|
||||
"disconnected", # 3
|
||||
"good", # 4
|
||||
]
|
||||
|
||||
def compare_landlords(self, u, c, expected):
|
||||
exp = [(str(peerid), bucketnum, c.peers[peerid])
|
||||
for peerid, bucketnum in expected]
|
||||
self.failUnlessEqual(u.landlords, exp)
|
||||
|
||||
VERIFIERID = "\x00" * 20
|
||||
def test_0(self):
|
||||
c = FakeClient([])
|
||||
u = NextPeerUploader(c)
|
||||
u.set_verifierid(self.VERIFIERID)
|
||||
u.set_params(2, 2, 2)
|
||||
d = u.start()
|
||||
def _check(f):
|
||||
f.trap(upload.NotEnoughPeersError)
|
||||
d.addCallbacks(lambda res: self.fail("this was supposed to fail"),
|
||||
_check)
|
||||
return d
|
||||
|
||||
def test_1(self):
|
||||
c = FakeClient(self.responses)
|
||||
u = NextPeerUploader(c)
|
||||
u.set_verifierid(self.VERIFIERID)
|
||||
u.set_params(2, 2, 2)
|
||||
d = u.start()
|
||||
def _check(res):
|
||||
self.failUnlessEqual(u.goodness_points, 2)
|
||||
self.compare_landlords(u, c, [(0, 0),
|
||||
(4, 1),
|
||||
])
|
||||
d.addCallback(_check)
|
||||
return d
|
||||
|
||||
def test_2(self):
|
||||
c = FakeClient(self.responses)
|
||||
u = NextPeerUploader(c)
|
||||
u.set_verifierid(self.VERIFIERID)
|
||||
u.set_params(3, 3, 3)
|
||||
d = u.start()
|
||||
def _check(res):
|
||||
self.failUnlessEqual(u.goodness_points, 3)
|
||||
self.compare_landlords(u, c, [(0, 0),
|
||||
(4, 1),
|
||||
(0, 2),
|
||||
])
|
||||
d.addCallback(_check)
|
||||
return d
|
||||
|
||||
responses2 = ["good", # 0
|
||||
"full", # 1
|
||||
"full", # 2
|
||||
"good", # 3
|
||||
"full", # 4
|
||||
]
|
||||
|
||||
def test_3(self):
|
||||
c = FakeClient(self.responses2)
|
||||
u = NextPeerUploader(c)
|
||||
u.set_verifierid(self.VERIFIERID)
|
||||
u.set_params(3, 3, 3)
|
||||
d = u.start()
|
||||
def _check(res):
|
||||
self.failUnlessEqual(u.goodness_points, 3)
|
||||
self.compare_landlords(u, c, [(0, 0),
|
||||
(3, 1),
|
||||
(0, 2),
|
||||
])
|
||||
d.addCallback(_check)
|
||||
return d
|
||||
|
||||
responses3 = ["good", # 0
|
||||
"good", # 1
|
||||
"good", # 2
|
||||
"good", # 3
|
||||
"good", # 4
|
||||
]
|
||||
|
||||
def test_4(self):
|
||||
c = FakeClient(self.responses3)
|
||||
u = NextPeerUploader(c)
|
||||
u.set_verifierid(self.VERIFIERID)
|
||||
u.set_params(4, 4, 4)
|
||||
d = u.start()
|
||||
def _check(res):
|
||||
self.failUnlessEqual(u.goodness_points, 4)
|
||||
self.compare_landlords(u, c, [(0, 0),
|
||||
(1, 1),
|
||||
(2, 2),
|
||||
(3, 3),
|
||||
])
|
||||
d.addCallback(_check)
|
||||
return d
|
||||
|
||||
|
||||
class FakePeer2:
|
||||
def __init__(self, peerid):
|
||||
self.peerid = peerid
|
||||
self.data = ""
|
||||
|
||||
def callRemote(self, methname, *args, **kwargs):
|
||||
if methname == "allocate_bucket":
|
||||
return defer.maybeDeferred(self._allocate_bucket, *args, **kwargs)
|
||||
if methname == "write":
|
||||
return defer.maybeDeferred(self._write, *args, **kwargs)
|
||||
if methname == "set_metadata":
|
||||
return defer.maybeDeferred(self._set_metadata, *args, **kwargs)
|
||||
if methname == "close":
|
||||
return defer.maybeDeferred(self._close, *args, **kwargs)
|
||||
return defer.maybeDeferred(self._bad_name, methname)
|
||||
|
||||
def _allocate_bucket(self, verifierid, bucket_num, size, leaser, canary):
|
||||
self.allocated_size = size
|
||||
return self
|
||||
def _write(self, data):
|
||||
self.data = self.data + data
|
||||
def _set_metadata(self, metadata):
|
||||
self.metadata = metadata
|
||||
def _close(self):
|
||||
pass
|
||||
def _bad_name(self, methname):
|
||||
raise NameError("FakePeer2 has no such method named '%s'" % methname)
|
||||
|
||||
class FakeClient2:
|
||||
nodeid = "fakeclient"
|
||||
def __init__(self, num_peers):
|
||||
self.peers = []
|
||||
for peerid in range(num_peers):
|
||||
self.peers.append(FakePeer2(str(peerid)))
|
||||
|
||||
def get_permuted_connections(self, key):
|
||||
return [str(i) for i in range(len(self.peers))]
|
||||
|
||||
def get_remote_service(self, peerid, name):
|
||||
peer = self.peers[int(peerid)]
|
||||
if not peer:
|
||||
return defer.fail(IndexError("no connection to that peer"))
|
||||
return defer.succeed(peer)
|
||||
def get_permuted_peers(self, verifierid):
|
||||
return [ ("%20d"%fakeid, "%20d"%fakeid, FakeStorageServer(),) for fakeid in range(50) ]
|
||||
|
||||
class Uploader(unittest.TestCase):
|
||||
def setUp(self):
|
||||
node = self.node = FakeClient2(10)
|
||||
u = self.u = upload.Uploader()
|
||||
u.running = 1
|
||||
u.parent = node
|
||||
self.node = FakeClient()
|
||||
self.u = upload.Uploader()
|
||||
self.u.running = True
|
||||
self.u.parent = self.node
|
||||
|
||||
def _check(self, uri):
|
||||
self.failUnless(isinstance(uri, str))
|
||||
|
@ -84,15 +84,19 @@ class FileUploader:
|
||||
|
||||
# create the encoder, so we can know how large the shares will be
|
||||
self._encoder = self.ENCODERCLASS()
|
||||
self._last_seg_encoder = self.ENCODERCLASS() # This one is for encoding the final segment, which might be shorter than the others.
|
||||
self._codec_name = self._encoder.get_encoder_type()
|
||||
self._encoder.set_params(self.segment_size, self.needed_shares, self.total_shares)
|
||||
xyz
|
||||
|
||||
paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
|
||||
self._encoder.set_params(paddedsize, self.needed_shares, self.total_shares)
|
||||
self._share_size = self._encoder.get_share_size()
|
||||
|
||||
self._block_size = self._encoder.get_block_size()
|
||||
|
||||
# first step: who should we upload to?
|
||||
peers = self._client.get_permuted_peers(self._verifierid)
|
||||
assert peers
|
||||
trackers = [ (permutedid, PeerTracker(peerid, conn),)
|
||||
trackers = [ (permutedid, PeerTracker(peerid, conn, self._share_size, self._block_size, self._verifierid),)
|
||||
for permutedid, peerid, conn in peers ]
|
||||
ring_things = [] # a list of (position_in_ring, whatami, x) where whatami is 0 if x is a sharenum or else 1 if x is a PeerTracker instance
|
||||
ring_things.extend([ (permutedpeerid, 1, peer,) for permutedpeerid, peer in trackers ])
|
||||
@ -192,7 +196,7 @@ class FileUploader:
|
||||
|
||||
def _compute_uri(self, roothash):
|
||||
params = self._encoder.get_serialized_params()
|
||||
return pack_uri(self._codec_name, params, self._verifierid, roothash)
|
||||
return pack_uri(self._codec_name, params, self._verifierid, roothash, self.needed_shares, self.total_shares, self._size, self._encoder.segment_size)
|
||||
|
||||
|
||||
def netstring(s):
|
||||
|
@ -5,7 +5,7 @@ from allmydata.util import idlib
|
||||
# enough information to retrieve and validate the contents. It shall be
|
||||
# expressed in a limited character set (namely [TODO]).
|
||||
|
||||
def pack_uri(codec_name, codec_params, verifierid):
|
||||
def pack_uri(codec_name, codec_params, verifierid, roothash, needed_shares, total_shares, size, segment_size):
|
||||
assert isinstance(codec_name, str)
|
||||
assert len(codec_name) < 10
|
||||
assert ":" not in codec_name
|
||||
@ -13,13 +13,18 @@ def pack_uri(codec_name, codec_params, verifierid):
|
||||
assert ":" not in codec_params
|
||||
assert isinstance(verifierid, str)
|
||||
assert len(verifierid) == 20 # sha1 hash
|
||||
return "URI:%s:%s:%s" % (codec_name, codec_params, idlib.b2a(verifierid))
|
||||
return "URI:%s:%s:%s:%s:%s:%s:%s:%s" % (codec_name, codec_params, idlib.b2a(verifierid), idlib.b2a(roothash), needed_shares, total_shares, size, segment_size)
|
||||
|
||||
|
||||
def unpack_uri(uri):
|
||||
assert uri.startswith("URI:")
|
||||
header, codec_name, codec_params, verifierid_s = uri.split(":")
|
||||
header, codec_name, codec_params, verifierid_s, roothash_s, needed_shares_s, total_shares_s, size_s, segment_size_s = uri.split(":")
|
||||
verifierid = idlib.a2b(verifierid_s)
|
||||
return codec_name, codec_params, verifierid
|
||||
roothash = idlib.a2b(roothash_s)
|
||||
needed_shares = idlib.a2b(needed_shares_s)
|
||||
total_shares = idlib.a2b(total_shares_s)
|
||||
size = int(size_s)
|
||||
segment_size = int(segment_size_s)
|
||||
return codec_name, codec_params, verifierid, roothash, needed_shares, total_shares, size, segment_size
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user