mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-08 22:12:43 +00:00
storage: make two levels of share directories so as not to exceed certain filesystems's limitations on directory size
The filesystem which gets my vote for most undeservedly popular is ext3, and it has a hard limit of 32,000 entries in a directory. Many other filesystems (even ones that I like more than I like ext3) have either hard limits or bad performance consequences or weird edge cases when you get too many entries in a single directory. This patch makes it so that there is a layer of intermediate directories between the "shares" directory and the actual storage-index directory (the one whose name contains the entire storage index (z-base-32 encoded) and which contains one or more share files named by their share number). The intermediate directories are named by the first 14 bits of the storage index, which means there are at most 16384 of them. (This also means that the intermediate directory names are not a leading prefix of the storage-index directory names -- to do that would have required us to have intermediate directories limited to either 1024 (2-char), which is too few, or 32768 (3-chars of a full 5 bits each), which would overrun ext3's funny hard limit of 32,000.)) This closes #150, and please see the "convertshares.py" script attached to #150 to convert your old tahoe-0.7.0 storage/shares directory into a new tahoe-0.8.0 storage/shares directory.
This commit is contained in:
parent
bf25a041f3
commit
79c439d026
@ -17,10 +17,14 @@ class DataTooLargeError(Exception):
|
|||||||
|
|
||||||
# storage/
|
# storage/
|
||||||
# storage/shares/incoming
|
# storage/shares/incoming
|
||||||
# incoming/ holds temp dirs named $STORAGEINDEX/$SHARENUM which will be
|
# incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
|
||||||
# moved to storage/shares/$STORAGEINDEX/$SHARENUM upon success
|
# be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
|
||||||
# storage/shares/$STORAGEINDEX
|
# storage/shares/$START/$STORAGEINDEX
|
||||||
# storage/shares/$STORAGEINDEX/$SHARENUM
|
# storage/shares/$START/$STORAGEINDEX/$SHARENUM
|
||||||
|
|
||||||
|
# Where "$START" denotes the first 14 bits worth of $STORAGEINDEX (that's 3
|
||||||
|
# base-32 chars, but the last one has only 4 bits in it -- i.e. only 16 possible
|
||||||
|
# chars in the last position).
|
||||||
|
|
||||||
# $SHARENUM matches this regex:
|
# $SHARENUM matches this regex:
|
||||||
NUM_RE=re.compile("^[0-9]+$")
|
NUM_RE=re.compile("^[0-9]+$")
|
||||||
@ -42,6 +46,9 @@ NUM_RE=re.compile("^[0-9]+$")
|
|||||||
# B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch
|
# B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch
|
||||||
# B+0x48: next lease, or end of record
|
# B+0x48: next lease, or end of record
|
||||||
|
|
||||||
|
def storage_index_to_dir(storageindex):
|
||||||
|
return os.path.join(idlib.b2a_l(storageindex[:2], 14), idlib.b2a(storageindex))
|
||||||
|
|
||||||
class ShareFile:
|
class ShareFile:
|
||||||
LEASE_SIZE = struct.calcsize(">L32s32sL")
|
LEASE_SIZE = struct.calcsize(">L32s32sL")
|
||||||
|
|
||||||
@ -174,12 +181,13 @@ class BucketWriter(Referenceable):
|
|||||||
self.closed = False
|
self.closed = False
|
||||||
self.throw_out_all_data = False
|
self.throw_out_all_data = False
|
||||||
# touch the file, so later callers will see that we're working on it.
|
# touch the file, so later callers will see that we're working on it.
|
||||||
|
assert not os.path.exists(incominghome)
|
||||||
|
fileutil.make_dirs(os.path.dirname(incominghome))
|
||||||
# Also construct the metadata.
|
# Also construct the metadata.
|
||||||
assert not os.path.exists(self.incominghome)
|
f = open(incominghome, 'wb')
|
||||||
f = open(self.incominghome, 'wb')
|
|
||||||
f.write(struct.pack(">LLL", 1, size, 0))
|
f.write(struct.pack(">LLL", 1, size, 0))
|
||||||
f.close()
|
f.close()
|
||||||
self._sharefile = ShareFile(self.incominghome)
|
self._sharefile = ShareFile(incominghome)
|
||||||
# also, add our lease to the file now, so that other ones can be
|
# also, add our lease to the file now, so that other ones can be
|
||||||
# added by simultaneous uploaders
|
# added by simultaneous uploaders
|
||||||
self._sharefile.add_lease(lease_info)
|
self._sharefile.add_lease(lease_info)
|
||||||
@ -195,7 +203,15 @@ class BucketWriter(Referenceable):
|
|||||||
|
|
||||||
def remote_close(self):
|
def remote_close(self):
|
||||||
precondition(not self.closed)
|
precondition(not self.closed)
|
||||||
|
|
||||||
|
fileutil.make_dirs(os.path.dirname(self.finalhome))
|
||||||
fileutil.rename(self.incominghome, self.finalhome)
|
fileutil.rename(self.incominghome, self.finalhome)
|
||||||
|
try:
|
||||||
|
os.rmdir(os.path.dirname(self.incominghome))
|
||||||
|
os.rmdir(os.path.dirname(os.path.dirname(self.incominghome)))
|
||||||
|
os.rmdir(os.path.dirname(os.path.dirname(os.path.dirname(self.incominghome))))
|
||||||
|
except EnvironmentError:
|
||||||
|
pass
|
||||||
self._sharefile = None
|
self._sharefile = None
|
||||||
self.closed = True
|
self.closed = True
|
||||||
self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
|
self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
|
||||||
@ -203,12 +219,6 @@ class BucketWriter(Referenceable):
|
|||||||
filelen = os.stat(self.finalhome)[stat.ST_SIZE]
|
filelen = os.stat(self.finalhome)[stat.ST_SIZE]
|
||||||
self.ss.bucket_writer_closed(self, filelen)
|
self.ss.bucket_writer_closed(self, filelen)
|
||||||
|
|
||||||
# if we were the last share to be moved, remove the incoming/
|
|
||||||
# directory that was our parent
|
|
||||||
parentdir = os.path.split(self.incominghome)[0]
|
|
||||||
if not os.listdir(parentdir):
|
|
||||||
os.rmdir(parentdir)
|
|
||||||
|
|
||||||
def _disconnected(self):
|
def _disconnected(self):
|
||||||
if not self.closed:
|
if not self.closed:
|
||||||
self._abort()
|
self._abort()
|
||||||
@ -235,8 +245,8 @@ class BucketWriter(Referenceable):
|
|||||||
class BucketReader(Referenceable):
|
class BucketReader(Referenceable):
|
||||||
implements(RIBucketReader)
|
implements(RIBucketReader)
|
||||||
|
|
||||||
def __init__(self, home):
|
def __init__(self, sharefname):
|
||||||
self._share_file = ShareFile(home)
|
self._share_file = ShareFile(sharefname)
|
||||||
|
|
||||||
def remote_read(self, offset, length):
|
def remote_read(self, offset, length):
|
||||||
return self._share_file.read_share_data(offset, length)
|
return self._share_file.read_share_data(offset, length)
|
||||||
@ -719,7 +729,7 @@ class StorageServer(service.MultiService, Referenceable):
|
|||||||
# to a particular owner.
|
# to a particular owner.
|
||||||
alreadygot = set()
|
alreadygot = set()
|
||||||
bucketwriters = {} # k: shnum, v: BucketWriter
|
bucketwriters = {} # k: shnum, v: BucketWriter
|
||||||
si_s = idlib.b2a(storage_index)
|
si_s = storage_index_to_dir(storage_index)
|
||||||
|
|
||||||
# in this implementation, the lease information (including secrets)
|
# in this implementation, the lease information (including secrets)
|
||||||
# goes into the share files themselves. It could also be put into a
|
# goes into the share files themselves. It could also be put into a
|
||||||
@ -752,7 +762,6 @@ class StorageServer(service.MultiService, Referenceable):
|
|||||||
pass
|
pass
|
||||||
elif no_limits or remaining_space >= space_per_bucket:
|
elif no_limits or remaining_space >= space_per_bucket:
|
||||||
# ok! we need to create the new share file.
|
# ok! we need to create the new share file.
|
||||||
fileutil.make_dirs(os.path.join(self.incomingdir, si_s))
|
|
||||||
bw = BucketWriter(self, incominghome, finalhome,
|
bw = BucketWriter(self, incominghome, finalhome,
|
||||||
space_per_bucket, lease_info, canary)
|
space_per_bucket, lease_info, canary)
|
||||||
if self.no_storage:
|
if self.no_storage:
|
||||||
@ -792,7 +801,7 @@ class StorageServer(service.MultiService, Referenceable):
|
|||||||
raise IndexError("no such lease to renew")
|
raise IndexError("no such lease to renew")
|
||||||
|
|
||||||
def remote_cancel_lease(self, storage_index, cancel_secret):
|
def remote_cancel_lease(self, storage_index, cancel_secret):
|
||||||
storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
|
storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
|
||||||
|
|
||||||
remaining_files = 0
|
remaining_files = 0
|
||||||
total_space_freed = 0
|
total_space_freed = 0
|
||||||
@ -844,7 +853,7 @@ class StorageServer(service.MultiService, Referenceable):
|
|||||||
"""Return a list of (shnum, pathname) tuples for files that hold
|
"""Return a list of (shnum, pathname) tuples for files that hold
|
||||||
shares for this storage_index. In each tuple, 'shnum' will always be
|
shares for this storage_index. In each tuple, 'shnum' will always be
|
||||||
the integer form of the last component of 'pathname'."""
|
the integer form of the last component of 'pathname'."""
|
||||||
storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
|
storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
|
||||||
try:
|
try:
|
||||||
for f in os.listdir(storagedir):
|
for f in os.listdir(storagedir):
|
||||||
if NUM_RE.match(f):
|
if NUM_RE.match(f):
|
||||||
@ -855,7 +864,7 @@ class StorageServer(service.MultiService, Referenceable):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def _get_incoming_shares(self, storage_index):
|
def _get_incoming_shares(self, storage_index):
|
||||||
incomingdir = os.path.join(self.incomingdir, idlib.b2a(storage_index))
|
incomingdir = os.path.join(self.incomingdir, storage_index_to_dir(storage_index))
|
||||||
try:
|
try:
|
||||||
for f in os.listdir(incomingdir):
|
for f in os.listdir(incomingdir):
|
||||||
if NUM_RE.match(f):
|
if NUM_RE.match(f):
|
||||||
@ -891,7 +900,7 @@ class StorageServer(service.MultiService, Referenceable):
|
|||||||
secrets,
|
secrets,
|
||||||
test_and_write_vectors,
|
test_and_write_vectors,
|
||||||
read_vector):
|
read_vector):
|
||||||
si_s = idlib.b2a(storage_index)
|
si_s = storage_index_to_dir(storage_index)
|
||||||
(write_enabler, renew_secret, cancel_secret) = secrets
|
(write_enabler, renew_secret, cancel_secret) = secrets
|
||||||
# shares exist if there is a file for them
|
# shares exist if there is a file for them
|
||||||
bucketdir = os.path.join(self.sharedir, si_s)
|
bucketdir = os.path.join(self.sharedir, si_s)
|
||||||
@ -968,7 +977,7 @@ class StorageServer(service.MultiService, Referenceable):
|
|||||||
return share
|
return share
|
||||||
|
|
||||||
def remote_slot_readv(self, storage_index, shares, readv):
|
def remote_slot_readv(self, storage_index, shares, readv):
|
||||||
si_s = idlib.b2a(storage_index)
|
si_s = storage_index_to_dir(storage_index)
|
||||||
# shares exist if there is a file for them
|
# shares exist if there is a file for them
|
||||||
bucketdir = os.path.join(self.sharedir, si_s)
|
bucketdir = os.path.join(self.sharedir, si_s)
|
||||||
if not os.path.isdir(bucketdir):
|
if not os.path.isdir(bucketdir):
|
||||||
|
@ -7,7 +7,8 @@ import itertools
|
|||||||
from allmydata import interfaces
|
from allmydata import interfaces
|
||||||
from allmydata.util import fileutil, hashutil, idlib
|
from allmydata.util import fileutil, hashutil, idlib
|
||||||
from allmydata.storage import BucketWriter, BucketReader, \
|
from allmydata.storage import BucketWriter, BucketReader, \
|
||||||
WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile
|
WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \
|
||||||
|
storage_index_to_dir
|
||||||
from allmydata.interfaces import BadWriteEnablerError
|
from allmydata.interfaces import BadWriteEnablerError
|
||||||
from allmydata.test.common import LoggingServiceParent
|
from allmydata.test.common import LoggingServiceParent
|
||||||
|
|
||||||
@ -56,7 +57,7 @@ class Bucket(unittest.TestCase):
|
|||||||
bw.remote_close()
|
bw.remote_close()
|
||||||
|
|
||||||
# now read from it
|
# now read from it
|
||||||
br = BucketReader(final)
|
br = BucketReader(bw.finalhome)
|
||||||
self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
|
self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
|
||||||
self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
|
self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
|
||||||
self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
|
self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
|
||||||
@ -93,7 +94,7 @@ class BucketProxy(unittest.TestCase):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def test_create(self):
|
def test_create(self):
|
||||||
bw, rb, final = self.make_bucket("test_create", 500)
|
bw, rb, sharefname = self.make_bucket("test_create", 500)
|
||||||
bp = WriteBucketProxy(rb,
|
bp = WriteBucketProxy(rb,
|
||||||
data_size=300,
|
data_size=300,
|
||||||
segment_size=10,
|
segment_size=10,
|
||||||
@ -123,7 +124,7 @@ class BucketProxy(unittest.TestCase):
|
|||||||
for i in (1,9,13)]
|
for i in (1,9,13)]
|
||||||
uri_extension = "s" + "E"*498 + "e"
|
uri_extension = "s" + "E"*498 + "e"
|
||||||
|
|
||||||
bw, rb, final = self.make_bucket("test_readwrite", 1414)
|
bw, rb, sharefname = self.make_bucket("test_readwrite", 1414)
|
||||||
bp = WriteBucketProxy(rb,
|
bp = WriteBucketProxy(rb,
|
||||||
data_size=95,
|
data_size=95,
|
||||||
segment_size=25,
|
segment_size=25,
|
||||||
@ -146,7 +147,7 @@ class BucketProxy(unittest.TestCase):
|
|||||||
|
|
||||||
# now read everything back
|
# now read everything back
|
||||||
def _start_reading(res):
|
def _start_reading(res):
|
||||||
br = BucketReader(final)
|
br = BucketReader(sharefname)
|
||||||
rb = RemoteBucket()
|
rb = RemoteBucket()
|
||||||
rb.target = br
|
rb.target = br
|
||||||
rbp = ReadBucketProxy(rb)
|
rbp = ReadBucketProxy(rb)
|
||||||
@ -212,16 +213,40 @@ class Server(unittest.TestCase):
|
|||||||
renew_secret, cancel_secret,
|
renew_secret, cancel_secret,
|
||||||
sharenums, size, FakeCanary())
|
sharenums, size, FakeCanary())
|
||||||
|
|
||||||
|
def test_dont_overfill_dirs(self):
|
||||||
|
"""
|
||||||
|
This test asserts that if you add a second share whose storage index
|
||||||
|
share lots of leading bits with an extant share (but isn't the exact
|
||||||
|
same storage index), this won't add an entry to the share directory.
|
||||||
|
"""
|
||||||
|
ss = self.create("test_dont_overfill_dirs")
|
||||||
|
already, writers = self.allocate(ss, "storageindex", [0], 10)
|
||||||
|
for i, wb in writers.items():
|
||||||
|
wb.remote_write(0, "%10d" % i)
|
||||||
|
wb.remote_close()
|
||||||
|
storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
|
||||||
|
"shares")
|
||||||
|
children_of_storedir = set(os.listdir(storedir))
|
||||||
|
|
||||||
|
# Now store another one under another storageindex that has leading
|
||||||
|
# chars the same as the first storageindex.
|
||||||
|
already, writers = self.allocate(ss, "storageindey", [0], 10)
|
||||||
|
for i, wb in writers.items():
|
||||||
|
wb.remote_write(0, "%10d" % i)
|
||||||
|
wb.remote_close()
|
||||||
|
storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
|
||||||
|
"shares")
|
||||||
|
new_children_of_storedir = set(os.listdir(storedir))
|
||||||
|
self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
|
||||||
|
|
||||||
def test_remove_incoming(self):
|
def test_remove_incoming(self):
|
||||||
ss = self.create("test_remove_incoming")
|
ss = self.create("test_remove_incoming")
|
||||||
already, writers = self.allocate(ss, "vid", range(3), 10)
|
already, writers = self.allocate(ss, "vid", range(3), 10)
|
||||||
for i,wb in writers.items():
|
for i,wb in writers.items():
|
||||||
wb.remote_write(0, "%10d" % i)
|
wb.remote_write(0, "%10d" % i)
|
||||||
wb.remote_close()
|
wb.remote_close()
|
||||||
incomingdir = os.path.join(self.workdir("test_remove_incoming"),
|
incomingdir = os.path.dirname(os.path.dirname(os.path.dirname(wb.incominghome)))
|
||||||
"shares", "incoming")
|
self.failIf(os.path.exists(incomingdir))
|
||||||
leftover_dirs = os.listdir(incomingdir)
|
|
||||||
self.failUnlessEqual(leftover_dirs, [])
|
|
||||||
|
|
||||||
def test_allocate(self):
|
def test_allocate(self):
|
||||||
ss = self.create("test_allocate")
|
ss = self.create("test_allocate")
|
||||||
@ -785,7 +810,7 @@ class MutableServer(unittest.TestCase):
|
|||||||
# create a random non-numeric file in the bucket directory, to
|
# create a random non-numeric file in the bucket directory, to
|
||||||
# exercise the code that's supposed to ignore those.
|
# exercise the code that's supposed to ignore those.
|
||||||
bucket_dir = os.path.join(self.workdir("test_leases"),
|
bucket_dir = os.path.join(self.workdir("test_leases"),
|
||||||
"shares", idlib.b2a("si1"))
|
"shares", storage_index_to_dir("si1"))
|
||||||
f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
|
f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
|
||||||
f.write("you ought to be ignoring me\n")
|
f.write("you ought to be ignoring me\n")
|
||||||
f.close()
|
f.close()
|
||||||
|
@ -372,7 +372,7 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
|
|||||||
for i in range(self.numclients):
|
for i in range(self.numclients):
|
||||||
incdir = os.path.join(self.getdir("client%d" % i),
|
incdir = os.path.join(self.getdir("client%d" % i),
|
||||||
"storage", "shares", "incoming")
|
"storage", "shares", "incoming")
|
||||||
self.failUnlessEqual(os.listdir(incdir), [])
|
self.failIf(os.path.exists(incdir) and os.listdir(incdir))
|
||||||
d.addCallback(_disconnected)
|
d.addCallback(_disconnected)
|
||||||
|
|
||||||
def _wait_for_reconnect(res):
|
def _wait_for_reconnect(res):
|
||||||
@ -442,11 +442,11 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
|
|||||||
if not filenames:
|
if not filenames:
|
||||||
continue
|
continue
|
||||||
pieces = dirpath.split(os.sep)
|
pieces = dirpath.split(os.sep)
|
||||||
if pieces[-3] == "storage" and pieces[-2] == "shares":
|
if pieces[-4] == "storage" and pieces[-3] == "shares":
|
||||||
# we're sitting in .../storage/shares/$SINDEX , and there
|
# we're sitting in .../storage/shares/$START/$SINDEX , and there
|
||||||
# are sharefiles here
|
# are sharefiles here
|
||||||
assert pieces[-4].startswith("client")
|
assert pieces[-5].startswith("client")
|
||||||
client_num = int(pieces[-4][-1])
|
client_num = int(pieces[-5][-1])
|
||||||
storage_index_s = pieces[-1]
|
storage_index_s = pieces[-1]
|
||||||
storage_index = idlib.a2b(storage_index_s)
|
storage_index = idlib.a2b(storage_index_s)
|
||||||
for sharename in filenames:
|
for sharename in filenames:
|
||||||
@ -1115,9 +1115,9 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
|
|||||||
if not filenames:
|
if not filenames:
|
||||||
continue
|
continue
|
||||||
pieces = dirpath.split(os.sep)
|
pieces = dirpath.split(os.sep)
|
||||||
if pieces[-3] == "storage" and pieces[-2] == "shares":
|
if pieces[-4] == "storage" and pieces[-3] == "shares":
|
||||||
# we're sitting in .../storage/shares/$SINDEX , and there are
|
# we're sitting in .../storage/shares/$START/$SINDEX , and there
|
||||||
# sharefiles here
|
# are sharefiles here
|
||||||
filename = os.path.join(dirpath, filenames[0])
|
filename = os.path.join(dirpath, filenames[0])
|
||||||
# peek at the magic to see if it is a chk share
|
# peek at the magic to see if it is a chk share
|
||||||
magic = open(filename, "rb").read(4)
|
magic = open(filename, "rb").read(4)
|
||||||
|
Loading…
Reference in New Issue
Block a user