diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 82e0546a5..4e557cd07 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -251,6 +251,7 @@ def create_client(basedir=u".", _client_factory=None): return defer.fail() +@defer.inlineCallbacks def create_client_from_config(config, _client_factory=None, _introducer_factory=None): """ Creates a new client instance (a subclass of Node). Most code @@ -267,47 +268,151 @@ def create_client_from_config(config, _client_factory=None, _introducer_factory= :param _introducer_factory: for testing; the class to instantiate instead of IntroducerClient """ - try: - if _client_factory is None: - _client_factory = _Client + if _client_factory is None: + _client_factory = _Client - i2p_provider = create_i2p_provider(reactor, config) - tor_provider = create_tor_provider(reactor, config) - handlers = node.create_connection_handlers(reactor, config, i2p_provider, tor_provider) - default_connection_handlers, foolscap_connection_handlers = handlers - tub_options = node.create_tub_options(config) + i2p_provider = create_i2p_provider(reactor, config) + tor_provider = create_tor_provider(reactor, config) + handlers = node.create_connection_handlers(reactor, config, i2p_provider, tor_provider) + default_connection_handlers, foolscap_connection_handlers = handlers + tub_options = node.create_tub_options(config) - main_tub = node.create_main_tub( - config, tub_options, default_connection_handlers, - foolscap_connection_handlers, i2p_provider, tor_provider, - ) - control_tub = node.create_control_tub() + main_tub = node.create_main_tub( + config, tub_options, default_connection_handlers, + foolscap_connection_handlers, i2p_provider, tor_provider, + ) + control_tub = node.create_control_tub() - introducer_clients = create_introducer_clients(config, main_tub, _introducer_factory) - storage_broker = create_storage_farm_broker( - config, default_connection_handlers, foolscap_connection_handlers, - tub_options, introducer_clients - ) + introducer_clients = create_introducer_clients(config, main_tub, _introducer_factory) + storage_broker = create_storage_farm_broker( + config, default_connection_handlers, foolscap_connection_handlers, + tub_options, introducer_clients + ) - client = _client_factory( + client = _client_factory( + config, + main_tub, + control_tub, + i2p_provider, + tor_provider, + introducer_clients, + storage_broker, + ) + + # Initialize storage separately after creating the client. This is + # necessary because we need to pass a reference to the client in to the + # storage plugins to allow them to initialize themselves (specifically, + # they may want the anonymous IStorageServer implementation so they don't + # have to duplicate all of its basic storage functionality). A better way + # to do this, eventually, may be to create that implementation first and + # then pass it in to both storage plugin creation and the client factory. + # This avoids making a partially initialized client object escape the + # client factory and removes the circular dependency between these + # objects. + storage_plugins = yield _StoragePlugins.from_config( + client.get_anonymous_storage_server, + config, + ) + client.init_storage(storage_plugins.announceable_storage_servers) + + i2p_provider.setServiceParent(client) + tor_provider.setServiceParent(client) + for ic in introducer_clients: + ic.setServiceParent(client) + storage_broker.setServiceParent(client) + defer.returnValue(client) + + +@attr.s +class _StoragePlugins(object): + announceable_storage_servers = attr.ib() + + @classmethod + @defer.inlineCallbacks + def from_config(cls, get_anonymous_storage_server, config): + """ + Load and configured storage plugins. + """ + storage_plugin_names = cls._get_enabled_storage_plugin_names(config) + plugins = list(cls._collect_storage_plugins(storage_plugin_names)) + # TODO Handle missing plugins + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3118 + announceable_storage_servers = yield cls._create_plugin_storage_servers( + get_anonymous_storage_server, config, - main_tub, - control_tub, - i2p_provider, - tor_provider, - introducer_clients, - storage_broker, + plugins, ) - i2p_provider.setServiceParent(client) - tor_provider.setServiceParent(client) - for ic in introducer_clients: - ic.setServiceParent(client) - storage_broker.setServiceParent(client) - d = client.init_storage_plugins() - d.addCallback(lambda ignored: client) - return d - except Exception: - return defer.fail() + defer.returnValue(cls( + announceable_storage_servers, + )) + + @classmethod + def _get_enabled_storage_plugin_names(cls, config): + """ + Get the names of storage plugins that are enabled in the configuration. + """ + return set( + config.get_config( + "storage", "plugins", b"" + ).decode("ascii").split(u",") + ) + + @classmethod + def _collect_storage_plugins(cls, storage_plugin_names): + """ + Get the storage plugins with names matching those given. + """ + return list( + plugin + for plugin + in getPlugins(IFoolscapStoragePlugin) + if plugin.name in storage_plugin_names + ) + + @classmethod + def _create_plugin_storage_servers(cls, get_anonymous_storage_server, config, plugins): + """ + Cause each storage plugin to instantiate its storage server and return + them all. + + :return: A ``Deferred`` that fires with storage servers instantiated + by all of the given storage server plugins. + """ + return defer.gatherResults( + list( + plugin.get_storage_server( + cls._get_storage_plugin_configuration(config, plugin.name), + get_anonymous_storage_server, + ).addCallback( + partial( + _add_to_announcement, + {u"name": plugin.name}, + ), + ) + for plugin + # The order is fairly arbitrary and it is not meant to convey + # anything but providing *some* stable ordering makes the data + # a little easier to deal with (mainly in tests and when + # manually inspecting it). + in sorted(plugins, key=lambda p: p.name) + ), + ) + + @classmethod + def _get_storage_plugin_configuration(cls, config, storage_plugin_name): + """ + Load the configuration for a storage server plugin with the given name. + + :return dict[bytes, bytes]: The matching configuration. + """ + try: + config = config.items( + "storageserver.plugins." + storage_plugin_name, + ) + except NoSectionError: + config = [] + return dict(config) + def _sequencer(config): @@ -480,6 +585,21 @@ class AnnounceableStorageServer(object): +def _add_to_announcement(information, announceable_storage_server): + """ + Create a new ``AnnounceableStorageServer`` based on + ``announceable_storage_server`` with ``information`` added to its + ``announcement``. + """ + updated_announcement = announceable_storage_server.announcement.copy() + updated_announcement.update(information) + return AnnounceableStorageServer( + updated_announcement, + announceable_storage_server.storage_server, + ) + + + @implementer(IStatsProducer) class _Client(node.Node, pollmixin.PollMixin): @@ -520,7 +640,6 @@ class _Client(node.Node, pollmixin.PollMixin): self.init_stats_provider() self.init_secrets() self.init_node_key() - self.init_storage() self.init_control() self._key_generator = KeyGenerator() key_gen_furl = config.get_config("client", "key_generator.furl", None) @@ -615,13 +734,24 @@ class _Client(node.Node, pollmixin.PollMixin): self.config.write_config_file("permutation-seed", seed+"\n") return seed.strip() - def init_storage(self): - # should we run a storage server (and publish it for others to use)? - if not self.config.get_config("storage", "enabled", True, boolean=True): - return - if not self._is_tub_listening(): - raise ValueError("config error: storage is enabled, but tub " - "is not listening ('tub.port=' is empty)") + def get_anonymous_storage_server(self): + """ + Get the anonymous ``IStorageServer`` implementation for this node. + + Note this will return an object even if storage is disabled on this + node (but the object will not be exposed, peers will not be able to + access it, and storage will remain disabled). + + The one and only instance for this node is always returned. It is + created first if necessary. + """ + try: + ss = self.getServiceNamed(StorageServer.name) + except KeyError: + pass + else: + return ss + readonly = self.config.get_config("storage", "readonly", False, boolean=True) config_storedir = self.get_config( @@ -674,108 +804,44 @@ class _Client(node.Node, pollmixin.PollMixin): expiration_cutoff_date=cutoff_date, expiration_sharetypes=expiration_sharetypes) ss.setServiceParent(self) + return ss + def init_storage(self, announceable_storage_servers): + # should we run a storage server (and publish it for others to use)? + if not self.config.get_config("storage", "enabled", True, boolean=True): + return + if not self._is_tub_listening(): + raise ValueError("config error: storage is enabled, but tub " + "is not listening ('tub.port=' is empty)") + + ss = self.get_anonymous_storage_server() furl_file = self.config.get_private_path("storage.furl").encode(get_filesystem_encoding()) furl = self.tub.registerReference(ss, furlFile=furl_file) - ann = {"anonymous-storage-FURL": furl, - "permutation-seed-base32": self._init_permutation_seed(ss), - } + + anonymous_announcement = { + "anonymous-storage-FURL": furl, + "permutation-seed-base32": self._init_permutation_seed(ss), + } + + enabled_storage_servers = self._enable_storage_servers( + announceable_storage_servers, + ) + plugins_announcement = {} + storage_options = list( + storage_server.announcement + for storage_server + in enabled_storage_servers + ) + if storage_options: + # Only add the new key if there are any plugins enabled. + plugins_announcement[u"storage-options"] = storage_options + + total_announcement = {} + total_announcement.update(anonymous_announcement) + total_announcement.update(plugins_announcement) + for ic in self.introducer_clients: - ic.publish("storage", ann, self._node_private_key) - - - @defer.inlineCallbacks - def init_storage_plugins(self): - """ - Load, register, and announce any configured storage plugins. - """ - storage_plugin_names = self._get_enabled_storage_plugin_names() - plugins = list(self._collect_storage_plugins(storage_plugin_names)) - # TODO Handle missing plugins - # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3118 - announceable_storage_servers = yield self._create_plugin_storage_servers(plugins) - self._enable_storage_servers(announceable_storage_servers) - - - def _get_enabled_storage_plugin_names(self): - """ - Get the names of storage plugins that are enabled in the configuration. - """ - return set( - self.config.get_config( - "storage", "plugins", b"" - ).decode("ascii").split(u",") - ) - - - def _collect_storage_plugins(self, storage_plugin_names): - """ - Get the storage plugins with names matching those given. - """ - return list( - plugin - for plugin - in getPlugins(IFoolscapStoragePlugin) - if plugin.name in storage_plugin_names - ) - - - def _create_plugin_storage_servers(self, plugins): - """ - Cause each storage plugin to instantiate its storage server and return - them all. - - :return: A ``Deferred`` that fires with storage servers instantiated - by all of the given storage server plugins. - """ - return defer.gatherResults( - list( - plugin.get_storage_server( - self._get_storage_plugin_configuration(plugin.name), - lambda: self.getServiceNamed(StorageServer.name), - ).addCallback( - partial( - self._add_to_announcement, - {u"name": plugin.name}, - ), - ) - for plugin - # The order is fairly arbitrary and it is not meant to convey - # anything but providing *some* stable ordering makes the data - # a little easier to deal with (mainly in tests and when - # manually inspecting it). - in sorted(plugins, key=lambda p: p.name) - ), - ) - - - def _add_to_announcement(self, information, announceable_storage_server): - """ - Create a new ``AnnounceableStorageServer`` based on - ``announceable_storage_server`` with ``information`` added to its - ``announcement``. - """ - updated_announcement = announceable_storage_server.announcement.copy() - updated_announcement.update(information) - return AnnounceableStorageServer( - updated_announcement, - announceable_storage_server.storage_server, - ) - - - def _get_storage_plugin_configuration(self, storage_plugin_name): - """ - Load the configuration for a storage server plugin with the given name. - - :return dict[bytes, bytes]: The matching configuration. - """ - try: - config = self.config.items( - "storageserver.plugins." + storage_plugin_name, - ) - except NoSectionError: - config = [] - return dict(config) + ic.publish("storage", total_announcement, self._node_private_key) def _enable_storage_servers(self, announceable_storage_servers): @@ -783,12 +849,12 @@ class _Client(node.Node, pollmixin.PollMixin): Register and announce the given storage servers. """ for announceable in announceable_storage_servers: - self._enable_storage_server(announceable) + yield self._enable_storage_server(announceable) def _enable_storage_server(self, announceable_storage_server): """ - Register and announce a storage server. + Register a storage server. """ config_key = b"storage-plugin.{}.furl".format( # Oops, why don't I have a better handle on this value? @@ -800,16 +866,11 @@ class _Client(node.Node, pollmixin.PollMixin): self.tub, announceable_storage_server.storage_server, ) - announceable_storage_server = self._add_to_announcement( + announceable_storage_server = _add_to_announcement( {u"storage-server-FURL": furl}, announceable_storage_server, ) - for ic in self.introducer_clients: - ic.publish( - "storage", - announceable_storage_server.announcement, - self._node_key, - ) + return announceable_storage_server def init_client(self):