storage: replace sizelimit with reserved_space, make the stats 'disk_avail' number incorporate this reservation

This commit is contained in:
Brian Warner 2008-12-01 17:24:21 -07:00
parent 39a41f1d85
commit cfba882b30
12 changed files with 172 additions and 131 deletions

12
NEWS
View File

@ -65,6 +65,17 @@ tahoe.cfg now has controls for the encoding parameters: "shares.needed" and
"shares.total" in the "[client]" section. The default parameters are still
3-of-10.
The inefficient storage 'sizelimit' control (which established an upper bound
on the amount of space that a storage server is allowed to consume) has been
replaced by a lightweight 'reserved_space' control (which establishes a lower
bound on the amount of remaining space). The storage server will reject all
writes that would cause the remaining disk space (as measured by a '/bin/df'
equivalent) to drop below this value. The "[storage]reserved_space="
tahoe.cfg parameter controls this setting. (note that this only affects
immutable shares: it is an outstanding bug that reserved_space does not
prevent the allocation of new mutable shares, nor does it prevent the growth
of existing mutable shares).
** CLI Changes
This release adds the 'tahoe create-alias' command, which is a combination of
@ -317,6 +328,7 @@ docs/frontends/FTP-and-SFTP.txt for configuration details. (#512, #531)
The Mac GUI in src/allmydata/gui/ has been improved.
* Release 1.2.0 (2008-07-21)
** Security

View File

@ -287,23 +287,16 @@ readonly = (boolean, optional)
written and modified anyway. See ticket #390 for the current status of this
bug. The default value is False.
sizelimit = (str, optional)
reserved_space = (str, optional)
If provided, this value establishes an upper bound (in bytes) on the amount
of storage consumed by share data (data that your node holds on behalf of
clients that are uploading files to the grid). To avoid providing more than
100MB of data to other clients, set this key to "100MB". Note that this is a
fairly loose bound, and the node may occasionally use slightly more storage
than this. To enforce a stronger (and possibly more reliable) limit, use a
symlink to place the 'storage/' directory on a separate size-limited
filesystem, and/or use per-user OS/filesystem quotas. If a size limit is
specified then Tahoe will do a "du" at startup (traversing all the storage
and summing the sizes of the files), which can take a long time if there are
a lot of shares stored.
If provided, this value defines how much disk space is reserved: the storage
server will not accept any share which causes the amount of free space (as
measured by 'df', or more specifically statvfs(2)) to drop below this value.
This string contains a number, with an optional case-insensitive scale
suffix like "K" or "M" or "G", and an optional "B" suffix. So "100MB",
"100M", "100000000B", "100000000", and "100000kb" all mean the same thing.
suffix like "K" or "M" or "G", and an optional "B" or "iB" suffix. So
"100MB", "100M", "100000000B", "100000000", and "100000kb" all mean the same
thing. Likewise, "1MiB", "1024KiB", and "1048576B" all mean the same thing.
== Running A Helper ==

View File

@ -1,5 +1,5 @@
import os, stat, time, re, weakref
import os, stat, time, weakref
from allmydata.interfaces import RIStorageServer
from allmydata import node
@ -19,6 +19,7 @@ from allmydata.offloaded import Helper
from allmydata.control import ControlServer
from allmydata.introducer.client import IntroducerClient
from allmydata.util import hashutil, base32, pollmixin, cachedir
from allmydata.util.abbreviate import parse_abbreviated_size
from allmydata.uri import LiteralFileURI
from allmydata.dirnode import NewDirectoryNode
from allmydata.mutable.node import MutableFileNode, MutableWatcher
@ -105,7 +106,6 @@ class Client(node.Node, pollmixin.PollMixin):
self.set_config("storage", "enabled", "false")
if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
self.set_config("storage", "readonly", "true")
copy("sizelimit", "storage", "sizelimit")
if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
self.set_config("storage", "debug_discard", "true")
if os.path.exists(os.path.join(self.basedir, "run_helper")):
@ -151,27 +151,22 @@ class Client(node.Node, pollmixin.PollMixin):
storedir = os.path.join(self.basedir, self.STOREDIR)
sizelimit = None
data = self.get_config("storage", "sizelimit", None)
if data:
m = re.match(r"^(\d+)([kKmMgG]?[bB]?)$", data)
if not m:
log.msg("SIZELIMIT_FILE contains unparseable value %s" % data)
else:
number, suffix = m.groups()
suffix = suffix.upper()
if suffix.endswith("B"):
suffix = suffix[:-1]
multiplier = {"": 1,
"K": 1000,
"M": 1000 * 1000,
"G": 1000 * 1000 * 1000,
}[suffix]
sizelimit = int(number) * multiplier
data = self.get_config("storage", "reserved_space", None)
reserved = None
try:
reserved = parse_abbreviated_size(data)
except ValueError:
log.msg("[storage]reserved_space= contains unparseable value %s"
% data)
if reserved is None:
reserved = 0
discard = self.get_config("storage", "debug_discard", False,
boolean=True)
ss = StorageServer(storedir, sizelimit, discard, readonly,
self.stats_provider)
ss = StorageServer(storedir,
reserved_space=reserved,
discard_storage=discard,
readonly_storage=readonly,
stats_provider=self.stats_provider)
self.add_service(ss)
d = self.when_tub_ready()
# we can't do registerReference until the Tub is ready

View File

@ -114,7 +114,7 @@ def create_client(basedir, config, out=sys.stdout, err=sys.stderr):
storage_enabled = not config.get("no-storage", None)
c.write("enabled = %s\n" % boolstr[storage_enabled])
c.write("#readonly =\n")
c.write("#sizelimit =\n")
c.write("#reserved_space =\n")
c.write("\n")
c.write("[helper]\n")

View File

@ -768,7 +768,7 @@ class StorageServer(service.MultiService, Referenceable):
"application-version": str(allmydata.__version__),
}
def __init__(self, storedir, sizelimit=None,
def __init__(self, storedir, reserved_space=0,
discard_storage=False, readonly_storage=False,
stats_provider=None):
service.MultiService.__init__(self)
@ -779,7 +779,7 @@ class StorageServer(service.MultiService, Referenceable):
# we don't actually create the corruption-advisory dir until necessary
self.corruption_advisory_dir = os.path.join(storedir,
"corruption-advisories")
self.sizelimit = sizelimit
self.reserved_space = int(reserved_space)
self.no_storage = discard_storage
self.readonly_storage = readonly_storage
self.stats_provider = stats_provider
@ -789,14 +789,13 @@ class StorageServer(service.MultiService, Referenceable):
self._clean_incomplete()
fileutil.make_dirs(self.incomingdir)
self._active_writers = weakref.WeakKeyDictionary()
lp = log.msg("StorageServer created, now measuring space..",
facility="tahoe.storage")
self.consumed = None
if self.sizelimit:
self.consumed = fileutil.du(self.sharedir)
log.msg(format="space measurement done, consumed=%(consumed)d bytes",
consumed=self.consumed,
parent=lp, facility="tahoe.storage")
lp = log.msg("StorageServer created", facility="tahoe.storage")
if reserved_space:
if self.get_available_space() is None:
log.msg("warning: [storage]reserved_space= is set, but this platform does not support statvfs(2), so this reservation cannot be honored",
umin="0wZ27w", level=log.UNUSUAL)
self.latencies = {"allocate": [], # immutable
"write": [],
"close": [],
@ -868,21 +867,26 @@ class StorageServer(service.MultiService, Referenceable):
def get_stats(self):
stats = { 'storage_server.allocated': self.allocated_size(), }
if self.consumed is not None:
stats['storage_server.consumed'] = self.consumed
for category,ld in self.get_latencies().items():
for name,v in ld.items():
stats['storage_server.latencies.%s.%s' % (category, name)] = v
writeable = True
try:
s = os.statvfs(self.storedir)
disk_total = s.f_bsize * s.f_blocks
disk_used = s.f_bsize * (s.f_blocks - s.f_bfree)
# spacetime predictors should look at the slope of disk_used.
disk_avail = s.f_bsize * s.f_bavail # available to non-root users
# TODO: include our local policy here: if we stop accepting
# shares when the available space drops below 1GB, then include
# that fact in disk_avail.
#
# include our local policy here: if we stop accepting shares when
# the available space drops below 1GB, then include that fact in
# disk_avail.
disk_avail -= self.reserved_space
disk_avail = max(disk_avail, 0)
if self.readonly_storage:
disk_avail = 0
if disk_avail == 0:
writeable = False
# spacetime predictors should use disk_avail / (d(disk_used)/dt)
stats["storage_server.disk_total"] = disk_total
stats["storage_server.disk_used"] = disk_used
@ -890,10 +894,29 @@ class StorageServer(service.MultiService, Referenceable):
except AttributeError:
# os.statvfs is only available on unix
pass
stats["storage_server.accepting_immutable_shares"] = writeable
return stats
def stat_disk(self, d):
s = os.statvfs(d)
# s.f_bavail: available to non-root users
disk_avail = s.f_bsize * s.f_bavail
return disk_avail
def get_available_space(self):
# returns None if it cannot be measured (windows)
try:
disk_avail = self.stat_disk(self.storedir)
except AttributeError:
return None
disk_avail -= self.reserved_space
if self.readonly_storage:
disk_avail = 0
return disk_avail
def allocated_size(self):
space = self.consumed or 0
space = 0
for bw in self._active_writers:
space += bw.allocated_size()
return space
@ -927,10 +950,14 @@ class StorageServer(service.MultiService, Referenceable):
expire_time, self.my_nodeid)
space_per_bucket = allocated_size
no_limits = self.sizelimit is None
yes_limits = not no_limits
if yes_limits:
remaining_space = self.sizelimit - self.allocated_size()
remaining_space = self.get_available_space()
limited = remaining_space is not None
if limited:
# this is a bit conservative, since some of this allocated_size()
# has already been written to disk, where it will show up in
# get_available_space.
remaining_space -= self.allocated_size()
# fill alreadygot with all shares that we have, not just the ones
# they asked about: this will save them a lot of work. Add or update
@ -941,10 +968,7 @@ class StorageServer(service.MultiService, Referenceable):
sf = ShareFile(fn)
sf.add_or_renew_lease(lease_info)
if self.readonly_storage:
# we won't accept new shares
self.add_latency("allocate", time.time() - start)
return alreadygot, bucketwriters
# self.readonly_storage causes remaining_space=0
for shnum in sharenums:
incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
@ -958,7 +982,7 @@ class StorageServer(service.MultiService, Referenceable):
# occurs while the first is still in progress, the second
# uploader will use different storage servers.
pass
elif no_limits or remaining_space >= space_per_bucket:
elif (not limited) or (remaining_space >= space_per_bucket):
# ok! we need to create the new share file.
bw = BucketWriter(self, incominghome, finalhome,
space_per_bucket, lease_info, canary)
@ -966,7 +990,7 @@ class StorageServer(service.MultiService, Referenceable):
bw.throw_out_all_data = True
bucketwriters[shnum] = bw
self._active_writers[bw] = 1
if yes_limits:
if limited:
remaining_space -= space_per_bucket
else:
# bummer! not enough space to accept this bucket
@ -1047,8 +1071,6 @@ class StorageServer(service.MultiService, Referenceable):
if not os.listdir(storagedir):
os.rmdir(storagedir)
if self.consumed is not None:
self.consumed -= total_space_freed
if self.stats_provider:
self.stats_provider.count('storage_server.bytes_freed',
total_space_freed)
@ -1057,8 +1079,6 @@ class StorageServer(service.MultiService, Referenceable):
raise IndexError("no such storage index")
def bucket_writer_closed(self, bw, consumed_size):
if self.consumed is not None:
self.consumed += consumed_size
if self.stats_provider:
self.stats_provider.count('storage_server.bytes_added', consumed_size)
del self._active_writers[bw]

View File

@ -198,8 +198,8 @@ class SystemFramework(pollmixin.PollMixin):
if self.mode in ("receive",):
# for this mode, the client-under-test gets all the shares,
# so our internal nodes can refuse requests
f = open(os.path.join(nodedir, "sizelimit"), "w")
f.write("0\n")
f = open(os.path.join(nodedir, "readonly_storage"), "w")
f.write("\n")
f.close()
c = self.add_service(client.Client(basedir=nodedir))
self.nodes.append(c)

View File

@ -407,7 +407,6 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
# client[0] runs a webserver and a helper, no key_generator
write("webport", "tcp:0:interface=127.0.0.1")
write("run_helper", "yes")
write("sizelimit", "10GB")
write("keepalive_timeout", "600")
if i == 3:
# client[3] runs a webserver and uses a helper, uses

View File

@ -338,7 +338,7 @@ class Put(SystemTestMixin, CLITestMixin, unittest.TestCase):
def _uploaded(res):
(stdout, stderr) = res
self.failUnless("waiting for file data on stdin.." in stderr)
self.failUnless("200 OK" in stderr)
self.failUnless("200 OK" in stderr, stderr)
self.readcap = stdout
self.failUnless(self.readcap.startswith("URI:CHK:"))
d.addCallback(_uploaded)

View File

@ -74,52 +74,71 @@ class Basic(unittest.TestCase):
cancel_secret = c.get_cancel_secret()
self.failUnless(base32.b2a(cancel_secret))
def test_sizelimit_1(self):
basedir = "client.Basic.test_sizelimit_1"
os.mkdir(basedir)
open(os.path.join(basedir, "introducer.furl"), "w").write("")
open(os.path.join(basedir, "vdrive.furl"), "w").write("")
open(os.path.join(basedir, "sizelimit"), "w").write("1000")
c = client.Client(basedir)
self.failUnlessEqual(c.getServiceNamed("storage").sizelimit, 1000)
BASECONFIG = ("[client]\n"
"introducer.furl = \n"
)
def test_sizelimit_2(self):
basedir = "client.Basic.test_sizelimit_2"
def test_reserved_1(self):
basedir = "client.Basic.test_reserved_1"
os.mkdir(basedir)
open(os.path.join(basedir, "introducer.furl"), "w").write("")
open(os.path.join(basedir, "vdrive.furl"), "w").write("")
open(os.path.join(basedir, "sizelimit"), "w").write("10K")
f = open(os.path.join(basedir, "tahoe.cfg"), "w")
f.write(self.BASECONFIG)
f.write("[storage]\n")
f.write("enabled = true\n")
f.write("reserved_space = 1000\n")
f.close()
c = client.Client(basedir)
self.failUnlessEqual(c.getServiceNamed("storage").sizelimit, 10*1000)
self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 1000)
def test_sizelimit_3(self):
basedir = "client.Basic.test_sizelimit_3"
def test_reserved_2(self):
basedir = "client.Basic.test_reserved_2"
os.mkdir(basedir)
open(os.path.join(basedir, "introducer.furl"), "w").write("")
open(os.path.join(basedir, "vdrive.furl"), "w").write("")
open(os.path.join(basedir, "sizelimit"), "w").write("5mB")
f = open(os.path.join(basedir, "tahoe.cfg"), "w")
f.write(self.BASECONFIG)
f.write("[storage]\n")
f.write("enabled = true\n")
f.write("reserved_space = 10K\n")
f.close()
c = client.Client(basedir)
self.failUnlessEqual(c.getServiceNamed("storage").sizelimit,
self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 10*1000)
def test_reserved_3(self):
basedir = "client.Basic.test_reserved_3"
os.mkdir(basedir)
f = open(os.path.join(basedir, "tahoe.cfg"), "w")
f.write(self.BASECONFIG)
f.write("[storage]\n")
f.write("enabled = true\n")
f.write("reserved_space = 5mB\n")
f.close()
c = client.Client(basedir)
self.failUnlessEqual(c.getServiceNamed("storage").reserved_space,
5*1000*1000)
def test_sizelimit_4(self):
basedir = "client.Basic.test_sizelimit_4"
def test_reserved_4(self):
basedir = "client.Basic.test_reserved_4"
os.mkdir(basedir)
open(os.path.join(basedir, "introducer.furl"), "w").write("")
open(os.path.join(basedir, "vdrive.furl"), "w").write("")
open(os.path.join(basedir, "sizelimit"), "w").write("78Gb")
f = open(os.path.join(basedir, "tahoe.cfg"), "w")
f.write(self.BASECONFIG)
f.write("[storage]\n")
f.write("enabled = true\n")
f.write("reserved_space = 78Gb\n")
f.close()
c = client.Client(basedir)
self.failUnlessEqual(c.getServiceNamed("storage").sizelimit,
self.failUnlessEqual(c.getServiceNamed("storage").reserved_space,
78*1000*1000*1000)
def test_sizelimit_bad(self):
basedir = "client.Basic.test_sizelimit_bad"
def test_reserved_bad(self):
basedir = "client.Basic.test_reserved_bad"
os.mkdir(basedir)
open(os.path.join(basedir, "introducer.furl"), "w").write("")
open(os.path.join(basedir, "vdrive.furl"), "w").write("")
open(os.path.join(basedir, "sizelimit"), "w").write("bogus")
f = open(os.path.join(basedir, "tahoe.cfg"), "w")
f.write(self.BASECONFIG)
f.write("[storage]\n")
f.write("enabled = true\n")
f.write("reserved_space = bogus\n")
f.close()
c = client.Client(basedir)
self.failUnlessEqual(c.getServiceNamed("storage").sizelimit, None)
self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0)
def _permute(self, c, key):
return [ peerid

View File

@ -225,6 +225,10 @@ class BucketProxy(unittest.TestCase):
return self._do_test_readwrite("test_readwrite_v2",
0x44, WriteBucketProxy_v2, ReadBucketProxy)
class FakeDiskStorageServer(StorageServer):
def stat_disk(self, d):
return self.DISKAVAIL
class Server(unittest.TestCase):
def setUp(self):
@ -237,10 +241,10 @@ class Server(unittest.TestCase):
basedir = os.path.join("storage", "Server", name)
return basedir
def create(self, name, sizelimit=None):
def create(self, name, reserved_space=0, klass=StorageServer):
workdir = self.workdir(name)
ss = StorageServer(workdir, sizelimit,
stats_provider=FakeStatsProvider())
ss = klass(workdir, reserved_space=reserved_space,
stats_provider=FakeStatsProvider())
ss.setNodeID("\x00" * 20)
ss.setServiceParent(self.sparent)
return ss
@ -365,8 +369,13 @@ class Server(unittest.TestCase):
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
def test_sizelimits(self):
ss = self.create("test_sizelimits", 5000)
def test_reserved_space(self):
ss = self.create("test_reserved_space", reserved_space=10000,
klass=FakeDiskStorageServer)
# the FakeDiskStorageServer doesn't do real statvfs() calls
ss.DISKAVAIL = 15000
# 15k available, 10k reserved, leaves 5k for shares
# a newly created and filled share incurs this much overhead, beyond
# the size we request.
OVERHEAD = 3*4
@ -402,6 +411,11 @@ class Server(unittest.TestCase):
self.failUnlessEqual(len(ss._active_writers), 0)
allocated = 1001 + OVERHEAD + LEASE_SIZE
# we have to manually increase DISKAVAIL, since we're not doing real
# disk measurements
ss.DISKAVAIL -= allocated
# now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
# 5000-1085=3915 free, therefore we can fit 39 100byte shares
already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
@ -414,19 +428,6 @@ class Server(unittest.TestCase):
ss.disownServiceParent()
del ss
# creating a new StorageServer in the same directory should see the
# same usage.
# metadata that goes into the share file is counted upon share close,
# as well as at startup. metadata that goes into other files will not
# be counted until the next startup, so if we were creating any
# extra-file metadata, the allocation would be more than 'allocated'
# and this test would need to be changed.
ss = self.create("test_sizelimits", 5000)
already4,writers4 = self.allocate(ss, "vid4", range(100), 100, canary)
self.failUnlessEqual(len(writers4), 39)
self.failUnlessEqual(len(ss._active_writers), 39)
def test_seek(self):
basedir = self.workdir("test_seek_behavior")
fileutil.make_dirs(basedir)
@ -571,6 +572,11 @@ class Server(unittest.TestCase):
self.failUnlessEqual(already, set())
self.failUnlessEqual(writers, {})
stats = ss.get_stats()
self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
False)
self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
def test_discard(self):
# discard is really only used for other tests, but we test it anyways
workdir = self.workdir("test_discard")
@ -651,9 +657,9 @@ class MutableServer(unittest.TestCase):
basedir = os.path.join("storage", "MutableServer", name)
return basedir
def create(self, name, sizelimit=None):
def create(self, name):
workdir = self.workdir(name)
ss = StorageServer(workdir, sizelimit)
ss = StorageServer(workdir)
ss.setServiceParent(self.sparent)
ss.setNodeID("\x00" * 20)
return ss
@ -1010,7 +1016,7 @@ class MutableServer(unittest.TestCase):
self.failUnlessEqual(a.expiration_time, b.expiration_time)
def test_leases(self):
ss = self.create("test_leases", sizelimit=1000*1000)
ss = self.create("test_leases")
def secrets(n):
return ( self.write_enabler("we1"),
self.renew_secret("we1-%d" % n),
@ -1146,9 +1152,9 @@ class Stats(unittest.TestCase):
basedir = os.path.join("storage", "Server", name)
return basedir
def create(self, name, sizelimit=None):
def create(self, name):
workdir = self.workdir(name)
ss = StorageServer(workdir, sizelimit)
ss = StorageServer(workdir)
ss.setNodeID("\x00" * 20)
ss.setServiceParent(self.sparent)
return ss

View File

@ -249,7 +249,6 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
add_to_sparent=True))
def _added(extra_node):
self.extra_node = extra_node
extra_node.getServiceNamed("storage").sizelimit = 0
d.addCallback(_added)
HELPER_DATA = "Data that needs help to upload" * 1000

View File

@ -153,10 +153,8 @@ class Root(rend.Page):
ss = client.getServiceNamed("storage")
allocated_s = abbreviate_size(ss.allocated_size())
allocated = "about %s allocated" % allocated_s
sizelimit = "no size limit"
if ss.sizelimit is not None:
sizelimit = "size limit is %s" % abbreviate_size(ss.sizelimit)
ul[T.li["Storage Server: %s, %s" % (allocated, sizelimit)]]
reserved = "%s reserved" % abbreviate_size(ss.reserved_space)
ul[T.li["Storage Server: %s, %s" % (allocated, reserved)]]
except KeyError:
ul[T.li["Not running storage server"]]