storage: ignore shares in incoming/, to make clients use other servers during simultaneous uploads

This commit is contained in:
Brian Warner 2008-06-10 11:53:10 -07:00
parent 1efcf2ee3c
commit 814922a9a1
2 changed files with 20 additions and 7 deletions

View File

@ -772,8 +772,7 @@ class StorageServer(service.MultiService, Referenceable):
# they asked about: this will save them a lot of work. Add or update
# leases for all of them: if they want us to hold shares for this
# file, they'll want us to hold leases for this file.
for (shnum, fn) in chain(self._get_bucket_shares(storage_index),
self._get_incoming_shares(storage_index)):
for (shnum, fn) in self._get_bucket_shares(storage_index):
alreadygot.add(shnum)
sf = ShareFile(fn)
sf.add_or_renew_lease(lease_info)
@ -785,9 +784,15 @@ class StorageServer(service.MultiService, Referenceable):
for shnum in sharenums:
incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
if os.path.exists(incominghome) or os.path.exists(finalhome):
if os.path.exists(finalhome):
# great! we already have it. easy.
pass
elif os.path.exists(incominghome):
# Note that we don't create BucketWriters for shnums that
# have a partial share (in incoming/), so if a second upload
# occurs while the first is still in progress, the second
# uploader will use different storage servers.
pass
elif no_limits or remaining_space >= space_per_bucket:
# ok! we need to create the new share file.
bw = BucketWriter(self, incominghome, finalhome,

View File

@ -261,6 +261,7 @@ class Server(unittest.TestCase):
# while the buckets are open, they should not count as readable
self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
# close the buckets
for i,wb in writers.items():
wb.remote_write(0, "%25d" % i)
wb.remote_close()
@ -278,11 +279,10 @@ class Server(unittest.TestCase):
self.failUnlessEqual(set(writers.keys()), set([3,4]))
# while those two buckets are open for writing, the server should
# tell new uploaders that they already exist (so that we don't try to
# upload into them a second time)
# refuse to offer them to uploaders
already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
self.failUnlessEqual(already, set([0,1,2,3,4]))
self.failUnlessEqual(already, set([0,1,2]))
self.failUnlessEqual(set(writers.keys()), set([5]))
def test_sizelimits(self):
@ -465,11 +465,19 @@ class Server(unittest.TestCase):
self.failUnlessEqual(len(writers), 5)
already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
sharenums, size, canary)
self.failUnlessEqual(len(already2), 5)
self.failUnlessEqual(len(already2), 0)
self.failUnlessEqual(len(writers2), 0)
for wb in writers.values():
wb.remote_close()
leases = list(ss.get_leases("si3"))
self.failUnlessEqual(len(leases), 1)
already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
sharenums, size, canary)
self.failUnlessEqual(len(already3), 5)
self.failUnlessEqual(len(writers3), 0)
leases = list(ss.get_leases("si3"))
self.failUnlessEqual(len(leases), 2)