mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-19 19:26:25 +00:00
#620: storage: allow mutable shares to be deleted, with a writev where new_length=0
This commit is contained in:
parent
6a5f28f47d
commit
13a3ef5ec1
@ -207,7 +207,7 @@ class RIStorageServer(RemoteInterface):
|
||||
can be used to pre-allocate space for a series of upcoming writes, or
|
||||
truncate existing data. If the container is growing, new_length will
|
||||
be applied before datav. If the container is shrinking, it will be
|
||||
applied afterwards.
|
||||
applied afterwards. If new_length==0, the share will be deleted.
|
||||
|
||||
The read vector is used to extract data from all known shares,
|
||||
*before* any writes have been applied. The same vector is used for
|
||||
|
@ -34,6 +34,7 @@ class RemoteServiceConnector:
|
||||
"storage": { "http://allmydata.org/tahoe/protocols/storage/v1" :
|
||||
{ "maximum-immutable-share-size": 2**32,
|
||||
"tolerates-immutable-read-overrun": False,
|
||||
"delete-mutable-shares-with-zero-length-writev": False,
|
||||
},
|
||||
"application-version": "unknown: no get_version()",
|
||||
},
|
||||
|
@ -963,6 +963,7 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
|
||||
{ "maximum-immutable-share-size": remaining_space,
|
||||
"tolerates-immutable-read-overrun": True,
|
||||
"delete-mutable-shares-with-zero-length-writev": True,
|
||||
},
|
||||
"application-version": str(allmydata.__version__),
|
||||
}
|
||||
@ -1218,27 +1219,37 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
for sharenum, share in shares.items():
|
||||
read_data[sharenum] = share.readv(read_vector)
|
||||
|
||||
ownerid = 1 # TODO
|
||||
expire_time = time.time() + 31*24*60*60 # one month
|
||||
lease_info = LeaseInfo(ownerid,
|
||||
renew_secret, cancel_secret,
|
||||
expire_time, self.my_nodeid)
|
||||
|
||||
if testv_is_good:
|
||||
# now apply the write vectors
|
||||
for sharenum in test_and_write_vectors:
|
||||
(testv, datav, new_length) = test_and_write_vectors[sharenum]
|
||||
if sharenum not in shares:
|
||||
# allocate a new share
|
||||
allocated_size = 2000 # arbitrary, really
|
||||
share = self._allocate_slot_share(bucketdir, secrets,
|
||||
sharenum,
|
||||
allocated_size,
|
||||
owner_num=0)
|
||||
shares[sharenum] = share
|
||||
shares[sharenum].writev(datav, new_length)
|
||||
# and update the leases on all shares
|
||||
ownerid = 1 # TODO
|
||||
expire_time = time.time() + 31*24*60*60 # one month
|
||||
lease_info = LeaseInfo(ownerid,
|
||||
renew_secret, cancel_secret,
|
||||
expire_time, self.my_nodeid)
|
||||
for share in shares.values():
|
||||
share.add_or_renew_lease(lease_info)
|
||||
if new_length == 0:
|
||||
if sharenum in shares:
|
||||
shares[sharenum].unlink()
|
||||
else:
|
||||
if sharenum not in shares:
|
||||
# allocate a new share
|
||||
allocated_size = 2000 # arbitrary, really
|
||||
share = self._allocate_slot_share(bucketdir, secrets,
|
||||
sharenum,
|
||||
allocated_size,
|
||||
owner_num=0)
|
||||
shares[sharenum] = share
|
||||
shares[sharenum].writev(datav, new_length)
|
||||
# and update the lease
|
||||
shares[sharenum].add_or_renew_lease(lease_info)
|
||||
|
||||
if new_length == 0:
|
||||
# delete empty bucket directories
|
||||
if not os.listdir(bucketdir):
|
||||
os.rmdir(bucketdir)
|
||||
|
||||
|
||||
# all done
|
||||
self.add_latency("writev", time.time() - start)
|
||||
|
@ -1150,6 +1150,50 @@ class MutableServer(unittest.TestCase):
|
||||
self.failUnlessRaises(IndexError,
|
||||
ss.remote_cancel_lease, "si2", "nonsecret")
|
||||
|
||||
def test_remove(self):
|
||||
ss = self.create("test_remove")
|
||||
self.allocate(ss, "si1", "we1", self._lease_secret.next(),
|
||||
set([0,1,2]), 100)
|
||||
readv = ss.remote_slot_readv
|
||||
writev = ss.remote_slot_testv_and_readv_and_writev
|
||||
secrets = ( self.write_enabler("we1"),
|
||||
self.renew_secret("we1"),
|
||||
self.cancel_secret("we1") )
|
||||
# delete sh0 by setting its size to zero
|
||||
answer = writev("si1", secrets,
|
||||
{0: ([], [], 0)},
|
||||
[])
|
||||
# the answer should mention all the shares that existed before the
|
||||
# write
|
||||
self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
|
||||
# but a new read should show only sh1 and sh2
|
||||
self.failUnlessEqual(readv("si1", [], [(0,10)]),
|
||||
{1: [""], 2: [""]})
|
||||
|
||||
# delete sh1 by setting its size to zero
|
||||
answer = writev("si1", secrets,
|
||||
{1: ([], [], 0)},
|
||||
[])
|
||||
self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
|
||||
self.failUnlessEqual(readv("si1", [], [(0,10)]),
|
||||
{2: [""]})
|
||||
|
||||
# delete sh2 by setting its size to zero
|
||||
answer = writev("si1", secrets,
|
||||
{2: ([], [], 0)},
|
||||
[])
|
||||
self.failUnlessEqual(answer, (True, {2:[]}) )
|
||||
self.failUnlessEqual(readv("si1", [], [(0,10)]),
|
||||
{})
|
||||
# and the bucket directory should now be gone
|
||||
si = base32.b2a("si1")
|
||||
# note: this is a detail of the storage server implementation, and
|
||||
# may change in the future
|
||||
prefix = si[:2]
|
||||
prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
|
||||
bucketdir = os.path.join(prefixdir, si)
|
||||
self.failUnless(os.path.exists(prefixdir))
|
||||
self.failIf(os.path.exists(bucketdir))
|
||||
|
||||
class Stats(unittest.TestCase):
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user