mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-06-18 23:38:18 +00:00
Merge branch 'master' into 2916.grid-manager-proposal.5
This commit is contained in:
@ -7,24 +7,36 @@ from allmydata import node
|
||||
from base64 import urlsafe_b64encode
|
||||
from functools import partial
|
||||
from errno import ENOENT, EPERM
|
||||
from ConfigParser import NoSectionError
|
||||
|
||||
from foolscap.furl import (
|
||||
decode_furl,
|
||||
)
|
||||
|
||||
import attr
|
||||
from zope.interface import implementer
|
||||
|
||||
from twisted.plugin import (
|
||||
getPlugins,
|
||||
)
|
||||
from twisted.internet import reactor, defer
|
||||
from twisted.application import service
|
||||
from twisted.application.internet import TimerService
|
||||
from twisted.python.filepath import FilePath
|
||||
from twisted.python.failure import Failure
|
||||
from pycryptopp.publickey import rsa
|
||||
|
||||
import allmydata
|
||||
from allmydata.crypto import rsa, ed25519
|
||||
from allmydata.crypto.util import remove_prefix
|
||||
from allmydata.storage.server import StorageServer
|
||||
from allmydata import storage_client
|
||||
from allmydata.immutable.upload import Uploader
|
||||
from allmydata.immutable.offloaded import Helper
|
||||
from allmydata.control import ControlServer
|
||||
from allmydata.introducer.client import IntroducerClient
|
||||
from allmydata.util import (hashutil, base32, pollmixin, log, keyutil, idlib,
|
||||
yamlutil)
|
||||
from allmydata.util import (
|
||||
hashutil, base32, pollmixin, log, idlib,
|
||||
yamlutil, configutil,
|
||||
)
|
||||
from allmydata.util.encodingutil import (get_filesystem_encoding,
|
||||
from_utf8_or_none)
|
||||
from allmydata.util.abbreviate import parse_abbreviated_size
|
||||
@ -33,7 +45,14 @@ from allmydata.util.i2p_provider import create as create_i2p_provider
|
||||
from allmydata.util.tor_provider import create as create_tor_provider
|
||||
from allmydata.stats import StatsProvider
|
||||
from allmydata.history import History
|
||||
from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION, DEFAULT_MAX_SEGMENT_SIZE
|
||||
from allmydata.interfaces import (
|
||||
IStatsProducer,
|
||||
SDMF_VERSION,
|
||||
MDMF_VERSION,
|
||||
DEFAULT_MAX_SEGMENT_SIZE,
|
||||
IFoolscapStoragePlugin,
|
||||
IAnnounceableStorageServer,
|
||||
)
|
||||
from allmydata.nodemaker import NodeMaker
|
||||
from allmydata.blacklist import Blacklist
|
||||
|
||||
@ -44,9 +63,20 @@ GiB=1024*MiB
|
||||
TiB=1024*GiB
|
||||
PiB=1024*TiB
|
||||
|
||||
def _valid_config_sections():
|
||||
cfg = node._common_config_sections()
|
||||
cfg.update({
|
||||
def _is_valid_section(section_name):
|
||||
"""
|
||||
Check for valid dynamic configuration section names.
|
||||
|
||||
Currently considers all possible storage server plugin sections valid.
|
||||
"""
|
||||
return (
|
||||
section_name.startswith(b"storageserver.plugins.") or
|
||||
section_name.startswith(b"storageclient.plugins.")
|
||||
)
|
||||
|
||||
|
||||
_client_config = configutil.ValidConfiguration(
|
||||
static_valid_sections={
|
||||
"client": (
|
||||
"helper.furl",
|
||||
"introducer.furl",
|
||||
@ -57,6 +87,7 @@ def _valid_config_sections():
|
||||
"shares.needed",
|
||||
"shares.total",
|
||||
"stats_gatherer.furl",
|
||||
"storage.plugins",
|
||||
),
|
||||
"grid_managers": None, # means "any options valid"
|
||||
"grid_manager_certificates": None,
|
||||
@ -72,6 +103,7 @@ def _valid_config_sections():
|
||||
"storage": (
|
||||
"debug_discard",
|
||||
"enabled",
|
||||
"anonymous",
|
||||
"expire.cutoff_date",
|
||||
"expire.enabled",
|
||||
"expire.immutable",
|
||||
@ -82,6 +114,7 @@ def _valid_config_sections():
|
||||
"readonly",
|
||||
"reserved_space",
|
||||
"storage_dir",
|
||||
"plugins",
|
||||
"grid_management",
|
||||
),
|
||||
"sftpd": (
|
||||
@ -95,14 +128,16 @@ def _valid_config_sections():
|
||||
"helper": (
|
||||
"enabled",
|
||||
),
|
||||
"magic_folder": (
|
||||
"download.umask",
|
||||
"enabled",
|
||||
"local.directory",
|
||||
"poll_interval",
|
||||
),
|
||||
})
|
||||
return cfg
|
||||
},
|
||||
is_valid_section=_is_valid_section,
|
||||
# Anything in a valid section is a valid item, for now.
|
||||
is_valid_item=lambda section, ignored: _is_valid_section(section),
|
||||
)
|
||||
|
||||
|
||||
def _valid_config():
|
||||
cfg = node._common_valid_config()
|
||||
return cfg.update(_client_config)
|
||||
|
||||
# this is put into README in new node-directories
|
||||
CLIENT_README = """
|
||||
@ -122,7 +157,7 @@ def _make_secret():
|
||||
return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
|
||||
|
||||
|
||||
class SecretHolder:
|
||||
class SecretHolder(object):
|
||||
def __init__(self, lease_secret, convergence_secret):
|
||||
self._lease_secret = lease_secret
|
||||
self._convergence_secret = convergence_secret
|
||||
@ -136,7 +171,7 @@ class SecretHolder:
|
||||
def get_convergence_secret(self):
|
||||
return self._convergence_secret
|
||||
|
||||
class KeyGenerator:
|
||||
class KeyGenerator(object):
|
||||
"""I create RSA keys for mutable files. Each call to generate() returns a
|
||||
single keypair. The keysize is specified first by the keysize= argument
|
||||
to generate(), then with a default set by set_default_keysize(), then
|
||||
@ -163,8 +198,7 @@ class KeyGenerator:
|
||||
keysize = keysize or self.default_keysize
|
||||
# RSA key generation for a 2048 bit key takes between 0.8 and 3.2
|
||||
# secs
|
||||
signer = rsa.generate(keysize)
|
||||
verifier = signer.get_verifying_key()
|
||||
signer, verifier = rsa.create_signing_keypair(keysize)
|
||||
return defer.succeed( (verifier, signer) )
|
||||
|
||||
class Terminator(service.Service):
|
||||
@ -189,10 +223,16 @@ def read_config(basedir, portnumfile, generated_files=[]):
|
||||
return node.read_config(
|
||||
basedir, portnumfile,
|
||||
generated_files=generated_files,
|
||||
_valid_config_sections=_valid_config_sections,
|
||||
_valid_config=_valid_config(),
|
||||
)
|
||||
|
||||
|
||||
config_from_string = partial(
|
||||
node.config_from_string,
|
||||
_valid_config=_valid_config(),
|
||||
)
|
||||
|
||||
|
||||
def create_client(basedir=u".", _client_factory=None):
|
||||
"""
|
||||
Creates a new client instance (a subclass of Node).
|
||||
@ -214,10 +254,11 @@ def create_client(basedir=u".", _client_factory=None):
|
||||
_client_factory=_client_factory,
|
||||
)
|
||||
except Exception:
|
||||
return Failure()
|
||||
return defer.fail()
|
||||
|
||||
|
||||
def create_client_from_config(config, _client_factory=None):
|
||||
@defer.inlineCallbacks
|
||||
def create_client_from_config(config, _client_factory=None, _introducer_factory=None):
|
||||
"""
|
||||
Creates a new client instance (a subclass of Node). Most code
|
||||
should probably use `create_client` instead.
|
||||
@ -229,46 +270,175 @@ def create_client_from_config(config, _client_factory=None):
|
||||
|
||||
:param _client_factory: for testing; the class to instantiate
|
||||
instead of _Client
|
||||
|
||||
:param _introducer_factory: for testing; the class to instantiate instead
|
||||
of IntroducerClient
|
||||
"""
|
||||
try:
|
||||
if _client_factory is None:
|
||||
_client_factory = _Client
|
||||
if _client_factory is None:
|
||||
_client_factory = _Client
|
||||
|
||||
i2p_provider = create_i2p_provider(reactor, config)
|
||||
tor_provider = create_tor_provider(reactor, config)
|
||||
handlers = node.create_connection_handlers(reactor, config, i2p_provider, tor_provider)
|
||||
default_connection_handlers, foolscap_connection_handlers = handlers
|
||||
tub_options = node.create_tub_options(config)
|
||||
i2p_provider = create_i2p_provider(reactor, config)
|
||||
tor_provider = create_tor_provider(reactor, config)
|
||||
handlers = node.create_connection_handlers(reactor, config, i2p_provider, tor_provider)
|
||||
default_connection_handlers, foolscap_connection_handlers = handlers
|
||||
tub_options = node.create_tub_options(config)
|
||||
|
||||
main_tub = node.create_main_tub(
|
||||
config, tub_options, default_connection_handlers,
|
||||
foolscap_connection_handlers, i2p_provider, tor_provider,
|
||||
)
|
||||
control_tub = node.create_control_tub()
|
||||
main_tub = node.create_main_tub(
|
||||
config, tub_options, default_connection_handlers,
|
||||
foolscap_connection_handlers, i2p_provider, tor_provider,
|
||||
)
|
||||
control_tub = node.create_control_tub()
|
||||
|
||||
introducer_clients = create_introducer_clients(config, main_tub)
|
||||
storage_broker = create_storage_farm_broker(
|
||||
config, default_connection_handlers, foolscap_connection_handlers,
|
||||
tub_options, introducer_clients
|
||||
)
|
||||
introducer_clients = create_introducer_clients(config, main_tub, _introducer_factory)
|
||||
storage_broker = create_storage_farm_broker(
|
||||
config, default_connection_handlers, foolscap_connection_handlers,
|
||||
tub_options, introducer_clients
|
||||
)
|
||||
|
||||
client = _client_factory(
|
||||
client = _client_factory(
|
||||
config,
|
||||
main_tub,
|
||||
control_tub,
|
||||
i2p_provider,
|
||||
tor_provider,
|
||||
introducer_clients,
|
||||
storage_broker,
|
||||
)
|
||||
|
||||
# Initialize storage separately after creating the client. This is
|
||||
# necessary because we need to pass a reference to the client in to the
|
||||
# storage plugins to allow them to initialize themselves (specifically,
|
||||
# they may want the anonymous IStorageServer implementation so they don't
|
||||
# have to duplicate all of its basic storage functionality). A better way
|
||||
# to do this, eventually, may be to create that implementation first and
|
||||
# then pass it in to both storage plugin creation and the client factory.
|
||||
# This avoids making a partially initialized client object escape the
|
||||
# client factory and removes the circular dependency between these
|
||||
# objects.
|
||||
storage_plugins = yield _StoragePlugins.from_config(
|
||||
client.get_anonymous_storage_server,
|
||||
config,
|
||||
)
|
||||
client.init_storage(storage_plugins.announceable_storage_servers)
|
||||
|
||||
i2p_provider.setServiceParent(client)
|
||||
tor_provider.setServiceParent(client)
|
||||
for ic in introducer_clients:
|
||||
ic.setServiceParent(client)
|
||||
storage_broker.setServiceParent(client)
|
||||
defer.returnValue(client)
|
||||
|
||||
|
||||
@attr.s
|
||||
class _StoragePlugins(object):
|
||||
"""
|
||||
Functionality related to getting storage plugins set up and ready for use.
|
||||
|
||||
:ivar list[IAnnounceableStorageServer] announceable_storage_servers: The
|
||||
announceable storage servers that should be used according to node
|
||||
configuration.
|
||||
"""
|
||||
announceable_storage_servers = attr.ib()
|
||||
|
||||
@classmethod
|
||||
@defer.inlineCallbacks
|
||||
def from_config(cls, get_anonymous_storage_server, config):
|
||||
"""
|
||||
Load and configured storage plugins.
|
||||
|
||||
:param get_anonymous_storage_server: A no-argument callable which
|
||||
returns the node's anonymous ``IStorageServer`` implementation.
|
||||
|
||||
:param _Config config: The node's configuration.
|
||||
|
||||
:return: A ``_StoragePlugins`` initialized from the given
|
||||
configuration.
|
||||
"""
|
||||
storage_plugin_names = cls._get_enabled_storage_plugin_names(config)
|
||||
plugins = list(cls._collect_storage_plugins(storage_plugin_names))
|
||||
unknown_plugin_names = storage_plugin_names - {plugin.name for plugin in plugins}
|
||||
if unknown_plugin_names:
|
||||
raise configutil.UnknownConfigError(
|
||||
"Storage plugins {} are enabled but not known on this system.".format(
|
||||
unknown_plugin_names,
|
||||
),
|
||||
)
|
||||
announceable_storage_servers = yield cls._create_plugin_storage_servers(
|
||||
get_anonymous_storage_server,
|
||||
config,
|
||||
main_tub,
|
||||
control_tub,
|
||||
i2p_provider,
|
||||
tor_provider,
|
||||
introducer_clients,
|
||||
storage_broker,
|
||||
plugins,
|
||||
)
|
||||
i2p_provider.setServiceParent(client)
|
||||
tor_provider.setServiceParent(client)
|
||||
for ic in introducer_clients:
|
||||
ic.setServiceParent(client)
|
||||
storage_broker.setServiceParent(client)
|
||||
return defer.succeed(client)
|
||||
except Exception:
|
||||
return Failure()
|
||||
defer.returnValue(cls(
|
||||
announceable_storage_servers,
|
||||
))
|
||||
|
||||
@classmethod
|
||||
def _get_enabled_storage_plugin_names(cls, config):
|
||||
"""
|
||||
Get the names of storage plugins that are enabled in the configuration.
|
||||
"""
|
||||
return set(
|
||||
config.get_config(
|
||||
"storage", "plugins", b""
|
||||
).decode("ascii").split(u",")
|
||||
) - {u""}
|
||||
|
||||
@classmethod
|
||||
def _collect_storage_plugins(cls, storage_plugin_names):
|
||||
"""
|
||||
Get the storage plugins with names matching those given.
|
||||
"""
|
||||
return list(
|
||||
plugin
|
||||
for plugin
|
||||
in getPlugins(IFoolscapStoragePlugin)
|
||||
if plugin.name in storage_plugin_names
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _create_plugin_storage_servers(cls, get_anonymous_storage_server, config, plugins):
|
||||
"""
|
||||
Cause each storage plugin to instantiate its storage server and return
|
||||
them all.
|
||||
|
||||
:return: A ``Deferred`` that fires with storage servers instantiated
|
||||
by all of the given storage server plugins.
|
||||
"""
|
||||
return defer.gatherResults(
|
||||
list(
|
||||
plugin.get_storage_server(
|
||||
cls._get_storage_plugin_configuration(config, plugin.name),
|
||||
get_anonymous_storage_server,
|
||||
).addCallback(
|
||||
partial(
|
||||
_add_to_announcement,
|
||||
{u"name": plugin.name},
|
||||
),
|
||||
)
|
||||
for plugin
|
||||
# The order is fairly arbitrary and it is not meant to convey
|
||||
# anything but providing *some* stable ordering makes the data
|
||||
# a little easier to deal with (mainly in tests and when
|
||||
# manually inspecting it).
|
||||
in sorted(plugins, key=lambda p: p.name)
|
||||
),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _get_storage_plugin_configuration(cls, config, storage_plugin_name):
|
||||
"""
|
||||
Load the configuration for a storage server plugin with the given name.
|
||||
|
||||
:return dict[bytes, bytes]: The matching configuration.
|
||||
"""
|
||||
try:
|
||||
config = config.items(
|
||||
"storageserver.plugins." + storage_plugin_name,
|
||||
)
|
||||
except NoSectionError:
|
||||
config = []
|
||||
return dict(config)
|
||||
|
||||
|
||||
|
||||
def _sequencer(config):
|
||||
@ -288,12 +458,18 @@ def _sequencer(config):
|
||||
return seqnum, nonce
|
||||
|
||||
|
||||
def create_introducer_clients(config, main_tub):
|
||||
def create_introducer_clients(config, main_tub, _introducer_factory=None):
|
||||
"""
|
||||
Read, validate and parse any 'introducers.yaml' configuration.
|
||||
|
||||
:param _introducer_factory: for testing; the class to instantiate instead
|
||||
of IntroducerClient
|
||||
|
||||
:returns: a list of IntroducerClient instances
|
||||
"""
|
||||
if _introducer_factory is None:
|
||||
_introducer_factory = IntroducerClient
|
||||
|
||||
# we return this list
|
||||
introducer_clients = []
|
||||
|
||||
@ -339,7 +515,7 @@ def create_introducer_clients(config, main_tub):
|
||||
|
||||
for petname, introducer in introducers.items():
|
||||
introducer_cache_filepath = FilePath(config.get_private_path("introducer_{}_cache.yaml".format(petname)))
|
||||
ic = IntroducerClient(
|
||||
ic = _introducer_factory(
|
||||
main_tub,
|
||||
introducer['furl'].encode("ascii"),
|
||||
config.nickname,
|
||||
@ -370,8 +546,9 @@ def create_storage_farm_broker(config, default_connection_handlers, foolscap_con
|
||||
:param list introducer_clients: IntroducerClient instances if
|
||||
we're connecting to any
|
||||
"""
|
||||
ps = config.get_config("client", "peers.preferred", "").split(",")
|
||||
preferred_peers = tuple([p.strip() for p in ps if p != ""])
|
||||
storage_client_config = storage_client.StorageClientConfig.from_node_config(
|
||||
config,
|
||||
)
|
||||
|
||||
def tub_creator(handler_overrides=None, **kwargs):
|
||||
return node.create_tub(
|
||||
@ -401,9 +578,10 @@ def create_storage_farm_broker(config, default_connection_handlers, foolscap_con
|
||||
sb = storage_client.StorageFarmBroker(
|
||||
permute_peers=True,
|
||||
tub_maker=tub_creator,
|
||||
node_config=config,
|
||||
storage_client_config=storage_client_config,
|
||||
preferred_peers=preferred_peers,
|
||||
grid_manager_keys=grid_manager_keys,
|
||||
## node_pubkey=my_pubkey,
|
||||
)
|
||||
for ic in introducer_clients:
|
||||
sb.use_introducer(ic)
|
||||
@ -438,6 +616,89 @@ def _load_grid_manager_certificates(config):
|
||||
return grid_manager_certificates
|
||||
|
||||
|
||||
def _register_reference(key, config, tub, referenceable):
|
||||
"""
|
||||
Register a referenceable in a tub with a stable fURL.
|
||||
|
||||
Stability is achieved by storing the fURL in the configuration the first
|
||||
time and then reading it back on for future calls.
|
||||
|
||||
:param bytes key: An identifier for this reference which can be used to
|
||||
identify its fURL in the configuration.
|
||||
|
||||
:param _Config config: The configuration to use for fURL persistence.
|
||||
|
||||
:param Tub tub: The tub in which to register the reference.
|
||||
|
||||
:param Referenceable referenceable: The referenceable to register in the
|
||||
Tub.
|
||||
|
||||
:return bytes: The fURL at which the object is registered.
|
||||
"""
|
||||
persisted_furl = config.get_private_config(
|
||||
key,
|
||||
default=None,
|
||||
)
|
||||
name = None
|
||||
if persisted_furl is not None:
|
||||
_, _, name = decode_furl(persisted_furl)
|
||||
registered_furl = tub.registerReference(
|
||||
referenceable,
|
||||
name=name,
|
||||
)
|
||||
if persisted_furl is None:
|
||||
config.write_private_config(key, registered_furl)
|
||||
return registered_furl
|
||||
|
||||
|
||||
@implementer(IAnnounceableStorageServer)
|
||||
@attr.s
|
||||
class AnnounceableStorageServer(object):
|
||||
announcement = attr.ib()
|
||||
storage_server = attr.ib()
|
||||
|
||||
|
||||
|
||||
def _add_to_announcement(information, announceable_storage_server):
|
||||
"""
|
||||
Create a new ``AnnounceableStorageServer`` based on
|
||||
``announceable_storage_server`` with ``information`` added to its
|
||||
``announcement``.
|
||||
"""
|
||||
updated_announcement = announceable_storage_server.announcement.copy()
|
||||
updated_announcement.update(information)
|
||||
return AnnounceableStorageServer(
|
||||
updated_announcement,
|
||||
announceable_storage_server.storage_server,
|
||||
)
|
||||
|
||||
|
||||
def storage_enabled(config):
|
||||
"""
|
||||
Is storage enabled according to the given configuration object?
|
||||
|
||||
:param _Config config: The configuration to inspect.
|
||||
|
||||
:return bool: ``True`` if storage is enabled, ``False`` otherwise.
|
||||
"""
|
||||
return config.get_config(b"storage", b"enabled", True, boolean=True)
|
||||
|
||||
|
||||
def anonymous_storage_enabled(config):
|
||||
"""
|
||||
Is anonymous access to storage enabled according to the given
|
||||
configuration object?
|
||||
|
||||
:param _Config config: The configuration to inspect.
|
||||
|
||||
:return bool: ``True`` if storage is enabled, ``False`` otherwise.
|
||||
"""
|
||||
return (
|
||||
storage_enabled(config) and
|
||||
config.get_config(b"storage", b"anonymous", True, boolean=True)
|
||||
)
|
||||
|
||||
|
||||
@implementer(IStatsProducer)
|
||||
class _Client(node.Node, pollmixin.PollMixin):
|
||||
|
||||
@ -467,7 +728,6 @@ class _Client(node.Node, pollmixin.PollMixin):
|
||||
"""
|
||||
node.Node.__init__(self, config, main_tub, control_tub, i2p_provider, tor_provider)
|
||||
|
||||
self._magic_folders = dict()
|
||||
self.started_timestamp = time.time()
|
||||
self.logSource = "Client"
|
||||
self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
|
||||
@ -478,7 +738,6 @@ class _Client(node.Node, pollmixin.PollMixin):
|
||||
self.init_stats_provider()
|
||||
self.init_secrets()
|
||||
self.init_node_key()
|
||||
self.init_storage()
|
||||
self.init_control()
|
||||
self._key_generator = KeyGenerator()
|
||||
key_gen_furl = config.get_config("client", "key_generator.furl", None)
|
||||
@ -494,7 +753,6 @@ class _Client(node.Node, pollmixin.PollMixin):
|
||||
self.init_helper()
|
||||
self.init_ftp_server()
|
||||
self.init_sftp_server()
|
||||
self.init_magic_folder()
|
||||
|
||||
# If the node sees an exit_trigger file, it will poll every second to see
|
||||
# whether the file still exists, and what its mtime is. If the file does not
|
||||
@ -533,21 +791,30 @@ class _Client(node.Node, pollmixin.PollMixin):
|
||||
# we only create the key once. On all subsequent runs, we re-use the
|
||||
# existing key
|
||||
def _make_key():
|
||||
sk_vs,vk_vs = keyutil.make_keypair()
|
||||
return sk_vs+"\n"
|
||||
sk_vs = self.config.get_or_create_private_config("node.privkey", _make_key)
|
||||
sk,vk_vs = keyutil.parse_privkey(sk_vs.strip())
|
||||
self.config.write_config_file("node.pubkey", vk_vs+"\n")
|
||||
self._node_key = sk
|
||||
private_key, _ = ed25519.create_signing_keypair()
|
||||
return ed25519.string_from_signing_key(private_key) + "\n"
|
||||
|
||||
private_key_str = self.config.get_or_create_private_config("node.privkey", _make_key)
|
||||
private_key, public_key = ed25519.signing_keypair_from_string(private_key_str)
|
||||
public_key_str = ed25519.string_from_verifying_key(public_key)
|
||||
self.config.write_config_file("node.pubkey", public_key_str + "\n")
|
||||
self._node_private_key = private_key
|
||||
self._node_public_key = public_key
|
||||
|
||||
def get_long_nodeid(self):
|
||||
# this matches what IServer.get_longname() says about us elsewhere
|
||||
vk_bytes = self._node_key.get_verifying_key_bytes()
|
||||
return "v0-"+base32.b2a(vk_bytes)
|
||||
vk_string = ed25519.string_from_verifying_key(self._node_public_key)
|
||||
return remove_prefix(vk_string, "pub-")
|
||||
|
||||
def get_long_tubid(self):
|
||||
return idlib.nodeid_b2a(self.nodeid)
|
||||
|
||||
def get_web_service(self):
|
||||
"""
|
||||
:return: a reference to our web server
|
||||
"""
|
||||
return self.getServiceNamed("webish")
|
||||
|
||||
def _init_permutation_seed(self, ss):
|
||||
seed = self.config.get_config_from_file("permutation-seed")
|
||||
if not seed:
|
||||
@ -564,18 +831,30 @@ class _Client(node.Node, pollmixin.PollMixin):
|
||||
else:
|
||||
# otherwise, we're free to use the more natural seed of our
|
||||
# pubkey-based serverid
|
||||
vk_bytes = self._node_key.get_verifying_key_bytes()
|
||||
vk_string = ed25519.string_from_verifying_key(self._node_public_key)
|
||||
vk_bytes = remove_prefix(vk_string, ed25519.PUBLIC_KEY_PREFIX)
|
||||
seed = base32.b2a(vk_bytes)
|
||||
self.config.write_config_file("permutation-seed", seed+"\n")
|
||||
return seed.strip()
|
||||
|
||||
def init_storage(self):
|
||||
# should we run a storage server (and publish it for others to use)?
|
||||
if not self.config.get_config("storage", "enabled", True, boolean=True):
|
||||
return
|
||||
if not self._is_tub_listening():
|
||||
raise ValueError("config error: storage is enabled, but tub "
|
||||
"is not listening ('tub.port=' is empty)")
|
||||
def get_anonymous_storage_server(self):
|
||||
"""
|
||||
Get the anonymous ``IStorageServer`` implementation for this node.
|
||||
|
||||
Note this will return an object even if storage is disabled on this
|
||||
node (but the object will not be exposed, peers will not be able to
|
||||
access it, and storage will remain disabled).
|
||||
|
||||
The one and only instance for this node is always returned. It is
|
||||
created first if necessary.
|
||||
"""
|
||||
try:
|
||||
ss = self.getServiceNamed(StorageServer.name)
|
||||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
return ss
|
||||
|
||||
readonly = self.config.get_config("storage", "readonly", False, boolean=True)
|
||||
|
||||
config_storedir = self.get_config(
|
||||
@ -630,6 +909,40 @@ class _Client(node.Node, pollmixin.PollMixin):
|
||||
expiration_sharetypes=expiration_sharetypes,
|
||||
)
|
||||
ss.setServiceParent(self)
|
||||
return ss
|
||||
|
||||
def init_storage(self, announceable_storage_servers):
|
||||
# should we run a storage server (and publish it for others to use)?
|
||||
if not storage_enabled(self.config):
|
||||
return
|
||||
if not self._is_tub_listening():
|
||||
raise ValueError("config error: storage is enabled, but tub "
|
||||
"is not listening ('tub.port=' is empty)")
|
||||
|
||||
ss = self.get_anonymous_storage_server()
|
||||
announcement = {
|
||||
"permutation-seed-base32": self._init_permutation_seed(ss),
|
||||
}
|
||||
|
||||
if anonymous_storage_enabled(self.config):
|
||||
furl_file = self.config.get_private_path("storage.furl").encode(get_filesystem_encoding())
|
||||
furl = self.tub.registerReference(ss, furlFile=furl_file)
|
||||
announcement["anonymous-storage-FURL"] = furl
|
||||
|
||||
enabled_storage_servers = self._enable_storage_servers(
|
||||
announceable_storage_servers,
|
||||
)
|
||||
storage_options = list(
|
||||
storage_server.announcement
|
||||
for storage_server
|
||||
in enabled_storage_servers
|
||||
)
|
||||
plugins_announcement = {}
|
||||
if storage_options:
|
||||
# Only add the new key if there are any plugins enabled.
|
||||
plugins_announcement[u"storage-options"] = storage_options
|
||||
|
||||
announcement.update(plugins_announcement)
|
||||
|
||||
grid_manager_certificates = []
|
||||
|
||||
@ -650,7 +963,45 @@ class _Client(node.Node, pollmixin.PollMixin):
|
||||
"grid-manager-certificates": grid_manager_certificates,
|
||||
}
|
||||
for ic in self.introducer_clients:
|
||||
ic.publish("storage", ann, self._node_key)
|
||||
ic.publish("storage", announcement, self._node_private_key)
|
||||
|
||||
def get_client_storage_plugin_web_resources(self):
|
||||
"""
|
||||
Get all of the client-side ``IResource`` implementations provided by
|
||||
enabled storage plugins.
|
||||
|
||||
:return dict[bytes, IResource provider]: The implementations.
|
||||
"""
|
||||
return self.storage_broker.get_client_storage_plugin_web_resources(
|
||||
self.config,
|
||||
)
|
||||
|
||||
def _enable_storage_servers(self, announceable_storage_servers):
|
||||
"""
|
||||
Register and announce the given storage servers.
|
||||
"""
|
||||
for announceable in announceable_storage_servers:
|
||||
yield self._enable_storage_server(announceable)
|
||||
|
||||
def _enable_storage_server(self, announceable_storage_server):
|
||||
"""
|
||||
Register a storage server.
|
||||
"""
|
||||
config_key = b"storage-plugin.{}.furl".format(
|
||||
# Oops, why don't I have a better handle on this value?
|
||||
announceable_storage_server.announcement[u"name"],
|
||||
)
|
||||
furl = _register_reference(
|
||||
config_key,
|
||||
self.config,
|
||||
self.tub,
|
||||
announceable_storage_server.storage_server,
|
||||
)
|
||||
announceable_storage_server = _add_to_announcement(
|
||||
{u"storage-server-FURL": furl},
|
||||
announceable_storage_server,
|
||||
)
|
||||
return announceable_storage_server
|
||||
|
||||
def init_client(self):
|
||||
helper_furl = self.config.get_config("client", "helper.furl", None)
|
||||
@ -682,9 +1033,6 @@ class _Client(node.Node, pollmixin.PollMixin):
|
||||
This returns a local authentication token, which is just some
|
||||
random data in "api_auth_token" which must be echoed to API
|
||||
calls.
|
||||
|
||||
Currently only the URI '/magic' for magic-folder status; other
|
||||
endpoints are invited to include this as well, as appropriate.
|
||||
"""
|
||||
return self.config.get_private_config('api_auth_token')
|
||||
|
||||
@ -802,40 +1150,6 @@ class _Client(node.Node, pollmixin.PollMixin):
|
||||
sftp_portstr, pubkey_file, privkey_file)
|
||||
s.setServiceParent(self)
|
||||
|
||||
def init_magic_folder(self):
|
||||
#print "init_magic_folder"
|
||||
if self.config.get_config("drop_upload", "enabled", False, boolean=True):
|
||||
raise node.OldConfigOptionError(
|
||||
"The [drop_upload] section must be renamed to [magic_folder].\n"
|
||||
"See docs/frontends/magic-folder.rst for more information."
|
||||
)
|
||||
|
||||
if self.config.get_config("magic_folder", "enabled", False, boolean=True):
|
||||
from allmydata.frontends import magic_folder
|
||||
|
||||
try:
|
||||
magic_folders = magic_folder.load_magic_folders(self.config._basedir)
|
||||
except Exception as e:
|
||||
log.msg("Error loading magic-folder config: {}".format(e))
|
||||
raise
|
||||
|
||||
# start processing the upload queue when we've connected to
|
||||
# enough servers
|
||||
threshold = min(self.encoding_params["k"],
|
||||
self.encoding_params["happy"] + 1)
|
||||
|
||||
for (name, mf_config) in magic_folders.items():
|
||||
self.log("Starting magic_folder '{}'".format(name))
|
||||
s = magic_folder.MagicFolder.from_config(self, name, mf_config)
|
||||
self._magic_folders[name] = s
|
||||
s.setServiceParent(self)
|
||||
|
||||
connected_d = self.storage_broker.when_connected_enough(threshold)
|
||||
def connected_enough(ign, mf):
|
||||
mf.ready() # returns a Deferred we ignore
|
||||
return None
|
||||
connected_d.addCallback(connected_enough, s)
|
||||
|
||||
def _check_exit_trigger(self, exit_trigger_file):
|
||||
if os.path.exists(exit_trigger_file):
|
||||
mtime = os.stat(exit_trigger_file)[stat.ST_MTIME]
|
||||
|
Reference in New Issue
Block a user