""" I contain the client-side code which speaks to storage servers, in particular the foolscap-based server implemented in src/allmydata/storage/*.py . Ported to Python 3. """ from __future__ import absolute_import from __future__ import division from __future__ import print_function from __future__ import unicode_literals # roadmap: # # 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to # create it, change uploader/servermap to get rrefs from it. ServerFarm calls # IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs # to clients. webapi status pages call broker.get_info_about_serverid. # # 2: move get_info methods to the descriptor, webapi status pages call # broker.get_descriptor_for_serverid().get_info # # 3?later?: store descriptors in UploadResults/etc instead of serverids, # webapi status pages call descriptor.get_info and don't use storage_broker # or Client # # 4: enable static config: tahoe.cfg can add descriptors. Make the introducer # optional. This closes #467 # # 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other # clients. Clients stop doing callRemote(), use NativeStorageClient methods # instead (which might do something else, i.e. http or whatever). The # introducer and tahoe.cfg only create NativeStorageClients for now. # # 6: implement other sorts of IStorageClient classes: S3, etc from future.utils import PY2 if PY2: from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 from six import ensure_text import re, time, hashlib # On Python 2 this will be the backport. from configparser import NoSectionError import attr from zope.interface import ( Attribute, Interface, implementer, ) from twisted.internet import defer from twisted.application import service from twisted.plugin import ( getPlugins, ) from eliot import ( log_call, ) from foolscap.api import eventually from foolscap.reconnector import ( ReconnectionInfo, ) from allmydata.interfaces import ( IStorageBroker, IDisplayableServer, IServer, IStorageServer, IFoolscapStoragePlugin, ) from allmydata.util import log, base32, connection_status from allmydata.util.assertutil import precondition from allmydata.util.observer import ObserverList from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.util.hashutil import permute_server_hash from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict # who is responsible for de-duplication? # both? # IC remembers the unpacked announcements it receives, to provide for late # subscribers and to remove duplicates # if a client subscribes after startup, will they receive old announcements? # yes # who will be responsible for signature checking? # make it be IntroducerClient, so they can push the filter outwards and # reduce inbound network traffic # what should the interface between StorageFarmBroker and IntroducerClient # look like? # don't pass signatures: only pass validated blessed-objects @attr.s class StorageClientConfig(object): """ Configuration for a node acting as a storage client. :ivar preferred_peers: An iterable of the server-ids (``bytes``) of the storage servers where share placement is preferred, in order of decreasing preference. See the *[client]peers.preferred* documentation for details. :ivar dict[unicode, dict[unicode, unicode]] storage_plugins: A mapping from names of ``IFoolscapStoragePlugin`` configured in *tahoe.cfg* to the respective configuration. """ preferred_peers = attr.ib(default=()) storage_plugins = attr.ib(default=attr.Factory(dict)) @classmethod def from_node_config(cls, config): """ Create a ``StorageClientConfig`` from a complete Tahoe-LAFS node configuration. :param _Config config: The loaded Tahoe-LAFS node configuration. """ ps = config.get_config("client", "peers.preferred", "").split(",") preferred_peers = tuple([p.strip() for p in ps if p != ""]) enabled_storage_plugins = ( name.strip() for name in config.get_config( "client", "storage.plugins", "", ).split(u",") if name.strip() ) storage_plugins = {} for plugin_name in enabled_storage_plugins: try: plugin_config = config.items("storageclient.plugins." + plugin_name) except NoSectionError: plugin_config = [] storage_plugins[plugin_name] = dict(plugin_config) return cls( preferred_peers, storage_plugins, ) @implementer(IStorageBroker) class StorageFarmBroker(service.MultiService): """I live on the client, and know about storage servers. For each server that is participating in a grid, I either maintain a connection to it or remember enough information to establish a connection to it on demand. I'm also responsible for subscribing to the IntroducerClient to find out about new servers as they are announced by the Introducer. :ivar _tub_maker: A one-argument callable which accepts a dictionary of "handler overrides" and returns a ``foolscap.api.Tub``. :ivar StorageClientConfig storage_client_config: Values from the node configuration file relating to storage behavior. """ @property def preferred_peers(self): return self.storage_client_config.preferred_peers def __init__( self, permute_peers, tub_maker, node_config, storage_client_config=None, ): service.MultiService.__init__(self) assert permute_peers # False not implemented yet self.permute_peers = permute_peers self._tub_maker = tub_maker self.node_config = node_config if storage_client_config is None: storage_client_config = StorageClientConfig() self.storage_client_config = storage_client_config # self.servers maps serverid -> IServer, and keeps track of all the # storage servers that we've heard about. Each descriptor manages its # own Reconnector, and will give us a RemoteReference when we ask # them for it. self.servers = BytesKeyDict() self._static_server_ids = set() # ignore announcements for these self.introducer_client = None self._threshold_listeners = [] # tuples of (threshold, Deferred) self._connected_high_water_mark = 0 @log_call(action_type=u"storage-client:broker:set-static-servers") def set_static_servers(self, servers): # Sorting the items gives us a deterministic processing order. This # doesn't really matter but it makes the logging behavior more # predictable and easier to test (and at least one test does depend on # this sorted order). servers = {ensure_text(key): value for (key, value) in servers.items()} for (server_id, server) in sorted(servers.items()): try: storage_server = self._make_storage_server( server_id.encode("utf-8"), server, ) except Exception: # TODO: The _make_storage_server failure is logged but maybe # we should write a traceback here. Notably, tests don't # automatically fail just because we hit this case. Well # written tests will still fail if a surprising exception # arrives here but they might be harder to debug without this # information. pass else: if isinstance(server_id, str): server_id = server_id.encode("utf-8") self._static_server_ids.add(server_id) self.servers[server_id] = storage_server storage_server.setServiceParent(self) storage_server.start_connecting(self._trigger_connections) def get_client_storage_plugin_web_resources(self, node_config): """ Get all of the client-side ``IResource`` implementations provided by enabled storage plugins. :param allmydata.node._Config node_config: The complete node configuration for the node from which these web resources will be served. :return dict[unicode, IResource]: Resources for all of the plugins. """ plugins = { plugin.name: plugin for plugin in getPlugins(IFoolscapStoragePlugin) } return UnicodeKeyDict({ name: plugins[name].get_client_resource(node_config) for (name, config) in self.storage_client_config.storage_plugins.items() }) @log_call( action_type=u"storage-client:broker:make-storage-server", include_args=["server_id"], include_result=False, ) def _make_storage_server(self, server_id, server): """ Create a new ``IServer`` for the given storage server announcement. :param bytes server_id: The unique identifier for the server. :param dict server: The server announcement. See ``Static Server Definitions`` in the configuration documentation for details about the structure and contents. :return IServer: The object-y representation of the server described by the given announcement. """ assert isinstance(server_id, bytes) handler_overrides = server.get("connections", {}) s = NativeStorageServer( server_id, server["ann"], self._tub_maker, handler_overrides, self.node_config, self.storage_client_config, ) s.on_status_changed(lambda _: self._got_connection()) return s def when_connected_enough(self, threshold): """ :returns: a Deferred that fires if/when our high water mark for number of connected servers becomes (or ever was) above "threshold". """ d = defer.Deferred() self._threshold_listeners.append( (threshold, d) ) self._check_connected_high_water_mark() return d # these two are used in unit tests def test_add_rref(self, serverid, rref, ann): s = self._make_storage_server( serverid, {"ann": ann.copy()}, ) s._rref = rref s._is_connected = True self.servers[serverid] = s def test_add_server(self, server_id, s): s.on_status_changed(lambda _: self._got_connection()) self.servers[server_id] = s def use_introducer(self, introducer_client): self.introducer_client = ic = introducer_client ic.subscribe_to("storage", self._got_announcement) def _got_connection(self): # this is called by NativeStorageServer when it is connected self._check_connected_high_water_mark() def _check_connected_high_water_mark(self): current = len(self.get_connected_servers()) if current > self._connected_high_water_mark: self._connected_high_water_mark = current remaining = [] for threshold, d in self._threshold_listeners: if self._connected_high_water_mark >= threshold: eventually(d.callback, None) else: remaining.append( (threshold, d) ) self._threshold_listeners = remaining def _should_ignore_announcement(self, server_id, ann): """ Determine whether a new storage announcement should be discarded or used to update our collection of storage servers. :param bytes server_id: The unique identifier for the storage server which made the announcement. :param dict ann: The announcement. :return bool: ``True`` if the announcement should be ignored, ``False`` if it should be used to update our local storage server state. """ # Let local static configuration always override any announcement for # a particular server. if server_id in self._static_server_ids: log.msg(format="ignoring announcement for static server '%(id)s'", id=server_id, facility="tahoe.storage_broker", umid="AlxzqA", level=log.UNUSUAL) return True try: old = self.servers[server_id] except KeyError: # We don't know anything about this server. Let's use the # announcement to change that. return False else: # Determine if this announcement is at all difference from the # announcement we already have for the server. If it is the same, # we don't need to change anything. return old.get_announcement() == ann def _got_announcement(self, key_s, ann): """ This callback is given to the introducer and called any time an announcement is received which has a valid signature and does not have a sequence number less than or equal to a previous sequence number seen for that server by that introducer. Note sequence numbers are not considered between different introducers so if we use more than one introducer it is possible for them to deliver us stale announcements in some cases. """ precondition(isinstance(key_s, bytes), key_s) precondition(key_s.startswith(b"v0-"), key_s) precondition(ann["service-name"] == "storage", ann["service-name"]) server_id = key_s if self._should_ignore_announcement(server_id, ann): return s = self._make_storage_server( server_id, {u"ann": ann}, ) try: old = self.servers.pop(server_id) except KeyError: pass else: # It's a replacement, get rid of the old one. old.stop_connecting() old.disownServiceParent() # NOTE: this disownServiceParent() returns a Deferred that # doesn't fire until Tub.stopService fires, which will wait for # any existing connections to be shut down. This doesn't # generally matter for normal runtime, but unit tests can run # into DirtyReactorErrors if they don't block on these. If a test # replaces one server with a newer version, then terminates # before the old one has been shut down, it might get # DirtyReactorErrors. The fix would be to gather these Deferreds # into a structure that will block StorageFarmBroker.stopService # until they have fired (but hopefully don't keep reference # cycles around when they fire earlier than that, which will # almost always be the case for normal runtime). # now we forget about them and start using the new one s.setServiceParent(self) self.servers[server_id] = s s.start_connecting(self._trigger_connections) # the descriptor will manage their own Reconnector, and each time we # need servers, we'll ask them if they're connected or not. def _trigger_connections(self): # when one connection is established, reset the timers on all others, # to trigger a reconnection attempt in one second. This is intended # to accelerate server connections when we've been offline for a # while. The goal is to avoid hanging out for a long time with # connections to only a subset of the servers, which would increase # the chances that we'll put shares in weird places (and not update # existing shares of mutable files). See #374 for more details. for dsc in list(self.servers.values()): dsc.try_to_connect() def get_servers_for_psi(self, peer_selection_index): # return a list of server objects (IServers) assert self.permute_peers == True connected_servers = self.get_connected_servers() preferred_servers = frozenset(s for s in connected_servers if s.get_longname() in self.preferred_peers) def _permuted(server): seed = server.get_permutation_seed() is_unpreferred = server not in preferred_servers return (is_unpreferred, permute_server_hash(peer_selection_index, seed)) return sorted(connected_servers, key=_permuted) def get_all_serverids(self): return frozenset(self.servers.keys()) def get_connected_servers(self): return frozenset([s for s in self.servers.values() if s.is_connected()]) def get_known_servers(self): return frozenset(self.servers.values()) def get_nickname_for_serverid(self, serverid): if serverid in self.servers: return self.servers[serverid].get_nickname() return None def get_stub_server(self, serverid): if serverid in self.servers: return self.servers[serverid] # some time before 1.12, we changed "serverid" to be "key_s" (the # printable verifying key, used in V2 announcements), instead of the # tubid. When the immutable uploader delegates work to a Helper, # get_stub_server() is used to map the returning server identifiers # to IDisplayableServer instances (to get a name, for display on the # Upload Results web page). If the Helper is running 1.12 or newer, # it will send pubkeys, but if it's still running 1.11, it will send # tubids. This clause maps the old tubids to our existing servers. for s in list(self.servers.values()): if isinstance(s, NativeStorageServer): if serverid == s.get_tubid(): return s return StubServer(serverid) @implementer(IDisplayableServer) class StubServer(object): def __init__(self, serverid): assert isinstance(serverid, bytes) self.serverid = serverid # binary tubid def get_serverid(self): return self.serverid def get_name(self): return base32.b2a(self.serverid)[:8] def get_longname(self): return base32.b2a(self.serverid) def get_nickname(self): return "?" class IFoolscapStorageServer(Interface): """ An internal interface that mediates between ``NativeStorageServer`` and Foolscap-based ``IStorageServer`` implementations. """ nickname = Attribute(""" A name for this server for presentation to users. """) permutation_seed = Attribute(""" A stable value associated with this server which a client can use as an input to the server selection permutation ordering. """) tubid = Attribute(""" The identifier for the Tub in which the server is run. """) storage_server = Attribute(""" An IStorageServer provide which implements a concrete Foolscap-based protocol for communicating with the server. """) name = Attribute(""" Another name for this server for presentation to users. """) longname = Attribute(""" *Another* name for this server for presentation to users. """) lease_seed = Attribute(""" A stable value associated with this server which a client can use as an input to a lease secret generation function. """) def connect_to(tub, got_connection): """ Attempt to establish and maintain a connection to the server. :param Tub tub: A Foolscap Tub from which the connection is to originate. :param got_connection: A one-argument callable which is called with a Foolscap ``RemoteReference`` when a connection is established. This may be called multiple times if the connection is lost and then re-established. :return foolscap.reconnector.Reconnector: An object which manages the connection and reconnection attempts. """ @implementer(IFoolscapStorageServer) @attr.s(frozen=True) class _FoolscapStorage(object): """ Abstraction for connecting to a storage server exposed via Foolscap. """ nickname = attr.ib() permutation_seed = attr.ib() tubid = attr.ib() storage_server = attr.ib(validator=attr.validators.provides(IStorageServer)) _furl = attr.ib() _short_description = attr.ib() _long_description = attr.ib() @property def name(self): return self._short_description @property def longname(self): return self._long_description @property def lease_seed(self): return self.tubid @classmethod def from_announcement(cls, server_id, furl, ann, storage_server): """ Create an instance from a fURL and an announcement like:: {"permutation-seed-base32": "...", "nickname": "...", } *nickname* is optional. The furl will be a Unicode string on Python 3; on Python 2 it will be either a native (bytes) string or a Unicode string. """ furl = furl.encode("utf-8") m = re.match(br'pb://(\w+)@', furl) assert m, furl tubid_s = m.group(1).lower() tubid = base32.a2b(tubid_s) if "permutation-seed-base32" in ann: seed = ann["permutation-seed-base32"] if isinstance(seed, str): seed = seed.encode("utf-8") ps = base32.a2b(seed) elif re.search(br'^v0-[0-9a-zA-Z]{52}$', server_id): ps = base32.a2b(server_id[3:]) else: log.msg("unable to parse serverid '%(server_id)s as pubkey, " "hashing it to get permutation-seed, " "may not converge with other clients", server_id=server_id, facility="tahoe.storage_broker", level=log.UNUSUAL, umid="qu86tw") ps = hashlib.sha256(server_id).digest() permutation_seed = ps assert server_id long_description = server_id if server_id.startswith(b"v0-"): # remove v0- prefix from abbreviated name short_description = server_id[3:3+8] else: short_description = server_id[:8] nickname = ann.get("nickname", "") return cls( nickname=nickname, permutation_seed=permutation_seed, tubid=tubid, storage_server=storage_server, furl=furl, short_description=short_description, long_description=long_description, ) def connect_to(self, tub, got_connection): return tub.connectTo(self._furl, got_connection) @implementer(IFoolscapStorageServer) class _NullStorage(object): """ Abstraction for *not* communicating with a storage server of a type with which we can't communicate. """ nickname = "" permutation_seed = hashlib.sha256(b"").digest() tubid = hashlib.sha256(b"").digest() storage_server = None lease_seed = hashlib.sha256(b"").digest() name = "" longname = "" def connect_to(self, tub, got_connection): return NonReconnector() class NonReconnector(object): """ A ``foolscap.reconnector.Reconnector``-alike that doesn't do anything. """ def stopConnecting(self): pass def reset(self): pass def getReconnectionInfo(self): return ReconnectionInfo() _null_storage = _NullStorage() class AnnouncementNotMatched(Exception): """ A storage server announcement wasn't matched by any of the locally enabled plugins. """ def _storage_from_foolscap_plugin(node_config, config, announcement, get_rref): """ Construct an ``IStorageServer`` from the most locally-preferred plugin that is offered in the given announcement. :param allmydata.node._Config node_config: The node configuration to pass to the plugin. """ plugins = { plugin.name: plugin for plugin in getPlugins(IFoolscapStoragePlugin) } storage_options = announcement.get(u"storage-options", []) for plugin_name, plugin_config in list(config.storage_plugins.items()): try: plugin = plugins[plugin_name] except KeyError: raise ValueError("{} not installed".format(plugin_name)) for option in storage_options: if plugin_name == option[u"name"]: furl = option[u"storage-server-FURL"] return furl, plugin.get_storage_client( node_config, option, get_rref, ) raise AnnouncementNotMatched() @implementer(IServer) class NativeStorageServer(service.MultiService): """I hold information about a storage server that we want to connect to. If we are connected, I hold the RemoteReference, their host address, and the their version information. I remember information about when we were last connected too, even if we aren't currently connected. @ivar last_connect_time: when we last established a connection @ivar last_loss_time: when we last lost a connection @ivar version: the server's versiondict, from the most recent announcement @ivar nickname: the server's self-reported nickname (unicode), same @ivar rref: the RemoteReference, if connected, otherwise None """ VERSION_DEFAULTS = UnicodeKeyDict({ "http://allmydata.org/tahoe/protocols/storage/v1" : UnicodeKeyDict({ "maximum-immutable-share-size": 2**32 - 1, "maximum-mutable-share-size": 2*1000*1000*1000, # maximum prior to v1.9.2 "tolerates-immutable-read-overrun": False, "delete-mutable-shares-with-zero-length-writev": False, "available-space": None, }), "application-version": "unknown: no get_version()", }) def __init__(self, server_id, ann, tub_maker, handler_overrides, node_config, config=StorageClientConfig()): service.MultiService.__init__(self) assert isinstance(server_id, bytes) self._server_id = server_id self.announcement = ann self._tub_maker = tub_maker self._handler_overrides = handler_overrides self._storage = self._make_storage_system(node_config, config, ann) self.last_connect_time = None self.last_loss_time = None self._rref = None self._is_connected = False self._reconnector = None self._trigger_cb = None self._on_status_changed = ObserverList() def _make_storage_system(self, node_config, config, ann): """ :param allmydata.node._Config node_config: The node configuration to pass to any configured storage plugins. :param StorageClientConfig config: Configuration specifying desired storage client behavior. :param dict ann: The storage announcement from the storage server we are meant to communicate with. :return IFoolscapStorageServer: An object enabling communication via Foolscap with the server which generated the announcement. """ # Try to match the announcement against a plugin. try: furl, storage_server = _storage_from_foolscap_plugin( node_config, config, ann, # Pass in an accessor for our _rref attribute. The value of # the attribute may change over time as connections are lost # and re-established. The _StorageServer should always be # able to get the most up-to-date value. self.get_rref, ) except AnnouncementNotMatched: # Nope. pass else: return _FoolscapStorage.from_announcement( self._server_id, furl, ann, storage_server, ) # Try to match the announcement against the anonymous access scheme. try: furl = ann[u"anonymous-storage-FURL"] except KeyError: # Nope pass else: # See comment above for the _storage_from_foolscap_plugin case # about passing in get_rref. storage_server = _StorageServer(get_rref=self.get_rref) return _FoolscapStorage.from_announcement( self._server_id, furl, ann, storage_server, ) # Nothing matched so we can't talk to this server. return _null_storage def get_permutation_seed(self): return self._storage.permutation_seed def get_name(self): # keep methodname short # TODO: decide who adds [] in the short description. It should # probably be the output side, not here. return self._storage.name def get_longname(self): return self._storage.longname def get_tubid(self): return self._storage.tubid def get_lease_seed(self): return self._storage.lease_seed def get_foolscap_write_enabler_seed(self): return self._storage.tubid def get_nickname(self): return self._storage.nickname def on_status_changed(self, status_changed): """ :param status_changed: a callable taking a single arg (the NativeStorageServer) that is notified when we become connected """ return self._on_status_changed.subscribe(status_changed) # Special methods used by copy.copy() and copy.deepcopy(). When those are # used in allmydata.immutable.filenode to copy CheckResults during # repair, we want it to treat the IServer instances as singletons, and # not attempt to duplicate them.. def __copy__(self): return self def __deepcopy__(self, memodict): return self def __repr__(self): return "" % self.get_name() def get_serverid(self): return self._server_id def get_version(self): if self._rref: return self._rref.version return None def get_announcement(self): return self.announcement def get_connection_status(self): last_received = None if self._rref: last_received = self._rref.getDataLastReceivedAt() return connection_status.from_foolscap_reconnector(self._reconnector, last_received) def is_connected(self): return self._is_connected def get_available_space(self): version = self.get_version() if version is None: return None protocol_v1_version = version.get(b'http://allmydata.org/tahoe/protocols/storage/v1', BytesKeyDict()) available_space = protocol_v1_version.get(b'available-space') if available_space is None: available_space = protocol_v1_version.get(b'maximum-immutable-share-size', None) return available_space def start_connecting(self, trigger_cb): self._tub = self._tub_maker(self._handler_overrides) self._tub.setServiceParent(self) self._trigger_cb = trigger_cb self._reconnector = self._storage.connect_to(self._tub, self._got_connection) def _got_connection(self, rref): lp = log.msg(format="got connection to %(name)s, getting versions", name=self.get_name(), facility="tahoe.storage_broker", umid="coUECQ") if self._trigger_cb: eventually(self._trigger_cb) default = self.VERSION_DEFAULTS d = add_version_to_remote_reference(rref, default) d.addCallback(self._got_versioned_service, lp) d.addCallback(lambda ign: self._on_status_changed.notify(self)) d.addErrback(log.err, format="storageclient._got_connection", name=self.get_name(), umid="Sdq3pg") def _got_versioned_service(self, rref, lp): log.msg(format="%(name)s provided version info %(version)s", name=self.get_name(), version=rref.version, facility="tahoe.storage_broker", umid="SWmJYg", level=log.NOISY, parent=lp) self.last_connect_time = time.time() self._rref = rref self._is_connected = True rref.notifyOnDisconnect(self._lost) def get_rref(self): return self._rref def get_storage_server(self): """ See ``IServer.get_storage_server``. """ if self._rref is None: return None return self._storage.storage_server def _lost(self): log.msg(format="lost connection to %(name)s", name=self.get_name(), facility="tahoe.storage_broker", umid="zbRllw") self.last_loss_time = time.time() # self._rref is now stale: all callRemote()s will get a # DeadReferenceError. We leave the stale reference in place so that # uploader/downloader code (which received this IServer through # get_connected_servers() or get_servers_for_psi()) can continue to # use s.get_rref().callRemote() and not worry about it being None. self._is_connected = False def stop_connecting(self): # used when this descriptor has been superceded by another self._reconnector.stopConnecting() def try_to_connect(self): # used when the broker wants us to hurry up self._reconnector.reset() class UnknownServerTypeError(Exception): pass @implementer(IStorageServer) @attr.s class _StorageServer(object): """ ``_StorageServer`` is a direct pass-through to an ``RIStorageServer`` via a ``RemoteReference``. """ _get_rref = attr.ib() @property def _rref(self): return self._get_rref() def get_version(self): return self._rref.callRemote( "get_version", ) def allocate_buckets( self, storage_index, renew_secret, cancel_secret, sharenums, allocated_size, canary, ): return self._rref.callRemote( "allocate_buckets", storage_index, renew_secret, cancel_secret, sharenums, allocated_size, canary, ) def add_lease( self, storage_index, renew_secret, cancel_secret, ): return self._rref.callRemote( "add_lease", storage_index, renew_secret, cancel_secret, ) def renew_lease( self, storage_index, renew_secret, ): return self._rref.callRemote( "renew_lease", storage_index, renew_secret, ) def get_buckets( self, storage_index, ): return self._rref.callRemote( "get_buckets", storage_index, ) def slot_readv( self, storage_index, shares, readv, ): return self._rref.callRemote( "slot_readv", storage_index, shares, readv, ) def slot_testv_and_readv_and_writev( self, storage_index, secrets, tw_vectors, r_vector, ): return self._rref.callRemote( "slot_testv_and_readv_and_writev", storage_index, secrets, tw_vectors, r_vector, ) def advise_corrupt_share( self, share_type, storage_index, shnum, reason, ): return self._rref.callRemoteOnly( "advise_corrupt_share", share_type, storage_index, shnum, reason, )