storage: fill alreadygot= with all known shares for the given storageindex, not just the ones they asked about

This commit is contained in:
Brian Warner 2007-09-17 00:48:40 -07:00
parent b3b94d24df
commit 8451b485a4
2 changed files with 31 additions and 12 deletions

View File

@ -1,4 +1,5 @@
import os, re, weakref, stat, struct, time import os, re, weakref, stat, struct, time
from itertools import chain
from foolscap import Referenceable from foolscap import Referenceable
from twisted.application import service from twisted.application import service
@ -211,6 +212,7 @@ class BucketReader(Referenceable):
def remote_read(self, offset, length): def remote_read(self, offset, length):
return self._share_file.read_share_data(offset, length) return self._share_file.read_share_data(offset, length)
class StorageServer(service.MultiService, Referenceable): class StorageServer(service.MultiService, Referenceable):
implements(RIStorageServer) implements(RIStorageServer)
name = 'storageserver' name = 'storageserver'
@ -265,19 +267,25 @@ class StorageServer(service.MultiService, Referenceable):
yes_limits = not no_limits yes_limits = not no_limits
if yes_limits: if yes_limits:
remaining_space = self.sizelimit - self.allocated_size() remaining_space = self.sizelimit - self.allocated_size()
# fill alreadygot with all shares that we have, not just the ones
# they asked about: this will save them a lot of work. Add or update
# leases for all of them: if they want us to hold shares for this
# file, they'll want us to hold leases for this file.
for (shnum, fn) in chain(self._get_bucket_shares(storage_index),
self._get_incoming_shares(storage_index)):
alreadygot.add(shnum)
sf = ShareFile(fn)
sf.add_or_renew_lease(lease_info)
for shnum in sharenums: for shnum in sharenums:
incominghome = os.path.join(self.incomingdir, si_s, "%d" % shnum) incominghome = os.path.join(self.incomingdir, si_s, "%d" % shnum)
finalhome = os.path.join(self.sharedir, si_s, "%d" % shnum) finalhome = os.path.join(self.sharedir, si_s, "%d" % shnum)
if os.path.exists(incominghome) or os.path.exists(finalhome): if os.path.exists(incominghome) or os.path.exists(finalhome):
alreadygot.add(shnum) # great! we already have it. easy.
# add a lease for the client whose upload was pre-empted pass
if os.path.exists(incominghome):
# the lease gets added to the still-in-construction share
sf = ShareFile(incominghome)
else:
sf = ShareFile(finalhome)
sf.add_or_renew_lease(lease_info)
elif no_limits or remaining_space >= space_per_bucket: elif no_limits or remaining_space >= space_per_bucket:
# ok! we need to create the new share file.
fileutil.make_dirs(os.path.join(self.incomingdir, si_s)) fileutil.make_dirs(os.path.join(self.incomingdir, si_s))
bw = BucketWriter(self, incominghome, finalhome, bw = BucketWriter(self, incominghome, finalhome,
space_per_bucket, lease_info) space_per_bucket, lease_info)
@ -288,7 +296,7 @@ class StorageServer(service.MultiService, Referenceable):
if yes_limits: if yes_limits:
remaining_space -= space_per_bucket remaining_space -= space_per_bucket
else: else:
# not enough space to accept this bucket # bummer! not enough space to accept this bucket
pass pass
if bucketwriters: if bucketwriters:
@ -354,6 +362,16 @@ class StorageServer(service.MultiService, Referenceable):
# Commonly caused by there being no buckets at all. # Commonly caused by there being no buckets at all.
pass pass
def _get_incoming_shares(self, storage_index):
incomingdir = os.path.join(self.incomingdir, idlib.b2a(storage_index))
try:
for f in os.listdir(incomingdir):
if NUM_RE.match(f):
filename = os.path.join(incomingdir, f)
yield (int(f), filename)
except OSError:
pass
def remote_get_buckets(self, storage_index): def remote_get_buckets(self, storage_index):
bucketreaders = {} # k: sharenum, v: BucketReader bucketreaders = {} # k: sharenum, v: BucketReader
for shnum, filename in self._get_bucket_shares(storage_index): for shnum, filename in self._get_bucket_shares(storage_index):

View File

@ -236,8 +236,9 @@ class Server(unittest.TestCase):
self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0) self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
# now if we about writing again, the server should offer those three # now if we about writing again, the server should offer those three
# buckets as already present # buckets as already present. It should offer them even if we don't
already,writers = self.allocate(ss, "vid", [0,1,2,3,4], 75) # ask about those specific ones.
already,writers = self.allocate(ss, "vid", [2,3,4], 75)
self.failUnlessEqual(already, set([0,1,2])) self.failUnlessEqual(already, set([0,1,2]))
self.failUnlessEqual(set(writers.keys()), set([3,4])) self.failUnlessEqual(set(writers.keys()), set([3,4]))
@ -246,7 +247,7 @@ class Server(unittest.TestCase):
# upload into them a second time) # upload into them a second time)
already,writers = self.allocate(ss, "vid", [2,3,4,5], 75) already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
self.failUnlessEqual(already, set([2,3,4])) self.failUnlessEqual(already, set([0,1,2,3,4]))
self.failUnlessEqual(set(writers.keys()), set([5])) self.failUnlessEqual(set(writers.keys()), set([5]))
def test_sizelimits(self): def test_sizelimits(self):