mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-03-21 19:25:16 +00:00
storage: replace sqlite with in-share lease records
This commit is contained in:
parent
c7288286ae
commit
0fe1205789
@ -1,3 +1,2 @@
|
||||
|
||||
include allmydata/web/*.xhtml allmydata/web/*.html allmydata/web/*.css
|
||||
include allmydata/*.sql
|
||||
|
2
README
2
README
@ -109,8 +109,6 @@ gcc make python-dev python-twisted python-nevow python-pyopenssl".
|
||||
libraries with the cygwin package management tool, then get the pyOpenSSL
|
||||
source code, cd into it, and run "python ./setup.py install".
|
||||
|
||||
+ pysqlite3 (database library)
|
||||
|
||||
+ the pywin32 package: only required on Windows
|
||||
|
||||
http://sourceforge.net/projects/pywin32/
|
||||
|
3
setup.py
3
setup.py
@ -90,8 +90,7 @@ setup(name='allmydata-tahoe',
|
||||
],
|
||||
package_dir={ "allmydata": "src/allmydata",},
|
||||
scripts = ["bin/allmydata-tahoe"],
|
||||
package_data={ 'allmydata': ['web/*.xhtml', 'web/*.html', 'web/*.css',
|
||||
'owner.sql'] },
|
||||
package_data={ 'allmydata': ['web/*.xhtml', 'web/*.html', 'web/*.css'] },
|
||||
classifiers=trove_classifiers,
|
||||
test_suite="allmydata.test",
|
||||
ext_modules=[
|
||||
|
@ -3,7 +3,6 @@ import os, re, weakref, stat, struct, time
|
||||
from foolscap import Referenceable
|
||||
from twisted.application import service
|
||||
from twisted.internet import defer
|
||||
from twisted.python import util
|
||||
|
||||
from zope.interface import implements
|
||||
from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
|
||||
@ -11,60 +10,180 @@ from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
|
||||
from allmydata.util import fileutil, idlib, mathutil
|
||||
from allmydata.util.assertutil import precondition
|
||||
|
||||
try:
|
||||
# python2.5 ships with sqlite builtin
|
||||
import sqlite3.dbapi2
|
||||
sqlite = sqlite3.dbapi2
|
||||
except ImportError:
|
||||
# for python2.4, it's installed under a different name
|
||||
import pysqlite2.dbapi2
|
||||
sqlite = pysqlite2.dbapi2
|
||||
|
||||
# store/
|
||||
# store/owners.db
|
||||
# store/shares/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/shares/$STORAGEINDEX/$SHARENUM on success
|
||||
# store/shares/$STORAGEINDEX
|
||||
# store/shares/$STORAGEINDEX/$SHARENUM
|
||||
# store/shares/$STORAGEINDEX/$SHARENUM/blocksize
|
||||
# store/shares/$STORAGEINDEX/$SHARENUM/data
|
||||
# store/shares/$STORAGEINDEX/$SHARENUM/blockhashes
|
||||
# store/shares/$STORAGEINDEX/$SHARENUM/sharehashtree
|
||||
# storage/
|
||||
# storage/shares/incoming
|
||||
# incoming/ holds temp dirs named $STORAGEINDEX/$SHARENUM which will be
|
||||
# moved to storage/shares/$STORAGEINDEX/$SHARENUM upon success
|
||||
# storage/shares/$STORAGEINDEX
|
||||
# storage/shares/$STORAGEINDEX/$SHARENUM
|
||||
|
||||
# $SHARENUM matches this regex:
|
||||
NUM_RE=re.compile("[0-9]*")
|
||||
|
||||
# each share file (in storage/shares/$SI/$SHNUM) contains lease information
|
||||
# and share data. The share data is accessed by RIBucketWriter.write and
|
||||
# RIBucketReader.read . The lease information is not accessible through these
|
||||
# interfaces.
|
||||
|
||||
# The share file has the following layout:
|
||||
# 0x00: share file version number, four bytes, current version is 1
|
||||
# 0x04: share data length, four bytes big-endian = A
|
||||
# 0x08: number of leases, four bytes big-endian
|
||||
# 0x0c: beginning of share data (described below, at WriteBucketProxy)
|
||||
# A+0x0c = B: first lease. Lease format is:
|
||||
# B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner
|
||||
# B+0x04: renew secret, 32 bytes (SHA256)
|
||||
# B+0x24: cancel secret, 32 bytes (SHA256)
|
||||
# B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch
|
||||
# B+0x48: next lease, or end of record
|
||||
|
||||
class ShareFile:
|
||||
LEASE_SIZE = struct.calcsize(">L32s32sL")
|
||||
|
||||
def __init__(self, filename):
|
||||
self.home = filename
|
||||
f = open(self.home, 'rb')
|
||||
(version, size, num_leases) = struct.unpack(">LLL", f.read(0xc))
|
||||
assert version == 1
|
||||
self._size = size
|
||||
self._num_leases = num_leases
|
||||
self._data_offset = 0xc
|
||||
self._lease_offset = 0xc + self._size
|
||||
|
||||
def read_share_data(self, offset, length):
|
||||
precondition(offset >= 0)
|
||||
precondition(offset+length <= self._size)
|
||||
f = open(self.home, 'rb')
|
||||
f.seek(self._data_offset+offset)
|
||||
return f.read(length)
|
||||
|
||||
def write_share_data(self, offset, data):
|
||||
length = len(data)
|
||||
precondition(offset >= 0)
|
||||
precondition(offset+length <= self._size)
|
||||
f = open(self.home, 'rb+')
|
||||
real_offset = self._data_offset+offset
|
||||
f.seek(real_offset)
|
||||
assert f.tell() == real_offset
|
||||
f.write(data)
|
||||
f.close()
|
||||
|
||||
def _write_lease_record(self, f, lease_number, lease_info):
|
||||
(owner_num, renew_secret, cancel_secret, expiration_time) = lease_info
|
||||
offset = self._lease_offset + lease_number * self.LEASE_SIZE
|
||||
f.seek(offset)
|
||||
assert f.tell() == offset
|
||||
f.write(struct.pack(">L32s32sL",
|
||||
owner_num, renew_secret, cancel_secret,
|
||||
expiration_time))
|
||||
|
||||
def _read_num_leases(self, f):
|
||||
f.seek(0x08)
|
||||
(num_leases,) = struct.unpack(">L", f.read(4))
|
||||
return num_leases
|
||||
|
||||
def _write_num_leases(self, f, num_leases):
|
||||
f.seek(0x08)
|
||||
f.write(struct.pack(">L", num_leases))
|
||||
|
||||
def _truncate_leases(self, f, num_leases):
|
||||
f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE)
|
||||
|
||||
def iter_leases(self):
|
||||
"""Yields (ownernum, renew_secret, cancel_secret, expiration_time)
|
||||
for all leases."""
|
||||
f = open(self.home, 'rb')
|
||||
(version, size, num_leases) = struct.unpack(">LLL", f.read(0xc))
|
||||
f.seek(self._lease_offset)
|
||||
for i in range(num_leases):
|
||||
data = f.read(self.LEASE_SIZE)
|
||||
if data:
|
||||
yield struct.unpack(">L32s32sL", data)
|
||||
|
||||
def add_lease(self, lease_info):
|
||||
f = open(self.home, 'rb+')
|
||||
num_leases = self._read_num_leases(f)
|
||||
self._write_lease_record(f, num_leases, lease_info)
|
||||
self._write_num_leases(f, num_leases+1)
|
||||
f.close()
|
||||
|
||||
def renew_lease(self, renew_secret, new_expire_time):
|
||||
for i,(on,rs,cs,et) in enumerate(self.iter_leases()):
|
||||
if rs == renew_secret:
|
||||
# yup. See if we need to update the owner time.
|
||||
if new_expire_time > et:
|
||||
# yes
|
||||
new_lease = (on,rs,cs,new_expire_time)
|
||||
f = open(self.home, 'rb+')
|
||||
self._write_lease_record(f, i, new_lease)
|
||||
f.close()
|
||||
return
|
||||
raise IndexError("unable to renew non-existent lease")
|
||||
|
||||
def cancel_lease(self, cancel_secret):
|
||||
"""Remove a lease with the given cancel_secret. Return
|
||||
(num_remaining_leases, space_freed). Raise IndexError if there was no
|
||||
lease with the given cancel_secret."""
|
||||
|
||||
leases = list(self.iter_leases())
|
||||
num_leases = len(leases)
|
||||
num_leases_removed = 0
|
||||
for i,lease_info in enumerate(leases[:]):
|
||||
(on,rs,cs,et) = lease_info
|
||||
if cs == cancel_secret:
|
||||
leases[i] = None
|
||||
num_leases_removed += 1
|
||||
if not num_leases_removed:
|
||||
raise IndexError("unable to find matching lease to cancel")
|
||||
if num_leases_removed:
|
||||
# pack and write out the remaining leases. We write these out in
|
||||
# the same order as they were added, so that if we crash while
|
||||
# doing this, we won't lose any non-cancelled leases.
|
||||
leases = [l for l in leases if l] # remove the cancelled leases
|
||||
f = open(self.home, 'rb+')
|
||||
for i,lease in enumerate(leases):
|
||||
self._write_lease_record(f, i, lease)
|
||||
self._write_num_leases(f, len(leases))
|
||||
self._truncate_leases(f, len(leases))
|
||||
f.close()
|
||||
return len(leases), self.LEASE_SIZE * num_leases_removed
|
||||
|
||||
|
||||
class BucketWriter(Referenceable):
|
||||
implements(RIBucketWriter)
|
||||
|
||||
def __init__(self, ss, incominghome, finalhome, size):
|
||||
def __init__(self, ss, incominghome, finalhome, size, lease_info):
|
||||
self.ss = ss
|
||||
self.incominghome = incominghome
|
||||
self.finalhome = finalhome
|
||||
self._size = size
|
||||
self._lease_info = lease_info
|
||||
self.closed = False
|
||||
self.throw_out_all_data = False
|
||||
# touch the file, so later callers will see that we're working on it
|
||||
f = open(self.incominghome, 'ab')
|
||||
# touch the file, so later callers will see that we're working on it.
|
||||
# Also construct the metadata.
|
||||
assert not os.path.exists(self.incominghome)
|
||||
f = open(self.incominghome, 'wb')
|
||||
f.write(struct.pack(">LLL", 1, size, 0))
|
||||
f.close()
|
||||
self._sharefile = ShareFile(self.incominghome)
|
||||
|
||||
def allocated_size(self):
|
||||
return self._size
|
||||
|
||||
def remote_write(self, offset, data):
|
||||
precondition(not self.closed)
|
||||
precondition(offset >= 0)
|
||||
precondition(offset+len(data) <= self._size)
|
||||
if self.throw_out_all_data:
|
||||
return
|
||||
f = open(self.incominghome, 'ab')
|
||||
f.seek(offset)
|
||||
f.write(data)
|
||||
f.close()
|
||||
self._sharefile.write_share_data(offset, data)
|
||||
|
||||
def remote_close(self):
|
||||
precondition(not self.closed)
|
||||
self._sharefile.add_lease(self._lease_info)
|
||||
fileutil.rename(self.incominghome, self.finalhome)
|
||||
self._sharefile = None
|
||||
self.closed = True
|
||||
|
||||
filelen = os.stat(self.finalhome)[stat.ST_SIZE]
|
||||
self.ss.bucket_writer_closed(self, filelen)
|
||||
|
||||
@ -73,12 +192,10 @@ class BucketReader(Referenceable):
|
||||
implements(RIBucketReader)
|
||||
|
||||
def __init__(self, home):
|
||||
self.home = home
|
||||
self._share_file = ShareFile(home)
|
||||
|
||||
def remote_read(self, offset, length):
|
||||
f = open(self.home, 'rb')
|
||||
f.seek(offset)
|
||||
return f.read(length)
|
||||
return self._share_file.read_share_data(offset, length)
|
||||
|
||||
class StorageServer(service.MultiService, Referenceable):
|
||||
implements(RIStorageServer)
|
||||
@ -97,27 +214,11 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
fileutil.make_dirs(self.incomingdir)
|
||||
self._active_writers = weakref.WeakKeyDictionary()
|
||||
|
||||
self.init_db()
|
||||
|
||||
self.measure_size()
|
||||
|
||||
def _clean_incomplete(self):
|
||||
fileutil.rm_dir(self.incomingdir)
|
||||
|
||||
def init_db(self):
|
||||
# files in storedir with non-zbase32 characters in it (like ".") are
|
||||
# safe, in that they cannot be accessed or overwritten by clients
|
||||
# (whose binary storage_index values are always converted into a
|
||||
# filename with idlib.b2a)
|
||||
db_file = os.path.join(self.storedir, "owners.db")
|
||||
need_to_init_db = not os.path.exists(db_file)
|
||||
self._owner_db_con = sqlite.connect(db_file)
|
||||
self._owner_db_cur = self._owner_db_con.cursor()
|
||||
if need_to_init_db:
|
||||
setup_file = util.sibpath(__file__, "owner.sql")
|
||||
setup = open(setup_file, "r").read()
|
||||
self._owner_db_cur.executescript(setup)
|
||||
|
||||
def measure_size(self):
|
||||
self.consumed = fileutil.du(self.sharedir)
|
||||
|
||||
@ -130,10 +231,21 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
def remote_allocate_buckets(self, storage_index,
|
||||
renew_secret, cancel_secret,
|
||||
sharenums, allocated_size,
|
||||
canary):
|
||||
canary, owner_num=0):
|
||||
# owner_num is not for clients to set, but rather it should be
|
||||
# curried into the PersonalStorageServer instance that is dedicated
|
||||
# to a particular owner.
|
||||
alreadygot = set()
|
||||
bucketwriters = {} # k: shnum, v: BucketWriter
|
||||
si_s = idlib.b2a(storage_index)
|
||||
|
||||
# in this implementation, the lease information (including secrets)
|
||||
# goes into the share files themselves. It could also be put into a
|
||||
# separate database. Note that the lease should not be added until
|
||||
# the BucketWrite has been closed.
|
||||
expire_time = time.time() + 31*24*60*60
|
||||
lease_info = (owner_num, renew_secret, cancel_secret, expire_time)
|
||||
|
||||
space_per_bucket = allocated_size
|
||||
no_limits = self.sizelimit is None
|
||||
yes_limits = not no_limits
|
||||
@ -144,10 +256,17 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
finalhome = os.path.join(self.sharedir, si_s, "%d" % shnum)
|
||||
if os.path.exists(incominghome) or os.path.exists(finalhome):
|
||||
alreadygot.add(shnum)
|
||||
# add a lease
|
||||
if os.path.exists(incominghome):
|
||||
# TODO: add a lease to the still-in-construction share
|
||||
pass
|
||||
else:
|
||||
sf = ShareFile(finalhome)
|
||||
sf.add_lease(lease_info)
|
||||
elif no_limits or remaining_space >= space_per_bucket:
|
||||
fileutil.make_dirs(os.path.join(self.incomingdir, si_s))
|
||||
bw = BucketWriter(self, incominghome, finalhome,
|
||||
space_per_bucket)
|
||||
space_per_bucket, lease_info)
|
||||
if self.no_storage:
|
||||
bw.throw_out_all_data = True
|
||||
bucketwriters[shnum] = bw
|
||||
@ -161,109 +280,56 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
if bucketwriters:
|
||||
fileutil.make_dirs(os.path.join(self.sharedir, si_s))
|
||||
|
||||
# now store the secrets somewhere. This requires a
|
||||
# variable-length-list of (renew,cancel) secret tuples per bucket.
|
||||
# Note that this does not need to be kept inside the share itself, if
|
||||
# packing efficiency is a concern. For this implementation, we use a
|
||||
# sqlite database, which puts everything in a single file.
|
||||
self.add_lease(storage_index, renew_secret, cancel_secret)
|
||||
|
||||
return alreadygot, bucketwriters
|
||||
|
||||
def add_lease(self, storage_index, renew_secret, cancel_secret):
|
||||
# is the bucket already in our database?
|
||||
cur = self._owner_db_cur
|
||||
cur.execute("SELECT bucket_id FROM buckets"
|
||||
" WHERE storage_index = ?",
|
||||
(storage_index,))
|
||||
res = cur.fetchone()
|
||||
if res:
|
||||
bucket_id = res[0]
|
||||
else:
|
||||
cur.execute("INSERT INTO buckets (storage_index)"
|
||||
" values(?)", (storage_index,))
|
||||
cur.execute("SELECT bucket_id FROM buckets"
|
||||
" WHERE storage_index = ?",
|
||||
(storage_index,))
|
||||
res = cur.fetchone()
|
||||
bucket_id = res[0]
|
||||
|
||||
# what time will this lease expire? One month from now.
|
||||
expire_time = time.time() + 31*24*60*60
|
||||
|
||||
# now, is this lease already in our database? Since we don't have
|
||||
# owners yet, look for a match by renew_secret/cancel_secret
|
||||
cur.execute("SELECT lease_id FROM leases"
|
||||
" WHERE renew_secret = ? AND cancel_secret = ?",
|
||||
(renew_secret, cancel_secret))
|
||||
res = cur.fetchone()
|
||||
if res:
|
||||
# yes, so just update the timestamp
|
||||
lease_id = res[0]
|
||||
cur.execute("UPDATE leases"
|
||||
" SET expire_time = ?"
|
||||
" WHERE lease_id = ?",
|
||||
(expire_time, lease_id))
|
||||
else:
|
||||
# no, we need to add the lease
|
||||
cur.execute("INSERT INTO leases "
|
||||
"(bucket_id, renew_secret, cancel_secret, expire_time)"
|
||||
" values(?,?,?,?)",
|
||||
(bucket_id, renew_secret, cancel_secret, expire_time))
|
||||
self._owner_db_con.commit()
|
||||
def get_or_add_owner(self, owner):
|
||||
# this will be more fully implemented when we get the Introduction
|
||||
# protocol built. At that point, it should take the 'owner' argument
|
||||
# (either a FURL or a Sealer/public-key) and look it up in a
|
||||
# persistent table, returning a short integer. If the owner doesn't
|
||||
# yet exist in the table, create a new entry for it and return the
|
||||
# new index.
|
||||
return 0
|
||||
|
||||
def remote_renew_lease(self, storage_index, renew_secret):
|
||||
# find the lease
|
||||
cur = self._owner_db_cur
|
||||
cur.execute("SELECT leases.lease_id FROM buckets, leases"
|
||||
" WHERE buckets.storage_index = ?"
|
||||
" AND buckets.bucket_id = leases.bucket_id"
|
||||
" AND leases.renew_secret = ?",
|
||||
(storage_index, renew_secret))
|
||||
res = cur.fetchone()
|
||||
if res:
|
||||
# found it, now update it. The new leases will expire one month
|
||||
# from now.
|
||||
expire_time = time.time() + 31*24*60*60
|
||||
lease_id = res[0]
|
||||
cur.execute("UPDATE leases"
|
||||
" SET expire_time = ?"
|
||||
" WHERE lease_id = ?",
|
||||
(expire_time, lease_id))
|
||||
else:
|
||||
# no such lease
|
||||
raise IndexError("No such lease")
|
||||
self._owner_db_con.commit()
|
||||
new_expire_time = time.time() + 31*24*60*60
|
||||
found_buckets = False
|
||||
for shnum, filename in self._get_bucket_shares(storage_index):
|
||||
found_buckets = True
|
||||
sf = ShareFile(filename)
|
||||
sf.renew_lease(renew_secret, new_expire_time)
|
||||
if not found_buckets:
|
||||
raise IndexError("no such lease to renew")
|
||||
|
||||
def remote_cancel_lease(self, storage_index, cancel_secret):
|
||||
# find the lease
|
||||
cur = self._owner_db_cur
|
||||
cur.execute("SELECT l.lease_id, b.storage_index, b.bucket_id"
|
||||
" FROM buckets b, leases l"
|
||||
" WHERE b.storage_index = ?"
|
||||
" AND b.bucket_id = l.bucket_id"
|
||||
" AND l.cancel_secret = ?",
|
||||
(storage_index, cancel_secret))
|
||||
res = cur.fetchone()
|
||||
if res:
|
||||
# found it
|
||||
lease_id, storage_index, bucket_id = res
|
||||
cur.execute("DELETE FROM leases WHERE lease_id = ?",
|
||||
(lease_id,))
|
||||
# was that the last one?
|
||||
cur.execute("SELECT COUNT(*) FROM leases WHERE bucket_id = ?",
|
||||
(bucket_id,))
|
||||
res = cur.fetchone()
|
||||
remaining_leases = res[0]
|
||||
if not remaining_leases:
|
||||
# delete the share
|
||||
cur.execute("DELETE FROM buckets WHERE bucket_id = ?",
|
||||
(bucket_id,))
|
||||
self.delete_bucket(storage_index)
|
||||
else:
|
||||
# no such lease
|
||||
raise IndexError("No such lease")
|
||||
self._owner_db_con.commit()
|
||||
storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
|
||||
|
||||
remaining_files = 0
|
||||
total_space_freed = 0
|
||||
found_buckets = False
|
||||
for shnum, filename in self._get_bucket_shares(storage_index):
|
||||
# note: if we can't find a lease on one share, we won't bother
|
||||
# looking in the others. Unless something broke internally
|
||||
# (perhaps we ran out of disk space while adding a lease), the
|
||||
# leases on all shares will be identical.
|
||||
found_buckets = True
|
||||
sf = ShareFile(filename)
|
||||
# this raises IndexError if the lease wasn't present
|
||||
remaining_leases, space_freed = sf.cancel_lease(cancel_secret)
|
||||
total_space_freed += space_freed
|
||||
if remaining_leases:
|
||||
remaining_files += 1
|
||||
else:
|
||||
# now remove the sharefile. We'll almost certainly be
|
||||
# removing the entire directory soon.
|
||||
filelen = os.stat(filename)[stat.ST_SIZE]
|
||||
os.unlink(filename)
|
||||
total_space_freed += filelen
|
||||
if not remaining_files:
|
||||
os.rmdir(storagedir)
|
||||
self.consumed -= total_space_freed
|
||||
if not found_buckets:
|
||||
raise IndexError("no such lease to cancel")
|
||||
|
||||
def delete_bucket(self, storage_index):
|
||||
storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
|
||||
@ -277,20 +343,44 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
self.consumed += consumed_size
|
||||
del self._active_writers[bw]
|
||||
|
||||
def remote_get_buckets(self, storage_index):
|
||||
bucketreaders = {} # k: sharenum, v: BucketReader
|
||||
def _get_bucket_shares(self, storage_index):
|
||||
"""Return a list of (shnum, pathname) tuples for files that hold
|
||||
shares for this storage_index. In each tuple, 'shnum' will always be
|
||||
the integer form of the last component of 'pathname'."""
|
||||
storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
|
||||
try:
|
||||
for f in os.listdir(storagedir):
|
||||
if NUM_RE.match(f):
|
||||
br = BucketReader(os.path.join(storagedir, f))
|
||||
bucketreaders[int(f)] = br
|
||||
filename = os.path.join(storagedir, f)
|
||||
yield (int(f), filename)
|
||||
except OSError:
|
||||
# Commonly caused by there being no buckets at all.
|
||||
pass
|
||||
|
||||
def remote_get_buckets(self, storage_index):
|
||||
bucketreaders = {} # k: sharenum, v: BucketReader
|
||||
for shnum, filename in self._get_bucket_shares(storage_index):
|
||||
bucketreaders[shnum] = BucketReader(filename)
|
||||
return bucketreaders
|
||||
|
||||
def get_leases(self, storage_index):
|
||||
"""Provide an iterator that yields all of the leases attached to this
|
||||
bucket. Each lease is returned as a tuple of (owner_num,
|
||||
renew_secret, cancel_secret, expiration_time).
|
||||
|
||||
This method is not for client use.
|
||||
"""
|
||||
|
||||
# since all shares get the same lease data, we just grab the leases
|
||||
# from the first share
|
||||
try:
|
||||
shnum, filename = self._get_bucket_shares(storage_index).next()
|
||||
sf = ShareFile(filename)
|
||||
return sf.iter_leases()
|
||||
except StopIteration:
|
||||
return iter([])
|
||||
|
||||
|
||||
"""
|
||||
Share data is written into a single file. At the start of the file, there is
|
||||
a series of four-byte big-endian offset values, which indicate where each
|
||||
|
@ -4,7 +4,7 @@ from twisted.trial import unittest
|
||||
from twisted.application import service
|
||||
from twisted.internet import defer
|
||||
from foolscap import Referenceable
|
||||
import os.path
|
||||
import time, os.path, stat
|
||||
import itertools
|
||||
from allmydata import interfaces
|
||||
from allmydata.util import fileutil, hashutil
|
||||
@ -23,9 +23,16 @@ class Bucket(unittest.TestCase):
|
||||
def bucket_writer_closed(self, bw, consumed):
|
||||
pass
|
||||
|
||||
def make_lease(self):
|
||||
owner_num = 0
|
||||
renew_secret = os.urandom(32)
|
||||
cancel_secret = os.urandom(32)
|
||||
expiration_time = time.time() + 5000
|
||||
return (owner_num, renew_secret, cancel_secret, expiration_time)
|
||||
|
||||
def test_create(self):
|
||||
incoming, final = self.make_workdir("test_create")
|
||||
bw = BucketWriter(self, incoming, final, 200)
|
||||
bw = BucketWriter(self, incoming, final, 200, self.make_lease())
|
||||
bw.remote_write(0, "a"*25)
|
||||
bw.remote_write(25, "b"*25)
|
||||
bw.remote_write(50, "c"*25)
|
||||
@ -34,7 +41,7 @@ class Bucket(unittest.TestCase):
|
||||
|
||||
def test_readwrite(self):
|
||||
incoming, final = self.make_workdir("test_readwrite")
|
||||
bw = BucketWriter(self, incoming, final, 200)
|
||||
bw = BucketWriter(self, incoming, final, 200, self.make_lease())
|
||||
bw.remote_write(0, "a"*25)
|
||||
bw.remote_write(25, "b"*25)
|
||||
bw.remote_write(50, "c"*7) # last block may be short
|
||||
@ -61,11 +68,18 @@ class BucketProxy(unittest.TestCase):
|
||||
final = os.path.join(basedir, "bucket")
|
||||
fileutil.make_dirs(basedir)
|
||||
fileutil.make_dirs(os.path.join(basedir, "tmp"))
|
||||
bw = BucketWriter(self, incoming, final, size)
|
||||
bw = BucketWriter(self, incoming, final, size, self.make_lease())
|
||||
rb = RemoteBucket()
|
||||
rb.target = bw
|
||||
return bw, rb, final
|
||||
|
||||
def make_lease(self):
|
||||
owner_num = 0
|
||||
renew_secret = os.urandom(32)
|
||||
cancel_secret = os.urandom(32)
|
||||
expiration_time = time.time() + 5000
|
||||
return (owner_num, renew_secret, cancel_secret, expiration_time)
|
||||
|
||||
def bucket_writer_closed(self, bw, consumed):
|
||||
pass
|
||||
|
||||
@ -225,16 +239,21 @@ class Server(unittest.TestCase):
|
||||
self.failUnlessEqual(set(writers.keys()), set([5]))
|
||||
|
||||
def test_sizelimits(self):
|
||||
ss = self.create("test_sizelimits", 100)
|
||||
ss = self.create("test_sizelimits", 5000)
|
||||
canary = Referenceable()
|
||||
|
||||
already,writers = self.allocate(ss, "vid1", [0,1,2], 25)
|
||||
# a newly created and filled share incurs this much overhead, beyond
|
||||
# the size we request.
|
||||
OVERHEAD = 3*4
|
||||
LEASE_SIZE = 4+32+32+4
|
||||
|
||||
already,writers = self.allocate(ss, "vid1", [0,1,2], 1000)
|
||||
self.failUnlessEqual(len(writers), 3)
|
||||
# now the StorageServer should have 75 bytes provisionally allocated,
|
||||
# allowing only 25 more to be claimed
|
||||
# now the StorageServer should have 3000 bytes provisionally
|
||||
# allocated, allowing only 2000 more to be claimed
|
||||
self.failUnlessEqual(len(ss._active_writers), 3)
|
||||
|
||||
already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 25)
|
||||
# allocating 1001-byte shares only leaves room for one
|
||||
already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001)
|
||||
self.failUnlessEqual(len(writers2), 1)
|
||||
self.failUnlessEqual(len(ss._active_writers), 4)
|
||||
|
||||
@ -243,9 +262,11 @@ class Server(unittest.TestCase):
|
||||
del already
|
||||
del writers
|
||||
self.failUnlessEqual(len(ss._active_writers), 1)
|
||||
# now we have a provisional allocation of 1001 bytes
|
||||
|
||||
# and we close the second set, so their provisional allocation should
|
||||
# become real, long-term allocation
|
||||
# become real, long-term allocation, and grows to include the
|
||||
# overhead.
|
||||
for bw in writers2.values():
|
||||
bw.remote_write(0, "a"*25)
|
||||
bw.remote_close()
|
||||
@ -254,10 +275,12 @@ class Server(unittest.TestCase):
|
||||
del bw
|
||||
self.failUnlessEqual(len(ss._active_writers), 0)
|
||||
|
||||
# now there should be 25 bytes allocated, and 75 free
|
||||
already3,writers3 = self.allocate(ss,"vid3", [0,1,2,3], 25)
|
||||
self.failUnlessEqual(len(writers3), 3)
|
||||
self.failUnlessEqual(len(ss._active_writers), 3)
|
||||
allocated = 1001 + OVERHEAD + LEASE_SIZE
|
||||
# now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
|
||||
# 5000-1085=3915 free, therefore we can fit 39 100byte shares
|
||||
already3,writers3 = self.allocate(ss,"vid3", range(100), 100)
|
||||
self.failUnlessEqual(len(writers3), 39)
|
||||
self.failUnlessEqual(len(ss._active_writers), 39)
|
||||
|
||||
del already3
|
||||
del writers3
|
||||
@ -266,13 +289,37 @@ class Server(unittest.TestCase):
|
||||
del ss
|
||||
|
||||
# creating a new StorageServer in the same directory should see the
|
||||
# same usage. note that metadata will be counted at startup but not
|
||||
# during runtime, so if we were creating any metadata, the allocation
|
||||
# would be more than 25 bytes and this test would need to be changed.
|
||||
ss = self.create("test_sizelimits", 100)
|
||||
already4,writers4 = self.allocate(ss, "vid4", [0,1,2,3], 25)
|
||||
self.failUnlessEqual(len(writers4), 3)
|
||||
self.failUnlessEqual(len(ss._active_writers), 3)
|
||||
# same usage.
|
||||
|
||||
# metadata that goes into the share file is counted upon share close,
|
||||
# as well as at startup. metadata that goes into other files will not
|
||||
# be counted until the next startup, so if we were creating any
|
||||
# extra-file metadata, the allocation would be more than 'allocated'
|
||||
# and this test would need to be changed.
|
||||
ss = self.create("test_sizelimits", 5000)
|
||||
already4,writers4 = self.allocate(ss, "vid4", range(100), 100)
|
||||
self.failUnlessEqual(len(writers4), 39)
|
||||
self.failUnlessEqual(len(ss._active_writers), 39)
|
||||
|
||||
def test_seek(self):
|
||||
basedir = self.workdir("test_seek_behavior")
|
||||
fileutil.make_dirs(basedir)
|
||||
filename = os.path.join(basedir, "testfile")
|
||||
f = open(filename, "wb")
|
||||
f.write("start")
|
||||
f.close()
|
||||
# mode="w" allows seeking-to-create-holes, but truncates pre-existing
|
||||
# files. mode="a" preserves previous contents but does not allow
|
||||
# seeking-to-create-holes. mode="r+" allows both.
|
||||
f = open(filename, "rb+")
|
||||
f.seek(100)
|
||||
f.write("100")
|
||||
f.close()
|
||||
filelen = os.stat(filename)[stat.ST_SIZE]
|
||||
self.failUnlessEqual(filelen, 100+3)
|
||||
f2 = open(filename, "rb")
|
||||
self.failUnlessEqual(f2.read(5), "start")
|
||||
|
||||
|
||||
def test_leases(self):
|
||||
ss = self.create("test_leases")
|
||||
@ -289,6 +336,10 @@ class Server(unittest.TestCase):
|
||||
for wb in writers.values():
|
||||
wb.remote_close()
|
||||
|
||||
leases = list(ss.get_leases("si0"))
|
||||
self.failUnlessEqual(len(leases), 1)
|
||||
self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
|
||||
|
||||
rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
|
||||
hashutil.tagged_hash("blah", "%d" % self._secret.next()))
|
||||
already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
|
||||
@ -304,6 +355,10 @@ class Server(unittest.TestCase):
|
||||
self.failUnlessEqual(len(already), 5)
|
||||
self.failUnlessEqual(len(writers), 0)
|
||||
|
||||
leases = list(ss.get_leases("si1"))
|
||||
self.failUnlessEqual(len(leases), 2)
|
||||
self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
|
||||
|
||||
# check that si0 is readable
|
||||
readers = ss.remote_get_buckets("si0")
|
||||
self.failUnlessEqual(len(readers), 5)
|
||||
@ -336,6 +391,10 @@ class Server(unittest.TestCase):
|
||||
# the corresponding renew should no longer work
|
||||
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
|
||||
|
||||
leases = list(ss.get_leases("si1"))
|
||||
self.failUnlessEqual(len(leases), 1)
|
||||
self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
|
||||
|
||||
ss.remote_renew_lease("si1", rs2)
|
||||
# cancelling the second should make it go away
|
||||
ss.remote_cancel_lease("si1", cs2)
|
||||
@ -344,3 +403,6 @@ class Server(unittest.TestCase):
|
||||
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
|
||||
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
|
||||
|
||||
leases = list(ss.get_leases("si1"))
|
||||
self.failUnlessEqual(len(leases), 0)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user