2020-09-14 14:47:26 -04:00
"""
Ported to Python 3.
"""
2020-09-14 14:46:08 -04:00
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
2020-09-17 12:37:10 -04:00
from future . utils import bytes_to_native_str , PY2
2020-09-14 14:46:08 -04:00
if PY2 :
2020-09-16 14:37:16 -04:00
# Omit open() to get native behavior where open("w") always accepts native
# strings. Omit bytes so we don't leak future's custom bytes.
from future . builtins import filter , map , zip , ascii , chr , hex , input , next , oct , pow , round , super , dict , list , object , range , str , max , min # noqa: F401
2021-09-29 13:42:17 -04:00
else :
from typing import Dict
2020-09-14 14:46:08 -04:00
2021-11-18 15:47:25 -05:00
import os , re
2009-02-18 14:46:55 -07:00
2009-05-21 17:38:23 -07:00
from foolscap . api import Referenceable
2021-09-29 13:42:17 -04:00
from foolscap . ipb import IRemoteReference
2009-02-18 14:46:55 -07:00
from twisted . application import service
2021-11-17 11:01:04 -05:00
from twisted . internet import reactor
2009-02-18 14:46:55 -07:00
2017-02-27 10:56:49 -07:00
from zope . interface import implementer
2009-02-18 14:46:55 -07:00
from allmydata . interfaces import RIStorageServer , IStatsProducer
2010-07-19 01:20:00 -07:00
from allmydata . util import fileutil , idlib , log , time_format
2009-02-18 14:46:55 -07:00
import allmydata # for __full_version__
2009-02-20 21:03:09 -07:00
from allmydata . storage . common import si_b2a , si_a2b , storage_index_to_dir
_pyflakes_hush = [ si_b2a , si_a2b , storage_index_to_dir ] # re-exported
2009-02-18 14:46:55 -07:00
from allmydata . storage . lease import LeaseInfo
from allmydata . storage . mutable import MutableShareFile , EmptyShare , \
create_mutable_sharefile
2012-06-22 15:43:54 +00:00
from allmydata . mutable . layout import MAX_MUTABLE_SHARE_SIZE
2009-02-18 14:46:55 -07:00
from allmydata . storage . immutable import ShareFile , BucketWriter , BucketReader
2009-02-20 21:04:08 -07:00
from allmydata . storage . crawler import BucketCountingCrawler
2009-03-06 22:45:17 -07:00
from allmydata . storage . expirer import LeaseCheckingCrawler
2009-02-18 14:46:55 -07:00
# storage/
# storage/shares/incoming
# incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
# be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
# storage/shares/$START/$STORAGEINDEX
# storage/shares/$START/$STORAGEINDEX/$SHARENUM
# Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
# base-32 chars).
# $SHARENUM matches this regex:
NUM_RE = re . compile ( " ^[0-9]+$ " )
2021-08-30 16:49:11 -04:00
# Number of seconds to add to expiration time on lease renewal.
# For now it's not actually configurable, but maybe someday.
2021-08-19 13:56:13 -04:00
DEFAULT_RENEWAL_TIME = 31 * 24 * 60 * 60
2009-02-18 14:46:55 -07:00
2017-02-27 10:56:49 -07:00
@implementer ( RIStorageServer , IStatsProducer )
2009-02-18 14:46:55 -07:00
class StorageServer ( service . MultiService , Referenceable ) :
2021-11-10 16:08:40 -05:00
"""
A filesystem - based implementation of ` ` RIStorageServer ` ` .
"""
2009-02-18 14:46:55 -07:00
name = ' storage '
2021-10-25 13:15:38 -06:00
# only the tests change this to anything else
2009-03-06 22:45:17 -07:00
LeaseCheckerClass = LeaseCheckingCrawler
2009-02-18 14:46:55 -07:00
2009-02-18 16:23:01 -07:00
def __init__ ( self , storedir , nodeid , reserved_space = 0 ,
2009-02-18 14:46:55 -07:00
discard_storage = False , readonly_storage = False ,
2009-03-06 22:45:17 -07:00
stats_provider = None ,
2009-03-16 22:10:41 -07:00
expiration_enabled = False ,
2009-03-18 17:21:38 -07:00
expiration_mode = " age " ,
expiration_override_lease_duration = None ,
2009-03-18 18:00:09 -07:00
expiration_cutoff_date = None ,
2021-08-19 13:23:01 -04:00
expiration_sharetypes = ( " mutable " , " immutable " ) ,
2021-11-17 11:01:04 -05:00
clock = reactor ) :
2009-02-18 14:46:55 -07:00
service . MultiService . __init__ ( self )
2020-08-19 11:38:59 -04:00
assert isinstance ( nodeid , bytes )
2009-02-18 16:23:01 -07:00
assert len ( nodeid ) == 20
2020-09-08 14:10:13 -04:00
assert isinstance ( nodeid , bytes )
2009-02-18 16:23:01 -07:00
self . my_nodeid = nodeid
2009-02-18 14:46:55 -07:00
self . storedir = storedir
sharedir = os . path . join ( storedir , " shares " )
fileutil . make_dirs ( sharedir )
self . sharedir = sharedir
self . corruption_advisory_dir = os . path . join ( storedir ,
" corruption-advisories " )
2021-10-22 14:56:09 -04:00
fileutil . make_dirs ( self . corruption_advisory_dir )
2009-02-18 14:46:55 -07:00
self . reserved_space = int ( reserved_space )
self . no_storage = discard_storage
self . readonly_storage = readonly_storage
self . stats_provider = stats_provider
if self . stats_provider :
self . stats_provider . register_producer ( self )
self . incomingdir = os . path . join ( sharedir , ' incoming ' )
self . _clean_incomplete ( )
fileutil . make_dirs ( self . incomingdir )
2010-01-14 14:15:29 -08:00
log . msg ( " StorageServer created " , facility = " tahoe.storage " )
2009-02-18 14:46:55 -07:00
if reserved_space :
if self . get_available_space ( ) is None :
2009-11-20 21:56:44 -08:00
log . msg ( " warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored " ,
2009-02-18 14:46:55 -07:00
umin = " 0wZ27w " , level = log . UNUSUAL )
self . latencies = { " allocate " : [ ] , # immutable
" write " : [ ] ,
" close " : [ ] ,
" read " : [ ] ,
" get " : [ ] ,
" writev " : [ ] , # mutable
" readv " : [ ] ,
" add-lease " : [ ] , # both
" renew " : [ ] ,
" cancel " : [ ] ,
}
2009-02-26 19:42:48 -07:00
self . add_bucket_counter ( )
2009-02-18 14:46:55 -07:00
2009-03-06 22:45:17 -07:00
statefile = os . path . join ( self . storedir , " lease_checker.state " )
historyfile = os . path . join ( self . storedir , " lease_checker.history " )
klass = self . LeaseCheckerClass
self . lease_checker = klass ( self , statefile , historyfile ,
2009-03-18 17:21:38 -07:00
expiration_enabled , expiration_mode ,
expiration_override_lease_duration ,
2009-03-18 18:00:09 -07:00
expiration_cutoff_date ,
2009-03-18 17:21:38 -07:00
expiration_sharetypes )
2009-03-06 22:45:17 -07:00
self . lease_checker . setServiceParent ( self )
2021-11-17 11:01:04 -05:00
self . _clock = clock
2009-03-06 22:45:17 -07:00
2021-09-29 13:58:53 -04:00
# Currently being-written Bucketwriters. For Foolscap, lifetime is tied
# to connection: when disconnection happens, the BucketWriters are
# removed. For HTTP, this makes no sense, so there will be
# timeout-based cleanup; see
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3807.
# Map in-progress filesystem path -> BucketWriter:
self . _bucket_writers = { } # type: Dict[str,BucketWriter]
2021-09-29 13:42:17 -04:00
# Canaries and disconnect markers for BucketWriters created via Foolscap:
self . _bucket_writer_disconnect_markers = { } # type: Dict[BucketWriter,(IRemoteReference, object)]
2021-11-18 15:42:54 -05:00
def stopService ( self ) :
# Cancel any in-progress uploads:
for bw in list ( self . _bucket_writers . values ( ) ) :
bw . disconnected ( )
return service . MultiService . stopService ( self )
2010-07-19 01:20:00 -07:00
def __repr__ ( self ) :
return " <StorageServer %s > " % ( idlib . shortnodeid_b2a ( self . my_nodeid ) , )
new introducer: signed extensible dictionary-based messages! refs #466
This introduces new client and server halves to the Introducer (renaming the
old one with a _V1 suffix). Both have fallbacks to accomodate talking to a
different version: the publishing client switches on whether the server's
.get_version() advertises V2 support, the server switches on which
subscription method was invoked by the subscribing client.
The V2 protocol sends a three-tuple of (serialized announcement dictionary,
signature, pubkey) for each announcement. The V2 server dispatches messages
to subscribers according to the service-name, and throws errors for invalid
signatures, but does not otherwise examine the messages. The V2 receiver's
subscription callback will receive a (serverid, ann_dict) pair. The
'serverid' will be equal to the pubkey if all of the following are true:
the originating client is V2, and was told a privkey to use
the announcement went through a V2 server
the signature is valid
If not, 'serverid' will be equal to the tubid portion of the announced FURL,
as was the case for V1 receivers.
Servers will create a keypair if one does not exist yet, stored in
private/server.privkey .
The signed announcement dictionary puts the server FURL in a key named
"anonymous-storage-FURL", which anticipates upcoming Accounting-related
changes in the server advertisements. It also provides a key named
"permutation-seed-base32" to tell clients what permutation seed to use. This
is computed at startup, using tubid if there are existing shares, otherwise
the pubkey, to retain share-order compatibility for existing servers.
2011-11-20 02:21:32 -08:00
def have_shares ( self ) :
# quick test to decide if we need to commit to an implicit
# permutation-seed or if we should use a new one
return bool ( set ( os . listdir ( self . sharedir ) ) - set ( [ " incoming " ] ) )
2009-03-18 17:21:38 -07:00
def add_bucket_counter ( self ) :
statefile = os . path . join ( self . storedir , " bucket_counter.state " )
self . bucket_counter = BucketCountingCrawler ( self , statefile )
self . bucket_counter . setServiceParent ( self )
2009-02-18 14:46:55 -07:00
def count ( self , name , delta = 1 ) :
if self . stats_provider :
self . stats_provider . count ( " storage_server. " + name , delta )
def add_latency ( self , category , latency ) :
a = self . latencies [ category ]
a . append ( latency )
if len ( a ) > 1000 :
self . latencies [ category ] = a [ - 1000 : ]
def get_latencies ( self ) :
""" Return a dict, indexed by category, that contains a dict of
2011-05-27 05:01:35 -07:00
latency numbers for each category . If there are sufficient samples
for unambiguous interpretation , each dict will contain the
2009-02-18 14:46:55 -07:00
following keys : mean , 01_0 _percentile , 10_0 _percentile ,
50_0 _percentile ( median ) , 90_0 _percentile , 95_0 _percentile ,
2011-05-27 05:01:35 -07:00
99_0 _percentile , 99_9 _percentile . If there are insufficient
samples for a given percentile to be interpreted unambiguously
that percentile will be reported as None . If no samples have been
collected for the given category , then that category name will
not be present in the return value . """
2009-02-18 14:46:55 -07:00
# note that Amazon's Dynamo paper says they use 99.9% percentile.
output = { }
for category in self . latencies :
if not self . latencies [ category ] :
continue
stats = { }
samples = self . latencies [ category ] [ : ]
count = len ( samples )
2011-05-27 05:01:35 -07:00
stats [ " samplesize " ] = count
samples . sort ( )
if count > 1 :
2020-09-14 14:47:26 -04:00
stats [ " mean " ] = sum ( samples ) / count
2011-05-27 05:01:35 -07:00
else :
stats [ " mean " ] = None
orderstatlist = [ ( 0.01 , " 01_0_percentile " , 100 ) , ( 0.1 , " 10_0_percentile " , 10 ) , \
( 0.50 , " 50_0_percentile " , 10 ) , ( 0.90 , " 90_0_percentile " , 10 ) , \
( 0.95 , " 95_0_percentile " , 20 ) , ( 0.99 , " 99_0_percentile " , 100 ) , \
( 0.999 , " 99_9_percentile " , 1000 ) ]
for percentile , percentilestring , minnumtoobserve in orderstatlist :
if count > = minnumtoobserve :
stats [ percentilestring ] = samples [ int ( percentile * count ) ]
else :
stats [ percentilestring ] = None
2009-02-18 14:46:55 -07:00
output [ category ] = stats
return output
def log ( self , * args , * * kwargs ) :
if " facility " not in kwargs :
kwargs [ " facility " ] = " tahoe.storage "
return log . msg ( * args , * * kwargs )
def _clean_incomplete ( self ) :
fileutil . rm_dir ( self . incomingdir )
2009-11-20 21:56:44 -08:00
def get_stats ( self ) :
# remember: RIStatsProvider requires that our return dict
# contains numeric values.
stats = { ' storage_server.allocated ' : self . allocated_size ( ) , }
stats [ ' storage_server.reserved_space ' ] = self . reserved_space
for category , ld in self . get_latencies ( ) . items ( ) :
for name , v in ld . items ( ) :
stats [ ' storage_server.latencies. %s . %s ' % ( category , name ) ] = v
try :
2011-07-18 19:27:52 -07:00
disk = fileutil . get_disk_stats ( self . sharedir , self . reserved_space )
2009-11-20 21:56:44 -08:00
writeable = disk [ ' avail ' ] > 0
2009-02-18 14:46:55 -07:00
# spacetime predictors should use disk_avail / (d(disk_used)/dt)
2009-11-20 21:56:44 -08:00
stats [ ' storage_server.disk_total ' ] = disk [ ' total ' ]
stats [ ' storage_server.disk_used ' ] = disk [ ' used ' ]
stats [ ' storage_server.disk_free_for_root ' ] = disk [ ' free_for_root ' ]
stats [ ' storage_server.disk_free_for_nonroot ' ] = disk [ ' free_for_nonroot ' ]
stats [ ' storage_server.disk_avail ' ] = disk [ ' avail ' ]
2009-02-18 14:46:55 -07:00
except AttributeError :
2009-11-20 21:56:44 -08:00
writeable = True
except EnvironmentError :
log . msg ( " OS call to get disk statistics failed " , level = log . UNUSUAL )
writeable = False
if self . readonly_storage :
stats [ ' storage_server.disk_avail ' ] = 0
writeable = False
stats [ ' storage_server.accepting_immutable_shares ' ] = int ( writeable )
2009-02-20 21:46:06 -07:00
s = self . bucket_counter . get_state ( )
bucket_count = s . get ( " last-complete-bucket-count " )
if bucket_count :
2009-11-20 21:56:44 -08:00
stats [ ' storage_server.total_bucket_count ' ] = bucket_count
2009-02-18 14:46:55 -07:00
return stats
def get_available_space ( self ) :
2009-11-20 21:56:44 -08:00
""" Returns available space for share storage in bytes, or None if no
API to get this information is available . """
if self . readonly_storage :
return 0
2011-07-18 19:27:52 -07:00
return fileutil . get_available_space ( self . sharedir , self . reserved_space )
2009-02-18 14:46:55 -07:00
def allocated_size ( self ) :
space = 0
2021-09-29 13:58:53 -04:00
for bw in self . _bucket_writers . values ( ) :
2009-02-18 14:46:55 -07:00
space + = bw . allocated_size ( )
return space
def remote_get_version ( self ) :
remaining_space = self . get_available_space ( )
if remaining_space is None :
2009-11-20 21:56:44 -08:00
# We're on a platform that has no API to get disk stats.
2009-02-18 14:46:55 -07:00
remaining_space = 2 * * 64
2009-11-20 21:56:44 -08:00
2020-09-16 14:37:16 -04:00
# Unicode strings might be nicer, but for now sticking to bytes since
# this is what the wire protocol has always been.
version = { b " http://allmydata.org/tahoe/protocols/storage/v1 " :
{ b " maximum-immutable-share-size " : remaining_space ,
b " maximum-mutable-share-size " : MAX_MUTABLE_SHARE_SIZE ,
b " available-space " : remaining_space ,
b " tolerates-immutable-read-overrun " : True ,
b " delete-mutable-shares-with-zero-length-writev " : True ,
b " fills-holes-with-zero-bytes " : True ,
b " prevents-read-past-end-of-share-data " : True ,
2009-02-18 14:46:55 -07:00
} ,
2020-09-16 14:37:16 -04:00
b " application-version " : allmydata . __full_version__ . encode ( " utf-8 " ) ,
2009-02-18 14:46:55 -07:00
}
return version
2021-09-29 14:10:14 -04:00
def _allocate_buckets ( self , storage_index ,
renew_secret , cancel_secret ,
sharenums , allocated_size ,
2021-11-12 16:20:27 -05:00
owner_num = 0 , renew_leases = True ) :
2021-09-29 13:42:17 -04:00
"""
Generic bucket allocation API .
2021-11-12 16:20:27 -05:00
2021-11-15 08:08:14 -05:00
: param bool renew_leases : If and only if this is ` ` True ` ` then renew a
secret - matching lease on ( or , if none match , add a new lease to )
existing shares in this bucket . Any * new * shares are given a new
lease regardless .
2021-09-29 13:42:17 -04:00
"""
2009-02-18 14:46:55 -07:00
# owner_num is not for clients to set, but rather it should be
# curried into the PersonalStorageServer instance that is dedicated
# to a particular owner.
2021-11-17 11:01:04 -05:00
start = self . _clock . seconds ( )
2009-02-18 14:46:55 -07:00
self . count ( " allocate " )
2021-10-20 14:35:13 -04:00
alreadygot = { }
2009-02-18 14:46:55 -07:00
bucketwriters = { } # k: shnum, v: BucketWriter
si_dir = storage_index_to_dir ( storage_index )
si_s = si_b2a ( storage_index )
2021-02-12 11:37:43 -05:00
log . msg ( " storage: allocate_buckets %r " % si_s )
2009-02-18 14:46:55 -07:00
# in this implementation, the lease information (including secrets)
# goes into the share files themselves. It could also be put into a
# separate database. Note that the lease should not be added until
# the BucketWriter has been closed.
2021-11-17 11:01:04 -05:00
expire_time = self . _clock . seconds ( ) + DEFAULT_RENEWAL_TIME
2009-02-18 14:46:55 -07:00
lease_info = LeaseInfo ( owner_num ,
renew_secret , cancel_secret ,
expire_time , self . my_nodeid )
max_space_per_bucket = allocated_size
remaining_space = self . get_available_space ( )
limited = remaining_space is not None
if limited :
# this is a bit conservative, since some of this allocated_size()
# has already been written to disk, where it will show up in
# get_available_space.
remaining_space - = self . allocated_size ( )
2009-11-30 21:46:07 -05:00
# self.readonly_storage causes remaining_space <= 0
2009-02-18 14:46:55 -07:00
# fill alreadygot with all shares that we have, not just the ones
# they asked about: this will save them a lot of work. Add or update
# leases for all of them: if they want us to hold shares for this
# file, they'll want us to hold leases for this file.
for ( shnum , fn ) in self . _get_bucket_shares ( storage_index ) :
2021-10-20 14:35:13 -04:00
alreadygot [ shnum ] = ShareFile ( fn )
2021-11-12 16:20:27 -05:00
if renew_leases :
2021-11-10 16:08:40 -05:00
sf = ShareFile ( fn )
2021-12-04 10:14:31 -07:00
sf . add_or_renew_lease ( remaining_space , lease_info )
2009-02-18 14:46:55 -07:00
for shnum in sharenums :
incominghome = os . path . join ( self . incomingdir , si_dir , " %d " % shnum )
finalhome = os . path . join ( self . sharedir , si_dir , " %d " % shnum )
if os . path . exists ( finalhome ) :
# great! we already have it. easy.
pass
elif os . path . exists ( incominghome ) :
2021-09-29 13:58:53 -04:00
# For Foolscap we don't create BucketWriters for shnums that
2009-02-18 14:46:55 -07:00
# have a partial share (in incoming/), so if a second upload
# occurs while the first is still in progress, the second
# uploader will use different storage servers.
pass
elif ( not limited ) or ( remaining_space > = max_space_per_bucket ) :
# ok! we need to create the new share file.
bw = BucketWriter ( self , incominghome , finalhome ,
2021-11-17 11:12:40 -05:00
max_space_per_bucket , lease_info ,
clock = self . _clock )
2009-02-18 14:46:55 -07:00
if self . no_storage :
bw . throw_out_all_data = True
bucketwriters [ shnum ] = bw
2021-09-29 13:58:53 -04:00
self . _bucket_writers [ incominghome ] = bw
2009-02-18 14:46:55 -07:00
if limited :
remaining_space - = max_space_per_bucket
else :
# bummer! not enough space to accept this bucket
pass
if bucketwriters :
fileutil . make_dirs ( os . path . join ( self . sharedir , si_dir ) )
2021-11-17 11:01:04 -05:00
self . add_latency ( " allocate " , self . _clock . seconds ( ) - start )
2021-10-20 14:35:13 -04:00
return set ( alreadygot ) , bucketwriters
2009-02-18 14:46:55 -07:00
2021-09-29 13:42:17 -04:00
def remote_allocate_buckets ( self , storage_index ,
renew_secret , cancel_secret ,
sharenums , allocated_size ,
canary , owner_num = 0 ) :
""" Foolscap-specific ``allocate_buckets()`` API. """
2021-09-29 14:10:14 -04:00
alreadygot , bucketwriters = self . _allocate_buckets (
2021-09-29 13:42:17 -04:00
storage_index , renew_secret , cancel_secret , sharenums , allocated_size ,
2021-11-12 16:20:27 -05:00
owner_num = owner_num , renew_leases = True ,
2021-09-29 13:42:17 -04:00
)
# Abort BucketWriters if disconnection happens.
for bw in bucketwriters . values ( ) :
disconnect_marker = canary . notifyOnDisconnect ( bw . disconnected )
self . _bucket_writer_disconnect_markers [ bw ] = ( canary , disconnect_marker )
return alreadygot , bucketwriters
2009-02-18 14:46:55 -07:00
def _iter_share_files ( self , storage_index ) :
for shnum , filename in self . _get_bucket_shares ( storage_index ) :
2019-12-21 00:03:38 -07:00
with open ( filename , ' rb ' ) as f :
header = f . read ( 32 )
2021-10-28 11:38:18 -04:00
if MutableShareFile . is_valid_header ( header ) :
2009-02-18 14:46:55 -07:00
sf = MutableShareFile ( filename , self )
# note: if the share has been migrated, the renew_lease()
# call will throw an exception, with information to help the
# client update the lease.
2021-10-28 11:12:20 -04:00
elif ShareFile . is_valid_header ( header ) :
2009-02-18 14:46:55 -07:00
sf = ShareFile ( filename )
else :
continue # non-sharefile
yield sf
def remote_add_lease ( self , storage_index , renew_secret , cancel_secret ,
owner_num = 1 ) :
2021-11-17 11:01:04 -05:00
start = self . _clock . seconds ( )
2009-02-18 14:46:55 -07:00
self . count ( " add-lease " )
2021-11-17 11:01:04 -05:00
new_expire_time = self . _clock . seconds ( ) + DEFAULT_RENEWAL_TIME
2009-02-18 14:46:55 -07:00
lease_info = LeaseInfo ( owner_num ,
renew_secret , cancel_secret ,
new_expire_time , self . my_nodeid )
for sf in self . _iter_share_files ( storage_index ) :
2021-12-04 10:14:31 -07:00
sf . add_or_renew_lease ( self . get_available_space ( ) , lease_info )
2021-11-17 11:01:04 -05:00
self . add_latency ( " add-lease " , self . _clock . seconds ( ) - start )
2009-02-18 14:46:55 -07:00
return None
def remote_renew_lease ( self , storage_index , renew_secret ) :
2021-11-17 11:01:04 -05:00
start = self . _clock . seconds ( )
2009-02-18 14:46:55 -07:00
self . count ( " renew " )
2021-11-17 11:01:04 -05:00
new_expire_time = self . _clock . seconds ( ) + DEFAULT_RENEWAL_TIME
2009-02-18 14:46:55 -07:00
found_buckets = False
for sf in self . _iter_share_files ( storage_index ) :
found_buckets = True
sf . renew_lease ( renew_secret , new_expire_time )
2021-11-17 11:01:04 -05:00
self . add_latency ( " renew " , self . _clock . seconds ( ) - start )
2009-02-18 14:46:55 -07:00
if not found_buckets :
raise IndexError ( " no such lease to renew " )
def bucket_writer_closed ( self , bw , consumed_size ) :
if self . stats_provider :
self . stats_provider . count ( ' storage_server.bytes_added ' , consumed_size )
2021-09-29 13:58:53 -04:00
del self . _bucket_writers [ bw . incominghome ]
2021-09-29 13:42:17 -04:00
if bw in self . _bucket_writer_disconnect_markers :
canary , disconnect_marker = self . _bucket_writer_disconnect_markers . pop ( bw )
canary . dontNotifyOnDisconnect ( disconnect_marker )
2009-02-18 14:46:55 -07:00
def _get_bucket_shares ( self , storage_index ) :
""" Return a list of (shnum, pathname) tuples for files that hold
shares for this storage_index . In each tuple , ' shnum ' will always be
the integer form of the last component of ' pathname ' . """
storagedir = os . path . join ( self . sharedir , storage_index_to_dir ( storage_index ) )
try :
for f in os . listdir ( storagedir ) :
if NUM_RE . match ( f ) :
filename = os . path . join ( storagedir , f )
yield ( int ( f ) , filename )
except OSError :
# Commonly caused by there being no buckets at all.
pass
def remote_get_buckets ( self , storage_index ) :
2021-11-17 11:01:04 -05:00
start = self . _clock . seconds ( )
2009-02-18 14:46:55 -07:00
self . count ( " get " )
si_s = si_b2a ( storage_index )
2021-02-12 11:37:43 -05:00
log . msg ( " storage: get_buckets %r " % si_s )
2009-02-18 14:46:55 -07:00
bucketreaders = { } # k: sharenum, v: BucketReader
for shnum , filename in self . _get_bucket_shares ( storage_index ) :
bucketreaders [ shnum ] = BucketReader ( self , filename ,
storage_index , shnum )
2021-11-17 11:01:04 -05:00
self . add_latency ( " get " , self . _clock . seconds ( ) - start )
2009-02-18 14:46:55 -07:00
return bucketreaders
def get_leases ( self , storage_index ) :
""" Provide an iterator that yields all of the leases attached to this
2009-03-06 22:45:17 -07:00
bucket . Each lease is returned as a LeaseInfo instance .
2009-02-18 14:46:55 -07:00
This method is not for client use .
2019-08-14 12:28:05 -04:00
: note : Only for immutable shares .
"""
2009-02-18 14:46:55 -07:00
# since all shares get the same lease data, we just grab the leases
# from the first share
try :
2020-08-27 15:49:04 -04:00
shnum , filename = next ( self . _get_bucket_shares ( storage_index ) )
2009-02-18 14:46:55 -07:00
sf = ShareFile ( filename )
2009-03-06 22:45:17 -07:00
return sf . get_leases ( )
2009-02-18 14:46:55 -07:00
except StopIteration :
return iter ( [ ] )
2019-08-14 12:28:05 -04:00
def get_slot_leases ( self , storage_index ) :
"""
This method is not for client use .
2019-08-14 12:29:26 -04:00
: note : Only for mutable shares .
2019-08-14 12:28:05 -04:00
: return : An iterable of the leases attached to this slot .
"""
2019-08-15 11:08:04 -04:00
for _ , share_filename in self . _get_bucket_shares ( storage_index ) :
share = MutableShareFile ( share_filename )
2019-08-14 12:28:05 -04:00
return share . get_leases ( )
return [ ]
2019-08-13 13:19:44 -04:00
def _collect_mutable_shares_for_storage_index ( self , bucketdir , write_enabler , si_s ) :
"""
Gather up existing mutable shares for the given storage index .
: param bytes bucketdir : The filesystem path containing shares for the
given storage index .
: param bytes write_enabler : The write enabler secret for the shares .
: param bytes si_s : The storage index in encoded ( base32 ) form .
: raise BadWriteEnablerError : If the write enabler is not correct for
any of the collected shares .
: return dict [ int , MutableShareFile ] : The collected shares in a mapping
from integer share numbers to ` ` MutableShareFile ` ` instances .
"""
2009-02-18 14:46:55 -07:00
shares = { }
if os . path . isdir ( bucketdir ) :
2019-08-13 13:19:44 -04:00
# shares exist if there is a file for them
2009-02-18 14:46:55 -07:00
for sharenum_s in os . listdir ( bucketdir ) :
try :
sharenum = int ( sharenum_s )
except ValueError :
continue
filename = os . path . join ( bucketdir , sharenum_s )
msf = MutableShareFile ( filename , self )
msf . check_write_enabler ( write_enabler , si_s )
shares [ sharenum ] = msf
2019-08-13 13:19:44 -04:00
return shares
2009-02-18 14:46:55 -07:00
2019-08-13 13:19:44 -04:00
def _evaluate_test_vectors ( self , test_and_write_vectors , shares ) :
"""
Execute test vectors against share data .
2019-08-14 11:54:03 -04:00
: param test_and_write_vectors : See
` ` allmydata . interfaces . TestAndWriteVectorsForShares ` ` .
: param dict [ int , MutableShareFile ] shares : The shares against which to
execute the vectors .
: return bool : ` ` True ` ` if and only if all of the test vectors succeed
against the given shares .
2019-08-13 13:19:44 -04:00
"""
2009-02-18 14:46:55 -07:00
for sharenum in test_and_write_vectors :
( testv , datav , new_length ) = test_and_write_vectors [ sharenum ]
if sharenum in shares :
if not shares [ sharenum ] . check_testv ( testv ) :
self . log ( " testv failed: [ %d ]: %r " % ( sharenum , testv ) )
2019-08-13 13:19:44 -04:00
return False
2009-02-18 14:46:55 -07:00
else :
# compare the vectors against an empty share, in which all
# reads return empty strings.
if not EmptyShare ( ) . check_testv ( testv ) :
self . log ( " testv failed (empty): [ %d ] %r " % ( sharenum ,
testv ) )
2019-08-13 13:19:44 -04:00
return False
return True
2009-02-18 14:46:55 -07:00
2019-08-13 13:19:44 -04:00
def _evaluate_read_vectors ( self , read_vector , shares ) :
2019-08-14 11:54:03 -04:00
"""
Execute read vectors against share data .
: param read_vector : See ` ` allmydata . interfaces . ReadVector ` ` .
: param dict [ int , MutableShareFile ] shares : The shares against which to
execute the vector .
: return dict [ int , bytes ] : The data read from the shares .
"""
2009-02-18 14:46:55 -07:00
read_data = { }
for sharenum , share in shares . items ( ) :
read_data [ sharenum ] = share . readv ( read_vector )
2019-08-13 13:19:44 -04:00
return read_data
def _evaluate_write_vectors ( self , bucketdir , secrets , test_and_write_vectors , shares ) :
2019-08-14 11:54:03 -04:00
"""
Execute write vectors against share data .
: param bytes bucketdir : The parent directory holding the shares . This
is removed if the last share is removed from it . If shares are
created , they are created in it .
: param secrets : A tuple of ` ` WriteEnablerSecret ` ` ,
` ` LeaseRenewSecret ` ` , and ` ` LeaseCancelSecret ` ` . These secrets
are used to initialize new shares .
: param test_and_write_vectors : See
` ` allmydata . interfaces . TestAndWriteVectorsForShares ` ` .
: param dict [ int , MutableShareFile ] : The shares against which to
execute the vectors .
: return dict [ int , MutableShareFile ] : The shares which still exist
after applying the vectors .
"""
2019-08-13 13:19:44 -04:00
remaining_shares = { }
for sharenum in test_and_write_vectors :
( testv , datav , new_length ) = test_and_write_vectors [ sharenum ]
if new_length == 0 :
if sharenum in shares :
shares [ sharenum ] . unlink ( )
else :
if sharenum not in shares :
# allocate a new share
share = self . _allocate_slot_share ( bucketdir , secrets ,
sharenum ,
owner_num = 0 )
shares [ sharenum ] = share
shares [ sharenum ] . writev ( datav , new_length )
remaining_shares [ sharenum ] = shares [ sharenum ]
if new_length == 0 :
2019-08-14 11:44:12 -04:00
# delete bucket directories that exist but are empty. They
# might not exist if a client showed up and asked us to
# truncate a share we weren't even holding.
if os . path . exists ( bucketdir ) and [ ] == os . listdir ( bucketdir ) :
2019-08-13 13:19:44 -04:00
os . rmdir ( bucketdir )
return remaining_shares
2009-02-18 14:46:55 -07:00
2019-08-13 13:19:44 -04:00
def _make_lease_info ( self , renew_secret , cancel_secret ) :
2019-08-14 11:54:03 -04:00
"""
: return LeaseInfo : Information for a new lease for a share .
"""
2009-02-18 14:46:55 -07:00
ownerid = 1 # TODO
2021-11-17 11:01:04 -05:00
expire_time = self . _clock . seconds ( ) + DEFAULT_RENEWAL_TIME
2009-02-18 14:46:55 -07:00
lease_info = LeaseInfo ( ownerid ,
renew_secret , cancel_secret ,
expire_time , self . my_nodeid )
2019-08-13 13:19:44 -04:00
return lease_info
2009-02-18 14:46:55 -07:00
2019-08-13 13:19:44 -04:00
def _add_or_renew_leases ( self , shares , lease_info ) :
2019-08-14 11:54:03 -04:00
"""
Put the given lease onto the given shares .
2021-10-20 14:35:13 -04:00
: param Iterable [ Union [ MutableShareFile , ShareFile ] ] shares : The shares
to put the lease onto .
2019-08-14 11:54:03 -04:00
: param LeaseInfo lease_info : The lease to put on the shares .
"""
2021-10-20 14:35:13 -04:00
for share in shares :
2021-10-20 14:36:05 -04:00
share . add_or_renew_lease ( self . get_available_space ( ) , lease_info )
2009-02-18 14:46:55 -07:00
2020-12-26 12:02:47 -05:00
def slot_testv_and_readv_and_writev ( # type: ignore # warner/foolscap#78
2019-08-14 12:28:05 -04:00
self ,
storage_index ,
secrets ,
test_and_write_vectors ,
read_vector ,
2021-11-12 16:20:27 -05:00
renew_leases ,
2019-08-14 12:28:05 -04:00
) :
"""
Read data from shares and conditionally write some data to them .
2021-11-12 16:23:15 -05:00
: param bool renew_leases : If and only if this is ` ` True ` ` and the test
vectors pass then shares mentioned in ` ` test_and_write_vectors ` `
2021-11-15 08:12:07 -05:00
that still exist after the changes are made will also have a
secret - matching lease renewed ( or , if none match , a new lease
added ) .
2021-11-12 16:23:15 -05:00
2019-08-14 12:28:05 -04:00
See ` ` allmydata . interfaces . RIStorageServer ` ` for details about other
parameters and return value .
"""
2021-11-17 11:01:04 -05:00
start = self . _clock . seconds ( )
2019-08-13 13:19:44 -04:00
self . count ( " writev " )
si_s = si_b2a ( storage_index )
2021-02-12 11:37:43 -05:00
log . msg ( " storage: slot_writev %r " % si_s )
2019-08-13 13:19:44 -04:00
si_dir = storage_index_to_dir ( storage_index )
( write_enabler , renew_secret , cancel_secret ) = secrets
bucketdir = os . path . join ( self . sharedir , si_dir )
# If collection succeeds we know the write_enabler is good for all
# existing shares.
shares = self . _collect_mutable_shares_for_storage_index (
bucketdir ,
write_enabler ,
si_s ,
)
2009-02-18 14:46:55 -07:00
2019-08-13 13:19:44 -04:00
# Now evaluate test vectors.
testv_is_good = self . _evaluate_test_vectors (
test_and_write_vectors ,
shares ,
)
# now gather the read vectors, before we do any writes
read_data = self . _evaluate_read_vectors (
read_vector ,
shares ,
)
if testv_is_good :
# now apply the write vectors
remaining_shares = self . _evaluate_write_vectors (
bucketdir ,
secrets ,
test_and_write_vectors ,
shares ,
)
2021-11-12 16:20:27 -05:00
if renew_leases :
2019-08-14 12:28:05 -04:00
lease_info = self . _make_lease_info ( renew_secret , cancel_secret )
2021-10-20 14:35:13 -04:00
self . _add_or_renew_leases ( remaining_shares . values ( ) , lease_info )
2009-02-18 14:46:55 -07:00
# all done
2021-11-17 11:01:04 -05:00
self . add_latency ( " writev " , self . _clock . seconds ( ) - start )
2009-02-18 14:46:55 -07:00
return ( testv_is_good , read_data )
2019-08-14 12:28:05 -04:00
def remote_slot_testv_and_readv_and_writev ( self , storage_index ,
secrets ,
test_and_write_vectors ,
read_vector ) :
return self . slot_testv_and_readv_and_writev (
storage_index ,
secrets ,
test_and_write_vectors ,
read_vector ,
2021-11-12 16:20:27 -05:00
renew_leases = True ,
2019-08-14 12:28:05 -04:00
)
2009-02-18 14:46:55 -07:00
def _allocate_slot_share ( self , bucketdir , secrets , sharenum ,
2021-11-10 16:08:53 -05:00
owner_num = 0 ) :
2009-02-18 14:46:55 -07:00
( write_enabler , renew_secret , cancel_secret ) = secrets
my_nodeid = self . my_nodeid
fileutil . make_dirs ( bucketdir )
filename = os . path . join ( bucketdir , " %d " % sharenum )
share = create_mutable_sharefile ( filename , my_nodeid , write_enabler ,
self )
return share
def remote_slot_readv ( self , storage_index , shares , readv ) :
2021-11-17 11:01:04 -05:00
start = self . _clock . seconds ( )
2009-02-18 14:46:55 -07:00
self . count ( " readv " )
si_s = si_b2a ( storage_index )
2021-02-12 11:37:43 -05:00
lp = log . msg ( " storage: slot_readv %r %r " % ( si_s , shares ) ,
2009-02-18 14:46:55 -07:00
facility = " tahoe.storage " , level = log . OPERATIONAL )
si_dir = storage_index_to_dir ( storage_index )
# shares exist if there is a file for them
bucketdir = os . path . join ( self . sharedir , si_dir )
if not os . path . isdir ( bucketdir ) :
2021-11-17 11:01:04 -05:00
self . add_latency ( " readv " , self . _clock . seconds ( ) - start )
2009-02-18 14:46:55 -07:00
return { }
datavs = { }
for sharenum_s in os . listdir ( bucketdir ) :
try :
sharenum = int ( sharenum_s )
except ValueError :
continue
if sharenum in shares or not shares :
filename = os . path . join ( bucketdir , sharenum_s )
msf = MutableShareFile ( filename , self )
datavs [ sharenum ] = msf . readv ( readv )
2020-09-14 14:46:08 -04:00
log . msg ( " returning shares %s " % ( list ( datavs . keys ( ) ) , ) ,
2009-02-18 14:46:55 -07:00
facility = " tahoe.storage " , level = log . NOISY , parent = lp )
2021-11-17 11:01:04 -05:00
self . add_latency ( " readv " , self . _clock . seconds ( ) - start )
2009-02-18 14:46:55 -07:00
return datavs
2021-10-22 14:56:09 -04:00
def _share_exists ( self , storage_index , shnum ) :
"""
Check local share storage to see if a matching share exists .
: param bytes storage_index : The storage index to inspect .
: param int shnum : The share number to check for .
: return bool : ` ` True ` ` if a share with the given number exists at the
given storage index , ` ` False ` ` otherwise .
"""
for existing_sharenum , ignored in self . _get_bucket_shares ( storage_index ) :
if existing_sharenum == shnum :
return True
return False
2009-02-18 14:46:55 -07:00
def remote_advise_corrupt_share ( self , share_type , storage_index , shnum ,
reason ) :
2020-08-31 13:17:52 -04:00
# This is a remote API, I believe, so this has to be bytes for legacy
# protocol backwards compatibility reasons.
assert isinstance ( share_type , bytes )
2020-09-24 10:55:47 -04:00
assert isinstance ( reason , bytes ) , " %r is not bytes " % ( reason , )
2021-10-22 13:38:37 -04:00
2009-02-22 11:57:51 -07:00
si_s = si_b2a ( storage_index )
2021-10-22 13:38:37 -04:00
2021-10-22 14:56:09 -04:00
if not self . _share_exists ( storage_index , shnum ) :
log . msg (
format = (
" discarding client corruption claim for %(si)s / %(shnum)d "
" which I do not have "
) ,
si = si_s ,
shnum = shnum ,
)
return
2009-02-18 14:46:55 -07:00
log . msg ( format = ( " client claims corruption in ( %(share_type)s ) " +
" %(si)s - %(shnum)d : %(reason)s " ) ,
share_type = share_type , si = si_s , shnum = shnum , reason = reason ,
level = log . SCARY , umid = " SGx2fA " )
2021-10-22 13:38:37 -04:00
report = render_corruption_report ( share_type , si_s , shnum , reason )
if len ( report ) > self . get_available_space ( ) :
return None
2021-10-22 14:56:09 -04:00
now = time_format . iso_utc ( sep = " T " )
2021-10-22 13:38:37 -04:00
report_path = get_corruption_report_path (
self . corruption_advisory_dir ,
now ,
si_s ,
shnum ,
)
with open ( report_path , " w " ) as f :
f . write ( report )
2009-02-18 14:46:55 -07:00
return None
2021-10-22 13:38:37 -04:00
CORRUPTION_REPORT_FORMAT = """ \
report : Share Corruption
type : { type }
storage_index : { storage_index }
share_number : { share_number }
{ reason }
"""
def render_corruption_report ( share_type , si_s , shnum , reason ) :
"""
Create a string that explains a corruption report using freeform text .
: param bytes share_type : The type of the share which the report is about .
: param bytes si_s : The encoded representation of the storage index which
the report is about .
: param int shnum : The share number which the report is about .
: param bytes reason : The reason given by the client for the corruption
report .
"""
return CORRUPTION_REPORT_FORMAT . format (
type = bytes_to_native_str ( share_type ) ,
storage_index = bytes_to_native_str ( si_s ) ,
share_number = shnum ,
reason = bytes_to_native_str ( reason ) ,
)
def get_corruption_report_path ( base_dir , now , si_s , shnum ) :
"""
Determine the path to which a certain corruption report should be written .
: param str base_dir : The directory beneath which to construct the path .
: param str now : The time of the report .
: param str si_s : The encoded representation of the storage index which the
report is about .
: param int shnum : The share number which the report is about .
: return str : A path to which the report can be written .
"""
# windows can't handle colons in the filename
return os . path . join (
base_dir ,
( " %s -- %s - %d " % ( now , str ( si_s , " utf-8 " ) , shnum ) ) . replace ( " : " , " " )
)