storage: handle simultanous uploads: add a lease for the pre-empted client

This commit is contained in:
Brian Warner 2007-09-02 14:57:49 -07:00
parent dbcabc1142
commit 85f3107b12
2 changed files with 27 additions and 6 deletions

View File

@ -157,7 +157,6 @@ class BucketWriter(Referenceable):
self.incominghome = incominghome
self.finalhome = finalhome
self._size = size
self._lease_info = lease_info
self.closed = False
self.throw_out_all_data = False
# touch the file, so later callers will see that we're working on it.
@ -167,6 +166,9 @@ class BucketWriter(Referenceable):
f.write(struct.pack(">LLL", 1, size, 0))
f.close()
self._sharefile = ShareFile(self.incominghome)
# also, add our lease to the file now, so that other ones can be
# added by simultaneous uploaders
self._sharefile.add_lease(lease_info)
def allocated_size(self):
return self._size
@ -179,7 +181,6 @@ class BucketWriter(Referenceable):
def remote_close(self):
precondition(not self.closed)
self._sharefile.add_lease(self._lease_info)
fileutil.rename(self.incominghome, self.finalhome)
self._sharefile = None
self.closed = True
@ -256,13 +257,13 @@ class StorageServer(service.MultiService, Referenceable):
finalhome = os.path.join(self.sharedir, si_s, "%d" % shnum)
if os.path.exists(incominghome) or os.path.exists(finalhome):
alreadygot.add(shnum)
# add a lease
# add a lease for the client whose upload was pre-empted
if os.path.exists(incominghome):
# TODO: add a lease to the still-in-construction share
pass
# the lease gets added to the still-in-construction share
sf = ShareFile(incominghome)
else:
sf = ShareFile(finalhome)
sf.add_lease(lease_info)
sf.add_lease(lease_info)
elif no_limits or remaining_space >= space_per_bucket:
fileutil.make_dirs(os.path.join(self.incomingdir, si_s))
bw = BucketWriter(self, incominghome, finalhome,

View File

@ -406,3 +406,23 @@ class Server(unittest.TestCase):
leases = list(ss.get_leases("si1"))
self.failUnlessEqual(len(leases), 0)
# test overlapping uploads
rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
hashutil.tagged_hash("blah", "%d" % self._secret.next()))
rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
hashutil.tagged_hash("blah", "%d" % self._secret.next()))
already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
sharenums, size, canary)
self.failUnlessEqual(len(already), 0)
self.failUnlessEqual(len(writers), 5)
already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
sharenums, size, canary)
self.failUnlessEqual(len(already2), 5)
self.failUnlessEqual(len(writers2), 0)
for wb in writers.values():
wb.remote_close()
leases = list(ss.get_leases("si3"))
self.failUnlessEqual(len(leases), 2)