deletion phase3: add a sqlite database to track renew/cancel-lease secrets, implement renew/cancel_lease (but nobody calls them yet). Also, move the shares from BASEDIR/storage/* down to BASEDIR/storage/shares/*

This commit is contained in:
Brian Warner 2007-08-27 23:41:40 -07:00
parent 850bc9da02
commit 2a63fe8b01
8 changed files with 274 additions and 37 deletions

View File

@ -1,2 +1,3 @@
include allmydata/web/*.xhtml allmydata/web/*.html allmydata/web/*.css
include allmydata/*.sql

2
README
View File

@ -109,6 +109,8 @@ gcc make python-dev python-twisted python-nevow python-pyopenssl".
libraries with the cygwin package management tool, then get the pyOpenSSL
source code, cd into it, and run "python ./setup.py install".
+ pysqlite3 (database library)
+ the pywin32 package: only required on Windows
http://sourceforge.net/projects/pywin32/

View File

@ -90,7 +90,8 @@ setup(name='allmydata-tahoe',
],
package_dir={ "allmydata": "src/allmydata",},
scripts = ["bin/allmydata-tahoe"],
package_data={ 'allmydata': ['web/*.xhtml', 'web/*.html', 'web/*.css'] },
package_data={ 'allmydata': ['web/*.xhtml', 'web/*.html', 'web/*.css',
'owner.sql'] },
classifiers=trove_classifiers,
test_suite="allmydata.test",
ext_modules=[

View File

@ -105,6 +105,20 @@ class RIStorageServer(RemoteInterface):
"""
return TupleOf(SetOf(int, maxLength=MAX_BUCKETS),
DictOf(int, RIBucketWriter, maxKeys=MAX_BUCKETS))
def renew_lease(storage_index=StorageIndex, renew_secret=LeaseRenewSecret):
"""
Renew the lease on a given bucket. Some networks will use this, some
will not.
"""
def cancel_lease(storage_index=StorageIndex,
cancel_secret=LeaseCancelSecret):
"""
Cancel the lease on a given bucket. If this was the last lease on the
bucket, the bucket will be deleted.
"""
def get_buckets(storage_index=StorageIndex):
return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)

20
src/allmydata/owner.sql Normal file
View File

@ -0,0 +1,20 @@
CREATE TABLE buckets
(
bucket_id integer PRIMARY KEY AUTOINCREMENT,
storage_index char(32)
);
CREATE TABLE owners
(
owner_id integer PRIMARY KEY AUTOINCREMENT
);
CREATE TABLE leases
(
lease_id integer PRIMARY KEY AUTOINCREMENT,
bucket_id integer,
owner_id integer,
renew_secret char(32),
cancel_secret char(32),
expire_time timestamp
);

View File

@ -1,8 +1,9 @@
import os, re, weakref, stat, struct
import os, re, weakref, stat, struct, time
from foolscap import Referenceable
from twisted.application import service
from twisted.internet import defer
from twisted.python import util
from zope.interface import implements
from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
@ -10,14 +11,17 @@ from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
from allmydata.util import fileutil, idlib, mathutil
from allmydata.util.assertutil import precondition
from pysqlite2 import dbapi2 as sqlite
# store/
# store/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/$STORAGEINDEX/$SHARENUM on success
# store/$STORAGEINDEX
# store/$STORAGEINDEX/$SHARENUM
# store/$STORAGEINDEX/$SHARENUM/blocksize
# store/$STORAGEINDEX/$SHARENUM/data
# store/$STORAGEINDEX/$SHARENUM/blockhashes
# store/$STORAGEINDEX/$SHARENUM/sharehashtree
# store/owners.db
# store/shares/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/shares/$STORAGEINDEX/$SHARENUM on success
# store/shares/$STORAGEINDEX
# store/shares/$STORAGEINDEX/$SHARENUM
# store/shares/$STORAGEINDEX/$SHARENUM/blocksize
# store/shares/$STORAGEINDEX/$SHARENUM/data
# store/shares/$STORAGEINDEX/$SHARENUM/blockhashes
# store/shares/$STORAGEINDEX/$SHARENUM/sharehashtree
# $SHARENUM matches this regex:
NUM_RE=re.compile("[0-9]*")
@ -75,22 +79,40 @@ class StorageServer(service.MultiService, Referenceable):
def __init__(self, storedir, sizelimit=None, no_storage=False):
service.MultiService.__init__(self)
fileutil.make_dirs(storedir)
self.storedir = storedir
sharedir = os.path.join(storedir, "shares")
fileutil.make_dirs(sharedir)
self.sharedir = sharedir
self.sizelimit = sizelimit
self.no_storage = no_storage
self.incomingdir = os.path.join(storedir, 'incoming')
self.incomingdir = os.path.join(sharedir, 'incoming')
self._clean_incomplete()
fileutil.make_dirs(self.incomingdir)
self._active_writers = weakref.WeakKeyDictionary()
self.init_db()
self.measure_size()
def _clean_incomplete(self):
fileutil.rm_dir(self.incomingdir)
def init_db(self):
# files in storedir with non-zbase32 characters in it (like ".") are
# safe, in that they cannot be accessed or overwritten by clients
# (whose binary storage_index values are always converted into a
# filename with idlib.b2a)
db_file = os.path.join(self.storedir, "owners.db")
need_to_init_db = not os.path.exists(db_file)
self._owner_db_con = sqlite.connect(db_file)
self._owner_db_cur = self._owner_db_con.cursor()
if need_to_init_db:
setup_file = util.sibpath(__file__, "owner.sql")
setup = open(setup_file, "r").read()
self._owner_db_cur.executescript(setup)
def measure_size(self):
self.consumed = fileutil.du(self.storedir)
self.consumed = fileutil.du(self.sharedir)
def allocated_size(self):
space = self.consumed
@ -112,7 +134,7 @@ class StorageServer(service.MultiService, Referenceable):
remaining_space = self.sizelimit - self.allocated_size()
for shnum in sharenums:
incominghome = os.path.join(self.incomingdir, si_s, "%d" % shnum)
finalhome = os.path.join(self.storedir, si_s, "%d" % shnum)
finalhome = os.path.join(self.sharedir, si_s, "%d" % shnum)
if os.path.exists(incominghome) or os.path.exists(finalhome):
alreadygot.add(shnum)
elif no_limits or remaining_space >= space_per_bucket:
@ -130,17 +152,127 @@ class StorageServer(service.MultiService, Referenceable):
pass
if bucketwriters:
fileutil.make_dirs(os.path.join(self.storedir, si_s))
fileutil.make_dirs(os.path.join(self.sharedir, si_s))
# now store the secrets somewhere. This requires a
# variable-length-list of (renew,cancel) secret tuples per bucket.
# Note that this does not need to be kept inside the share itself, if
# packing efficiency is a concern. For this implementation, we use a
# sqlite database, which puts everything in a single file.
self.add_lease(storage_index, renew_secret, cancel_secret)
return alreadygot, bucketwriters
def add_lease(self, storage_index, renew_secret, cancel_secret):
# is the bucket already in our database?
cur = self._owner_db_cur
cur.execute("SELECT bucket_id FROM buckets"
" WHERE storage_index = ?",
(storage_index,))
res = cur.fetchone()
if res:
bucket_id = res[0]
else:
cur.execute("INSERT INTO buckets (storage_index)"
" values(?)", (storage_index,))
cur.execute("SELECT bucket_id FROM buckets"
" WHERE storage_index = ?",
(storage_index,))
res = cur.fetchone()
bucket_id = res[0]
# what time will this lease expire? One month from now.
expire_time = time.time() + 31*24*60*60
# now, is this lease already in our database? Since we don't have
# owners yet, look for a match by renew_secret/cancel_secret
cur.execute("SELECT lease_id FROM leases"
" WHERE renew_secret = ? AND cancel_secret = ?",
(renew_secret, cancel_secret))
res = cur.fetchone()
if res:
# yes, so just update the timestamp
lease_id = res[0]
cur.execute("UPDATE leases"
" SET expire_time = ?"
" WHERE lease_id = ?",
(expire_time, lease_id))
else:
# no, we need to add the lease
cur.execute("INSERT INTO leases "
"(bucket_id, renew_secret, cancel_secret, expire_time)"
" values(?,?,?,?)",
(bucket_id, renew_secret, cancel_secret, expire_time))
self._owner_db_con.commit()
def remote_renew_lease(self, storage_index, renew_secret):
# find the lease
cur = self._owner_db_cur
cur.execute("SELECT leases.lease_id FROM buckets, leases"
" WHERE buckets.storage_index = ?"
" AND buckets.bucket_id = leases.bucket_id"
" AND leases.renew_secret = ?",
(storage_index, renew_secret))
res = cur.fetchone()
if res:
# found it, now update it. The new leases will expire one month
# from now.
expire_time = time.time() + 31*24*60*60
lease_id = res[0]
cur.execute("UPDATE leases"
" SET expire_time = ?"
" WHERE lease_id = ?",
(expire_time, lease_id))
else:
# no such lease
raise IndexError("No such lease")
self._owner_db_con.commit()
def remote_cancel_lease(self, storage_index, cancel_secret):
# find the lease
cur = self._owner_db_cur
cur.execute("SELECT l.lease_id, b.storage_index, b.bucket_id"
" FROM buckets b, leases l"
" WHERE b.storage_index = ?"
" AND b.bucket_id = l.bucket_id"
" AND l.cancel_secret = ?",
(storage_index, cancel_secret))
res = cur.fetchone()
if res:
# found it
lease_id, storage_index, bucket_id = res
cur.execute("DELETE FROM leases WHERE lease_id = ?",
(lease_id,))
# was that the last one?
cur.execute("SELECT COUNT(*) FROM leases WHERE bucket_id = ?",
(bucket_id,))
res = cur.fetchone()
remaining_leases = res[0]
if not remaining_leases:
# delete the share
cur.execute("DELETE FROM buckets WHERE bucket_id = ?",
(bucket_id,))
self.delete_bucket(storage_index)
else:
# no such lease
raise IndexError("No such lease")
self._owner_db_con.commit()
def delete_bucket(self, storage_index):
storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
# measure the usage of this directory, to remove it from our current
# total
consumed = fileutil.du(storagedir)
fileutil.rm_dir(storagedir)
self.consumed -= consumed
def bucket_writer_closed(self, bw, consumed_size):
self.consumed += consumed_size
del self._active_writers[bw]
def remote_get_buckets(self, storage_index):
bucketreaders = {} # k: sharenum, v: BucketReader
storagedir = os.path.join(self.storedir, idlib.b2a(storage_index))
storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
try:
for f in os.listdir(storagedir):
if NUM_RE.match(f):

View File

@ -5,15 +5,12 @@ from twisted.application import service
from twisted.internet import defer
from foolscap import Referenceable
import os.path
import itertools
from allmydata import interfaces
from allmydata.util import fileutil, hashutil
from allmydata.storage import BucketWriter, BucketReader, \
WriteBucketProxy, ReadBucketProxy, StorageServer
RS = hashutil.tagged_hash("blah", "foo")
CS = RS
class Bucket(unittest.TestCase):
def make_workdir(self, name):
basedir = os.path.join("storage", "Bucket", name)
@ -167,6 +164,7 @@ class Server(unittest.TestCase):
def setUp(self):
self.sparent = service.MultiService()
self._secret = itertools.count()
def tearDown(self):
return self.sparent.stopService()
@ -183,14 +181,20 @@ class Server(unittest.TestCase):
def test_create(self):
ss = self.create("test_create")
def allocate(self, ss, storage_index, sharenums, size):
renew_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
cancel_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
return ss.remote_allocate_buckets(storage_index,
renew_secret, cancel_secret,
sharenums, size, Referenceable())
def test_allocate(self):
ss = self.create("test_allocate")
self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
canary = Referenceable()
already,writers = ss.remote_allocate_buckets("vid", RS, CS, [0,1,2],
75, canary)
already,writers = self.allocate(ss, "vid", [0,1,2], 75)
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
@ -208,8 +212,7 @@ class Server(unittest.TestCase):
# now if we about writing again, the server should offer those three
# buckets as already present
already,writers = ss.remote_allocate_buckets("vid", RS, CS, [0,1,2,3,4],
75, canary)
already,writers = self.allocate(ss, "vid", [0,1,2,3,4], 75)
self.failUnlessEqual(already, set([0,1,2]))
self.failUnlessEqual(set(writers.keys()), set([3,4]))
@ -217,8 +220,7 @@ class Server(unittest.TestCase):
# tell new uploaders that they already exist (so that we don't try to
# upload into them a second time)
already,writers = ss.remote_allocate_buckets("vid", RS, CS, [2,3,4,5],
75, canary)
already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
self.failUnlessEqual(already, set([2,3,4]))
self.failUnlessEqual(set(writers.keys()), set([5]))
@ -226,15 +228,13 @@ class Server(unittest.TestCase):
ss = self.create("test_sizelimits", 100)
canary = Referenceable()
already,writers = ss.remote_allocate_buckets("vid1", RS, CS, [0,1,2],
25, canary)
already,writers = self.allocate(ss, "vid1", [0,1,2], 25)
self.failUnlessEqual(len(writers), 3)
# now the StorageServer should have 75 bytes provisionally allocated,
# allowing only 25 more to be claimed
self.failUnlessEqual(len(ss._active_writers), 3)
already2,writers2 = ss.remote_allocate_buckets("vid2", RS, CS, [0,1,2],
25, canary)
already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 25)
self.failUnlessEqual(len(writers2), 1)
self.failUnlessEqual(len(ss._active_writers), 4)
@ -255,9 +255,7 @@ class Server(unittest.TestCase):
self.failUnlessEqual(len(ss._active_writers), 0)
# now there should be 25 bytes allocated, and 75 free
already3,writers3 = ss.remote_allocate_buckets("vid3", RS, CS,
[0,1,2,3],
25, canary)
already3,writers3 = self.allocate(ss,"vid3", [0,1,2,3], 25)
self.failUnlessEqual(len(writers3), 3)
self.failUnlessEqual(len(ss._active_writers), 3)
@ -272,8 +270,77 @@ class Server(unittest.TestCase):
# during runtime, so if we were creating any metadata, the allocation
# would be more than 25 bytes and this test would need to be changed.
ss = self.create("test_sizelimits", 100)
already4,writers4 = ss.remote_allocate_buckets("vid4",
RS, CS, [0,1,2,3],
25, canary)
already4,writers4 = self.allocate(ss, "vid4", [0,1,2,3], 25)
self.failUnlessEqual(len(writers4), 3)
self.failUnlessEqual(len(ss._active_writers), 3)
def test_leases(self):
ss = self.create("test_leases")
canary = Referenceable()
sharenums = range(5)
size = 100
rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
hashutil.tagged_hash("blah", "%d" % self._secret.next()))
already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
sharenums, size, canary)
self.failUnlessEqual(len(already), 0)
self.failUnlessEqual(len(writers), 5)
for wb in writers.values():
wb.remote_close()
rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
hashutil.tagged_hash("blah", "%d" % self._secret.next()))
already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
sharenums, size, canary)
for wb in writers.values():
wb.remote_close()
# take out a second lease on si1
rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
hashutil.tagged_hash("blah", "%d" % self._secret.next()))
already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
sharenums, size, canary)
self.failUnlessEqual(len(already), 5)
self.failUnlessEqual(len(writers), 0)
# check that si0 is readable
readers = ss.remote_get_buckets("si0")
self.failUnlessEqual(len(readers), 5)
# renew the first lease. Only the proper renew_secret should work
ss.remote_renew_lease("si0", rs0)
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
# check that si0 is still readable
readers = ss.remote_get_buckets("si0")
self.failUnlessEqual(len(readers), 5)
# now cancel it
self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
ss.remote_cancel_lease("si0", cs0)
# si0 should now be gone
readers = ss.remote_get_buckets("si0")
self.failUnlessEqual(len(readers), 0)
# and the renew should no longer work
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
# cancel the first lease on si1, leaving the second in place
ss.remote_cancel_lease("si1", cs1)
readers = ss.remote_get_buckets("si1")
self.failUnlessEqual(len(readers), 5)
# the corresponding renew should no longer work
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
ss.remote_renew_lease("si1", rs2)
# cancelling the second should make it go away
ss.remote_cancel_lease("si1", cs2)
readers = ss.remote_get_buckets("si1")
self.failUnlessEqual(len(readers), 0)
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)

View File

@ -616,8 +616,8 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
if not filenames:
continue
pieces = dirpath.split(os.sep)
if pieces[-2] == "storage":
# we're sitting in .../storage/$SINDEX , and there are
if pieces[-3] == "storage" and pieces[-2] == "shares":
# we're sitting in .../storage/shares/$SINDEX , and there are
# sharefiles here
filename = os.path.join(dirpath, filenames[0])
break