Split StorageServer into generic part and Foolscap part.

This commit is contained in:
Itamar Turner-Trauring 2021-12-01 09:55:44 -05:00
parent 6e1f6f68ca
commit 50e21a9034

View File

@ -12,7 +12,7 @@ if PY2:
# strings. Omit bytes so we don't leak future's custom bytes. # strings. Omit bytes so we don't leak future's custom bytes.
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, pow, round, super, dict, list, object, range, str, max, min # noqa: F401 from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, pow, round, super, dict, list, object, range, str, max, min # noqa: F401
else: else:
from typing import Dict from typing import Dict, Tuple
import os, re import os, re
import six import six
@ -56,12 +56,11 @@ NUM_RE=re.compile("^[0-9]+$")
DEFAULT_RENEWAL_TIME = 31 * 24 * 60 * 60 DEFAULT_RENEWAL_TIME = 31 * 24 * 60 * 60
@implementer(RIStorageServer, IStatsProducer) @implementer(IStatsProducer)
class StorageServer(service.MultiService, Referenceable): class StorageServer(service.MultiService):
""" """
A filesystem-based implementation of ``RIStorageServer``. Implement the business logic for the storage server.
""" """
name = 'storage'
LeaseCheckerClass = LeaseCheckingCrawler LeaseCheckerClass = LeaseCheckingCrawler
def __init__(self, storedir, nodeid, reserved_space=0, def __init__(self, storedir, nodeid, reserved_space=0,
@ -125,16 +124,8 @@ class StorageServer(service.MultiService, Referenceable):
self.lease_checker.setServiceParent(self) self.lease_checker.setServiceParent(self)
self._clock = clock self._clock = clock
# Currently being-written Bucketwriters. For Foolscap, lifetime is tied
# to connection: when disconnection happens, the BucketWriters are
# removed. For HTTP, this makes no sense, so there will be
# timeout-based cleanup; see
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3807.
# Map in-progress filesystem path -> BucketWriter: # Map in-progress filesystem path -> BucketWriter:
self._bucket_writers = {} # type: Dict[str,BucketWriter] self._bucket_writers = {} # type: Dict[str,BucketWriter]
# Canaries and disconnect markers for BucketWriters created via Foolscap:
self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,(IRemoteReference, object)]
def stopService(self): def stopService(self):
# Cancel any in-progress uploads: # Cancel any in-progress uploads:
@ -263,7 +254,7 @@ class StorageServer(service.MultiService, Referenceable):
space += bw.allocated_size() space += bw.allocated_size()
return space return space
def remote_get_version(self): def get_version(self):
remaining_space = self.get_available_space() remaining_space = self.get_available_space()
if remaining_space is None: if remaining_space is None:
# We're on a platform that has no API to get disk stats. # We're on a platform that has no API to get disk stats.
@ -284,7 +275,7 @@ class StorageServer(service.MultiService, Referenceable):
} }
return version return version
def _allocate_buckets(self, storage_index, def allocate_buckets(self, storage_index,
renew_secret, cancel_secret, renew_secret, cancel_secret,
sharenums, allocated_size, sharenums, allocated_size,
owner_num=0, renew_leases=True): owner_num=0, renew_leases=True):
@ -371,21 +362,6 @@ class StorageServer(service.MultiService, Referenceable):
self.add_latency("allocate", self._clock.seconds() - start) self.add_latency("allocate", self._clock.seconds() - start)
return alreadygot, bucketwriters return alreadygot, bucketwriters
def remote_allocate_buckets(self, storage_index,
renew_secret, cancel_secret,
sharenums, allocated_size,
canary, owner_num=0):
"""Foolscap-specific ``allocate_buckets()`` API."""
alreadygot, bucketwriters = self._allocate_buckets(
storage_index, renew_secret, cancel_secret, sharenums, allocated_size,
owner_num=owner_num, renew_leases=True,
)
# Abort BucketWriters if disconnection happens.
for bw in bucketwriters.values():
disconnect_marker = canary.notifyOnDisconnect(bw.disconnected)
self._bucket_writer_disconnect_markers[bw] = (canary, disconnect_marker)
return alreadygot, bucketwriters
def _iter_share_files(self, storage_index): def _iter_share_files(self, storage_index):
for shnum, filename in self._get_bucket_shares(storage_index): for shnum, filename in self._get_bucket_shares(storage_index):
with open(filename, 'rb') as f: with open(filename, 'rb') as f:
@ -401,8 +377,7 @@ class StorageServer(service.MultiService, Referenceable):
continue # non-sharefile continue # non-sharefile
yield sf yield sf
def remote_add_lease(self, storage_index, renew_secret, cancel_secret, def add_lease(self, storage_index, renew_secret, cancel_secret, owner_num=1):
owner_num=1):
start = self._clock.seconds() start = self._clock.seconds()
self.count("add-lease") self.count("add-lease")
new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
@ -414,7 +389,7 @@ class StorageServer(service.MultiService, Referenceable):
self.add_latency("add-lease", self._clock.seconds() - start) self.add_latency("add-lease", self._clock.seconds() - start)
return None return None
def remote_renew_lease(self, storage_index, renew_secret): def renew_lease(self, storage_index, renew_secret):
start = self._clock.seconds() start = self._clock.seconds()
self.count("renew") self.count("renew")
new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
@ -448,7 +423,7 @@ class StorageServer(service.MultiService, Referenceable):
# Commonly caused by there being no buckets at all. # Commonly caused by there being no buckets at all.
pass pass
def remote_get_buckets(self, storage_index): def get_buckets(self, storage_index):
start = self._clock.seconds() start = self._clock.seconds()
self.count("get") self.count("get")
si_s = si_b2a(storage_index) si_s = si_b2a(storage_index)
@ -698,18 +673,6 @@ class StorageServer(service.MultiService, Referenceable):
self.add_latency("writev", self._clock.seconds() - start) self.add_latency("writev", self._clock.seconds() - start)
return (testv_is_good, read_data) return (testv_is_good, read_data)
def remote_slot_testv_and_readv_and_writev(self, storage_index,
secrets,
test_and_write_vectors,
read_vector):
return self.slot_testv_and_readv_and_writev(
storage_index,
secrets,
test_and_write_vectors,
read_vector,
renew_leases=True,
)
def _allocate_slot_share(self, bucketdir, secrets, sharenum, def _allocate_slot_share(self, bucketdir, secrets, sharenum,
owner_num=0): owner_num=0):
(write_enabler, renew_secret, cancel_secret) = secrets (write_enabler, renew_secret, cancel_secret) = secrets
@ -720,7 +683,7 @@ class StorageServer(service.MultiService, Referenceable):
self) self)
return share return share
def remote_slot_readv(self, storage_index, shares, readv): def slot_readv(self, storage_index, shares, readv):
start = self._clock.seconds() start = self._clock.seconds()
self.count("readv") self.count("readv")
si_s = si_b2a(storage_index) si_s = si_b2a(storage_index)
@ -747,8 +710,8 @@ class StorageServer(service.MultiService, Referenceable):
self.add_latency("readv", self._clock.seconds() - start) self.add_latency("readv", self._clock.seconds() - start)
return datavs return datavs
def remote_advise_corrupt_share(self, share_type, storage_index, shnum, def advise_corrupt_share(self, share_type, storage_index, shnum,
reason): reason):
# This is a remote API, I believe, so this has to be bytes for legacy # This is a remote API, I believe, so this has to be bytes for legacy
# protocol backwards compatibility reasons. # protocol backwards compatibility reasons.
assert isinstance(share_type, bytes) assert isinstance(share_type, bytes)
@ -774,3 +737,69 @@ class StorageServer(service.MultiService, Referenceable):
share_type=share_type, si=si_s, shnum=shnum, reason=reason, share_type=share_type, si=si_s, shnum=shnum, reason=reason,
level=log.SCARY, umid="SGx2fA") level=log.SCARY, umid="SGx2fA")
return None return None
@implementer(RIStorageServer)
class FoolscapStorageServer(Referenceable): # type: ignore # warner/foolscap#78
"""
A filesystem-based implementation of ``RIStorageServer``.
For Foolscap, BucketWriter lifetime is tied to connection: when
disconnection happens, the BucketWriters are removed.
"""
name = 'storage'
def __init__(self, storage_server): # type: (StorageServer) -> None
self._server = storage_server
# Canaries and disconnect markers for BucketWriters created via Foolscap:
self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,Tuple[IRemoteReference, object]]
def remote_get_version(self):
return self.get_version()
def remote_allocate_buckets(self, storage_index,
renew_secret, cancel_secret,
sharenums, allocated_size,
canary, owner_num=0):
"""Foolscap-specific ``allocate_buckets()`` API."""
alreadygot, bucketwriters = self._server.allocate_buckets(
storage_index, renew_secret, cancel_secret, sharenums, allocated_size,
owner_num=owner_num, renew_leases=True,
)
# Abort BucketWriters if disconnection happens.
for bw in bucketwriters.values():
disconnect_marker = canary.notifyOnDisconnect(bw.disconnected)
self._bucket_writer_disconnect_markers[bw] = (canary, disconnect_marker)
return alreadygot, bucketwriters
def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
owner_num=1):
return self._server.add_lease(storage_index, renew_secret, cancel_secret)
def remote_renew_lease(self, storage_index, renew_secret):
return self._server.renew_lease(storage_index, renew_secret)
def remote_get_buckets(self, storage_index):
return self._server.get_buckets(storage_index)
def remote_slot_testv_and_readv_and_writev(self, storage_index,
secrets,
test_and_write_vectors,
read_vector):
return self._server.slot_testv_and_readv_and_writev(
storage_index,
secrets,
test_and_write_vectors,
read_vector,
renew_leases=True,
)
def remote_slot_readv(self, storage_index, shares, readv):
return self._server.slot_readv(self, storage_index, shares, readv)
def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
reason):
return self._server.advise_corrupt_share(share_type, storage_index, shnum,
reason)