2009-06-01 21:06:04 +00:00
|
|
|
|
|
|
|
"""
|
|
|
|
I contain the client-side code which speaks to storage servers, in particular
|
|
|
|
the foolscap-based server implemented in src/allmydata/storage/*.py .
|
|
|
|
"""
|
|
|
|
|
|
|
|
# roadmap:
|
|
|
|
#
|
2009-06-23 02:10:47 +00:00
|
|
|
# 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
|
|
|
|
# create it, change uploader/servermap to get rrefs from it. ServerFarm calls
|
|
|
|
# IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
|
|
|
|
# to clients. webapi status pages call broker.get_info_about_serverid.
|
2009-06-01 21:06:04 +00:00
|
|
|
#
|
2009-06-23 02:10:47 +00:00
|
|
|
# 2: move get_info methods to the descriptor, webapi status pages call
|
|
|
|
# broker.get_descriptor_for_serverid().get_info
|
2009-06-01 21:06:04 +00:00
|
|
|
#
|
2009-06-23 02:10:47 +00:00
|
|
|
# 3?later?: store descriptors in UploadResults/etc instead of serverids,
|
|
|
|
# webapi status pages call descriptor.get_info and don't use storage_broker
|
|
|
|
# or Client
|
2009-06-01 21:06:04 +00:00
|
|
|
#
|
2009-06-23 02:10:47 +00:00
|
|
|
# 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
|
|
|
|
# optional. This closes #467
|
|
|
|
#
|
|
|
|
# 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
|
|
|
|
# clients. Clients stop doing callRemote(), use NativeStorageClient methods
|
|
|
|
# instead (which might do something else, i.e. http or whatever). The
|
|
|
|
# introducer and tahoe.cfg only create NativeStorageClients for now.
|
|
|
|
#
|
|
|
|
# 6: implement other sorts of IStorageClient classes: S3, etc
|
2009-06-01 21:06:04 +00:00
|
|
|
|
2020-09-21 14:45:05 +00:00
|
|
|
from past.builtins import unicode
|
2009-12-14 21:27:03 +00:00
|
|
|
|
2016-08-27 18:27:58 +00:00
|
|
|
import re, time, hashlib
|
2020-10-26 15:30:12 +00:00
|
|
|
|
|
|
|
# On Python 2 this will be the backport.
|
|
|
|
from configparser import NoSectionError
|
|
|
|
|
2019-05-31 17:40:51 +00:00
|
|
|
import attr
|
2019-07-02 18:57:20 +00:00
|
|
|
from zope.interface import (
|
|
|
|
Attribute,
|
|
|
|
Interface,
|
|
|
|
implementer,
|
|
|
|
)
|
2016-04-26 17:44:58 +00:00
|
|
|
from twisted.internet import defer
|
2016-05-02 15:23:07 +00:00
|
|
|
from twisted.application import service
|
2019-06-28 18:25:50 +00:00
|
|
|
from twisted.plugin import (
|
|
|
|
getPlugins,
|
|
|
|
)
|
2019-06-13 13:08:42 +00:00
|
|
|
from eliot import (
|
|
|
|
log_call,
|
|
|
|
)
|
2016-08-27 23:53:31 +00:00
|
|
|
from foolscap.api import eventually
|
2019-06-19 18:19:37 +00:00
|
|
|
from foolscap.reconnector import (
|
|
|
|
ReconnectionInfo,
|
|
|
|
)
|
2019-05-31 17:40:51 +00:00
|
|
|
from allmydata.interfaces import (
|
|
|
|
IStorageBroker,
|
|
|
|
IDisplayableServer,
|
|
|
|
IServer,
|
|
|
|
IStorageServer,
|
2019-06-28 18:25:50 +00:00
|
|
|
IFoolscapStoragePlugin,
|
2019-05-31 17:40:51 +00:00
|
|
|
)
|
2016-12-08 23:15:49 +00:00
|
|
|
from allmydata.util import log, base32, connection_status
|
2011-02-21 01:58:04 +00:00
|
|
|
from allmydata.util.assertutil import precondition
|
2016-07-22 00:23:22 +00:00
|
|
|
from allmydata.util.observer import ObserverList
|
2009-06-23 02:10:47 +00:00
|
|
|
from allmydata.util.rrefutil import add_version_to_remote_reference
|
2016-09-27 03:42:42 +00:00
|
|
|
from allmydata.util.hashutil import permute_server_hash
|
2020-11-04 18:09:55 +00:00
|
|
|
from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict
|
2020-10-30 18:34:23 +00:00
|
|
|
|
2009-06-23 02:10:47 +00:00
|
|
|
|
|
|
|
# who is responsible for de-duplication?
|
|
|
|
# both?
|
|
|
|
# IC remembers the unpacked announcements it receives, to provide for late
|
|
|
|
# subscribers and to remove duplicates
|
|
|
|
|
|
|
|
# if a client subscribes after startup, will they receive old announcements?
|
|
|
|
# yes
|
|
|
|
|
|
|
|
# who will be responsible for signature checking?
|
|
|
|
# make it be IntroducerClient, so they can push the filter outwards and
|
|
|
|
# reduce inbound network traffic
|
|
|
|
|
|
|
|
# what should the interface between StorageFarmBroker and IntroducerClient
|
|
|
|
# look like?
|
|
|
|
# don't pass signatures: only pass validated blessed-objects
|
2009-06-01 21:06:04 +00:00
|
|
|
|
2019-06-28 18:24:58 +00:00
|
|
|
@attr.s
|
|
|
|
class StorageClientConfig(object):
|
2019-07-02 14:05:02 +00:00
|
|
|
"""
|
|
|
|
Configuration for a node acting as a storage client.
|
|
|
|
|
2019-07-05 12:48:14 +00:00
|
|
|
:ivar preferred_peers: An iterable of the server-ids (``bytes``) of the
|
|
|
|
storage servers where share placement is preferred, in order of
|
|
|
|
decreasing preference. See the *[client]peers.preferred*
|
|
|
|
documentation for details.
|
2019-07-02 14:05:02 +00:00
|
|
|
|
2020-10-30 18:34:23 +00:00
|
|
|
:ivar dict[unicode, dict[unicode, unicode]] storage_plugins: A mapping from
|
2019-07-02 14:05:02 +00:00
|
|
|
names of ``IFoolscapStoragePlugin`` configured in *tahoe.cfg* to the
|
|
|
|
respective configuration.
|
|
|
|
"""
|
2019-06-28 18:24:58 +00:00
|
|
|
preferred_peers = attr.ib(default=())
|
2019-06-28 18:25:50 +00:00
|
|
|
storage_plugins = attr.ib(default=attr.Factory(dict))
|
|
|
|
|
2019-06-28 18:24:58 +00:00
|
|
|
@classmethod
|
|
|
|
def from_node_config(cls, config):
|
2019-07-02 14:05:02 +00:00
|
|
|
"""
|
|
|
|
Create a ``StorageClientConfig`` from a complete Tahoe-LAFS node
|
|
|
|
configuration.
|
|
|
|
|
|
|
|
:param _Config config: The loaded Tahoe-LAFS node configuration.
|
|
|
|
"""
|
2020-10-30 18:34:23 +00:00
|
|
|
ps = config.get_config("client", "peers.preferred", "").split(",")
|
|
|
|
preferred_peers = tuple([p.strip() for p in ps if p != ""])
|
2016-04-26 07:22:52 +00:00
|
|
|
|
2019-06-28 18:25:50 +00:00
|
|
|
enabled_storage_plugins = (
|
|
|
|
name.strip()
|
|
|
|
for name
|
|
|
|
in config.get_config(
|
2020-10-30 18:34:23 +00:00
|
|
|
"client",
|
|
|
|
"storage.plugins",
|
|
|
|
"",
|
|
|
|
).split(u",")
|
2019-06-28 18:25:50 +00:00
|
|
|
if name.strip()
|
|
|
|
)
|
|
|
|
|
|
|
|
storage_plugins = {}
|
|
|
|
for plugin_name in enabled_storage_plugins:
|
|
|
|
try:
|
2020-10-30 18:34:23 +00:00
|
|
|
plugin_config = config.items("storageclient.plugins." + plugin_name)
|
2019-06-28 18:25:50 +00:00
|
|
|
except NoSectionError:
|
2019-07-03 16:08:58 +00:00
|
|
|
plugin_config = []
|
|
|
|
storage_plugins[plugin_name] = dict(plugin_config)
|
2019-06-28 18:25:50 +00:00
|
|
|
|
2019-06-28 18:24:58 +00:00
|
|
|
return cls(
|
|
|
|
preferred_peers,
|
2019-06-28 18:25:50 +00:00
|
|
|
storage_plugins,
|
2019-06-28 18:24:58 +00:00
|
|
|
)
|
2019-06-28 18:25:50 +00:00
|
|
|
|
|
|
|
|
2017-02-27 17:56:49 +00:00
|
|
|
@implementer(IStorageBroker)
|
2016-05-02 15:23:07 +00:00
|
|
|
class StorageFarmBroker(service.MultiService):
|
2009-06-01 21:06:04 +00:00
|
|
|
"""I live on the client, and know about storage servers. For each server
|
|
|
|
that is participating in a grid, I either maintain a connection to it or
|
|
|
|
remember enough information to establish a connection to it on demand.
|
|
|
|
I'm also responsible for subscribing to the IntroducerClient to find out
|
|
|
|
about new servers as they are announced by the Introducer.
|
2019-07-02 14:07:21 +00:00
|
|
|
|
|
|
|
:ivar StorageClientConfig storage_client_config: Values from the node
|
|
|
|
configuration file relating to storage behavior.
|
2009-06-01 21:06:04 +00:00
|
|
|
"""
|
2019-06-28 18:24:58 +00:00
|
|
|
|
|
|
|
@property
|
|
|
|
def preferred_peers(self):
|
|
|
|
return self.storage_client_config.preferred_peers
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
permute_peers,
|
|
|
|
tub_maker,
|
2019-08-19 20:09:26 +00:00
|
|
|
node_config,
|
2019-06-28 18:24:58 +00:00
|
|
|
storage_client_config=None,
|
|
|
|
):
|
2016-05-02 15:23:07 +00:00
|
|
|
service.MultiService.__init__(self)
|
2009-06-01 21:06:04 +00:00
|
|
|
assert permute_peers # False not implemented yet
|
|
|
|
self.permute_peers = permute_peers
|
2016-08-27 23:53:31 +00:00
|
|
|
self._tub_maker = tub_maker
|
2019-06-28 18:24:58 +00:00
|
|
|
|
2019-08-19 20:09:26 +00:00
|
|
|
self.node_config = node_config
|
|
|
|
|
2019-06-28 18:24:58 +00:00
|
|
|
if storage_client_config is None:
|
|
|
|
storage_client_config = StorageClientConfig()
|
|
|
|
self.storage_client_config = storage_client_config
|
2016-05-02 15:23:07 +00:00
|
|
|
|
2011-02-21 01:58:04 +00:00
|
|
|
# self.servers maps serverid -> IServer, and keeps track of all the
|
|
|
|
# storage servers that we've heard about. Each descriptor manages its
|
|
|
|
# own Reconnector, and will give us a RemoteReference when we ask
|
|
|
|
# them for it.
|
2020-10-30 18:34:23 +00:00
|
|
|
self.servers = BytesKeyDict()
|
2016-08-27 00:31:02 +00:00
|
|
|
self._static_server_ids = set() # ignore announcements for these
|
2009-06-01 21:06:04 +00:00
|
|
|
self.introducer_client = None
|
2016-07-22 00:23:22 +00:00
|
|
|
self._threshold_listeners = [] # tuples of (threshold, Deferred)
|
|
|
|
self._connected_high_water_mark = 0
|
2016-04-26 07:22:52 +00:00
|
|
|
|
2019-06-13 13:08:42 +00:00
|
|
|
@log_call(action_type=u"storage-client:broker:set-static-servers")
|
2016-08-27 00:31:02 +00:00
|
|
|
def set_static_servers(self, servers):
|
2019-06-21 12:38:57 +00:00
|
|
|
# Sorting the items gives us a deterministic processing order. This
|
|
|
|
# doesn't really matter but it makes the logging behavior more
|
|
|
|
# predictable and easier to test (and at least one test does depend on
|
|
|
|
# this sorted order).
|
|
|
|
for (server_id, server) in sorted(servers.items()):
|
2019-06-13 13:08:42 +00:00
|
|
|
try:
|
2020-10-02 15:57:27 +00:00
|
|
|
storage_server = self._make_storage_server(
|
|
|
|
server_id.encode("utf-8"),
|
|
|
|
server,
|
|
|
|
)
|
2019-06-13 13:08:42 +00:00
|
|
|
except Exception:
|
2019-06-28 18:27:03 +00:00
|
|
|
# TODO: The _make_storage_server failure is logged but maybe
|
|
|
|
# we should write a traceback here. Notably, tests don't
|
|
|
|
# automatically fail just because we hit this case. Well
|
|
|
|
# written tests will still fail if a surprising exception
|
|
|
|
# arrives here but they might be harder to debug without this
|
|
|
|
# information.
|
2020-10-30 18:34:23 +00:00
|
|
|
raise
|
2019-06-13 13:08:42 +00:00
|
|
|
else:
|
2020-10-30 18:34:23 +00:00
|
|
|
if isinstance(server_id, unicode):
|
|
|
|
server_id = server_id.encode("utf-8")
|
2019-06-13 13:08:42 +00:00
|
|
|
self._static_server_ids.add(server_id)
|
|
|
|
self.servers[server_id] = storage_server
|
|
|
|
storage_server.setServiceParent(self)
|
|
|
|
storage_server.start_connecting(self._trigger_connections)
|
|
|
|
|
2019-08-19 15:21:03 +00:00
|
|
|
def get_client_storage_plugin_web_resources(self, node_config):
|
2019-07-24 19:37:24 +00:00
|
|
|
"""
|
|
|
|
Get all of the client-side ``IResource`` implementations provided by
|
|
|
|
enabled storage plugins.
|
2019-08-19 15:21:03 +00:00
|
|
|
|
|
|
|
:param allmydata.node._Config node_config: The complete node
|
|
|
|
configuration for the node from which these web resources will be
|
|
|
|
served.
|
|
|
|
|
|
|
|
:return dict[unicode, IResource]: Resources for all of the plugins.
|
2019-07-24 19:37:24 +00:00
|
|
|
"""
|
|
|
|
plugins = {
|
|
|
|
plugin.name: plugin
|
|
|
|
for plugin
|
|
|
|
in getPlugins(IFoolscapStoragePlugin)
|
|
|
|
}
|
|
|
|
return {
|
2019-08-19 15:21:03 +00:00
|
|
|
name: plugins[name].get_client_resource(node_config)
|
2019-07-24 19:37:24 +00:00
|
|
|
for (name, config)
|
|
|
|
in self.storage_client_config.storage_plugins.items()
|
|
|
|
}
|
|
|
|
|
2019-06-13 13:08:42 +00:00
|
|
|
@log_call(
|
|
|
|
action_type=u"storage-client:broker:make-storage-server",
|
|
|
|
include_args=["server_id"],
|
|
|
|
include_result=False,
|
|
|
|
)
|
|
|
|
def _make_storage_server(self, server_id, server):
|
2020-10-02 15:57:27 +00:00
|
|
|
"""
|
|
|
|
Create a new ``IServer`` for the given storage server announcement.
|
|
|
|
|
|
|
|
:param bytes server_id: The unique identifier for the server.
|
|
|
|
|
|
|
|
:param dict server: The server announcement. See ``Static Server
|
|
|
|
Definitions`` in the configuration documentation for details about
|
|
|
|
the structure and contents.
|
|
|
|
|
|
|
|
:return IServer: The object-y representation of the server described
|
|
|
|
by the given announcement.
|
|
|
|
"""
|
|
|
|
assert isinstance(server_id, bytes)
|
2019-06-13 13:08:42 +00:00
|
|
|
handler_overrides = server.get("connections", {})
|
2019-06-28 18:27:47 +00:00
|
|
|
s = NativeStorageServer(
|
|
|
|
server_id,
|
|
|
|
server["ann"],
|
|
|
|
self._tub_maker,
|
|
|
|
handler_overrides,
|
2019-08-19 20:09:26 +00:00
|
|
|
self.node_config,
|
2019-06-28 18:27:47 +00:00
|
|
|
self.storage_client_config,
|
|
|
|
)
|
2019-06-13 13:08:42 +00:00
|
|
|
s.on_status_changed(lambda _: self._got_connection())
|
|
|
|
return s
|
2016-08-27 00:31:02 +00:00
|
|
|
|
2016-07-22 00:23:22 +00:00
|
|
|
def when_connected_enough(self, threshold):
|
|
|
|
"""
|
|
|
|
:returns: a Deferred that fires if/when our high water mark for
|
|
|
|
number of connected servers becomes (or ever was) above
|
|
|
|
"threshold".
|
|
|
|
"""
|
|
|
|
d = defer.Deferred()
|
|
|
|
self._threshold_listeners.append( (threshold, d) )
|
|
|
|
self._check_connected_high_water_mark()
|
|
|
|
return d
|
2009-06-23 02:10:47 +00:00
|
|
|
|
|
|
|
# these two are used in unit tests
|
new introducer: signed extensible dictionary-based messages! refs #466
This introduces new client and server halves to the Introducer (renaming the
old one with a _V1 suffix). Both have fallbacks to accomodate talking to a
different version: the publishing client switches on whether the server's
.get_version() advertises V2 support, the server switches on which
subscription method was invoked by the subscribing client.
The V2 protocol sends a three-tuple of (serialized announcement dictionary,
signature, pubkey) for each announcement. The V2 server dispatches messages
to subscribers according to the service-name, and throws errors for invalid
signatures, but does not otherwise examine the messages. The V2 receiver's
subscription callback will receive a (serverid, ann_dict) pair. The
'serverid' will be equal to the pubkey if all of the following are true:
the originating client is V2, and was told a privkey to use
the announcement went through a V2 server
the signature is valid
If not, 'serverid' will be equal to the tubid portion of the announced FURL,
as was the case for V1 receivers.
Servers will create a keypair if one does not exist yet, stored in
private/server.privkey .
The signed announcement dictionary puts the server FURL in a key named
"anonymous-storage-FURL", which anticipates upcoming Accounting-related
changes in the server advertisements. It also provides a key named
"permutation-seed-base32" to tell clients what permutation seed to use. This
is computed at startup, using tubid if there are existing shares, otherwise
the pubkey, to retain share-order compatibility for existing servers.
2011-11-20 10:21:32 +00:00
|
|
|
def test_add_rref(self, serverid, rref, ann):
|
2019-06-28 18:50:46 +00:00
|
|
|
s = self._make_storage_server(
|
2020-10-02 15:57:27 +00:00
|
|
|
serverid,
|
2019-06-28 18:50:46 +00:00
|
|
|
{"ann": ann.copy()},
|
|
|
|
)
|
2019-05-31 15:09:20 +00:00
|
|
|
s._rref = rref
|
2012-06-15 01:48:55 +00:00
|
|
|
s._is_connected = True
|
2011-02-27 02:10:56 +00:00
|
|
|
self.servers[serverid] = s
|
|
|
|
|
2016-08-27 00:29:39 +00:00
|
|
|
def test_add_server(self, server_id, s):
|
2016-04-26 17:44:58 +00:00
|
|
|
s.on_status_changed(lambda _: self._got_connection())
|
2016-08-27 00:29:39 +00:00
|
|
|
self.servers[server_id] = s
|
2009-06-23 02:10:47 +00:00
|
|
|
|
2009-06-01 21:06:04 +00:00
|
|
|
def use_introducer(self, introducer_client):
|
|
|
|
self.introducer_client = ic = introducer_client
|
2009-06-23 02:10:47 +00:00
|
|
|
ic.subscribe_to("storage", self._got_announcement)
|
|
|
|
|
2016-04-26 17:44:58 +00:00
|
|
|
def _got_connection(self):
|
2019-06-28 18:28:32 +00:00
|
|
|
# this is called by NativeStorageServer when it is connected
|
2016-07-22 00:23:22 +00:00
|
|
|
self._check_connected_high_water_mark()
|
|
|
|
|
|
|
|
def _check_connected_high_water_mark(self):
|
|
|
|
current = len(self.get_connected_servers())
|
|
|
|
if current > self._connected_high_water_mark:
|
|
|
|
self._connected_high_water_mark = current
|
|
|
|
|
|
|
|
remaining = []
|
|
|
|
for threshold, d in self._threshold_listeners:
|
|
|
|
if self._connected_high_water_mark >= threshold:
|
|
|
|
eventually(d.callback, None)
|
|
|
|
else:
|
|
|
|
remaining.append( (threshold, d) )
|
|
|
|
self._threshold_listeners = remaining
|
2016-04-26 17:44:58 +00:00
|
|
|
|
2020-10-02 17:49:36 +00:00
|
|
|
def _should_ignore_announcement(self, server_id, ann):
|
|
|
|
"""
|
|
|
|
Determine whether a new storage announcement should be discarded or used
|
|
|
|
to update our collection of storage servers.
|
|
|
|
|
|
|
|
:param bytes server_id: The unique identifier for the storage server
|
|
|
|
which made the announcement.
|
|
|
|
|
|
|
|
:param dict ann: The announcement.
|
|
|
|
|
|
|
|
:return bool: ``True`` if the announcement should be ignored,
|
|
|
|
``False`` if it should be used to update our local storage server
|
|
|
|
state.
|
|
|
|
"""
|
|
|
|
# Let local static configuration always override any announcement for
|
|
|
|
# a particular server.
|
2016-08-27 00:31:02 +00:00
|
|
|
if server_id in self._static_server_ids:
|
|
|
|
log.msg(format="ignoring announcement for static server '%(id)s'",
|
|
|
|
id=server_id,
|
|
|
|
facility="tahoe.storage_broker", umid="AlxzqA",
|
|
|
|
level=log.UNUSUAL)
|
2020-10-02 17:49:36 +00:00
|
|
|
return True
|
|
|
|
|
|
|
|
try:
|
|
|
|
old = self.servers[server_id]
|
|
|
|
except KeyError:
|
|
|
|
# We don't know anything about this server. Let's use the
|
|
|
|
# announcement to change that.
|
|
|
|
return False
|
|
|
|
else:
|
|
|
|
# Determine if this announcement is at all difference from the
|
|
|
|
# announcement we already have for the server. If it is the same,
|
|
|
|
# we don't need to change anything.
|
|
|
|
return old.get_announcement() == ann
|
|
|
|
|
|
|
|
def _got_announcement(self, key_s, ann):
|
|
|
|
"""
|
|
|
|
This callback is given to the introducer and called any time an
|
|
|
|
announcement is received which has a valid signature and does not have
|
|
|
|
a sequence number less than or equal to a previous sequence number
|
|
|
|
seen for that server by that introducer.
|
|
|
|
|
|
|
|
Note sequence numbers are not considered between different introducers
|
|
|
|
so if we use more than one introducer it is possible for them to
|
|
|
|
deliver us stale announcements in some cases.
|
|
|
|
"""
|
2020-10-30 18:21:16 +00:00
|
|
|
precondition(isinstance(key_s, bytes), key_s)
|
|
|
|
precondition(key_s.startswith(b"v0-"), key_s)
|
2020-10-02 17:49:36 +00:00
|
|
|
precondition(ann["service-name"] == "storage", ann["service-name"])
|
|
|
|
server_id = key_s
|
|
|
|
|
|
|
|
if self._should_ignore_announcement(server_id, ann):
|
2016-08-27 00:31:02 +00:00
|
|
|
return
|
2020-10-02 15:57:27 +00:00
|
|
|
|
2019-06-28 18:26:50 +00:00
|
|
|
s = self._make_storage_server(
|
2020-10-02 15:57:27 +00:00
|
|
|
server_id,
|
2019-06-28 18:26:50 +00:00
|
|
|
{u"ann": ann},
|
|
|
|
)
|
2020-10-02 17:49:36 +00:00
|
|
|
|
|
|
|
try:
|
|
|
|
old = self.servers.pop(server_id)
|
|
|
|
except KeyError:
|
|
|
|
pass
|
|
|
|
else:
|
|
|
|
# It's a replacement, get rid of the old one.
|
2009-06-23 02:10:47 +00:00
|
|
|
old.stop_connecting()
|
2016-05-03 22:09:13 +00:00
|
|
|
old.disownServiceParent()
|
|
|
|
# NOTE: this disownServiceParent() returns a Deferred that
|
|
|
|
# doesn't fire until Tub.stopService fires, which will wait for
|
|
|
|
# any existing connections to be shut down. This doesn't
|
|
|
|
# generally matter for normal runtime, but unit tests can run
|
|
|
|
# into DirtyReactorErrors if they don't block on these. If a test
|
|
|
|
# replaces one server with a newer version, then terminates
|
|
|
|
# before the old one has been shut down, it might get
|
|
|
|
# DirtyReactorErrors. The fix would be to gather these Deferreds
|
|
|
|
# into a structure that will block StorageFarmBroker.stopService
|
|
|
|
# until they have fired (but hopefully don't keep reference
|
|
|
|
# cycles around when they fire earlier than that, which will
|
|
|
|
# almost always be the case for normal runtime).
|
2020-10-02 17:49:36 +00:00
|
|
|
|
2016-04-26 17:44:58 +00:00
|
|
|
# now we forget about them and start using the new one
|
2016-05-03 22:09:13 +00:00
|
|
|
s.setServiceParent(self)
|
2016-08-24 21:11:58 +00:00
|
|
|
self.servers[server_id] = s
|
2016-05-03 22:09:13 +00:00
|
|
|
s.start_connecting(self._trigger_connections)
|
2009-06-23 02:10:47 +00:00
|
|
|
# the descriptor will manage their own Reconnector, and each time we
|
|
|
|
# need servers, we'll ask them if they're connected or not.
|
|
|
|
|
|
|
|
def _trigger_connections(self):
|
|
|
|
# when one connection is established, reset the timers on all others,
|
|
|
|
# to trigger a reconnection attempt in one second. This is intended
|
|
|
|
# to accelerate server connections when we've been offline for a
|
|
|
|
# while. The goal is to avoid hanging out for a long time with
|
|
|
|
# connections to only a subset of the servers, which would increase
|
|
|
|
# the chances that we'll put shares in weird places (and not update
|
|
|
|
# existing shares of mutable files). See #374 for more details.
|
2011-02-21 01:58:04 +00:00
|
|
|
for dsc in self.servers.values():
|
2009-06-23 02:10:47 +00:00
|
|
|
dsc.try_to_connect()
|
|
|
|
|
2011-02-21 01:58:04 +00:00
|
|
|
def get_servers_for_psi(self, peer_selection_index):
|
|
|
|
# return a list of server objects (IServers)
|
2009-06-01 21:06:04 +00:00
|
|
|
assert self.permute_peers == True
|
2015-12-01 18:47:50 +00:00
|
|
|
connected_servers = self.get_connected_servers()
|
|
|
|
preferred_servers = frozenset(s for s in connected_servers if s.get_longname() in self.preferred_peers)
|
2011-02-21 01:58:04 +00:00
|
|
|
def _permuted(server):
|
|
|
|
seed = server.get_permutation_seed()
|
2015-12-01 18:47:50 +00:00
|
|
|
is_unpreferred = server not in preferred_servers
|
2016-09-27 03:42:42 +00:00
|
|
|
return (is_unpreferred,
|
|
|
|
permute_server_hash(peer_selection_index, seed))
|
2015-12-01 18:47:50 +00:00
|
|
|
return sorted(connected_servers, key=_permuted)
|
2009-06-01 21:06:04 +00:00
|
|
|
|
|
|
|
def get_all_serverids(self):
|
2011-08-01 17:44:52 +00:00
|
|
|
return frozenset(self.servers.keys())
|
2009-06-23 02:10:47 +00:00
|
|
|
|
2011-02-21 01:58:04 +00:00
|
|
|
def get_connected_servers(self):
|
2012-06-15 01:48:55 +00:00
|
|
|
return frozenset([s for s in self.servers.values() if s.is_connected()])
|
2011-02-21 01:58:04 +00:00
|
|
|
|
|
|
|
def get_known_servers(self):
|
2011-08-01 17:44:52 +00:00
|
|
|
return frozenset(self.servers.values())
|
2009-06-01 21:06:04 +00:00
|
|
|
|
|
|
|
def get_nickname_for_serverid(self, serverid):
|
2011-02-21 01:58:04 +00:00
|
|
|
if serverid in self.servers:
|
|
|
|
return self.servers[serverid].get_nickname()
|
2009-06-01 21:06:04 +00:00
|
|
|
return None
|
|
|
|
|
2012-05-22 04:17:27 +00:00
|
|
|
def get_stub_server(self, serverid):
|
|
|
|
if serverid in self.servers:
|
|
|
|
return self.servers[serverid]
|
make IServer.get_serverid() use pubkey, not tubid
This is a change I've wanted to make for many years, because when we get
to HTTP-based servers, we won't have tubids for them. What held me back
was that there's code all over the place that uses the serverid for
various purposes, so I wasn't sure it was safe. I did a big push a few
years ago to use IServer instances instead of serverids in most
places (in #1363), and to split out the values that actually depend upon
tubid into separate accessors (like get_lease_seed and
get_foolscap_write_enabler_seed), which I think took care of all the
important uses.
There are a number of places that use get_serverid() as dictionary key
to track shares (Checker results, mutable servermap). I believe these
are happy to use pubkeys instead of tubids: the only thing they do with
get_serverid() is to compare it to other values obtained from
get_serverid(). A few places in the WUI used serverid to compute display
values: these were fixed.
The main trouble was the Helper: it returns a HelperUploadResults (a
Copyable) with a share->server mapping that's keyed by whatever the
Helper's get_serverid() returns. If the uploader and the helper are on
different sides of this change, the Helper could return values that the
uploader won't recognize. This is cosmetic: that mapping is only used to
display the upload results on the "Recent and Active Operations" page.
I've added code to StorageFarmBroker.get_stub_server() to fall back to
tubids when looking up a server, so this should still work correctly
when the uploader is new and the Helper is old. If the Helper is new and
the uploader is old, the upload results will show unusual server ids.
refs ticket:1363
2016-08-26 19:16:17 +00:00
|
|
|
# some time before 1.12, we changed "serverid" to be "key_s" (the
|
|
|
|
# printable verifying key, used in V2 announcements), instead of the
|
|
|
|
# tubid. When the immutable uploader delegates work to a Helper,
|
|
|
|
# get_stub_server() is used to map the returning server identifiers
|
|
|
|
# to IDisplayableServer instances (to get a name, for display on the
|
|
|
|
# Upload Results web page). If the Helper is running 1.12 or newer,
|
|
|
|
# it will send pubkeys, but if it's still running 1.11, it will send
|
|
|
|
# tubids. This clause maps the old tubids to our existing servers.
|
|
|
|
for s in self.servers.values():
|
|
|
|
if isinstance(s, NativeStorageServer):
|
2019-06-19 18:19:37 +00:00
|
|
|
if serverid == s.get_tubid():
|
make IServer.get_serverid() use pubkey, not tubid
This is a change I've wanted to make for many years, because when we get
to HTTP-based servers, we won't have tubids for them. What held me back
was that there's code all over the place that uses the serverid for
various purposes, so I wasn't sure it was safe. I did a big push a few
years ago to use IServer instances instead of serverids in most
places (in #1363), and to split out the values that actually depend upon
tubid into separate accessors (like get_lease_seed and
get_foolscap_write_enabler_seed), which I think took care of all the
important uses.
There are a number of places that use get_serverid() as dictionary key
to track shares (Checker results, mutable servermap). I believe these
are happy to use pubkeys instead of tubids: the only thing they do with
get_serverid() is to compare it to other values obtained from
get_serverid(). A few places in the WUI used serverid to compute display
values: these were fixed.
The main trouble was the Helper: it returns a HelperUploadResults (a
Copyable) with a share->server mapping that's keyed by whatever the
Helper's get_serverid() returns. If the uploader and the helper are on
different sides of this change, the Helper could return values that the
uploader won't recognize. This is cosmetic: that mapping is only used to
display the upload results on the "Recent and Active Operations" page.
I've added code to StorageFarmBroker.get_stub_server() to fall back to
tubids when looking up a server, so this should still work correctly
when the uploader is new and the Helper is old. If the Helper is new and
the uploader is old, the upload results will show unusual server ids.
refs ticket:1363
2016-08-26 19:16:17 +00:00
|
|
|
return s
|
2012-05-22 04:17:27 +00:00
|
|
|
return StubServer(serverid)
|
|
|
|
|
2017-02-27 17:56:49 +00:00
|
|
|
@implementer(IDisplayableServer)
|
|
|
|
class StubServer(object):
|
2012-05-22 04:17:27 +00:00
|
|
|
def __init__(self, serverid):
|
|
|
|
self.serverid = serverid # binary tubid
|
|
|
|
def get_serverid(self):
|
|
|
|
return self.serverid
|
|
|
|
def get_name(self):
|
|
|
|
return base32.b2a(self.serverid)[:8]
|
|
|
|
def get_longname(self):
|
|
|
|
return base32.b2a(self.serverid)
|
|
|
|
def get_nickname(self):
|
|
|
|
return "?"
|
|
|
|
|
2019-06-21 19:26:08 +00:00
|
|
|
|
2019-07-02 18:57:20 +00:00
|
|
|
class IFoolscapStorageServer(Interface):
|
|
|
|
"""
|
|
|
|
An internal interface that mediates between ``NativeStorageServer`` and
|
|
|
|
Foolscap-based ``IStorageServer`` implementations.
|
|
|
|
"""
|
|
|
|
nickname = Attribute("""
|
|
|
|
A name for this server for presentation to users.
|
|
|
|
""")
|
|
|
|
permutation_seed = Attribute("""
|
|
|
|
A stable value associated with this server which a client can use as an
|
|
|
|
input to the server selection permutation ordering.
|
|
|
|
""")
|
|
|
|
tubid = Attribute("""
|
|
|
|
The identifier for the Tub in which the server is run.
|
|
|
|
""")
|
|
|
|
storage_server = Attribute("""
|
|
|
|
An IStorageServer provide which implements a concrete Foolscap-based
|
|
|
|
protocol for communicating with the server.
|
|
|
|
""")
|
|
|
|
name = Attribute("""
|
|
|
|
Another name for this server for presentation to users.
|
|
|
|
""")
|
|
|
|
longname = Attribute("""
|
|
|
|
*Another* name for this server for presentation to users.
|
|
|
|
""")
|
|
|
|
lease_seed = Attribute("""
|
|
|
|
A stable value associated with this server which a client can use as an
|
|
|
|
input to a lease secret generation function.
|
|
|
|
""")
|
|
|
|
|
|
|
|
def connect_to(tub, got_connection):
|
|
|
|
"""
|
|
|
|
Attempt to establish and maintain a connection to the server.
|
|
|
|
|
|
|
|
:param Tub tub: A Foolscap Tub from which the connection is to
|
|
|
|
originate.
|
|
|
|
|
|
|
|
:param got_connection: A one-argument callable which is called with a
|
|
|
|
Foolscap ``RemoteReference`` when a connection is established.
|
|
|
|
This may be called multiple times if the connection is lost and
|
|
|
|
then re-established.
|
|
|
|
|
|
|
|
:return foolscap.reconnector.Reconnector: An object which manages the
|
|
|
|
connection and reconnection attempts.
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
@implementer(IFoolscapStorageServer)
|
2019-06-21 19:26:08 +00:00
|
|
|
@attr.s(frozen=True)
|
2019-07-02 18:57:20 +00:00
|
|
|
class _FoolscapStorage(object):
|
2019-06-19 18:19:37 +00:00
|
|
|
"""
|
2019-07-02 18:57:20 +00:00
|
|
|
Abstraction for connecting to a storage server exposed via Foolscap.
|
2019-06-19 18:19:37 +00:00
|
|
|
"""
|
2019-06-21 19:26:08 +00:00
|
|
|
nickname = attr.ib()
|
|
|
|
permutation_seed = attr.ib()
|
|
|
|
tubid = attr.ib()
|
|
|
|
|
2019-08-20 13:28:05 +00:00
|
|
|
storage_server = attr.ib(validator=attr.validators.provides(IStorageServer))
|
2019-07-02 18:57:20 +00:00
|
|
|
|
2019-06-21 19:26:08 +00:00
|
|
|
_furl = attr.ib()
|
|
|
|
_short_description = attr.ib()
|
|
|
|
_long_description = attr.ib()
|
|
|
|
|
2019-07-02 18:57:20 +00:00
|
|
|
|
2019-06-21 19:26:08 +00:00
|
|
|
@property
|
|
|
|
def name(self):
|
|
|
|
return self._short_description
|
|
|
|
|
|
|
|
@property
|
|
|
|
def longname(self):
|
|
|
|
return self._long_description
|
|
|
|
|
|
|
|
@property
|
|
|
|
def lease_seed(self):
|
|
|
|
return self.tubid
|
|
|
|
|
2019-06-19 18:19:37 +00:00
|
|
|
@classmethod
|
2019-07-02 18:57:20 +00:00
|
|
|
def from_announcement(cls, server_id, furl, ann, storage_server):
|
2019-06-19 18:19:37 +00:00
|
|
|
"""
|
2019-07-02 18:57:20 +00:00
|
|
|
Create an instance from a fURL and an announcement like::
|
2019-06-19 18:19:37 +00:00
|
|
|
|
2019-07-02 18:57:20 +00:00
|
|
|
{"permutation-seed-base32": "...",
|
2019-06-19 18:19:37 +00:00
|
|
|
"nickname": "...",
|
|
|
|
}
|
|
|
|
|
|
|
|
*nickname* is optional.
|
|
|
|
"""
|
2020-09-21 14:45:05 +00:00
|
|
|
m = re.match(br'pb://(\w+)@', furl)
|
2019-06-21 19:26:08 +00:00
|
|
|
assert m, furl
|
2019-06-19 18:19:37 +00:00
|
|
|
tubid_s = m.group(1).lower()
|
2019-06-21 19:26:08 +00:00
|
|
|
tubid = base32.a2b(tubid_s)
|
2019-06-19 18:19:37 +00:00
|
|
|
if "permutation-seed-base32" in ann:
|
2020-09-21 15:01:51 +00:00
|
|
|
seed = ann["permutation-seed-base32"]
|
|
|
|
if isinstance(seed, unicode):
|
|
|
|
seed = seed.encode("utf-8")
|
|
|
|
ps = base32.a2b(seed)
|
2020-10-30 18:34:23 +00:00
|
|
|
elif re.search(br'^v0-[0-9a-zA-Z]{52}$', server_id):
|
2019-06-19 18:19:37 +00:00
|
|
|
ps = base32.a2b(server_id[3:])
|
|
|
|
else:
|
|
|
|
log.msg("unable to parse serverid '%(server_id)s as pubkey, "
|
|
|
|
"hashing it to get permutation-seed, "
|
|
|
|
"may not converge with other clients",
|
|
|
|
server_id=server_id,
|
|
|
|
facility="tahoe.storage_broker",
|
|
|
|
level=log.UNUSUAL, umid="qu86tw")
|
|
|
|
ps = hashlib.sha256(server_id).digest()
|
2019-06-21 19:26:08 +00:00
|
|
|
permutation_seed = ps
|
2019-06-19 18:19:37 +00:00
|
|
|
|
|
|
|
assert server_id
|
2019-06-21 19:26:08 +00:00
|
|
|
long_description = server_id
|
2020-09-21 15:01:51 +00:00
|
|
|
if server_id.startswith(b"v0-"):
|
2019-06-19 18:19:37 +00:00
|
|
|
# remove v0- prefix from abbreviated name
|
2019-06-21 19:26:08 +00:00
|
|
|
short_description = server_id[3:3+8]
|
2019-06-19 18:19:37 +00:00
|
|
|
else:
|
2019-06-21 19:26:08 +00:00
|
|
|
short_description = server_id[:8]
|
|
|
|
nickname = ann.get("nickname", "")
|
|
|
|
|
|
|
|
return cls(
|
|
|
|
nickname=nickname,
|
|
|
|
permutation_seed=permutation_seed,
|
|
|
|
tubid=tubid,
|
2019-07-02 18:57:20 +00:00
|
|
|
storage_server=storage_server,
|
|
|
|
furl=furl,
|
2019-06-21 19:26:08 +00:00
|
|
|
short_description=short_description,
|
|
|
|
long_description=long_description,
|
|
|
|
)
|
2019-06-19 18:19:37 +00:00
|
|
|
|
|
|
|
def connect_to(self, tub, got_connection):
|
|
|
|
return tub.connectTo(self._furl, got_connection)
|
|
|
|
|
|
|
|
|
2019-07-02 18:57:20 +00:00
|
|
|
@implementer(IFoolscapStorageServer)
|
2019-06-19 18:19:37 +00:00
|
|
|
class _NullStorage(object):
|
|
|
|
"""
|
|
|
|
Abstraction for *not* communicating with a storage server of a type with
|
|
|
|
which we can't communicate.
|
|
|
|
"""
|
2019-06-21 19:26:08 +00:00
|
|
|
nickname = ""
|
2020-08-26 14:53:02 +00:00
|
|
|
permutation_seed = hashlib.sha256(b"").digest()
|
|
|
|
tubid = hashlib.sha256(b"").digest()
|
2019-07-02 18:57:20 +00:00
|
|
|
storage_server = None
|
|
|
|
|
2020-08-26 14:53:02 +00:00
|
|
|
lease_seed = hashlib.sha256(b"").digest()
|
2019-06-19 18:19:37 +00:00
|
|
|
|
2019-06-21 19:26:08 +00:00
|
|
|
name = "<unsupported>"
|
|
|
|
longname = "<storage with unsupported protocol>"
|
2019-06-19 18:19:37 +00:00
|
|
|
|
|
|
|
def connect_to(self, tub, got_connection):
|
|
|
|
return NonReconnector()
|
|
|
|
|
|
|
|
|
|
|
|
class NonReconnector(object):
|
|
|
|
"""
|
|
|
|
A ``foolscap.reconnector.Reconnector``-alike that doesn't do anything.
|
|
|
|
"""
|
|
|
|
def stopConnecting(self):
|
|
|
|
pass
|
|
|
|
|
|
|
|
def reset(self):
|
|
|
|
pass
|
|
|
|
|
|
|
|
def getReconnectionInfo(self):
|
|
|
|
return ReconnectionInfo()
|
|
|
|
|
|
|
|
_null_storage = _NullStorage()
|
|
|
|
|
2019-06-13 13:08:42 +00:00
|
|
|
|
2019-06-28 18:27:47 +00:00
|
|
|
class AnnouncementNotMatched(Exception):
|
|
|
|
"""
|
|
|
|
A storage server announcement wasn't matched by any of the locally enabled
|
|
|
|
plugins.
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
2019-08-19 20:09:26 +00:00
|
|
|
def _storage_from_foolscap_plugin(node_config, config, announcement, get_rref):
|
2019-06-28 18:27:47 +00:00
|
|
|
"""
|
|
|
|
Construct an ``IStorageServer`` from the most locally-preferred plugin
|
|
|
|
that is offered in the given announcement.
|
2019-08-19 20:09:26 +00:00
|
|
|
|
|
|
|
:param allmydata.node._Config node_config: The node configuration to
|
|
|
|
pass to the plugin.
|
2019-06-28 18:27:47 +00:00
|
|
|
"""
|
|
|
|
plugins = {
|
|
|
|
plugin.name: plugin
|
|
|
|
for plugin
|
|
|
|
in getPlugins(IFoolscapStoragePlugin)
|
|
|
|
}
|
|
|
|
storage_options = announcement.get(u"storage-options", [])
|
|
|
|
for plugin_name, plugin_config in config.storage_plugins.items():
|
|
|
|
try:
|
|
|
|
plugin = plugins[plugin_name]
|
|
|
|
except KeyError:
|
|
|
|
raise ValueError("{} not installed".format(plugin_name))
|
|
|
|
for option in storage_options:
|
|
|
|
if plugin_name == option[u"name"]:
|
2019-07-02 18:57:20 +00:00
|
|
|
furl = option[u"storage-server-FURL"]
|
|
|
|
return furl, plugin.get_storage_client(
|
2019-08-19 20:09:26 +00:00
|
|
|
node_config,
|
2019-06-28 18:27:47 +00:00
|
|
|
option,
|
2019-07-02 19:46:17 +00:00
|
|
|
get_rref,
|
2019-06-28 18:27:47 +00:00
|
|
|
)
|
|
|
|
raise AnnouncementNotMatched()
|
|
|
|
|
|
|
|
|
2017-02-27 17:56:49 +00:00
|
|
|
@implementer(IServer)
|
2016-05-03 22:09:13 +00:00
|
|
|
class NativeStorageServer(service.MultiService):
|
2009-06-23 02:10:47 +00:00
|
|
|
"""I hold information about a storage server that we want to connect to.
|
|
|
|
If we are connected, I hold the RemoteReference, their host address, and
|
|
|
|
the their version information. I remember information about when we were
|
|
|
|
last connected too, even if we aren't currently connected.
|
|
|
|
|
|
|
|
@ivar last_connect_time: when we last established a connection
|
|
|
|
@ivar last_loss_time: when we last lost a connection
|
|
|
|
|
|
|
|
@ivar version: the server's versiondict, from the most recent announcement
|
|
|
|
@ivar nickname: the server's self-reported nickname (unicode), same
|
|
|
|
|
|
|
|
@ivar rref: the RemoteReference, if connected, otherwise None
|
|
|
|
@ivar remote_host: the IAddress, if connected, otherwise None
|
|
|
|
"""
|
|
|
|
|
2020-11-04 18:09:55 +00:00
|
|
|
VERSION_DEFAULTS = UnicodeKeyDict({
|
|
|
|
"http://allmydata.org/tahoe/protocols/storage/v1" :
|
|
|
|
UnicodeKeyDict({ "maximum-immutable-share-size": 2**32 - 1,
|
|
|
|
"maximum-mutable-share-size": 2*1000*1000*1000, # maximum prior to v1.9.2
|
|
|
|
"tolerates-immutable-read-overrun": False,
|
|
|
|
"delete-mutable-shares-with-zero-length-writev": False,
|
|
|
|
"available-space": None,
|
|
|
|
}),
|
|
|
|
"application-version": "unknown: no get_version()",
|
|
|
|
})
|
2009-06-23 02:10:47 +00:00
|
|
|
|
2019-08-19 20:09:26 +00:00
|
|
|
def __init__(self, server_id, ann, tub_maker, handler_overrides, node_config, config=StorageClientConfig()):
|
2016-05-03 22:09:13 +00:00
|
|
|
service.MultiService.__init__(self)
|
2020-09-21 14:45:05 +00:00
|
|
|
assert isinstance(server_id, bytes)
|
2016-08-27 00:29:39 +00:00
|
|
|
self._server_id = server_id
|
new introducer: signed extensible dictionary-based messages! refs #466
This introduces new client and server halves to the Introducer (renaming the
old one with a _V1 suffix). Both have fallbacks to accomodate talking to a
different version: the publishing client switches on whether the server's
.get_version() advertises V2 support, the server switches on which
subscription method was invoked by the subscribing client.
The V2 protocol sends a three-tuple of (serialized announcement dictionary,
signature, pubkey) for each announcement. The V2 server dispatches messages
to subscribers according to the service-name, and throws errors for invalid
signatures, but does not otherwise examine the messages. The V2 receiver's
subscription callback will receive a (serverid, ann_dict) pair. The
'serverid' will be equal to the pubkey if all of the following are true:
the originating client is V2, and was told a privkey to use
the announcement went through a V2 server
the signature is valid
If not, 'serverid' will be equal to the tubid portion of the announced FURL,
as was the case for V1 receivers.
Servers will create a keypair if one does not exist yet, stored in
private/server.privkey .
The signed announcement dictionary puts the server FURL in a key named
"anonymous-storage-FURL", which anticipates upcoming Accounting-related
changes in the server advertisements. It also provides a key named
"permutation-seed-base32" to tell clients what permutation seed to use. This
is computed at startup, using tubid if there are existing shares, otherwise
the pubkey, to retain share-order compatibility for existing servers.
2011-11-20 10:21:32 +00:00
|
|
|
self.announcement = ann
|
2016-08-27 23:53:31 +00:00
|
|
|
self._tub_maker = tub_maker
|
|
|
|
self._handler_overrides = handler_overrides
|
2009-06-01 21:06:04 +00:00
|
|
|
|
2019-08-19 20:09:26 +00:00
|
|
|
self._storage = self._make_storage_system(node_config, config, ann)
|
new introducer: signed extensible dictionary-based messages! refs #466
This introduces new client and server halves to the Introducer (renaming the
old one with a _V1 suffix). Both have fallbacks to accomodate talking to a
different version: the publishing client switches on whether the server's
.get_version() advertises V2 support, the server switches on which
subscription method was invoked by the subscribing client.
The V2 protocol sends a three-tuple of (serialized announcement dictionary,
signature, pubkey) for each announcement. The V2 server dispatches messages
to subscribers according to the service-name, and throws errors for invalid
signatures, but does not otherwise examine the messages. The V2 receiver's
subscription callback will receive a (serverid, ann_dict) pair. The
'serverid' will be equal to the pubkey if all of the following are true:
the originating client is V2, and was told a privkey to use
the announcement went through a V2 server
the signature is valid
If not, 'serverid' will be equal to the tubid portion of the announced FURL,
as was the case for V1 receivers.
Servers will create a keypair if one does not exist yet, stored in
private/server.privkey .
The signed announcement dictionary puts the server FURL in a key named
"anonymous-storage-FURL", which anticipates upcoming Accounting-related
changes in the server advertisements. It also provides a key named
"permutation-seed-base32" to tell clients what permutation seed to use. This
is computed at startup, using tubid if there are existing shares, otherwise
the pubkey, to retain share-order compatibility for existing servers.
2011-11-20 10:21:32 +00:00
|
|
|
|
2009-06-23 02:10:47 +00:00
|
|
|
self.last_connect_time = None
|
|
|
|
self.last_loss_time = None
|
|
|
|
self.remote_host = None
|
2019-05-31 15:09:20 +00:00
|
|
|
self._rref = None
|
2012-06-15 01:48:55 +00:00
|
|
|
self._is_connected = False
|
2009-06-23 02:10:47 +00:00
|
|
|
self._reconnector = None
|
|
|
|
self._trigger_cb = None
|
2016-04-26 17:44:58 +00:00
|
|
|
self._on_status_changed = ObserverList()
|
|
|
|
|
2019-08-19 20:09:26 +00:00
|
|
|
def _make_storage_system(self, node_config, config, ann):
|
2019-06-28 18:27:47 +00:00
|
|
|
"""
|
2019-08-19 20:09:26 +00:00
|
|
|
:param allmydata.node._Config node_config: The node configuration to pass
|
|
|
|
to any configured storage plugins.
|
|
|
|
|
2019-06-28 18:27:47 +00:00
|
|
|
:param StorageClientConfig config: Configuration specifying desired
|
|
|
|
storage client behavior.
|
2019-07-02 18:57:20 +00:00
|
|
|
|
|
|
|
:param dict ann: The storage announcement from the storage server we
|
|
|
|
are meant to communicate with.
|
|
|
|
|
|
|
|
:return IFoolscapStorageServer: An object enabling communication via
|
|
|
|
Foolscap with the server which generated the announcement.
|
2019-06-28 18:27:47 +00:00
|
|
|
"""
|
2019-07-02 18:57:20 +00:00
|
|
|
# Try to match the announcement against a plugin.
|
2019-06-28 18:27:47 +00:00
|
|
|
try:
|
2019-07-02 19:46:17 +00:00
|
|
|
furl, storage_server = _storage_from_foolscap_plugin(
|
2019-08-19 20:09:26 +00:00
|
|
|
node_config,
|
2019-07-02 19:46:17 +00:00
|
|
|
config,
|
|
|
|
ann,
|
|
|
|
# Pass in an accessor for our _rref attribute. The value of
|
|
|
|
# the attribute may change over time as connections are lost
|
|
|
|
# and re-established. The _StorageServer should always be
|
|
|
|
# able to get the most up-to-date value.
|
|
|
|
self.get_rref,
|
|
|
|
)
|
2019-06-28 18:27:47 +00:00
|
|
|
except AnnouncementNotMatched:
|
2019-07-02 18:57:20 +00:00
|
|
|
# Nope.
|
|
|
|
pass
|
|
|
|
else:
|
|
|
|
return _FoolscapStorage.from_announcement(
|
|
|
|
self._server_id,
|
|
|
|
furl.encode("utf-8"),
|
|
|
|
ann,
|
|
|
|
storage_server,
|
|
|
|
)
|
|
|
|
|
|
|
|
# Try to match the announcement against the anonymous access scheme.
|
|
|
|
try:
|
|
|
|
furl = ann[u"anonymous-storage-FURL"]
|
|
|
|
except KeyError:
|
|
|
|
# Nope
|
|
|
|
pass
|
|
|
|
else:
|
2020-09-21 15:01:51 +00:00
|
|
|
if isinstance(furl, unicode):
|
|
|
|
furl = furl.encode("utf-8")
|
2019-07-02 19:46:17 +00:00
|
|
|
# See comment above for the _storage_from_foolscap_plugin case
|
|
|
|
# about passing in get_rref.
|
2019-07-02 18:57:20 +00:00
|
|
|
storage_server = _StorageServer(get_rref=self.get_rref)
|
|
|
|
return _FoolscapStorage.from_announcement(
|
|
|
|
self._server_id,
|
2020-09-21 15:01:51 +00:00
|
|
|
furl,
|
2019-07-02 18:57:20 +00:00
|
|
|
ann,
|
|
|
|
storage_server,
|
|
|
|
)
|
|
|
|
|
|
|
|
# Nothing matched so we can't talk to this server.
|
|
|
|
return _null_storage
|
2019-06-19 18:19:37 +00:00
|
|
|
|
|
|
|
def get_permutation_seed(self):
|
2019-06-21 19:26:08 +00:00
|
|
|
return self._storage.permutation_seed
|
2019-06-19 18:19:37 +00:00
|
|
|
def get_name(self): # keep methodname short
|
|
|
|
# TODO: decide who adds [] in the short description. It should
|
|
|
|
# probably be the output side, not here.
|
2019-06-21 19:26:08 +00:00
|
|
|
return self._storage.name
|
2019-06-19 18:19:37 +00:00
|
|
|
def get_longname(self):
|
2019-06-21 19:26:08 +00:00
|
|
|
return self._storage.longname
|
2019-06-19 18:19:37 +00:00
|
|
|
def get_tubid(self):
|
2019-06-21 19:26:08 +00:00
|
|
|
return self._storage.tubid
|
2019-06-19 18:19:37 +00:00
|
|
|
def get_lease_seed(self):
|
2019-06-21 19:26:08 +00:00
|
|
|
return self._storage.lease_seed
|
2019-06-19 18:19:37 +00:00
|
|
|
def get_foolscap_write_enabler_seed(self):
|
2019-06-21 19:26:08 +00:00
|
|
|
return self._storage.tubid
|
2019-06-19 18:19:37 +00:00
|
|
|
def get_nickname(self):
|
2019-06-21 19:26:08 +00:00
|
|
|
return self._storage.nickname
|
2019-06-19 18:19:37 +00:00
|
|
|
|
2016-04-26 17:44:58 +00:00
|
|
|
def on_status_changed(self, status_changed):
|
|
|
|
"""
|
|
|
|
:param status_changed: a callable taking a single arg (the
|
|
|
|
NativeStorageServer) that is notified when we become connected
|
|
|
|
"""
|
|
|
|
return self._on_status_changed.subscribe(status_changed)
|
2009-06-23 02:10:47 +00:00
|
|
|
|
2012-04-04 18:14:09 +00:00
|
|
|
# Special methods used by copy.copy() and copy.deepcopy(). When those are
|
|
|
|
# used in allmydata.immutable.filenode to copy CheckResults during
|
|
|
|
# repair, we want it to treat the IServer instances as singletons, and
|
|
|
|
# not attempt to duplicate them..
|
|
|
|
def __copy__(self):
|
|
|
|
return self
|
|
|
|
def __deepcopy__(self, memodict):
|
|
|
|
return self
|
|
|
|
|
2011-02-27 02:11:32 +00:00
|
|
|
def __repr__(self):
|
2011-08-01 17:44:28 +00:00
|
|
|
return "<NativeStorageServer for %s>" % self.get_name()
|
2009-06-23 02:10:47 +00:00
|
|
|
def get_serverid(self):
|
2016-08-27 00:29:39 +00:00
|
|
|
return self._server_id
|
2011-02-27 02:11:32 +00:00
|
|
|
def get_version(self):
|
2019-05-31 15:09:20 +00:00
|
|
|
if self._rref:
|
|
|
|
return self._rref.version
|
2011-02-27 02:11:32 +00:00
|
|
|
return None
|
2009-06-23 02:10:47 +00:00
|
|
|
def get_announcement(self):
|
|
|
|
return self.announcement
|
|
|
|
def get_remote_host(self):
|
|
|
|
return self.remote_host
|
2016-12-08 23:15:49 +00:00
|
|
|
|
|
|
|
def get_connection_status(self):
|
|
|
|
last_received = None
|
2019-05-31 15:09:20 +00:00
|
|
|
if self._rref:
|
|
|
|
last_received = self._rref.getDataLastReceivedAt()
|
2016-12-08 23:15:49 +00:00
|
|
|
return connection_status.from_foolscap_reconnector(self._reconnector,
|
|
|
|
last_received)
|
|
|
|
|
2012-06-15 01:48:55 +00:00
|
|
|
def is_connected(self):
|
|
|
|
return self._is_connected
|
2009-06-23 02:10:47 +00:00
|
|
|
|
2014-11-20 22:46:20 +00:00
|
|
|
def get_available_space(self):
|
|
|
|
version = self.get_version()
|
|
|
|
if version is None:
|
|
|
|
return None
|
2020-11-04 18:09:55 +00:00
|
|
|
protocol_v1_version = version.get('http://allmydata.org/tahoe/protocols/storage/v1', UnicodeKeyDict())
|
|
|
|
available_space = protocol_v1_version.get(u'available-space')
|
2014-11-20 22:46:20 +00:00
|
|
|
if available_space is None:
|
2020-11-04 18:09:55 +00:00
|
|
|
available_space = protocol_v1_version.get(u'maximum-immutable-share-size', None)
|
2014-11-20 22:46:20 +00:00
|
|
|
return available_space
|
|
|
|
|
2016-05-03 22:09:13 +00:00
|
|
|
def start_connecting(self, trigger_cb):
|
2016-08-27 23:53:31 +00:00
|
|
|
self._tub = self._tub_maker(self._handler_overrides)
|
2016-05-03 22:09:13 +00:00
|
|
|
self._tub.setServiceParent(self)
|
2009-06-23 02:10:47 +00:00
|
|
|
self._trigger_cb = trigger_cb
|
2019-06-19 18:19:37 +00:00
|
|
|
self._reconnector = self._storage.connect_to(self._tub, self._got_connection)
|
2009-06-23 02:10:47 +00:00
|
|
|
|
|
|
|
def _got_connection(self, rref):
|
2011-02-27 02:11:32 +00:00
|
|
|
lp = log.msg(format="got connection to %(name)s, getting versions",
|
2011-08-01 17:44:28 +00:00
|
|
|
name=self.get_name(),
|
2009-06-23 02:10:47 +00:00
|
|
|
facility="tahoe.storage_broker", umid="coUECQ")
|
|
|
|
if self._trigger_cb:
|
|
|
|
eventually(self._trigger_cb)
|
|
|
|
default = self.VERSION_DEFAULTS
|
|
|
|
d = add_version_to_remote_reference(rref, default)
|
|
|
|
d.addCallback(self._got_versioned_service, lp)
|
2016-04-26 17:44:58 +00:00
|
|
|
d.addCallback(lambda ign: self._on_status_changed.notify(self))
|
2009-06-23 02:10:47 +00:00
|
|
|
d.addErrback(log.err, format="storageclient._got_connection",
|
2011-08-01 17:44:28 +00:00
|
|
|
name=self.get_name(), umid="Sdq3pg")
|
2009-06-23 02:10:47 +00:00
|
|
|
|
|
|
|
def _got_versioned_service(self, rref, lp):
|
2011-02-27 02:11:32 +00:00
|
|
|
log.msg(format="%(name)s provided version info %(version)s",
|
2011-08-01 17:44:28 +00:00
|
|
|
name=self.get_name(), version=rref.version,
|
2009-06-23 02:10:47 +00:00
|
|
|
facility="tahoe.storage_broker", umid="SWmJYg",
|
|
|
|
level=log.NOISY, parent=lp)
|
|
|
|
|
|
|
|
self.last_connect_time = time.time()
|
2016-09-02 11:58:51 +00:00
|
|
|
self.remote_host = rref.getLocationHints()
|
2019-05-31 15:09:20 +00:00
|
|
|
self._rref = rref
|
2012-06-15 01:48:55 +00:00
|
|
|
self._is_connected = True
|
2009-06-23 02:10:47 +00:00
|
|
|
rref.notifyOnDisconnect(self._lost)
|
|
|
|
|
|
|
|
def get_rref(self):
|
2019-05-31 15:09:20 +00:00
|
|
|
return self._rref
|
2009-06-23 02:10:47 +00:00
|
|
|
|
2019-05-31 17:40:51 +00:00
|
|
|
def get_storage_server(self):
|
2019-06-11 20:32:29 +00:00
|
|
|
"""
|
|
|
|
See ``IServer.get_storage_server``.
|
|
|
|
"""
|
2019-05-31 17:40:51 +00:00
|
|
|
if self._rref is None:
|
|
|
|
return None
|
2019-07-02 18:57:20 +00:00
|
|
|
return self._storage.storage_server
|
2019-05-31 17:40:51 +00:00
|
|
|
|
2009-06-23 02:10:47 +00:00
|
|
|
def _lost(self):
|
2011-08-01 17:44:28 +00:00
|
|
|
log.msg(format="lost connection to %(name)s", name=self.get_name(),
|
2009-06-23 02:10:47 +00:00
|
|
|
facility="tahoe.storage_broker", umid="zbRllw")
|
|
|
|
self.last_loss_time = time.time()
|
2019-05-31 15:09:20 +00:00
|
|
|
# self._rref is now stale: all callRemote()s will get a
|
2012-06-15 01:48:55 +00:00
|
|
|
# DeadReferenceError. We leave the stale reference in place so that
|
|
|
|
# uploader/downloader code (which received this IServer through
|
|
|
|
# get_connected_servers() or get_servers_for_psi()) can continue to
|
|
|
|
# use s.get_rref().callRemote() and not worry about it being None.
|
|
|
|
self._is_connected = False
|
2009-06-23 02:10:47 +00:00
|
|
|
self.remote_host = None
|
|
|
|
|
|
|
|
def stop_connecting(self):
|
|
|
|
# used when this descriptor has been superceded by another
|
|
|
|
self._reconnector.stopConnecting()
|
|
|
|
|
|
|
|
def try_to_connect(self):
|
|
|
|
# used when the broker wants us to hurry up
|
|
|
|
self._reconnector.reset()
|
|
|
|
|
2009-06-01 21:06:04 +00:00
|
|
|
class UnknownServerTypeError(Exception):
|
|
|
|
pass
|
2019-05-31 17:40:51 +00:00
|
|
|
|
|
|
|
|
|
|
|
@implementer(IStorageServer)
|
|
|
|
@attr.s
|
|
|
|
class _StorageServer(object):
|
|
|
|
"""
|
|
|
|
``_StorageServer`` is a direct pass-through to an ``RIStorageServer`` via
|
|
|
|
a ``RemoteReference``.
|
|
|
|
"""
|
|
|
|
_get_rref = attr.ib()
|
|
|
|
|
|
|
|
@property
|
|
|
|
def _rref(self):
|
|
|
|
return self._get_rref()
|
|
|
|
|
|
|
|
def get_version(self):
|
|
|
|
return self._rref.callRemote(
|
|
|
|
"get_version",
|
|
|
|
)
|
|
|
|
|
|
|
|
def allocate_buckets(
|
|
|
|
self,
|
|
|
|
storage_index,
|
|
|
|
renew_secret,
|
|
|
|
cancel_secret,
|
|
|
|
sharenums,
|
|
|
|
allocated_size,
|
|
|
|
canary,
|
|
|
|
):
|
|
|
|
return self._rref.callRemote(
|
|
|
|
"allocate_buckets",
|
|
|
|
storage_index,
|
|
|
|
renew_secret,
|
|
|
|
cancel_secret,
|
|
|
|
sharenums,
|
|
|
|
allocated_size,
|
|
|
|
canary,
|
|
|
|
)
|
|
|
|
|
|
|
|
def add_lease(
|
|
|
|
self,
|
|
|
|
storage_index,
|
|
|
|
renew_secret,
|
|
|
|
cancel_secret,
|
|
|
|
):
|
|
|
|
return self._rref.callRemote(
|
|
|
|
"add_lease",
|
|
|
|
storage_index,
|
|
|
|
renew_secret,
|
|
|
|
cancel_secret,
|
|
|
|
)
|
|
|
|
|
|
|
|
def renew_lease(
|
|
|
|
self,
|
|
|
|
storage_index,
|
|
|
|
renew_secret,
|
|
|
|
):
|
|
|
|
return self._rref.callRemote(
|
|
|
|
"renew_lease",
|
|
|
|
storage_index,
|
|
|
|
renew_secret,
|
|
|
|
)
|
|
|
|
|
|
|
|
def get_buckets(
|
|
|
|
self,
|
|
|
|
storage_index,
|
|
|
|
):
|
|
|
|
return self._rref.callRemote(
|
|
|
|
"get_buckets",
|
|
|
|
storage_index,
|
|
|
|
)
|
|
|
|
|
|
|
|
def slot_readv(
|
|
|
|
self,
|
|
|
|
storage_index,
|
|
|
|
shares,
|
|
|
|
readv,
|
|
|
|
):
|
|
|
|
return self._rref.callRemote(
|
|
|
|
"slot_readv",
|
|
|
|
storage_index,
|
|
|
|
shares,
|
|
|
|
readv,
|
|
|
|
)
|
|
|
|
|
|
|
|
def slot_testv_and_readv_and_writev(
|
|
|
|
self,
|
|
|
|
storage_index,
|
|
|
|
secrets,
|
|
|
|
tw_vectors,
|
|
|
|
r_vector,
|
|
|
|
):
|
|
|
|
return self._rref.callRemote(
|
|
|
|
"slot_testv_and_readv_and_writev",
|
|
|
|
storage_index,
|
|
|
|
secrets,
|
|
|
|
tw_vectors,
|
|
|
|
r_vector,
|
|
|
|
)
|
|
|
|
|
|
|
|
def advise_corrupt_share(
|
|
|
|
self,
|
|
|
|
share_type,
|
|
|
|
storage_index,
|
|
|
|
shnum,
|
|
|
|
reason,
|
|
|
|
):
|
|
|
|
return self._rref.callRemoteOnly(
|
|
|
|
"advise_corrupt_share",
|
|
|
|
share_type,
|
|
|
|
storage_index,
|
|
|
|
shnum,
|
|
|
|
reason,
|
|
|
|
)
|