checkpointing mutable-file work. Storage layer is 80% in place.

This commit is contained in:
Brian Warner 2007-10-30 19:47:36 -07:00
parent 75b7df7e29
commit b24c2925e8
4 changed files with 502 additions and 5 deletions

View File

@ -6,15 +6,13 @@ from twisted.internet import defer
from foolscap import Referenceable
from allmydata import uri
from allmydata.interfaces import RIVirtualDriveServer, \
IDirectoryNode, IFileNode, IFileURI, IDirnodeURI, IURI
IDirectoryNode, IFileNode, IFileURI, IDirnodeURI, IURI, \
BadWriteEnablerError
from allmydata.util import bencode, idlib, hashutil, fileutil
from allmydata.Crypto.Cipher import AES
# VirtualDriveServer is the side that hosts directory nodes
class BadWriteEnablerError(Exception):
pass
class NoPublicRootError(Exception):
pass

View File

@ -82,6 +82,49 @@ class RIBucketReader(RemoteInterface):
def read(offset=int, length=int):
return ShareData
TestVector = ListOf(TupleOf(int, int, str, str))
# elements are (offset, length, operator, specimen)
# operator is one of "lt, le, eq, ne, ge, gt, nop"
# nop always passes and is used to fetch data while writing.
# you should use length==len(specimen) for everything except nop
DataVector = ListOf(TupleOf(int, ShareData))
# (offset, data). This limits us to 30 writes of 1MiB each per call
TestResults = ListOf(str)
# returns data[offset:offset+length] for each element of TestVector
class RIMutableSlot(RemoteInterface):
def testv_and_writev(write_enabler=Hash,
testv=TestVector,
datav=DataVector,
new_length=ChoiceOf(None, int)):
"""General-purpose test-and-set operation for mutable slots. Perform
the given comparisons. If they all pass, then apply the write vector.
If new_length is not None, use it to set the size of the container.
This can be used to pre-allocate space for a series of upcoming
writes, or truncate existing data. If the container is growing,
new_length will be applied before datav. If the container is
shrinking, it will be applied afterwards.
Return the old data that was used for the comparisons.
The boolean return value is True if the write vector was applied,
false if not.
If the write_enabler is wrong, this will raise BadWriterEnablerError.
To enable share migration, the exception will have the nodeid used
for the old write enabler embedded in it, in the following string::
The write enabler was recorded by nodeid '%s'.
"""
return TupleOf(bool, TestResults)
def read(offset=int, length=int):
return ShareData
def get_length():
return int
class RIStorageServer(RemoteInterface):
def allocate_buckets(storage_index=StorageIndex,
@ -126,6 +169,47 @@ class RIStorageServer(RemoteInterface):
return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)
def allocate_mutable_slot(storage_index=StorageIndex,
write_enabler=Hash,
renew_secret=LeaseRenewSecret,
cancel_secret=LeaseCancelSecret,
sharenums=SetOf(int, maxLength=MAX_BUCKETS),
allocated_size=int):
"""
@param storage_index: the index of the bucket to be created or
increfed.
@param write_enabler: a secret that is stored along with the slot.
Writes are accepted from any caller who can
present the matching secret. A different secret
should be used for each slot*server pair.
@param renew_secret: This is the secret used to protect bucket refresh
This secret is generated by the client and
stored for later comparison by the server. Each
server is given a different secret.
@param cancel_secret: Like renew_secret, but protects bucket decref.
@param sharenums: these are the share numbers (probably between 0 and
99) that the sender is proposing to store on this
server.
@param allocated_size: all shares will pre-allocate this many bytes.
Use this to a) confirm that you can claim as
much space as you want before you actually
send the data, and b) reduce the disk-IO cost
of doing incremental writes.
@return: dict mapping sharenum to slot. The return value may include
more sharenums than asked, if some shares already existed.
New leases are added for all
shares.
"""
return DictOf(int, RIMutableSlot, maxKeys=MAX_BUCKETS)
def get_mutable_slot(storage_index=StorageIndex):
"""This returns an empty dictionary if the server has no shares
of the slot mentioned."""
return DictOf(int, RIMutableSlot, maxKeys=MAX_BUCKETS)
class IStorageBucketWriter(Interface):
def put_block(segmentnum=int, data=ShareData):
"""@param data: For most segments, this data will be 'blocksize'
@ -1117,6 +1201,10 @@ class IVirtualDrive(Interface):
class NotCapableError(Exception):
"""You have tried to write to a read-only node."""
class BadWriteEnablerError(Exception):
pass
class RIControlClient(RemoteInterface):
def wait_for_client_connections(num_clients=int):

View File

@ -7,10 +7,14 @@ from twisted.internet import defer
from zope.interface import implements
from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE
RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE, \
BadWriterEnablerError, RIMutableSlot
from allmydata.util import fileutil, idlib, mathutil
from allmydata.util.assertutil import precondition, _assert
class DataTooLargeError(Exception):
pass
# storage/
# storage/shares/incoming
# incoming/ holds temp dirs named $STORAGEINDEX/$SHARENUM which will be
@ -213,6 +217,347 @@ class BucketReader(Referenceable):
return self._share_file.read_share_data(offset, length)
# the MutableShareFile is like the ShareFile, but used for mutable data. It
# has a different layout. See docs/mutable.txt for more details.
# # offset size name
# 1 0 32 magic verstr "tahoe mutable container v1" plus binary
# 2 32 32 write enabler's nodeid
# 3 64 32 write enabler
# 4 72 8 data size (actual share data present) (a)
# 5 80 8 offset of (8) count of extra leases (after data)
# 6 88 416 four leases, 104 bytes each
# 0 4 ownerid (0 means "no lease here")
# 4 4 expiration timestamp
# 8 32 renewal token
# 40 32 cancel token
# 72 32 nodeid which accepted the tokens
# 7 504 (a) data
# 8 ?? 4 count of extra leases
# 9 ?? n*104 extra leases
assert struct.calcsize("L"), 4
assert struct.calcsize("Q"), 8
class MutableShareFile(Referenceable):
# note: at any given time, there should only be a single instance of this
# class per filename. More than one is likely to corrupt the container,
# because of state we cache in instance variables. This requires the
# StorageServer to use a WeakValueDictionary, indexed by filename. This
# could be improved by cacheing less and doing more IO.
implements(RIMutableSlot)
DATA_LENGTH_OFFSET = struct.calcsize(">32s32s32s")
HEADER_SIZE = struct.calcsize(">32s32s32sQQ") # doesn't include leases
LEASE_SIZE = struct.calcsize(">LL32s32s32s")
DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
# our sharefiles share with a recognizable string, plus some random
# binary data to reduce the chance that a regular text file will look
# like a sharefile.
MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
assert len(MAGIC) == 32
MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
# TODO: decide upon a policy for max share size
def __init__(self, filename):
self.home = filename
self._base_lease_offset = self.HEADER_SIZE
if os.path.exists(self.home):
f = open(self.home, 'rb')
data = f.read(88)
(magic,
self._write_enabler_nodeid, self._write_enabler,
self._data_length, offset) = struct.unpack(">32s32s32sQQ", data)
assert magic == self.MAGIC
self._extra_lease_offset = offset # points at (8)
f.seek(self._extra_lease_offset)
data = f.read(4)
self._num_extra_leases = struct.unpack(">L", data)
f.close()
def create(self, my_nodeid, write_enabler):
assert not os.path.exists(self.home)
self._write_enabler = write_enabler
self._data_length = 0
self._extra_lease_offset = (self.HEADER_SIZE
+ 4 * self.LEASE_SIZE
+ self._data_length)
assert self._extra_lease_offset == self.DATA_OFFSET # true at creation
self._num_extra_leases = 0
f = open(self.home, 'wb')
header = struct.pack(">32s32s32sQQ",
self.MAGIC, my_nodeid, write_enabler,
self._data_length, self._extra_lease_offset,
)
f.write(header)
# data goes here, empty after creation
f.write(struct.pack(">L", self._num_extra_leases))
# extra leases go here, none at creation
f.close()
def read_share_data(self, offset, length):
precondition(offset >= 0)
if offset+length > self._data_length:
# reads beyond the end of the data are truncated. Reads that
# start beyond the end of the data return an empty string.
length = max(0, self._data_length-offset)
if length == 0:
return ""
precondition(offset+length <= self._data_length)
f = open(self.home, 'rb')
f.seek(self.DATA_OFFSET+offset)
return f.read(length)
def change_container_size(self, new_container_size):
if new_container_size > self.MAX_SIZE:
raise DataTooLargeError()
new_extra_lease_offset = self.DATA_OFFSET + new_container_size
if new_extra_lease_offset < self._extra_lease_offset:
# TODO: allow containers to shrink
return
f = open(self.home, 'rb+')
f.seek(self._extra_lease_offset)
extra_lease_data = f.read(4 + self._num_extra_leases * self.LEASE_SIZE)
f.seek(new_extra_lease_offset)
f.write(extra_lease_data)
self._extra_lease_offset = new_extra_lease_offset
# an interrupt here will corrupt the leases, iff the move caused the
# extra leases to overlap.
f.seek(self.DATA_LENGTH_OFFSET+8)
f.write(struct.pack(">Q", new_extra_lease_offset))
f.close()
def write_share_data(self, offset, data):
length = len(data)
precondition(offset >= 0)
if offset+length < self._data_length:
# they are not expanding their data size
f = open(self.home, 'rb+')
f.seek(self.DATA_OFFSET+offset)
f.write(data)
f.close()
return
if self.DATA_OFFSET+offset+length <= self._extra_lease_offset:
# they are expanding their data size, but not the container size
f = open(self.home, 'rb+')
self._data_length = offset+length
f.seek(self.DATA_LENGTH_OFFSET)
f.write(struct.pack(">Q", self._data_length))
# an interrupt here will result in a corrupted share
f.seek(self.DATA_OFFSET+offset)
f.write(data)
f.close()
return
# they are expanding the container, so we have to move the leases.
# With luck, they're expanding it more than the size of the extra
# lease block, which will minimize the corrupt-the-share window
self.change_container_size(offset+length)
# an interrupt here is ok.. the container has been enlarged but the
# data remains untouched
self._data_length = offset+length
f = open(self.home, 'rb+')
f.seek(self.DATA_OFFSET+offset)
f.write(data)
# an interrupt here will result in a corrupted share
f.seek(self.DATA_LENGTH_OFFSET)
f.write(struct.pack(">Q", self._data_length))
f.close()
return
def _write_lease_record(self, f, lease_number, lease_info):
(ownerid, expiration_time,
renew_secret, cancel_secret, nodeid) = lease_info
if lease_number < 4:
offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
elif (lease_number-4) < self._num_extra_leases:
offset = (self._extra_lease_offset
+ 4
+ (lease_number-4)*self.LEASE_NUMBER)
else:
f.seek(self._extra_lease_offset)
f.write(struct.pack(">L", self._num_extra_leases+1))
self._num_extra_leases += 1
offset = (self._extra_lease_offset
+ 4
+ (lease_number-4)*self.LEASE_NUMBER)
f.seek(offset)
assert f.tell() == offset
f.write(struct.pack(">LL32s32s32s",
ownerid, int(expiration_time),
renew_secret, cancel_secret, nodeid))
def _read_lease_record(self, f, lease_number):
# returns a 5-tuple of lease info, or None
if lease_number < 4:
offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
elif (lease_number-4) < self._num_extra_leases:
offset = (self._extra_lease_offset
+ 4
+ (lease_number-4)*self.LEASE_NUMBER)
else:
raise IndexError("No such lease number %d" % lease_number)
f.seek(offset)
assert f.tell() == offset
data = f.read(self.LEASE_SIZE)
lease_info = struct.unpack(">LL32s32s32s", data)
(ownerid, expiration_time,
renew_secret, cancel_secret, nodeid) = lease_info
if ownerid == 0:
return None
return lease_info
def _read_num_leases(self, f):
f.seek(self.HEADER_SIZE)
leasedata = f.read(4*self.LEASE_SIZE)
num_leases = 0
for i in range(4):
base = i*self.LEASE_SIZE
(ownerid,) = struct.unpack(">L", leasedata[base:base+4])
if ownerid != 0:
num_leases += 1
return num_leases + self._num_extra_leases
def pack_leases(self):
pass
def _truncate_leases(self, f, num_leases):
f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE)
def enumerate_leases(self, f):
"""Yields (leasenum, (ownerid, expiration_time, renew_secret,
cancel_secret, accepting_nodeid)) for all leases."""
for i in range(self._read_num_leases(f)):
try:
data = self._read_lease_record(f, i)
if data is not None:
yield (i,data)
except IndexError:
return
def add_lease(self, lease_info):
f = open(self.home, 'rb+')
num_leases = self._read_num_leases(f)
self._write_lease_record(f, num_leases, lease_info)
self._write_num_leases(f, num_leases+1)
f.close()
def renew_lease(self, renew_secret, new_expire_time):
accepting_nodeids = set()
f = open(self.home, 'rb+')
for (leasenum,(oid,et,rs,cs,anid)) in self.enumerate_leases(f):
if rs == renew_secret:
# yup. See if we need to update the owner time.
if new_expire_time > et:
# yes
new_lease = (oid,new_expire_time,rs,cs,anid)
self._write_lease_record(f, leasenum, new_lease)
f.close()
return
accepting_nodeids.add(anid)
f.close()
# TODO: return the accepting_nodeids set, to give the client a chance
# to update the leases on a share which has been migrated from its
# original server to a new one.
raise IndexError("unable to renew non-existent lease")
def add_or_renew_lease(self, lease_info):
ownerid, expire_time, renew_secret, cancel_secret, anid = lease_info
try:
self.renew_lease(renew_secret, expire_time)
except IndexError:
self.add_lease(lease_info)
def cancel_lease(self, cancel_secret):
"""Remove any leases with the given cancel_secret. Return
(num_remaining_leases, space_freed). Raise IndexError if there was no
lease with the given cancel_secret."""
modified = 0
remaining = 0
blank = "\x00"*32
blank_lease = (0, 0, blank, blank, blank)
f = open(self.home, 'rb+')
for (leasenum,(oid,et,rs,cs,anid)) in self.enumerate_leases(f):
if cs == cancel_secret:
self._write_lease_record(f, leasenum, blank_lease)
modified += 1
else:
remaining += 1
if modified:
freed_space = self._pack_leases(f)
f.close()
return (freed_space, remaining)
def _pack_leases(self, f):
# TODO: reclaim space from cancelled leases
return 0
def remote_read(self, offset, length):
return self.read_share_data(offset, length)
def remote_get_length(self):
return self._data_length
def remote_testv_and_writev(self, write_enabler, testv, datav, new_length):
if write_enabler != self._write_enabler:
# accomodate share migration by reporting the nodeid used for the
# old write enabler.
msg = "The write enabler was recorded by nodeid '%s'." % \
(idlib.b2a(self._write_enabler_nodeid),)
raise BadWriterEnablerError(msg)
# check testv
test_results_v = []
test_failed = False
for (offset, length, operator, specimen) in testv:
data = self.read_share_data(offset, length)
test_results_v.append(data)
if not self.compare(data, operator, specimen):
test_failed = False
if test_failed:
return (False, test_results_v)
# now apply the write vector
for (offset, data) in datav:
self.write_share_data(offset, data)
if new_length is not None:
self.change_container_size(new_length)
self._data_length = new_length
f = open(self.home, "rb+")
f.seek(self.DATA_LENGTH_OFFSET)
f.write(struct.pack(">Q", self._data_length))
f.close()
return (True, test_results_v)
def compare(self, a, op, b):
assert op in ("nop", "lt", "le", "eq", "ne", "ge", "gt")
if op == "nop":
return True
if op == "lt":
return a <= b
if op == "le":
return a < b
if op == "eq":
return a == b
if op == "ne":
return a != b
if op == "ge":
return a >= b
if op == "gt":
return a > b
# never reached
def create_mutable_sharefile(filename, my_nodeid, write_enabler):
ms = MutableShareFile(filename)
ms.create(my_nodeid, write_enabler)
del ms
return MutableShareFile(filename)
class StorageServer(service.MultiService, Referenceable):
implements(RIStorageServer)
name = 'storageserver'
@ -396,6 +741,9 @@ class StorageServer(service.MultiService, Referenceable):
return iter([])
# the code before here runs on the storage server, not the client
# the code beyond here runs on the client, not the storage server
"""
Share data is written into a single file. At the start of the file, there is
a series of four-byte big-endian offset values, which indicate where each

View File

@ -438,3 +438,66 @@ class Server(unittest.TestCase):
leases = list(ss.get_leases("si3"))
self.failUnlessEqual(len(leases), 2)
class MutableServer(unittest.TestCase):
def setUp(self):
self.sparent = service.MultiService()
self._secret = itertools.count()
def tearDown(self):
return self.sparent.stopService()
def workdir(self, name):
basedir = os.path.join("storage", "MutableServer", name)
return basedir
def create(self, name, sizelimit=None):
workdir = self.workdir(name)
ss = StorageServer(workdir, sizelimit)
ss.setServiceParent(self.sparent)
return ss
def test_create(self):
ss = self.create("test_create")
def write_enabler(self, we_tag):
return hashutil.tagged_hash("we_blah", we_tag)
def allocate(self, ss, storage_index, we_tag, sharenums, size):
write_enabler = self.write_enabler(we_tag)
renew_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
cancel_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
return ss.remote_allocate_mutable_slot(storage_index,
write_enabler,
renew_secret, cancel_secret,
sharenums, size)
def test_allocate(self):
ss = self.create("test_allocate")
shares = self.allocate(ss, "si1", "we1", set([0,1,2]), 100)
self.failUnlessEqual(len(shares), 3)
self.failUnlessEqual(set(shares.keys()), set([0,1,2]))
shares2 = ss.get_mutable_slot("si1")
self.failUnlessEqual(len(shares2), 3)
self.failUnlessEqual(set(shares2.keys()), set([0,1,2]))
# the actual RIMutableSlot objects are required to be singtons (one
# per SI+shnum), so each get_mutable_slot() call should return the
# same RemoteReferences
self.failUnlessEqual(set(shares.values()), set(shares2.values()))
s0 = shares[0]
self.failUnlessEqual(s0.remote_read(0, 10), "")
self.failUnlessEqual(s0.remote_read(100, 10), "")
# try writing to one
data = "".join([ ("%d" % i) * 10 for i in range(10) ])
answer = s0.remote_testv_and_writev(self.write_enabler("we1"),
[],
[(0, data),],
new_length=None)
self.failUnlessEqual(answer, [])
self.failUnlessEqual(s0.remote_read(0, 20), "00000000001111111111")
self.failUnlessEqual(s0.remote_read(95, 10), "99999")
self.failUnlessEqual(s0.remote_get_length(), 100)