more #85 work, system test still fails

This commit is contained in:
Brian Warner 2007-07-13 15:09:01 -07:00
parent cd8648d39b
commit 1f8e407d9c
7 changed files with 152 additions and 80 deletions

View File

@ -162,10 +162,7 @@ class Encoder(object):
def set_shareholders(self, landlords):
assert isinstance(landlords, dict)
for k in landlords:
# it would be nice to:
#assert RIBucketWriter.providedBy(landlords[k])
assert IStorageBucketWriter(landlords[k])
pass
assert IStorageBucketWriter.providedBy(landlords[k])
self.landlords = landlords.copy()
def start(self):

View File

@ -80,7 +80,7 @@ class RIBucketReader(RemoteInterface):
class RIStorageServer(RemoteInterface):
def allocate_buckets(storage_index=StorageIndex,
sharenums=SetOf(int, maxLength=MAX_BUCKETS),
sharesize=int, blocksize=int, canary=Referenceable):
allocated_size=int, canary=Referenceable):
"""
@param canary: If the canary is lost before close(), the bucket is deleted.
@return: tuple of (alreadygot, allocated), where alreadygot is what we

View File

@ -95,12 +95,12 @@ class StorageServer(service.MultiService, Referenceable):
space += bw.allocated_size()
return space
def remote_allocate_buckets(self, storage_index, sharenums, sharesize,
def remote_allocate_buckets(self, storage_index, sharenums, allocated_size,
canary):
alreadygot = set()
bucketwriters = {} # k: shnum, v: BucketWriter
si_s = idlib.b2a(storage_index)
space_per_bucket = sharesize
space_per_bucket = allocated_size
no_limits = self.sizelimit is None
yes_limits = not no_limits
if yes_limits:
@ -169,18 +169,28 @@ section starts. Each offset is measured from the beginning of the file.
start of uri_extension
"""
def allocated_size(data_size, num_segments, num_share_hashes,
uri_extension_size):
wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
uri_extension_size)
uri_extension_starts_at = wbp._offsets['uri_extension']
return uri_extension_starts_at + 4 + uri_extension_size
class WriteBucketProxy:
implements(IStorageBucketWriter)
def __init__(self, rref, data_size, segment_size, num_segments,
num_share_hashes):
num_share_hashes, uri_extension_size):
self._rref = rref
self._segment_size = segment_size
self._num_segments = num_segments
HASH_SIZE = interfaces.HASH_SIZE
self._segment_hash_size = (2*num_segments - 1) * HASH_SIZE
# how many share hashes are included in each share? This will be
# about ln2(num_shares).
self._share_hash_size = num_share_hashes * (2+HASH_SIZE)
# we commit to not sending a uri extension larger than this
self._uri_extension_size = uri_extension_size
offsets = self._offsets = {}
x = 0x1c
@ -215,10 +225,12 @@ class WriteBucketProxy:
offset = self._offsets['data'] + segmentnum * self._segment_size
assert offset + len(data) <= self._offsets['uri_extension']
assert isinstance(data, str)
if segmentnum < self._segment_size-1:
assert len(data) == self._segment_size
if segmentnum < self._num_segments-1:
precondition(len(data) == self._segment_size,
len(data), self._segment_size)
else:
assert len(data) <= self._segment_size
precondition(len(data) <= self._segment_size,
len(data), self._segment_size)
return self._write(offset, data)
def put_plaintext_hashes(self, hashes):
@ -252,13 +264,15 @@ class WriteBucketProxy:
assert isinstance(sharehashes, list)
data = "".join([struct.pack(">H", hashnum) + hashvalue
for hashnum,hashvalue in sharehashes])
assert len(data) == self._share_hash_size
precondition(len(data) == self._share_hash_size,
len(data), self._share_hash_size)
assert offset + len(data) <= self._offsets['uri_extension']
return self._write(offset, data)
def put_uri_extension(self, data):
offset = self._offsets['uri_extension']
assert isinstance(data, str)
assert len(data) <= self._uri_extension_size
length = struct.pack(">L", len(data))
return self._write(offset, length+data)
@ -273,6 +287,7 @@ class ReadBucketProxy:
implements(IStorageBucketReader)
def __init__(self, rref):
self._rref = rref
self._started = False
def startIfNecessary(self):
if self._started:

View File

@ -3,7 +3,6 @@ from zope.interface import implements
from twisted.trial import unittest
from twisted.internet import defer
from twisted.python.failure import Failure
from foolscap import eventual
from allmydata import encode, download, hashtree
from allmydata.util import hashutil
from allmydata.uri import pack_uri
@ -11,45 +10,13 @@ from allmydata.Crypto.Cipher import AES
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader
from cStringIO import StringIO
class FakePeer:
def __init__(self, mode="good"):
self.ss = FakeStorageServer(mode)
def callRemote(self, methname, *args, **kwargs):
def _call():
meth = getattr(self, methname)
return meth(*args, **kwargs)
return defer.maybeDeferred(_call)
def get_service(self, sname):
assert sname == "storageserver"
return self.ss
class FakeStorageServer:
def __init__(self, mode):
self.mode = mode
def callRemote(self, methname, *args, **kwargs):
def _call():
meth = getattr(self, methname)
return meth(*args, **kwargs)
d = eventual.fireEventually()
d.addCallback(lambda res: _call())
return d
def allocate_buckets(self, crypttext_hash, sharenums, shareize, blocksize, canary):
if self.mode == "full":
return (set(), {},)
elif self.mode == "already got them":
return (set(sharenums), {},)
else:
return (set(), dict([(shnum, FakeBucketWriter(),) for shnum in sharenums]),)
class LostPeerError(Exception):
pass
def flip_bit(good): # flips the last bit
return good[:-1] + chr(ord(good[-1]) ^ 0x01)
class FakeBucketWriter:
class FakeBucketWriterProxy:
implements(IStorageBucketWriter, IStorageBucketReader)
# these are used for both reading and writing
def __init__(self, mode="good"):
@ -195,7 +162,7 @@ class Encode(unittest.TestCase):
shareholders = {}
all_shareholders = []
for shnum in range(NUM_SHARES):
peer = FakeBucketWriter()
peer = FakeBucketWriterProxy()
shareholders[shnum] = peer
all_shareholders.append(peer)
e.set_shareholders(shareholders)
@ -322,7 +289,7 @@ class Roundtrip(unittest.TestCase):
all_peers = []
for shnum in range(NUM_SHARES):
mode = bucket_modes.get(shnum, "good")
peer = FakeBucketWriter(mode)
peer = FakeBucketWriterProxy(mode)
shareholders[shnum] = peer
e.set_shareholders(shareholders)
plaintext_hasher = hashutil.plaintext_hasher()

View File

@ -5,8 +5,10 @@ from twisted.application import service
from twisted.internet import defer
from foolscap import Referenceable
import os.path
from allmydata import storageserver, interfaces
from allmydata import interfaces
from allmydata.util import fileutil, hashutil
from allmydata.storageserver import BucketWriter, BucketReader, \
WriteBucketProxy, ReadBucketProxy, StorageServer
class Bucket(unittest.TestCase):
@ -23,7 +25,7 @@ class Bucket(unittest.TestCase):
def test_create(self):
incoming, final = self.make_workdir("test_create")
bw = storageserver.BucketWriter(self, incoming, final, 200)
bw = BucketWriter(self, incoming, final, 200)
bw.remote_write(0, "a"*25)
bw.remote_write(25, "b"*25)
bw.remote_write(50, "c"*25)
@ -32,14 +34,14 @@ class Bucket(unittest.TestCase):
def test_readwrite(self):
incoming, final = self.make_workdir("test_readwrite")
bw = storageserver.BucketWriter(self, incoming, final, 200)
bw = BucketWriter(self, incoming, final, 200)
bw.remote_write(0, "a"*25)
bw.remote_write(25, "b"*25)
bw.remote_write(50, "c"*7) # last block may be short
bw.remote_close()
# now read from it
br = storageserver.BucketReader(final)
br = BucketReader(final)
self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
@ -59,7 +61,7 @@ class BucketProxy(unittest.TestCase):
final = os.path.join(basedir, "bucket")
fileutil.make_dirs(basedir)
fileutil.make_dirs(os.path.join(basedir, "tmp"))
bw = storageserver.BucketWriter(self, incoming, final, size)
bw = BucketWriter(self, incoming, final, size)
rb = RemoteBucket()
rb.target = bw
return bw, rb, final
@ -69,11 +71,12 @@ class BucketProxy(unittest.TestCase):
def test_create(self):
bw, rb, final = self.make_bucket("test_create", 500)
bp = storageserver.WriteBucketProxy(rb,
data_size=300,
segment_size=10,
num_segments=5,
num_share_hashes=3)
bp = WriteBucketProxy(rb,
data_size=300,
segment_size=10,
num_segments=5,
num_share_hashes=3,
uri_extension_size=500)
self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
def test_readwrite(self):
@ -98,11 +101,12 @@ class BucketProxy(unittest.TestCase):
uri_extension = "s" + "E"*498 + "e"
bw, rb, final = self.make_bucket("test_readwrite", 1406)
bp = storageserver.WriteBucketProxy(rb,
data_size=100,
segment_size=25,
num_segments=4,
num_share_hashes=3)
bp = WriteBucketProxy(rb,
data_size=100,
segment_size=25,
num_segments=4,
num_share_hashes=3,
uri_extension_size=len(uri_extension))
d = bp.start()
d.addCallback(lambda res: bp.put_block(0, "a"*25))
@ -118,13 +122,13 @@ class BucketProxy(unittest.TestCase):
# now read everything back
def _start_reading(res):
br = storageserver.BucketReader(final)
br = BucketReader(final)
rb = RemoteBucket()
rb.target = br
rbp = storageserver.ReadBucketProxy(rb)
rbp = ReadBucketProxy(rb)
self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
d1 = rbp.start()
d1 = rbp.startIfNecessary()
d1.addCallback(lambda res: rbp.get_block(0))
d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
d1.addCallback(lambda res: rbp.get_block(1))
@ -169,7 +173,7 @@ class Server(unittest.TestCase):
def create(self, name, sizelimit=None):
workdir = self.workdir(name)
ss = storageserver.StorageServer(workdir, sizelimit)
ss = StorageServer(workdir, sizelimit)
ss.setServiceParent(self.sparent)
return ss

View File

@ -1,12 +1,79 @@
from twisted.trial import unittest
from twisted.python.failure import Failure
from twisted.internet import defer
from cStringIO import StringIO
from allmydata import upload, encode
from allmydata import upload, encode, storageserver
from allmydata.uri import unpack_uri, unpack_lit
from allmydata.util.assertutil import precondition
from foolscap import eventual
from test_encode import FakePeer
class FakePeer:
def __init__(self, mode="good"):
self.ss = FakeStorageServer(mode)
def callRemote(self, methname, *args, **kwargs):
def _call():
meth = getattr(self, methname)
return meth(*args, **kwargs)
return defer.maybeDeferred(_call)
def get_service(self, sname):
assert sname == "storageserver"
return self.ss
class FakeStorageServer:
def __init__(self, mode):
self.mode = mode
def callRemote(self, methname, *args, **kwargs):
def _call():
meth = getattr(self, methname)
return meth(*args, **kwargs)
d = eventual.fireEventually()
d.addCallback(lambda res: _call())
return d
def allocate_buckets(self, crypttext_hash, sharenums,
share_size, blocksize, canary):
#print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
if self.mode == "full":
return (set(), {},)
elif self.mode == "already got them":
return (set(sharenums), {},)
else:
return (set(),
dict([( shnum, FakeBucketWriter(share_size) )
for shnum in sharenums]),
)
class FakeBucketWriter:
# a diagnostic version of storageserver.BucketWriter
def __init__(self, size):
self.data = StringIO()
self.closed = False
self._size = size
def callRemote(self, methname, *args, **kwargs):
def _call():
meth = getattr(self, "remote_" + methname)
return meth(*args, **kwargs)
d = eventual.fireEventually()
d.addCallback(lambda res: _call())
return d
def remote_write(self, offset, data):
precondition(not self.closed)
precondition(offset >= 0)
precondition(offset+len(data) <= self._size,
"offset=%d + data=%d > size=%d" %
(offset, len(data), self._size))
self.data.seek(offset)
self.data.write(data)
def remote_close(self):
precondition(not self.closed)
self.closed = True
class FakeClient:
def __init__(self, mode="good"):

View File

@ -5,7 +5,7 @@ from twisted.application import service
from foolscap import Referenceable
from allmydata.util import idlib, hashutil
from allmydata import encode, storageserver
from allmydata import encode, storageserver, hashtree
from allmydata.uri import pack_uri, pack_lit
from allmydata.interfaces import IUploadable, IUploader
from allmydata.Crypto.Cipher import AES
@ -22,6 +22,13 @@ class HaveAllPeersError(Exception):
class TooFullError(Exception):
pass
# our current uri_extension is 846 bytes for small files, a few bytes
# more for larger ones (since the filesize is encoded in decimal in a
# few places). Ask for a little bit more just in case we need it. If
# the extension changes size, we can change EXTENSION_SIZE to
# allocate a more accurate amount of space.
EXTENSION_SIZE = 1000
class PeerTracker:
def __init__(self, peerid, permutedid, connection,
sharesize, blocksize, num_segments, num_share_hashes,
@ -31,6 +38,13 @@ class PeerTracker:
self.connection = connection # to an RIClient
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
self.sharesize = sharesize
#print "PeerTracker", peerid, permutedid, sharesize
as = storageserver.allocated_size(sharesize,
num_segments,
num_share_hashes,
EXTENSION_SIZE)
self.allocated_size = as
self.blocksize = blocksize
self.num_segments = num_segments
self.num_share_hashes = num_share_hashes
@ -47,10 +61,11 @@ class PeerTracker:
def _got_storageserver(self, storageserver):
self._storageserver = storageserver
def _query(self, sharenums):
#print " query", self.peerid, len(sharenums)
d = self._storageserver.callRemote("allocate_buckets",
self.crypttext_hash,
sharenums, self.sharesize,
self.blocksize,
sharenums,
self.allocated_size,
canary=Referenceable())
d.addCallback(self._got_reply)
return d
@ -62,7 +77,8 @@ class PeerTracker:
bp = storageserver.WriteBucketProxy(rref, self.sharesize,
self.blocksize,
self.num_segments,
self.num_share_hashes)
self.num_share_hashes,
EXTENSION_SIZE)
b[sharenum] = bp
self.buckets.update(b)
return (alreadygot, set(b.keys()))
@ -137,11 +153,16 @@ class FileUploader:
# responsible for handling the data and sending out the shares.
peers = self._client.get_permuted_peers(self._crypttext_hash)
assert peers
# TODO: eek, don't pull this from here, find a better way. gross.
num_segments = self._encoder.uri_extension_data['num_segments']
from allmydata.util.mathutil import next_power_of_k
import math
num_share_hashes = max(int(math.log(next_power_of_k(self.total_shares,2),2)),1)
ht = hashtree.IncompleteHashTree(self.total_shares)
# this needed_hashes computation should mirror
# Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
# (instead of a HashTree) because we don't require actual hashing
# just to count the levels.
num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
trackers = [ PeerTracker(peerid, permutedid, conn,
share_size, block_size,
num_segments, num_share_hashes,
@ -217,10 +238,11 @@ class FileUploader:
if ring[0][1] == SHARE:
sharenums_to_query.add(ring[0][2])
else:
d = peer.query(sharenums_to_query)
d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,))
outstanding_queries.append(d)
d.addErrback(log.err)
if True or sharenums_to_query:
d = peer.query(sharenums_to_query)
d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,))
outstanding_queries.append(d)
d.addErrback(log.err)
peer = ring[0][2]
sharenums_to_query = set()
ring.rotate(-1)