620 lines
27 KiB
Python
Raw Normal View History

import os, re, weakref, struct, time
from foolscap.api import Referenceable
from twisted.application import service
from zope.interface import implements
from allmydata.interfaces import RIStorageServer, IStatsProducer
from allmydata.util import fileutil, log, time_format
import allmydata # for __full_version__
from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
_pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
from allmydata.storage.lease import LeaseInfo
from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
create_mutable_sharefile
from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
from allmydata.storage.crawler import BucketCountingCrawler
from allmydata.storage.expirer import LeaseCheckingCrawler
# storage/
# storage/shares/incoming
# incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
# be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
# storage/shares/$START/$STORAGEINDEX
# storage/shares/$START/$STORAGEINDEX/$SHARENUM
# Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
# base-32 chars).
# $SHARENUM matches this regex:
NUM_RE=re.compile("^[0-9]+$")
class StorageServer(service.MultiService, Referenceable):
implements(RIStorageServer, IStatsProducer)
name = 'storage'
LeaseCheckerClass = LeaseCheckingCrawler
windows = False
try:
import win32api, win32con
windows = True
# <http://msdn.microsoft.com/en-us/library/ms680621%28VS.85%29.aspx>
win32api.SetErrorMode(win32con.SEM_FAILCRITICALERRORS |
win32con.SEM_NOOPENFILEERRORBOX)
except ImportError:
pass
def __init__(self, storedir, nodeid, reserved_space=0,
discard_storage=False, readonly_storage=False,
stats_provider=None,
expiration_enabled=False,
expiration_mode="age",
expiration_override_lease_duration=None,
expiration_cutoff_date=None,
expiration_sharetypes=("mutable", "immutable")):
service.MultiService.__init__(self)
assert isinstance(nodeid, str)
assert len(nodeid) == 20
self.my_nodeid = nodeid
self.storedir = storedir
sharedir = os.path.join(storedir, "shares")
fileutil.make_dirs(sharedir)
self.sharedir = sharedir
# we don't actually create the corruption-advisory dir until necessary
self.corruption_advisory_dir = os.path.join(storedir,
"corruption-advisories")
self.reserved_space = int(reserved_space)
self.no_storage = discard_storage
self.readonly_storage = readonly_storage
self.stats_provider = stats_provider
if self.stats_provider:
self.stats_provider.register_producer(self)
self.incomingdir = os.path.join(sharedir, 'incoming')
self._clean_incomplete()
fileutil.make_dirs(self.incomingdir)
self._active_writers = weakref.WeakKeyDictionary()
lp = log.msg("StorageServer created", facility="tahoe.storage")
if reserved_space:
if self.get_available_space() is None:
log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
umin="0wZ27w", level=log.UNUSUAL)
self.latencies = {"allocate": [], # immutable
"write": [],
"close": [],
"read": [],
"get": [],
"writev": [], # mutable
"readv": [],
"add-lease": [], # both
"renew": [],
"cancel": [],
}
2009-02-26 19:42:48 -07:00
self.add_bucket_counter()
statefile = os.path.join(self.storedir, "lease_checker.state")
historyfile = os.path.join(self.storedir, "lease_checker.history")
klass = self.LeaseCheckerClass
self.lease_checker = klass(self, statefile, historyfile,
expiration_enabled, expiration_mode,
expiration_override_lease_duration,
expiration_cutoff_date,
expiration_sharetypes)
self.lease_checker.setServiceParent(self)
def add_bucket_counter(self):
statefile = os.path.join(self.storedir, "bucket_counter.state")
self.bucket_counter = BucketCountingCrawler(self, statefile)
self.bucket_counter.setServiceParent(self)
def count(self, name, delta=1):
if self.stats_provider:
self.stats_provider.count("storage_server." + name, delta)
def add_latency(self, category, latency):
a = self.latencies[category]
a.append(latency)
if len(a) > 1000:
self.latencies[category] = a[-1000:]
def get_latencies(self):
"""Return a dict, indexed by category, that contains a dict of
latency numbers for each category. Each dict will contain the
following keys: mean, 01_0_percentile, 10_0_percentile,
50_0_percentile (median), 90_0_percentile, 95_0_percentile,
99_0_percentile, 99_9_percentile. If no samples have been collected
for the given category, then that category name will not be present
in the return value."""
# note that Amazon's Dynamo paper says they use 99.9% percentile.
output = {}
for category in self.latencies:
if not self.latencies[category]:
continue
stats = {}
samples = self.latencies[category][:]
samples.sort()
count = len(samples)
stats["mean"] = sum(samples) / count
stats["01_0_percentile"] = samples[int(0.01 * count)]
stats["10_0_percentile"] = samples[int(0.1 * count)]
stats["50_0_percentile"] = samples[int(0.5 * count)]
stats["90_0_percentile"] = samples[int(0.9 * count)]
stats["95_0_percentile"] = samples[int(0.95 * count)]
stats["99_0_percentile"] = samples[int(0.99 * count)]
stats["99_9_percentile"] = samples[int(0.999 * count)]
output[category] = stats
return output
def log(self, *args, **kwargs):
if "facility" not in kwargs:
kwargs["facility"] = "tahoe.storage"
return log.msg(*args, **kwargs)
def _clean_incomplete(self):
fileutil.rm_dir(self.incomingdir)
def get_disk_stats(self):
"""Return disk statistics for the storage disk, in the form of a dict
with the following fields.
total: total bytes on disk
free_for_root: bytes actually free on disk
free_for_nonroot: bytes free for "a non-privileged user" [Unix] or
the current user [Windows]; might take into
account quotas depending on platform
used: bytes used on disk
avail: bytes available excluding reserved space
An AttributeError can occur if the OS has no API to get disk information.
An EnvironmentError can occur if the OS call fails."""
if self.windows:
# For Windows systems, where os.statvfs is not available, use GetDiskFreeSpaceEx.
# <http://docs.activestate.com/activepython/2.5/pywin32/win32api__GetDiskFreeSpaceEx_meth.html>
#
# Although the docs say that the argument should be the root directory
# of a disk, GetDiskFreeSpaceEx actually accepts any path on that disk
# (like its Win32 equivalent).
(free_for_nonroot, total, free_for_root) = self.win32api.GetDiskFreeSpaceEx(self.storedir)
else:
# For Unix-like systems.
# <http://docs.python.org/library/os.html#os.statvfs>
# <http://opengroup.org/onlinepubs/7990989799/xsh/fstatvfs.html>
# <http://opengroup.org/onlinepubs/7990989799/xsh/sysstatvfs.h.html>
s = os.statvfs(self.storedir)
# on my mac laptop:
# statvfs(2) is a wrapper around statfs(2).
# statvfs.f_frsize = statfs.f_bsize :
# "minimum unit of allocation" (statvfs)
# "fundamental file system block size" (statfs)
# statvfs.f_bsize = statfs.f_iosize = stat.st_blocks : preferred IO size
# on an encrypted home directory ("FileVault"), it gets f_blocks
# wrong, and s.f_blocks*s.f_frsize is twice the size of my disk,
# but s.f_bavail*s.f_frsize is correct
total = s.f_frsize * s.f_blocks
free_for_root = s.f_frsize * s.f_bfree
free_for_nonroot = s.f_frsize * s.f_bavail
# valid for all platforms:
used = total - free_for_root
avail = max(free_for_nonroot - self.reserved_space, 0)
return { 'total': total, 'free_for_root': free_for_root,
'free_for_nonroot': free_for_nonroot,
'used': used, 'avail': avail, }
def get_stats(self):
# remember: RIStatsProvider requires that our return dict
# contains numeric values.
stats = { 'storage_server.allocated': self.allocated_size(), }
stats['storage_server.reserved_space'] = self.reserved_space
for category,ld in self.get_latencies().items():
for name,v in ld.items():
stats['storage_server.latencies.%s.%s' % (category, name)] = v
try:
disk = self.get_disk_stats()
writeable = disk['avail'] > 0
# spacetime predictors should use disk_avail / (d(disk_used)/dt)
stats['storage_server.disk_total'] = disk['total']
stats['storage_server.disk_used'] = disk['used']
stats['storage_server.disk_free_for_root'] = disk['free_for_root']
stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
stats['storage_server.disk_avail'] = disk['avail']
except AttributeError:
writeable = True
except EnvironmentError:
log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
writeable = False
if self.readonly_storage:
stats['storage_server.disk_avail'] = 0
writeable = False
stats['storage_server.accepting_immutable_shares'] = int(writeable)
s = self.bucket_counter.get_state()
bucket_count = s.get("last-complete-bucket-count")
if bucket_count:
stats['storage_server.total_bucket_count'] = bucket_count
return stats
def get_available_space(self):
"""Returns available space for share storage in bytes, or None if no
API to get this information is available."""
if self.readonly_storage:
return 0
try:
return self.get_disk_stats()['avail']
except AttributeError:
return None
except EnvironmentError:
log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
return 0
def allocated_size(self):
space = 0
for bw in self._active_writers:
space += bw.allocated_size()
return space
def remote_get_version(self):
remaining_space = self.get_available_space()
if remaining_space is None:
# We're on a platform that has no API to get disk stats.
remaining_space = 2**64
version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
{ "maximum-immutable-share-size": remaining_space,
"tolerates-immutable-read-overrun": True,
"delete-mutable-shares-with-zero-length-writev": True,
},
"application-version": str(allmydata.__full_version__),
}
return version
def remote_allocate_buckets(self, storage_index,
renew_secret, cancel_secret,
sharenums, allocated_size,
canary, owner_num=0):
# owner_num is not for clients to set, but rather it should be
# curried into the PersonalStorageServer instance that is dedicated
# to a particular owner.
start = time.time()
self.count("allocate")
alreadygot = set()
bucketwriters = {} # k: shnum, v: BucketWriter
si_dir = storage_index_to_dir(storage_index)
si_s = si_b2a(storage_index)
log.msg("storage: allocate_buckets %s" % si_s)
# in this implementation, the lease information (including secrets)
# goes into the share files themselves. It could also be put into a
# separate database. Note that the lease should not be added until
# the BucketWriter has been closed.
expire_time = time.time() + 31*24*60*60
lease_info = LeaseInfo(owner_num,
renew_secret, cancel_secret,
expire_time, self.my_nodeid)
max_space_per_bucket = allocated_size
remaining_space = self.get_available_space()
limited = remaining_space is not None
if limited:
# this is a bit conservative, since some of this allocated_size()
# has already been written to disk, where it will show up in
# get_available_space.
remaining_space -= self.allocated_size()
# fill alreadygot with all shares that we have, not just the ones
# they asked about: this will save them a lot of work. Add or update
# leases for all of them: if they want us to hold shares for this
# file, they'll want us to hold leases for this file.
for (shnum, fn) in self._get_bucket_shares(storage_index):
alreadygot.add(shnum)
sf = ShareFile(fn)
sf.add_or_renew_lease(lease_info)
2009-11-30 11:59:13 -08:00
# self.readonly_storage causes remaining_space=0
for shnum in sharenums:
incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
if os.path.exists(finalhome):
# great! we already have it. easy.
pass
elif os.path.exists(incominghome):
# Note that we don't create BucketWriters for shnums that
# have a partial share (in incoming/), so if a second upload
# occurs while the first is still in progress, the second
# uploader will use different storage servers.
pass
elif (not limited) or (remaining_space >= max_space_per_bucket):
# ok! we need to create the new share file.
bw = BucketWriter(self, incominghome, finalhome,
max_space_per_bucket, lease_info, canary)
if self.no_storage:
bw.throw_out_all_data = True
bucketwriters[shnum] = bw
self._active_writers[bw] = 1
if limited:
remaining_space -= max_space_per_bucket
else:
# bummer! not enough space to accept this bucket
pass
if bucketwriters:
fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
self.add_latency("allocate", time.time() - start)
return alreadygot, bucketwriters
def _iter_share_files(self, storage_index):
for shnum, filename in self._get_bucket_shares(storage_index):
f = open(filename, 'rb')
header = f.read(32)
f.close()
if header[:32] == MutableShareFile.MAGIC:
sf = MutableShareFile(filename, self)
# note: if the share has been migrated, the renew_lease()
# call will throw an exception, with information to help the
# client update the lease.
elif header[:4] == struct.pack(">L", 1):
sf = ShareFile(filename)
else:
continue # non-sharefile
yield sf
def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
owner_num=1):
start = time.time()
self.count("add-lease")
new_expire_time = time.time() + 31*24*60*60
lease_info = LeaseInfo(owner_num,
renew_secret, cancel_secret,
new_expire_time, self.my_nodeid)
for sf in self._iter_share_files(storage_index):
sf.add_or_renew_lease(lease_info)
self.add_latency("add-lease", time.time() - start)
return None
def remote_renew_lease(self, storage_index, renew_secret):
start = time.time()
self.count("renew")
new_expire_time = time.time() + 31*24*60*60
found_buckets = False
for sf in self._iter_share_files(storage_index):
found_buckets = True
sf.renew_lease(renew_secret, new_expire_time)
self.add_latency("renew", time.time() - start)
if not found_buckets:
raise IndexError("no such lease to renew")
def remote_cancel_lease(self, storage_index, cancel_secret):
start = time.time()
self.count("cancel")
total_space_freed = 0
found_buckets = False
for sf in self._iter_share_files(storage_index):
# note: if we can't find a lease on one share, we won't bother
# looking in the others. Unless something broke internally
# (perhaps we ran out of disk space while adding a lease), the
# leases on all shares will be identical.
found_buckets = True
# this raises IndexError if the lease wasn't present XXXX
total_space_freed += sf.cancel_lease(cancel_secret)
if found_buckets:
storagedir = os.path.join(self.sharedir,
storage_index_to_dir(storage_index))
if not os.listdir(storagedir):
os.rmdir(storagedir)
if self.stats_provider:
self.stats_provider.count('storage_server.bytes_freed',
total_space_freed)
self.add_latency("cancel", time.time() - start)
if not found_buckets:
raise IndexError("no such storage index")
def bucket_writer_closed(self, bw, consumed_size):
if self.stats_provider:
self.stats_provider.count('storage_server.bytes_added', consumed_size)
del self._active_writers[bw]
def _get_bucket_shares(self, storage_index):
"""Return a list of (shnum, pathname) tuples for files that hold
shares for this storage_index. In each tuple, 'shnum' will always be
the integer form of the last component of 'pathname'."""
storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
try:
for f in os.listdir(storagedir):
if NUM_RE.match(f):
filename = os.path.join(storagedir, f)
yield (int(f), filename)
except OSError:
# Commonly caused by there being no buckets at all.
pass
def remote_get_buckets(self, storage_index):
start = time.time()
self.count("get")
si_s = si_b2a(storage_index)
log.msg("storage: get_buckets %s" % si_s)
bucketreaders = {} # k: sharenum, v: BucketReader
for shnum, filename in self._get_bucket_shares(storage_index):
bucketreaders[shnum] = BucketReader(self, filename,
storage_index, shnum)
self.add_latency("get", time.time() - start)
return bucketreaders
def get_leases(self, storage_index):
"""Provide an iterator that yields all of the leases attached to this
bucket. Each lease is returned as a LeaseInfo instance.
This method is not for client use.
"""
# since all shares get the same lease data, we just grab the leases
# from the first share
try:
shnum, filename = self._get_bucket_shares(storage_index).next()
sf = ShareFile(filename)
return sf.get_leases()
except StopIteration:
return iter([])
def remote_slot_testv_and_readv_and_writev(self, storage_index,
secrets,
test_and_write_vectors,
read_vector):
start = time.time()
self.count("writev")
si_s = si_b2a(storage_index)
lp = log.msg("storage: slot_writev %s" % si_s)
si_dir = storage_index_to_dir(storage_index)
(write_enabler, renew_secret, cancel_secret) = secrets
# shares exist if there is a file for them
bucketdir = os.path.join(self.sharedir, si_dir)
shares = {}
if os.path.isdir(bucketdir):
for sharenum_s in os.listdir(bucketdir):
try:
sharenum = int(sharenum_s)
except ValueError:
continue
filename = os.path.join(bucketdir, sharenum_s)
msf = MutableShareFile(filename, self)
msf.check_write_enabler(write_enabler, si_s)
shares[sharenum] = msf
# write_enabler is good for all existing shares.
# Now evaluate test vectors.
testv_is_good = True
for sharenum in test_and_write_vectors:
(testv, datav, new_length) = test_and_write_vectors[sharenum]
if sharenum in shares:
if not shares[sharenum].check_testv(testv):
self.log("testv failed: [%d]: %r" % (sharenum, testv))
testv_is_good = False
break
else:
# compare the vectors against an empty share, in which all
# reads return empty strings.
if not EmptyShare().check_testv(testv):
self.log("testv failed (empty): [%d] %r" % (sharenum,
testv))
testv_is_good = False
break
# now gather the read vectors, before we do any writes
read_data = {}
for sharenum, share in shares.items():
read_data[sharenum] = share.readv(read_vector)
ownerid = 1 # TODO
expire_time = time.time() + 31*24*60*60 # one month
lease_info = LeaseInfo(ownerid,
renew_secret, cancel_secret,
expire_time, self.my_nodeid)
if testv_is_good:
# now apply the write vectors
for sharenum in test_and_write_vectors:
(testv, datav, new_length) = test_and_write_vectors[sharenum]
if new_length == 0:
if sharenum in shares:
shares[sharenum].unlink()
else:
if sharenum not in shares:
# allocate a new share
allocated_size = 2000 # arbitrary, really
share = self._allocate_slot_share(bucketdir, secrets,
sharenum,
allocated_size,
owner_num=0)
shares[sharenum] = share
shares[sharenum].writev(datav, new_length)
# and update the lease
shares[sharenum].add_or_renew_lease(lease_info)
if new_length == 0:
# delete empty bucket directories
if not os.listdir(bucketdir):
os.rmdir(bucketdir)
# all done
self.add_latency("writev", time.time() - start)
return (testv_is_good, read_data)
def _allocate_slot_share(self, bucketdir, secrets, sharenum,
allocated_size, owner_num=0):
(write_enabler, renew_secret, cancel_secret) = secrets
my_nodeid = self.my_nodeid
fileutil.make_dirs(bucketdir)
filename = os.path.join(bucketdir, "%d" % sharenum)
share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
self)
return share
def remote_slot_readv(self, storage_index, shares, readv):
start = time.time()
self.count("readv")
si_s = si_b2a(storage_index)
lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
facility="tahoe.storage", level=log.OPERATIONAL)
si_dir = storage_index_to_dir(storage_index)
# shares exist if there is a file for them
bucketdir = os.path.join(self.sharedir, si_dir)
if not os.path.isdir(bucketdir):
self.add_latency("readv", time.time() - start)
return {}
datavs = {}
for sharenum_s in os.listdir(bucketdir):
try:
sharenum = int(sharenum_s)
except ValueError:
continue
if sharenum in shares or not shares:
filename = os.path.join(bucketdir, sharenum_s)
msf = MutableShareFile(filename, self)
datavs[sharenum] = msf.readv(readv)
log.msg("returning shares %s" % (datavs.keys(),),
facility="tahoe.storage", level=log.NOISY, parent=lp)
self.add_latency("readv", time.time() - start)
return datavs
def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
reason):
fileutil.make_dirs(self.corruption_advisory_dir)
now = time_format.iso_utc(sep="T")
si_s = si_b2a(storage_index)
# windows can't handle colons in the filename
fn = os.path.join(self.corruption_advisory_dir,
"%s--%s-%d" % (now, si_s, shnum)).replace(":","")
f = open(fn, "w")
f.write("report: Share Corruption\n")
f.write("type: %s\n" % share_type)
f.write("storage_index: %s\n" % si_s)
f.write("share_number: %d\n" % shnum)
f.write("\n")
f.write(reason)
f.write("\n")
f.close()
log.msg(format=("client claims corruption in (%(share_type)s) " +
"%(si)s-%(shnum)d: %(reason)s"),
share_type=share_type, si=si_s, shnum=shnum, reason=reason,
level=log.SCARY, umid="SGx2fA")
return None