Change the shape of the storage announcement(s)

Instead of generating a sequence of announcements like:

    - anonymous storage server announcement
    - plugin 1 storage server announcement
    - ...
    - plugin N storage server announcement

The client now generates a single announcement like:

    - anonymous storage server details
    - storage-options
      - plugin 1 storage server details
      - ...
      - plugin N storage server details
This commit is contained in:
Jean-Paul Calderone 2019-06-28 11:55:13 -04:00
parent 07bf8a3b8c
commit 53861e2a0f

View File

@ -251,6 +251,7 @@ def create_client(basedir=u".", _client_factory=None):
return defer.fail()
@defer.inlineCallbacks
def create_client_from_config(config, _client_factory=None, _introducer_factory=None):
"""
Creates a new client instance (a subclass of Node). Most code
@ -267,47 +268,151 @@ def create_client_from_config(config, _client_factory=None, _introducer_factory=
:param _introducer_factory: for testing; the class to instantiate instead
of IntroducerClient
"""
try:
if _client_factory is None:
_client_factory = _Client
if _client_factory is None:
_client_factory = _Client
i2p_provider = create_i2p_provider(reactor, config)
tor_provider = create_tor_provider(reactor, config)
handlers = node.create_connection_handlers(reactor, config, i2p_provider, tor_provider)
default_connection_handlers, foolscap_connection_handlers = handlers
tub_options = node.create_tub_options(config)
i2p_provider = create_i2p_provider(reactor, config)
tor_provider = create_tor_provider(reactor, config)
handlers = node.create_connection_handlers(reactor, config, i2p_provider, tor_provider)
default_connection_handlers, foolscap_connection_handlers = handlers
tub_options = node.create_tub_options(config)
main_tub = node.create_main_tub(
config, tub_options, default_connection_handlers,
foolscap_connection_handlers, i2p_provider, tor_provider,
)
control_tub = node.create_control_tub()
main_tub = node.create_main_tub(
config, tub_options, default_connection_handlers,
foolscap_connection_handlers, i2p_provider, tor_provider,
)
control_tub = node.create_control_tub()
introducer_clients = create_introducer_clients(config, main_tub, _introducer_factory)
storage_broker = create_storage_farm_broker(
config, default_connection_handlers, foolscap_connection_handlers,
tub_options, introducer_clients
)
introducer_clients = create_introducer_clients(config, main_tub, _introducer_factory)
storage_broker = create_storage_farm_broker(
config, default_connection_handlers, foolscap_connection_handlers,
tub_options, introducer_clients
)
client = _client_factory(
client = _client_factory(
config,
main_tub,
control_tub,
i2p_provider,
tor_provider,
introducer_clients,
storage_broker,
)
# Initialize storage separately after creating the client. This is
# necessary because we need to pass a reference to the client in to the
# storage plugins to allow them to initialize themselves (specifically,
# they may want the anonymous IStorageServer implementation so they don't
# have to duplicate all of its basic storage functionality). A better way
# to do this, eventually, may be to create that implementation first and
# then pass it in to both storage plugin creation and the client factory.
# This avoids making a partially initialized client object escape the
# client factory and removes the circular dependency between these
# objects.
storage_plugins = yield _StoragePlugins.from_config(
client.get_anonymous_storage_server,
config,
)
client.init_storage(storage_plugins.announceable_storage_servers)
i2p_provider.setServiceParent(client)
tor_provider.setServiceParent(client)
for ic in introducer_clients:
ic.setServiceParent(client)
storage_broker.setServiceParent(client)
defer.returnValue(client)
@attr.s
class _StoragePlugins(object):
announceable_storage_servers = attr.ib()
@classmethod
@defer.inlineCallbacks
def from_config(cls, get_anonymous_storage_server, config):
"""
Load and configured storage plugins.
"""
storage_plugin_names = cls._get_enabled_storage_plugin_names(config)
plugins = list(cls._collect_storage_plugins(storage_plugin_names))
# TODO Handle missing plugins
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3118
announceable_storage_servers = yield cls._create_plugin_storage_servers(
get_anonymous_storage_server,
config,
main_tub,
control_tub,
i2p_provider,
tor_provider,
introducer_clients,
storage_broker,
plugins,
)
i2p_provider.setServiceParent(client)
tor_provider.setServiceParent(client)
for ic in introducer_clients:
ic.setServiceParent(client)
storage_broker.setServiceParent(client)
d = client.init_storage_plugins()
d.addCallback(lambda ignored: client)
return d
except Exception:
return defer.fail()
defer.returnValue(cls(
announceable_storage_servers,
))
@classmethod
def _get_enabled_storage_plugin_names(cls, config):
"""
Get the names of storage plugins that are enabled in the configuration.
"""
return set(
config.get_config(
"storage", "plugins", b""
).decode("ascii").split(u",")
)
@classmethod
def _collect_storage_plugins(cls, storage_plugin_names):
"""
Get the storage plugins with names matching those given.
"""
return list(
plugin
for plugin
in getPlugins(IFoolscapStoragePlugin)
if plugin.name in storage_plugin_names
)
@classmethod
def _create_plugin_storage_servers(cls, get_anonymous_storage_server, config, plugins):
"""
Cause each storage plugin to instantiate its storage server and return
them all.
:return: A ``Deferred`` that fires with storage servers instantiated
by all of the given storage server plugins.
"""
return defer.gatherResults(
list(
plugin.get_storage_server(
cls._get_storage_plugin_configuration(config, plugin.name),
get_anonymous_storage_server,
).addCallback(
partial(
_add_to_announcement,
{u"name": plugin.name},
),
)
for plugin
# The order is fairly arbitrary and it is not meant to convey
# anything but providing *some* stable ordering makes the data
# a little easier to deal with (mainly in tests and when
# manually inspecting it).
in sorted(plugins, key=lambda p: p.name)
),
)
@classmethod
def _get_storage_plugin_configuration(cls, config, storage_plugin_name):
"""
Load the configuration for a storage server plugin with the given name.
:return dict[bytes, bytes]: The matching configuration.
"""
try:
config = config.items(
"storageserver.plugins." + storage_plugin_name,
)
except NoSectionError:
config = []
return dict(config)
def _sequencer(config):
@ -480,6 +585,21 @@ class AnnounceableStorageServer(object):
def _add_to_announcement(information, announceable_storage_server):
"""
Create a new ``AnnounceableStorageServer`` based on
``announceable_storage_server`` with ``information`` added to its
``announcement``.
"""
updated_announcement = announceable_storage_server.announcement.copy()
updated_announcement.update(information)
return AnnounceableStorageServer(
updated_announcement,
announceable_storage_server.storage_server,
)
@implementer(IStatsProducer)
class _Client(node.Node, pollmixin.PollMixin):
@ -520,7 +640,6 @@ class _Client(node.Node, pollmixin.PollMixin):
self.init_stats_provider()
self.init_secrets()
self.init_node_key()
self.init_storage()
self.init_control()
self._key_generator = KeyGenerator()
key_gen_furl = config.get_config("client", "key_generator.furl", None)
@ -615,13 +734,24 @@ class _Client(node.Node, pollmixin.PollMixin):
self.config.write_config_file("permutation-seed", seed+"\n")
return seed.strip()
def init_storage(self):
# should we run a storage server (and publish it for others to use)?
if not self.config.get_config("storage", "enabled", True, boolean=True):
return
if not self._is_tub_listening():
raise ValueError("config error: storage is enabled, but tub "
"is not listening ('tub.port=' is empty)")
def get_anonymous_storage_server(self):
"""
Get the anonymous ``IStorageServer`` implementation for this node.
Note this will return an object even if storage is disabled on this
node (but the object will not be exposed, peers will not be able to
access it, and storage will remain disabled).
The one and only instance for this node is always returned. It is
created first if necessary.
"""
try:
ss = self.getServiceNamed(StorageServer.name)
except KeyError:
pass
else:
return ss
readonly = self.config.get_config("storage", "readonly", False, boolean=True)
config_storedir = self.get_config(
@ -674,108 +804,44 @@ class _Client(node.Node, pollmixin.PollMixin):
expiration_cutoff_date=cutoff_date,
expiration_sharetypes=expiration_sharetypes)
ss.setServiceParent(self)
return ss
def init_storage(self, announceable_storage_servers):
# should we run a storage server (and publish it for others to use)?
if not self.config.get_config("storage", "enabled", True, boolean=True):
return
if not self._is_tub_listening():
raise ValueError("config error: storage is enabled, but tub "
"is not listening ('tub.port=' is empty)")
ss = self.get_anonymous_storage_server()
furl_file = self.config.get_private_path("storage.furl").encode(get_filesystem_encoding())
furl = self.tub.registerReference(ss, furlFile=furl_file)
ann = {"anonymous-storage-FURL": furl,
"permutation-seed-base32": self._init_permutation_seed(ss),
}
anonymous_announcement = {
"anonymous-storage-FURL": furl,
"permutation-seed-base32": self._init_permutation_seed(ss),
}
enabled_storage_servers = self._enable_storage_servers(
announceable_storage_servers,
)
plugins_announcement = {}
storage_options = list(
storage_server.announcement
for storage_server
in enabled_storage_servers
)
if storage_options:
# Only add the new key if there are any plugins enabled.
plugins_announcement[u"storage-options"] = storage_options
total_announcement = {}
total_announcement.update(anonymous_announcement)
total_announcement.update(plugins_announcement)
for ic in self.introducer_clients:
ic.publish("storage", ann, self._node_private_key)
@defer.inlineCallbacks
def init_storage_plugins(self):
"""
Load, register, and announce any configured storage plugins.
"""
storage_plugin_names = self._get_enabled_storage_plugin_names()
plugins = list(self._collect_storage_plugins(storage_plugin_names))
# TODO Handle missing plugins
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3118
announceable_storage_servers = yield self._create_plugin_storage_servers(plugins)
self._enable_storage_servers(announceable_storage_servers)
def _get_enabled_storage_plugin_names(self):
"""
Get the names of storage plugins that are enabled in the configuration.
"""
return set(
self.config.get_config(
"storage", "plugins", b""
).decode("ascii").split(u",")
)
def _collect_storage_plugins(self, storage_plugin_names):
"""
Get the storage plugins with names matching those given.
"""
return list(
plugin
for plugin
in getPlugins(IFoolscapStoragePlugin)
if plugin.name in storage_plugin_names
)
def _create_plugin_storage_servers(self, plugins):
"""
Cause each storage plugin to instantiate its storage server and return
them all.
:return: A ``Deferred`` that fires with storage servers instantiated
by all of the given storage server plugins.
"""
return defer.gatherResults(
list(
plugin.get_storage_server(
self._get_storage_plugin_configuration(plugin.name),
lambda: self.getServiceNamed(StorageServer.name),
).addCallback(
partial(
self._add_to_announcement,
{u"name": plugin.name},
),
)
for plugin
# The order is fairly arbitrary and it is not meant to convey
# anything but providing *some* stable ordering makes the data
# a little easier to deal with (mainly in tests and when
# manually inspecting it).
in sorted(plugins, key=lambda p: p.name)
),
)
def _add_to_announcement(self, information, announceable_storage_server):
"""
Create a new ``AnnounceableStorageServer`` based on
``announceable_storage_server`` with ``information`` added to its
``announcement``.
"""
updated_announcement = announceable_storage_server.announcement.copy()
updated_announcement.update(information)
return AnnounceableStorageServer(
updated_announcement,
announceable_storage_server.storage_server,
)
def _get_storage_plugin_configuration(self, storage_plugin_name):
"""
Load the configuration for a storage server plugin with the given name.
:return dict[bytes, bytes]: The matching configuration.
"""
try:
config = self.config.items(
"storageserver.plugins." + storage_plugin_name,
)
except NoSectionError:
config = []
return dict(config)
ic.publish("storage", total_announcement, self._node_private_key)
def _enable_storage_servers(self, announceable_storage_servers):
@ -783,12 +849,12 @@ class _Client(node.Node, pollmixin.PollMixin):
Register and announce the given storage servers.
"""
for announceable in announceable_storage_servers:
self._enable_storage_server(announceable)
yield self._enable_storage_server(announceable)
def _enable_storage_server(self, announceable_storage_server):
"""
Register and announce a storage server.
Register a storage server.
"""
config_key = b"storage-plugin.{}.furl".format(
# Oops, why don't I have a better handle on this value?
@ -800,16 +866,11 @@ class _Client(node.Node, pollmixin.PollMixin):
self.tub,
announceable_storage_server.storage_server,
)
announceable_storage_server = self._add_to_announcement(
announceable_storage_server = _add_to_announcement(
{u"storage-server-FURL": furl},
announceable_storage_server,
)
for ic in self.introducer_clients:
ic.publish(
"storage",
announceable_storage_server.announcement,
self._node_key,
)
return announceable_storage_server
def init_client(self):