storage: use one file per share instead of 7 (#85). work-in-progress, tests still fail

This commit is contained in:
Brian Warner 2007-07-13 14:04:49 -07:00
parent 6ff94541a8
commit cd8648d39b
7 changed files with 433 additions and 205 deletions

View File

@ -102,8 +102,17 @@ class ValidatedBucket:
self.share_hash_tree = share_hash_tree
self._roothash = roothash
self.block_hash_tree = hashtree.IncompleteHashTree(num_blocks)
self.started = False
def get_block(self, blocknum):
if not self.started:
d = self.bucket.start()
def _started(res):
self.started = True
return self.get_block(blocknum)
d.addCallback(_started)
return d
# the first time we use this bucket, we need to fetch enough elements
# of the share hash tree to validate it from our share hash up to the
# hashroot.
@ -380,7 +389,8 @@ class FileDownloader:
bucket = sources[0]
sources = sources[1:]
#d = bucket.callRemote(methname, *args)
d = getattr(bucket, methname)(*args)
d = bucket.startIfNecessary()
d.addCallback(lambda res: getattr(bucket, methname)(*args))
d.addCallback(validatorfunc, bucket)
def _bad(f):
log.msg("%s from vbucket %s failed: %s" % (name, bucket, f)) # WEIRD

View File

@ -180,6 +180,9 @@ class Encoder(object):
self.setup_codec() # TODO: duplicate call?
d = defer.succeed(None)
for l in self.landlords.values():
d.addCallback(lambda res, l=l: l.start())
for i in range(self.num_segments-1):
# note to self: this form doesn't work, because lambda only
# captures the slot, not the value

View File

@ -61,6 +61,38 @@ class RIClient(RemoteInterface):
return Nodeid
class RIBucketWriter(RemoteInterface):
def write(offset=int, data=ShareData):
return None
def close():
"""
If the data that has been written is incomplete or inconsistent then
the server will throw the data away, else it will store it for future
retrieval.
"""
return None
class RIBucketReader(RemoteInterface):
def read(offset=int, length=int):
return ShareData
class RIStorageServer(RemoteInterface):
def allocate_buckets(storage_index=StorageIndex,
sharenums=SetOf(int, maxLength=MAX_BUCKETS),
sharesize=int, blocksize=int, canary=Referenceable):
"""
@param canary: If the canary is lost before close(), the bucket is deleted.
@return: tuple of (alreadygot, allocated), where alreadygot is what we
already have and is what we hereby agree to accept
"""
return TupleOf(SetOf(int, maxLength=MAX_BUCKETS),
DictOf(int, RIBucketWriter, maxKeys=MAX_BUCKETS))
def get_buckets(storage_index=StorageIndex):
return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)
class IStorageBucketWriter(Interface):
def put_block(segmentnum=int, data=ShareData):
"""@param data: For most segments, this data will be 'blocksize'
bytes in length. The last segment might be shorter.
@ -92,16 +124,11 @@ class RIBucketWriter(RemoteInterface):
write(k + ':' + netstring(dict[k]))
"""
return None
def close():
"""
If the data that has been written is incomplete or inconsistent then
the server will throw the data away, else it will store it for future
retrieval.
"""
return None
pass
class IStorageBucketReader(Interface):
class RIBucketReader(RemoteInterface):
def get_block(blocknum=int):
"""Most blocks will be the same size. The last block might be shorter
than the others.
@ -121,55 +148,6 @@ class RIBucketReader(RemoteInterface):
return URIExtensionData
class RIStorageServer(RemoteInterface):
def allocate_buckets(storage_index=StorageIndex,
sharenums=SetOf(int, maxLength=MAX_BUCKETS),
sharesize=int, blocksize=int, canary=Referenceable):
"""
@param canary: If the canary is lost before close(), the bucket is deleted.
@return: tuple of (alreadygot, allocated), where alreadygot is what we
already have and is what we hereby agree to accept
"""
return TupleOf(SetOf(int, maxLength=MAX_BUCKETS),
DictOf(int, RIBucketWriter, maxKeys=MAX_BUCKETS))
def get_buckets(storage_index=StorageIndex):
return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)
class IStorageBucketWriter(Interface):
def put_block(segmentnum, data):
pass
def put_plaintext_hashes(hashes):
pass
def put_crypttext_hashes(hashes):
pass
def put_block_hashes(blockhashes):
pass
def put_share_hashes(sharehashes):
pass
def put_uri_extension(data):
pass
def close():
pass
class IStorageBucketReader(Interface):
def get_block(blocknum):
pass
def get_plaintext_hashes():
pass
def get_crypttext_hashes():
pass
def get_block_hashes():
pass
def get_share_hashes():
pass
def get_uri_extension():
pass
# hm, we need a solution for forward references in schemas
from foolscap.schema import Any

View File

@ -1,13 +1,14 @@
import os, re, weakref
import os, re, weakref, stat, struct
from foolscap import Referenceable
from twisted.application import service
from twisted.internet import defer
from zope.interface import implements
from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
RIBucketReader, IStorageBucketWriter, IStorageBucketReader
from allmydata import interfaces
from allmydata.util import bencode, fileutil, idlib
from allmydata.util import fileutil, idlib
from allmydata.util.assertutil import precondition
# store/
@ -25,110 +26,46 @@ NUM_RE=re.compile("[0-9]*")
class BucketWriter(Referenceable):
implements(RIBucketWriter)
def __init__(self, ss, incominghome, finalhome, blocksize, sharesize):
def __init__(self, ss, incominghome, finalhome, size):
self.ss = ss
self.incominghome = incominghome
self.finalhome = finalhome
self.blocksize = blocksize
self.sharesize = sharesize
self._size = size
self.closed = False
self._next_segnum = 0
fileutil.make_dirs(incominghome)
self._write_file('blocksize', str(blocksize))
# touch the file, so later callers will see that we're working on it
f = open(self.incominghome, 'ab')
f.close()
def allocated_size(self):
return self.sharesize
return self._size
def _write_file(self, fname, data):
open(os.path.join(self.incominghome, fname), 'wb').write(data)
def remote_put_block(self, segmentnum, data):
def remote_write(self, offset, data):
precondition(not self.closed)
# all blocks but the last will be of size self.blocksize, however the
# last one may be short, and we don't know the total number of
# segments so we can't tell which is which.
assert len(data) <= self.blocksize
assert segmentnum == self._next_segnum # must write in sequence
self._next_segnum = segmentnum + 1
f = fileutil.open_or_create(os.path.join(self.incominghome, 'data'))
f.seek(self.blocksize*segmentnum)
precondition(offset >= 0)
precondition(offset+len(data) <= self._size)
f = open(self.incominghome, 'ab')
f.seek(offset)
f.write(data)
def remote_put_plaintext_hashes(self, hashes):
precondition(not self.closed)
# TODO: verify the length of blockhashes.
# TODO: tighten foolscap schema to require exactly 32 bytes.
self._write_file('plaintext_hashes', ''.join(hashes))
def remote_put_crypttext_hashes(self, hashes):
precondition(not self.closed)
# TODO: verify the length of blockhashes.
# TODO: tighten foolscap schema to require exactly 32 bytes.
self._write_file('crypttext_hashes', ''.join(hashes))
def remote_put_block_hashes(self, blockhashes):
precondition(not self.closed)
# TODO: verify the length of blockhashes.
# TODO: tighten foolscap schema to require exactly 32 bytes.
self._write_file('blockhashes', ''.join(blockhashes))
def remote_put_share_hashes(self, sharehashes):
precondition(not self.closed)
self._write_file('sharehashes', bencode.bencode(sharehashes))
def remote_put_uri_extension(self, data):
precondition(not self.closed)
self._write_file('uri_extension', data)
f.close()
def remote_close(self):
precondition(not self.closed)
# TODO assert or check the completeness and consistency of the data that has been written
fileutil.make_dirs(os.path.dirname(self.finalhome))
fileutil.rename(self.incominghome, self.finalhome)
try:
os.rmdir(os.path.dirname(self.incominghome))
except OSError:
# Perhaps the directory wasn't empty. In any case, ignore the error.
pass
self.closed = True
self.ss.bucket_writer_closed(self, fileutil.du(self.finalhome))
filelen = os.stat(self.finalhome)[stat.ST_SIZE]
self.ss.bucket_writer_closed(self, filelen)
def str2l(s):
""" split string (pulled from storage) into a list of blockids """
return [ s[i:i+interfaces.HASH_SIZE] for i in range(0, len(s), interfaces.HASH_SIZE) ]
class BucketReader(Referenceable):
implements(RIBucketReader)
def __init__(self, home):
self.home = home
self.blocksize = int(self._read_file('blocksize'))
def _read_file(self, fname):
return open(os.path.join(self.home, fname), 'rb').read()
def remote_get_block(self, blocknum):
f = open(os.path.join(self.home, 'data'), 'rb')
f.seek(self.blocksize * blocknum)
return f.read(self.blocksize) # this might be short for the last block
def remote_get_plaintext_hashes(self):
return str2l(self._read_file('plaintext_hashes'))
def remote_get_crypttext_hashes(self):
return str2l(self._read_file('crypttext_hashes'))
def remote_get_block_hashes(self):
return str2l(self._read_file('blockhashes'))
def remote_get_share_hashes(self):
hashes = bencode.bdecode(self._read_file('sharehashes'))
# tuples come through bdecode(bencode()) as lists, which violates the
# schema
return [tuple(i) for i in hashes]
def remote_get_uri_extension(self):
return self._read_file('uri_extension')
def remote_read(self, offset, length):
f = open(self.home, 'rb')
f.seek(offset)
return f.read(length)
class StorageServer(service.MultiService, Referenceable):
implements(RIStorageServer)
@ -159,7 +96,7 @@ class StorageServer(service.MultiService, Referenceable):
return space
def remote_allocate_buckets(self, storage_index, sharenums, sharesize,
blocksize, canary):
canary):
alreadygot = set()
bucketwriters = {} # k: shnum, v: BucketWriter
si_s = idlib.b2a(storage_index)
@ -174,8 +111,9 @@ class StorageServer(service.MultiService, Referenceable):
if os.path.exists(incominghome) or os.path.exists(finalhome):
alreadygot.add(shnum)
elif no_limits or remaining_space >= space_per_bucket:
fileutil.make_dirs(os.path.join(self.incomingdir, si_s))
bw = BucketWriter(self, incominghome, finalhome,
blocksize, space_per_bucket)
space_per_bucket)
bucketwriters[shnum] = bw
self._active_writers[bw] = 1
if yes_limits:
@ -184,6 +122,9 @@ class StorageServer(service.MultiService, Referenceable):
# not enough space to accept this bucket
pass
if bucketwriters:
fileutil.make_dirs(os.path.join(self.storedir, si_s))
return alreadygot, bucketwriters
def bucket_writer_closed(self, bw, consumed_size):
@ -204,24 +145,127 @@ class StorageServer(service.MultiService, Referenceable):
return bucketreaders
"""
Share data is written into a single file. At the start of the file, there is
a series of four-byte big-endian offset values, which indicate where each
section starts. Each offset is measured from the beginning of the file.
0x00: segment size
0x04: offset of data (=00 00 00 1c)
0x08: offset of plaintext_hash_tree
0x0c: offset of crypttext_hash_tree
0x10: offset of block_hashes
0x14: offset of share_hashes
0x18: offset of uri_extension_length + uri_extension
0x1c: start of data
start of plaintext_hash_tree
start of crypttext_hash_tree
start of block_hashes
start of share_hashes
each share_hash is written as a two-byte (big-endian) hashnum
followed by the 32-byte SHA-256 hash. We only store the hashes
necessary to validate the share hash root
start of uri_extension_length (four-byte big-endian value)
start of uri_extension
"""
class WriteBucketProxy:
implements(IStorageBucketWriter)
def __init__(self, rref):
def __init__(self, rref, data_size, segment_size, num_segments,
num_share_hashes):
self._rref = rref
self._segment_size = segment_size
HASH_SIZE = interfaces.HASH_SIZE
self._segment_hash_size = (2*num_segments - 1) * HASH_SIZE
# how many share hashes are included in each share? This will be
# about ln2(num_shares).
self._share_hash_size = num_share_hashes * (2+HASH_SIZE)
offsets = self._offsets = {}
x = 0x1c
offsets['data'] = x
x += data_size
offsets['plaintext_hash_tree'] = x
x += self._segment_hash_size
offsets['crypttext_hash_tree'] = x
x += self._segment_hash_size
offsets['block_hashes'] = x
x += self._segment_hash_size
offsets['share_hashes'] = x
x += self._share_hash_size
offsets['uri_extension'] = x
offset_data = struct.pack(">LLLLLLL",
segment_size,
offsets['data'],
offsets['plaintext_hash_tree'],
offsets['crypttext_hash_tree'],
offsets['block_hashes'],
offsets['share_hashes'],
offsets['uri_extension']
)
assert len(offset_data) == 7*4
self._offset_data = offset_data
def start(self):
return self._write(0, self._offset_data)
def put_block(self, segmentnum, data):
return self._rref.callRemote("put_block", segmentnum, data)
offset = self._offsets['data'] + segmentnum * self._segment_size
assert offset + len(data) <= self._offsets['uri_extension']
assert isinstance(data, str)
if segmentnum < self._segment_size-1:
assert len(data) == self._segment_size
else:
assert len(data) <= self._segment_size
return self._write(offset, data)
def put_plaintext_hashes(self, hashes):
return self._rref.callRemote("put_plaintext_hashes", hashes)
offset = self._offsets['plaintext_hash_tree']
assert isinstance(hashes, list)
data = "".join(hashes)
assert len(data) == self._segment_hash_size
assert offset + len(data) <= self._offsets['crypttext_hash_tree']
return self._write(offset, data)
def put_crypttext_hashes(self, hashes):
return self._rref.callRemote("put_crypttext_hashes", hashes)
offset = self._offsets['crypttext_hash_tree']
assert isinstance(hashes, list)
data = "".join(hashes)
assert len(data) == self._segment_hash_size
assert offset + len(data) <= self._offsets['block_hashes']
return self._write(offset, data)
def put_block_hashes(self, blockhashes):
return self._rref.callRemote("put_block_hashes", blockhashes)
offset = self._offsets['block_hashes']
assert isinstance(blockhashes, list)
data = "".join(blockhashes)
assert len(data) == self._segment_hash_size
assert offset + len(data) <= self._offsets['share_hashes']
return self._write(offset, data)
def put_share_hashes(self, sharehashes):
return self._rref.callRemote("put_share_hashes", sharehashes)
# sharehashes is a list of (index, hash) tuples, so they get stored
# as 2+32=34 bytes each
offset = self._offsets['share_hashes']
assert isinstance(sharehashes, list)
data = "".join([struct.pack(">H", hashnum) + hashvalue
for hashnum,hashvalue in sharehashes])
assert len(data) == self._share_hash_size
assert offset + len(data) <= self._offsets['uri_extension']
return self._write(offset, data)
def put_uri_extension(self, data):
return self._rref.callRemote("put_uri_extension", data)
offset = self._offsets['uri_extension']
assert isinstance(data, str)
length = struct.pack(">L", len(data))
return self._write(offset, length+data)
def _write(self, offset, data):
# TODO: for small shares, buffer the writes and do just a single call
return self._rref.callRemote("write", offset, data)
def close(self):
return self._rref.callRemote("close")
@ -230,17 +274,87 @@ class ReadBucketProxy:
def __init__(self, rref):
self._rref = rref
def startIfNecessary(self):
if self._started:
return defer.succeed(self)
d = self.start()
d.addCallback(lambda res: self)
return d
def start(self):
# TODO: for small shares, read the whole bucket in start()
d = self._read(0, 7*4)
self._offsets = {}
def _got_offsets(data):
self._segment_size = struct.unpack(">L", data[0:4])[0]
x = 4
for field in ( 'data',
'plaintext_hash_tree',
'crypttext_hash_tree',
'block_hashes',
'share_hashes',
'uri_extension' ):
offset = struct.unpack(">L", data[x:x+4])[0]
x += 4
self._offsets[field] = offset
d.addCallback(_got_offsets)
return d
def get_block(self, blocknum):
return self._rref.callRemote("get_block", blocknum)
offset = self._offsets['data'] + blocknum * self._segment_size
return self._read(offset, self._segment_size)
def _str2l(self, s):
""" split string (pulled from storage) into a list of blockids """
return [ s[i:i+interfaces.HASH_SIZE]
for i in range(0, len(s), interfaces.HASH_SIZE) ]
def get_plaintext_hashes(self):
return self._rref.callRemote("get_plaintext_hashes")
def get_crypttext_hashes(self):
return self._rref.callRemote("get_crypttext_hashes")
def get_block_hashes(self):
return self._rref.callRemote("get_block_hashes")
def get_share_hashes(self):
return self._rref.callRemote("get_share_hashes")
def get_uri_extension(self):
return self._rref.callRemote("get_uri_extension")
offset = self._offsets['plaintext_hash_tree']
size = self._offsets['crypttext_hash_tree'] - offset
d = self._read(offset, size)
d.addCallback(self._str2l)
return d
def get_crypttext_hashes(self):
offset = self._offsets['crypttext_hash_tree']
size = self._offsets['block_hashes'] - offset
d = self._read(offset, size)
d.addCallback(self._str2l)
return d
def get_block_hashes(self):
offset = self._offsets['block_hashes']
size = self._offsets['share_hashes'] - offset
d = self._read(offset, size)
d.addCallback(self._str2l)
return d
def get_share_hashes(self):
offset = self._offsets['share_hashes']
size = self._offsets['uri_extension'] - offset
HASH_SIZE = interfaces.HASH_SIZE
assert size % (2+HASH_SIZE) == 0
d = self._read(offset, size)
def _unpack_share_hashes(data):
assert len(data) == size
hashes = []
for i in range(0, size, 2+HASH_SIZE):
hashnum = struct.unpack(">H", data[i:i+2])[0]
hashvalue = data[i+2:i+2+HASH_SIZE]
hashes.append( (hashnum, hashvalue) )
return hashes
d.addCallback(_unpack_share_hashes)
return d
def get_uri_extension(self):
offset = self._offsets['uri_extension']
d = self._read(offset, 4)
def _got_length(data):
length = struct.unpack(">L", data)[0]
return self._read(offset+4, length)
d.addCallback(_got_length)
return d
def _read(self, offset, length):
return self._rref.callRemote("read", offset, length)

View File

@ -61,17 +61,10 @@ class FakeBucketWriter:
self.share_hashes = None
self.closed = False
def callRemote(self, methname, *args, **kwargs):
# this allows FakeBucketWriter to be used either as an
# IStorageBucketWriter or as the remote reference that it wraps. This
# should be cleaned up eventually when we change RIBucketWriter to
# have just write(offset, data) and close()
def _call():
meth = getattr(self, methname)
return meth(*args, **kwargs)
d = eventual.fireEventually()
d.addCallback(lambda res: _call())
return d
def startIfNecessary(self):
return defer.succeed(self)
def start(self):
return defer.succeed(self)
def put_block(self, segmentnum, data):
def _try():

View File

@ -2,18 +2,20 @@
from twisted.trial import unittest
from twisted.application import service
from twisted.internet import defer
from foolscap import Referenceable
import os.path
from allmydata import storageserver
from allmydata.util import fileutil
from allmydata import storageserver, interfaces
from allmydata.util import fileutil, hashutil
class Bucket(unittest.TestCase):
def make_workdir(self, name):
basedir = os.path.join("test_storage", "Bucket", name)
basedir = os.path.join("storage", "Bucket", name)
incoming = os.path.join(basedir, "tmp", "bucket")
final = os.path.join(basedir, "bucket")
fileutil.make_dirs(basedir)
fileutil.make_dirs(os.path.join(basedir, "tmp"))
return incoming, final
def bucket_writer_closed(self, bw, consumed):
@ -21,31 +23,138 @@ class Bucket(unittest.TestCase):
def test_create(self):
incoming, final = self.make_workdir("test_create")
bw = storageserver.BucketWriter(self, incoming, final, 25, 57)
bw.remote_put_block(0, "a"*25)
bw.remote_put_block(1, "b"*25)
bw.remote_put_block(2, "c"*7) # last block may be short
bw = storageserver.BucketWriter(self, incoming, final, 200)
bw.remote_write(0, "a"*25)
bw.remote_write(25, "b"*25)
bw.remote_write(50, "c"*25)
bw.remote_write(75, "d"*7)
bw.remote_close()
def test_readwrite(self):
incoming, final = self.make_workdir("test_readwrite")
bw = storageserver.BucketWriter(self, incoming, final, 25, 57)
bw.remote_put_block(0, "a"*25)
bw.remote_put_block(1, "b"*25)
bw.remote_put_block(2, "c"*7) # last block may be short
bw.remote_put_block_hashes(["1"*32, "2"*32, "3"*32, "4"*32])
bw.remote_put_share_hashes([(5, "5"*32), (6, "6"*32)])
bw = storageserver.BucketWriter(self, incoming, final, 200)
bw.remote_write(0, "a"*25)
bw.remote_write(25, "b"*25)
bw.remote_write(50, "c"*7) # last block may be short
bw.remote_close()
# now read from it
br = storageserver.BucketReader(final)
self.failUnlessEqual(br.remote_get_block(0), "a"*25)
self.failUnlessEqual(br.remote_get_block(1), "b"*25)
self.failUnlessEqual(br.remote_get_block(2), "c"*7)
self.failUnlessEqual(br.remote_get_block_hashes(),
["1"*32, "2"*32, "3"*32, "4"*32])
self.failUnlessEqual(br.remote_get_share_hashes(),
[(5, "5"*32), (6, "6"*32)])
self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
class RemoteBucket:
def callRemote(self, methname, *args, **kwargs):
def _call():
meth = getattr(self.target, "remote_" + methname)
return meth(*args, **kwargs)
return defer.maybeDeferred(_call)
class BucketProxy(unittest.TestCase):
def make_bucket(self, name, size):
basedir = os.path.join("storage", "BucketProxy", name)
incoming = os.path.join(basedir, "tmp", "bucket")
final = os.path.join(basedir, "bucket")
fileutil.make_dirs(basedir)
fileutil.make_dirs(os.path.join(basedir, "tmp"))
bw = storageserver.BucketWriter(self, incoming, final, size)
rb = RemoteBucket()
rb.target = bw
return bw, rb, final
def bucket_writer_closed(self, bw, consumed):
pass
def test_create(self):
bw, rb, final = self.make_bucket("test_create", 500)
bp = storageserver.WriteBucketProxy(rb,
data_size=300,
segment_size=10,
num_segments=5,
num_share_hashes=3)
self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
def test_readwrite(self):
# Let's pretend each share has 100 bytes of data, and that there are
# 4 segments (25 bytes each), and 8 shares total. So the three
# per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
# block_hashes) will have 4 leaves and 7 nodes each. The per-share
# merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
# nodes. Furthermore, let's assume the uri_extension is 500 bytes
# long. That should make the whole share:
#
# 0x1c + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1406 bytes long
plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
for i in range(7)]
crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
for i in range(7)]
block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
for i in range(7)]
share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
for i in (1,9,13)]
uri_extension = "s" + "E"*498 + "e"
bw, rb, final = self.make_bucket("test_readwrite", 1406)
bp = storageserver.WriteBucketProxy(rb,
data_size=100,
segment_size=25,
num_segments=4,
num_share_hashes=3)
d = bp.start()
d.addCallback(lambda res: bp.put_block(0, "a"*25))
d.addCallback(lambda res: bp.put_block(1, "b"*25))
d.addCallback(lambda res: bp.put_block(2, "c"*25))
d.addCallback(lambda res: bp.put_block(3, "d"*25))
d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
d.addCallback(lambda res: bp.close())
# now read everything back
def _start_reading(res):
br = storageserver.BucketReader(final)
rb = RemoteBucket()
rb.target = br
rbp = storageserver.ReadBucketProxy(rb)
self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
d1 = rbp.start()
d1.addCallback(lambda res: rbp.get_block(0))
d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
d1.addCallback(lambda res: rbp.get_block(1))
d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
d1.addCallback(lambda res: rbp.get_block(2))
d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
d1.addCallback(lambda res: rbp.get_block(3))
d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*25))
d1.addCallback(lambda res: rbp.get_plaintext_hashes())
d1.addCallback(lambda res:
self.failUnlessEqual(res, plaintext_hashes))
d1.addCallback(lambda res: rbp.get_crypttext_hashes())
d1.addCallback(lambda res:
self.failUnlessEqual(res, crypttext_hashes))
d1.addCallback(lambda res: rbp.get_block_hashes())
d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
d1.addCallback(lambda res: rbp.get_share_hashes())
d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
d1.addCallback(lambda res: rbp.get_uri_extension())
d1.addCallback(lambda res:
self.failUnlessEqual(res, uri_extension))
return d1
d.addCallback(_start_reading)
return d
class Server(unittest.TestCase):
@ -74,7 +183,7 @@ class Server(unittest.TestCase):
canary = Referenceable()
already,writers = ss.remote_allocate_buckets("vid", [0,1,2],
75, 25, canary)
75, canary)
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
@ -82,19 +191,18 @@ class Server(unittest.TestCase):
self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
for i,wb in writers.items():
wb.remote_put_block(0, "%25d" % i)
wb.remote_write(0, "%25d" % i)
wb.remote_close()
# now they should be readable
b = ss.remote_get_buckets("vid")
self.failUnlessEqual(set(b.keys()), set([0,1,2]))
self.failUnlessEqual(b[0].remote_get_block(0),
"%25d" % 0)
self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
# now if we about writing again, the server should offer those three
# buckets as already present
already,writers = ss.remote_allocate_buckets("vid", [0,1,2,3,4],
75, 25, canary)
75, canary)
self.failUnlessEqual(already, set([0,1,2]))
self.failUnlessEqual(set(writers.keys()), set([3,4]))
@ -103,7 +211,7 @@ class Server(unittest.TestCase):
# upload into them a second time)
already,writers = ss.remote_allocate_buckets("vid", [2,3,4,5],
75, 25, canary)
75, canary)
self.failUnlessEqual(already, set([2,3,4]))
self.failUnlessEqual(set(writers.keys()), set([5]))
@ -112,35 +220,42 @@ class Server(unittest.TestCase):
canary = Referenceable()
already,writers = ss.remote_allocate_buckets("vid1", [0,1,2],
25, 5, canary)
25, canary)
self.failUnlessEqual(len(writers), 3)
# now the StorageServer should have 75 bytes provisionally allocated,
# allowing only 25 more to be claimed
self.failUnlessEqual(len(ss._active_writers), 3)
already2,writers2 = ss.remote_allocate_buckets("vid2", [0,1,2],
25, 5, canary)
25, canary)
self.failUnlessEqual(len(writers2), 1)
self.failUnlessEqual(len(ss._active_writers), 4)
# we abandon the first set, so their provisional allocation should be
# returned
del already
del writers
self.failUnlessEqual(len(ss._active_writers), 1)
# and we close the second set, so their provisional allocation should
# become real, long-term allocation
for bw in writers2.values():
bw.remote_write(0, "a"*25)
bw.remote_close()
del already2
del writers2
del bw
self.failUnlessEqual(len(ss._active_writers), 0)
# now there should be 25 bytes allocated, and 75 free
already3,writers3 = ss.remote_allocate_buckets("vid3", [0,1,2,3],
25, 5, canary)
25, canary)
self.failUnlessEqual(len(writers3), 3)
self.failUnlessEqual(len(ss._active_writers), 3)
del already3
del writers3
self.failUnlessEqual(len(ss._active_writers), 0)
ss.disownServiceParent()
del ss
@ -150,5 +265,6 @@ class Server(unittest.TestCase):
# would be more than 25 bytes and this test would need to be changed.
ss = self.create("test_sizelimits", 100)
already4,writers4 = ss.remote_allocate_buckets("vid4", [0,1,2,3],
25, 5, canary)
25, canary)
self.failUnlessEqual(len(writers4), 3)
self.failUnlessEqual(len(ss._active_writers), 3)

View File

@ -24,13 +24,16 @@ class TooFullError(Exception):
class PeerTracker:
def __init__(self, peerid, permutedid, connection,
sharesize, blocksize, crypttext_hash):
sharesize, blocksize, num_segments, num_share_hashes,
crypttext_hash):
self.peerid = peerid
self.permutedid = permutedid
self.connection = connection # to an RIClient
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
self.sharesize = sharesize
self.blocksize = blocksize
self.num_segments = num_segments
self.num_share_hashes = num_share_hashes
self.crypttext_hash = crypttext_hash
self._storageserver = None
@ -54,8 +57,13 @@ class PeerTracker:
def _got_reply(self, (alreadygot, buckets)):
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
b = dict( [ (sharenum, storageserver.WriteBucketProxy(rref))
for sharenum, rref in buckets.iteritems() ] )
b = {}
for sharenum, rref in buckets.iteritems():
bp = storageserver.WriteBucketProxy(rref, self.sharesize,
self.blocksize,
self.num_segments,
self.num_share_hashes)
b[sharenum] = bp
self.buckets.update(b)
return (alreadygot, set(b.keys()))
@ -129,8 +137,14 @@ class FileUploader:
# responsible for handling the data and sending out the shares.
peers = self._client.get_permuted_peers(self._crypttext_hash)
assert peers
# TODO: eek, don't pull this from here, find a better way. gross.
num_segments = self._encoder.uri_extension_data['num_segments']
from allmydata.util.mathutil import next_power_of_k
import math
num_share_hashes = max(int(math.log(next_power_of_k(self.total_shares,2),2)),1)
trackers = [ PeerTracker(peerid, permutedid, conn,
share_size, block_size,
num_segments, num_share_hashes,
self._crypttext_hash)
for permutedid, peerid, conn in peers ]
self.usable_peers = set(trackers) # this set shrinks over time