diff --git a/src/allmydata/storage.py b/src/allmydata/storage.py index b0be6b1be..a89f05c5e 100644 --- a/src/allmydata/storage.py +++ b/src/allmydata/storage.py @@ -17,10 +17,14 @@ class DataTooLargeError(Exception): # storage/ # storage/shares/incoming -# incoming/ holds temp dirs named $STORAGEINDEX/$SHARENUM which will be -# moved to storage/shares/$STORAGEINDEX/$SHARENUM upon success -# storage/shares/$STORAGEINDEX -# storage/shares/$STORAGEINDEX/$SHARENUM +# incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will +# be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success +# storage/shares/$START/$STORAGEINDEX +# storage/shares/$START/$STORAGEINDEX/$SHARENUM + +# Where "$START" denotes the first 14 bits worth of $STORAGEINDEX (that's 3 +# base-32 chars, but the last one has only 4 bits in it -- i.e. only 16 possible +# chars in the last position). # $SHARENUM matches this regex: NUM_RE=re.compile("^[0-9]+$") @@ -42,6 +46,9 @@ NUM_RE=re.compile("^[0-9]+$") # B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch # B+0x48: next lease, or end of record +def storage_index_to_dir(storageindex): + return os.path.join(idlib.b2a_l(storageindex[:2], 14), idlib.b2a(storageindex)) + class ShareFile: LEASE_SIZE = struct.calcsize(">L32s32sL") @@ -174,12 +181,13 @@ class BucketWriter(Referenceable): self.closed = False self.throw_out_all_data = False # touch the file, so later callers will see that we're working on it. + assert not os.path.exists(incominghome) + fileutil.make_dirs(os.path.dirname(incominghome)) # Also construct the metadata. - assert not os.path.exists(self.incominghome) - f = open(self.incominghome, 'wb') + f = open(incominghome, 'wb') f.write(struct.pack(">LLL", 1, size, 0)) f.close() - self._sharefile = ShareFile(self.incominghome) + self._sharefile = ShareFile(incominghome) # also, add our lease to the file now, so that other ones can be # added by simultaneous uploaders self._sharefile.add_lease(lease_info) @@ -195,7 +203,15 @@ class BucketWriter(Referenceable): def remote_close(self): precondition(not self.closed) + + fileutil.make_dirs(os.path.dirname(self.finalhome)) fileutil.rename(self.incominghome, self.finalhome) + try: + os.rmdir(os.path.dirname(self.incominghome)) + os.rmdir(os.path.dirname(os.path.dirname(self.incominghome))) + os.rmdir(os.path.dirname(os.path.dirname(os.path.dirname(self.incominghome)))) + except EnvironmentError: + pass self._sharefile = None self.closed = True self._canary.dontNotifyOnDisconnect(self._disconnect_marker) @@ -203,12 +219,6 @@ class BucketWriter(Referenceable): filelen = os.stat(self.finalhome)[stat.ST_SIZE] self.ss.bucket_writer_closed(self, filelen) - # if we were the last share to be moved, remove the incoming/ - # directory that was our parent - parentdir = os.path.split(self.incominghome)[0] - if not os.listdir(parentdir): - os.rmdir(parentdir) - def _disconnected(self): if not self.closed: self._abort() @@ -235,8 +245,8 @@ class BucketWriter(Referenceable): class BucketReader(Referenceable): implements(RIBucketReader) - def __init__(self, home): - self._share_file = ShareFile(home) + def __init__(self, sharefname): + self._share_file = ShareFile(sharefname) def remote_read(self, offset, length): return self._share_file.read_share_data(offset, length) @@ -719,7 +729,7 @@ class StorageServer(service.MultiService, Referenceable): # to a particular owner. alreadygot = set() bucketwriters = {} # k: shnum, v: BucketWriter - si_s = idlib.b2a(storage_index) + si_s = storage_index_to_dir(storage_index) # in this implementation, the lease information (including secrets) # goes into the share files themselves. It could also be put into a @@ -752,7 +762,6 @@ class StorageServer(service.MultiService, Referenceable): pass elif no_limits or remaining_space >= space_per_bucket: # ok! we need to create the new share file. - fileutil.make_dirs(os.path.join(self.incomingdir, si_s)) bw = BucketWriter(self, incominghome, finalhome, space_per_bucket, lease_info, canary) if self.no_storage: @@ -792,7 +801,7 @@ class StorageServer(service.MultiService, Referenceable): raise IndexError("no such lease to renew") def remote_cancel_lease(self, storage_index, cancel_secret): - storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index)) + storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index)) remaining_files = 0 total_space_freed = 0 @@ -844,7 +853,7 @@ class StorageServer(service.MultiService, Referenceable): """Return a list of (shnum, pathname) tuples for files that hold shares for this storage_index. In each tuple, 'shnum' will always be the integer form of the last component of 'pathname'.""" - storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index)) + storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index)) try: for f in os.listdir(storagedir): if NUM_RE.match(f): @@ -855,7 +864,7 @@ class StorageServer(service.MultiService, Referenceable): pass def _get_incoming_shares(self, storage_index): - incomingdir = os.path.join(self.incomingdir, idlib.b2a(storage_index)) + incomingdir = os.path.join(self.incomingdir, storage_index_to_dir(storage_index)) try: for f in os.listdir(incomingdir): if NUM_RE.match(f): @@ -891,7 +900,7 @@ class StorageServer(service.MultiService, Referenceable): secrets, test_and_write_vectors, read_vector): - si_s = idlib.b2a(storage_index) + si_s = storage_index_to_dir(storage_index) (write_enabler, renew_secret, cancel_secret) = secrets # shares exist if there is a file for them bucketdir = os.path.join(self.sharedir, si_s) @@ -968,7 +977,7 @@ class StorageServer(service.MultiService, Referenceable): return share def remote_slot_readv(self, storage_index, shares, readv): - si_s = idlib.b2a(storage_index) + si_s = storage_index_to_dir(storage_index) # shares exist if there is a file for them bucketdir = os.path.join(self.sharedir, si_s) if not os.path.isdir(bucketdir): diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 5d09e14dc..550891e4f 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -7,7 +7,8 @@ import itertools from allmydata import interfaces from allmydata.util import fileutil, hashutil, idlib from allmydata.storage import BucketWriter, BucketReader, \ - WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile + WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \ + storage_index_to_dir from allmydata.interfaces import BadWriteEnablerError from allmydata.test.common import LoggingServiceParent @@ -56,7 +57,7 @@ class Bucket(unittest.TestCase): bw.remote_close() # now read from it - br = BucketReader(final) + br = BucketReader(bw.finalhome) self.failUnlessEqual(br.remote_read(0, 25), "a"*25) self.failUnlessEqual(br.remote_read(25, 25), "b"*25) self.failUnlessEqual(br.remote_read(50, 7), "c"*7) @@ -93,7 +94,7 @@ class BucketProxy(unittest.TestCase): pass def test_create(self): - bw, rb, final = self.make_bucket("test_create", 500) + bw, rb, sharefname = self.make_bucket("test_create", 500) bp = WriteBucketProxy(rb, data_size=300, segment_size=10, @@ -123,7 +124,7 @@ class BucketProxy(unittest.TestCase): for i in (1,9,13)] uri_extension = "s" + "E"*498 + "e" - bw, rb, final = self.make_bucket("test_readwrite", 1414) + bw, rb, sharefname = self.make_bucket("test_readwrite", 1414) bp = WriteBucketProxy(rb, data_size=95, segment_size=25, @@ -146,7 +147,7 @@ class BucketProxy(unittest.TestCase): # now read everything back def _start_reading(res): - br = BucketReader(final) + br = BucketReader(sharefname) rb = RemoteBucket() rb.target = br rbp = ReadBucketProxy(rb) @@ -212,16 +213,40 @@ class Server(unittest.TestCase): renew_secret, cancel_secret, sharenums, size, FakeCanary()) + def test_dont_overfill_dirs(self): + """ + This test asserts that if you add a second share whose storage index + share lots of leading bits with an extant share (but isn't the exact + same storage index), this won't add an entry to the share directory. + """ + ss = self.create("test_dont_overfill_dirs") + already, writers = self.allocate(ss, "storageindex", [0], 10) + for i, wb in writers.items(): + wb.remote_write(0, "%10d" % i) + wb.remote_close() + storedir = os.path.join(self.workdir("test_dont_overfill_dirs"), + "shares") + children_of_storedir = set(os.listdir(storedir)) + + # Now store another one under another storageindex that has leading + # chars the same as the first storageindex. + already, writers = self.allocate(ss, "storageindey", [0], 10) + for i, wb in writers.items(): + wb.remote_write(0, "%10d" % i) + wb.remote_close() + storedir = os.path.join(self.workdir("test_dont_overfill_dirs"), + "shares") + new_children_of_storedir = set(os.listdir(storedir)) + self.failUnlessEqual(children_of_storedir, new_children_of_storedir) + def test_remove_incoming(self): ss = self.create("test_remove_incoming") already, writers = self.allocate(ss, "vid", range(3), 10) for i,wb in writers.items(): wb.remote_write(0, "%10d" % i) wb.remote_close() - incomingdir = os.path.join(self.workdir("test_remove_incoming"), - "shares", "incoming") - leftover_dirs = os.listdir(incomingdir) - self.failUnlessEqual(leftover_dirs, []) + incomingdir = os.path.dirname(os.path.dirname(os.path.dirname(wb.incominghome))) + self.failIf(os.path.exists(incomingdir)) def test_allocate(self): ss = self.create("test_allocate") @@ -785,7 +810,7 @@ class MutableServer(unittest.TestCase): # create a random non-numeric file in the bucket directory, to # exercise the code that's supposed to ignore those. bucket_dir = os.path.join(self.workdir("test_leases"), - "shares", idlib.b2a("si1")) + "shares", storage_index_to_dir("si1")) f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w") f.write("you ought to be ignoring me\n") f.close() diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 75de3da62..8bcb70698 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -372,7 +372,7 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): for i in range(self.numclients): incdir = os.path.join(self.getdir("client%d" % i), "storage", "shares", "incoming") - self.failUnlessEqual(os.listdir(incdir), []) + self.failIf(os.path.exists(incdir) and os.listdir(incdir)) d.addCallback(_disconnected) def _wait_for_reconnect(res): @@ -442,11 +442,11 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): if not filenames: continue pieces = dirpath.split(os.sep) - if pieces[-3] == "storage" and pieces[-2] == "shares": - # we're sitting in .../storage/shares/$SINDEX , and there + if pieces[-4] == "storage" and pieces[-3] == "shares": + # we're sitting in .../storage/shares/$START/$SINDEX , and there # are sharefiles here - assert pieces[-4].startswith("client") - client_num = int(pieces[-4][-1]) + assert pieces[-5].startswith("client") + client_num = int(pieces[-5][-1]) storage_index_s = pieces[-1] storage_index = idlib.a2b(storage_index_s) for sharename in filenames: @@ -1115,9 +1115,9 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): if not filenames: continue pieces = dirpath.split(os.sep) - if pieces[-3] == "storage" and pieces[-2] == "shares": - # we're sitting in .../storage/shares/$SINDEX , and there are - # sharefiles here + if pieces[-4] == "storage" and pieces[-3] == "shares": + # we're sitting in .../storage/shares/$START/$SINDEX , and there + # are sharefiles here filename = os.path.join(dirpath, filenames[0]) # peek at the magic to see if it is a chk share magic = open(filename, "rb").read(4)