pull 'StorageFarmBroker' out of __init__

This means also pulling out introducer-clients and some
related utility methods
This commit is contained in:
meejah 2018-01-30 18:04:08 -07:00
parent 42c39d435a
commit 26007f363b
6 changed files with 142 additions and 117 deletions

View File

@ -1,6 +1,7 @@
import os, stat, time, weakref import os, stat, time, weakref
from allmydata import node from allmydata import node
from base64 import urlsafe_b64encode from base64 import urlsafe_b64encode
from functools import partial
from zope.interface import implementer from zope.interface import implementer
from twisted.internet import reactor, defer from twisted.internet import reactor, defer
@ -108,6 +109,7 @@ the files. See the 'configuration.rst' documentation file for details.
def _make_secret(): def _make_secret():
return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n" return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
class SecretHolder: class SecretHolder:
def __init__(self, lease_secret, convergence_secret): def __init__(self, lease_secret, convergence_secret):
self._lease_secret = lease_secret self._lease_secret = lease_secret
@ -206,15 +208,15 @@ def create_client(basedir=u".", _client_factory=None):
# pre-requisites # pre-requisites
config = read_config(basedir, u"client.port", _valid_config_sections=_valid_config_sections) config = read_config(basedir, u"client.port", _valid_config_sections=_valid_config_sections)
return create_client_from_config(basedir, config) return create_client_from_config(config)
# this can/should be async # this can/should be async
# @defer.inlineCallbacks # @defer.inlineCallbacks
def create_client_from_config(basedir, config): def create_client_from_config(config):
i2p_provider = create_i2p_provider(reactor, basedir, config) i2p_provider = create_i2p_provider(reactor, config)
tor_provider = create_tor_provider(reactor, basedir, config) tor_provider = create_tor_provider(reactor, config)
handlers = create_connection_handlers(reactor, basedir, config, i2p_provider, tor_provider) handlers = create_connection_handlers(reactor, config, i2p_provider, tor_provider)
default_connection_handlers, foolscap_connection_handlers = handlers default_connection_handlers, foolscap_connection_handlers = handlers
tub_options = create_tub_options(config) tub_options = create_tub_options(config)
@ -223,17 +225,112 @@ def create_client_from_config(basedir, config):
foolscap_connection_handlers, i2p_provider, tor_provider, foolscap_connection_handlers, i2p_provider, tor_provider,
) )
control_tub = create_control_tub() control_tub = create_control_tub()
return defer.succeed(
_Client( introducer_clients, introducer_furls = create_introducer_clients(basedir, config, main_tub)
storage_broker = create_storage_farm_broker(
config, default_connection_handlers, foolscap_connection_handlers,
tub_options, introducer_clients
)
client = _Client(
config, config,
main_tub, main_tub,
control_tub, control_tub,
i2p_provider, i2p_provider,
tor_provider, tor_provider,
introducer_clients,
introducer_furls,
storage_broker,
basedir, basedir,
tub_is_listening=is_listening, tub_is_listening=is_listening,
) )
i2p_provider.setServiceParent(client)
tor_provider.setServiceParent(client)
return defer.succeed(client)
def _sequencer(config):
seqnum_s = config.get_config_from_file("announcement-seqnum")
if not seqnum_s:
seqnum_s = "0"
seqnum = int(seqnum_s.strip())
seqnum += 1 # increment
node._write_config(config.get_config_path(), "announcement-seqnum", "%d\n" % seqnum)
nonce = _make_secret().strip()
return seqnum, nonce
def create_introducer_clients(basedir, config, main_tub):
# Returns both of these:
introducer_clients = []
introducer_furls = []
introducers_yaml_filename = os.path.join(basedir, "private", "introducers.yaml")
introducers_filepath = FilePath(introducers_yaml_filename)
try:
with introducers_filepath.open() as f:
introducers_yaml = yamlutil.safe_load(f)
introducers = introducers_yaml.get("introducers", {})
log.msg("found %d introducers in private/introducers.yaml" %
len(introducers))
except EnvironmentError:
introducers = {}
if "default" in introducers.keys():
raise ValueError("'default' introducer furl cannot be specified in introducers.yaml; please fix impossible configuration.")
# read furl from tahoe.cfg
tahoe_cfg_introducer_furl = config.get_config("client", "introducer.furl", None)
if tahoe_cfg_introducer_furl == "None":
raise ValueError("tahoe.cfg has invalid 'introducer.furl = None':"
" to disable it, use 'introducer.furl ='"
" or omit the key entirely")
if tahoe_cfg_introducer_furl:
introducers[u'default'] = {'furl':tahoe_cfg_introducer_furl}
for petname, introducer in introducers.items():
introducer_cache_filepath = FilePath(os.path.join(basedir, "private", "introducer_{}_cache.yaml".format(petname)))
ic = IntroducerClient(
main_tub,
introducer['furl'].encode("ascii"),
config.nickname,
str(allmydata.__full_version__),
str(_Client.OLDEST_SUPPORTED_VERSION),
config.get_app_versions(),
partial(_sequencer, config),
introducer_cache_filepath,
) )
introducer_clients.append(ic)
introducer_furls.append(introducer['furl'])
return introducer_clients, introducer_furls
def create_storage_farm_broker(config, default_connection_handlers, foolscap_connection_handlers, tub_options, introducer_clients):
"""
create a StorageFarmBroker object, for use by Uploader/Downloader
(and everybody else who wants to use storage servers)
"""
ps = config.get_config("client", "peers.preferred", "").split(",")
preferred_peers = tuple([p.strip() for p in ps if p != ""])
def tub_creator(handler_overrides={}, **kwargs):
return create_tub(
tub_options,
default_connection_handlers,
foolscap_connection_handlers,
handler_overrides=handler_overrides,
**kwargs
)
sb = storage_client.StorageFarmBroker(
permute_peers=True,
tub_maker=tub_creator,
preferred_peers=preferred_peers,
)
for ic in introducer_clients:
sb.use_introducer(ic)
return sb
@implementer(IStatsProducer) @implementer(IStatsProducer)
@ -258,15 +355,25 @@ class _Client(node.Node, pollmixin.PollMixin):
"max_segment_size": 128*KiB, "max_segment_size": 128*KiB,
} }
def __init__(self, config, main_tub, control_tub, i2p_provider, tor_provider, tub_is_listening): def __init__(self, config, main_tub, control_tub, i2p_provider, tor_provider, introducer_clients, introducer_furls,
storage_farm_broker, tub_is_listening):
"""
Use create_client() to instantiate one of these.
"""
node.Node.__init__(self, config, main_tub, control_tub, i2p_provider, tor_provider, tub_is_listening) node.Node.__init__(self, config, main_tub, control_tub, i2p_provider, tor_provider, tub_is_listening)
# All tub.registerReference must happen *after* we upcall, since
# that's what does tub.setLocation()
self._magic_folders = dict() self._magic_folders = dict()
self.started_timestamp = time.time() self.started_timestamp = time.time()
self.logSource="Client" self.logSource = "Client"
self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy() self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
self.init_introducer_clients()
self.introducer_clients = introducer_clients
self.introducer_furls = introducer_furls # appears completely unused (but for tests?)
for ic in introducer_clients:
ic.setServiceParent(self)
self.storage_broker = storage_farm_broker
self.storage_broker.setServiceParent(self)
self.init_stats_provider() self.init_stats_provider()
self.init_secrets() self.init_secrets()
self.init_node_key() self.init_node_key()
@ -304,56 +411,6 @@ class _Client(node.Node, pollmixin.PollMixin):
if webport: if webport:
self.init_web(webport) # strports string self.init_web(webport) # strports string
def _sequencer(self):
seqnum_s = self.config.get_config_from_file("announcement-seqnum")
if not seqnum_s:
seqnum_s = "0"
seqnum = int(seqnum_s.strip())
seqnum += 1 # increment
self.config.write_config_file("announcement-seqnum", "%d\n" % seqnum)
nonce = _make_secret().strip()
return seqnum, nonce
def init_introducer_clients(self):
self.introducer_clients = []
self.introducer_furls = []
introducers_yaml_filename = self.config.get_private_path("introducers.yaml")
introducers_filepath = FilePath(introducers_yaml_filename)
try:
with introducers_filepath.open() as f:
introducers_yaml = yamlutil.safe_load(f)
introducers = introducers_yaml.get("introducers", {})
log.msg("found %d introducers in private/introducers.yaml" %
len(introducers))
except EnvironmentError:
introducers = {}
if "default" in introducers.keys():
raise ValueError("'default' introducer furl cannot be specified in introducers.yaml; please fix impossible configuration.")
# read furl from tahoe.cfg
tahoe_cfg_introducer_furl = self.config.get_config("client", "introducer.furl", None)
if tahoe_cfg_introducer_furl == "None":
raise ValueError("tahoe.cfg has invalid 'introducer.furl = None':"
" to disable it, use 'introducer.furl ='"
" or omit the key entirely")
if tahoe_cfg_introducer_furl:
introducers[u'default'] = {'furl':tahoe_cfg_introducer_furl}
for petname, introducer in introducers.items():
introducer_cache_filepath = FilePath(self.config.get_private_path("introducer_{}_cache.yaml".format(petname)))
ic = IntroducerClient(self.tub, introducer['furl'].encode("ascii"),
self.nickname,
str(allmydata.__full_version__),
str(self.OLDEST_SUPPORTED_VERSION),
node.get_app_versions(), self._sequencer,
introducer_cache_filepath)
self.introducer_clients.append(ic)
self.introducer_furls.append(introducer['furl'])
ic.setServiceParent(self)
def init_stats_provider(self): def init_stats_provider(self):
gatherer_furl = self.config.get_config("client", "stats_gatherer.furl", None) gatherer_furl = self.config.get_config("client", "stats_gatherer.furl", None)
self.stats_provider = StatsProvider(self, gatherer_furl) self.stats_provider = StatsProvider(self, gatherer_furl)
@ -492,7 +549,6 @@ class _Client(node.Node, pollmixin.PollMixin):
# for the CLI to authenticate to local JSON endpoints # for the CLI to authenticate to local JSON endpoints
self._create_auth_token() self._create_auth_token()
self.init_client_storage_broker()
self.history = History(self.stats_provider) self.history = History(self.stats_provider)
self.terminator = Terminator() self.terminator = Terminator()
self.terminator.setServiceParent(self) self.terminator.setServiceParent(self)
@ -523,38 +579,6 @@ class _Client(node.Node, pollmixin.PollMixin):
urlsafe_b64encode(os.urandom(32)) + '\n', urlsafe_b64encode(os.urandom(32)) + '\n',
) )
def init_client_storage_broker(self):
# create a StorageFarmBroker object, for use by Uploader/Downloader
# (and everybody else who wants to use storage servers)
ps = self.config.get_config("client", "peers.preferred", "").split(",")
preferred_peers = tuple([p.strip() for p in ps if p != ""])
# this is temporary; create_client() should create a
# storage-broker and pass it in -- that method already has
# all these objects created...
default_connection_handlers, foolscap_connection_handlers = create_connection_handlers(reactor, self.basedir, self.config, self._i2p_provider, self._tor_provider)
def tub_creator(handler_overrides={}, **kwargs):
tub_options = create_tub_options(self.config)
return create_tub(
tub_options,
default_connection_handlers,
foolscap_connection_handlers,
handler_overrides=handler_overrides,
**kwargs
)
sb = storage_client.StorageFarmBroker(
permute_peers=True,
tub_maker=tub_creator,
preferred_peers=preferred_peers,
)
self.storage_broker = sb
sb.setServiceParent(self)
for ic in self.introducer_clients:
sb.use_introducer(ic)
def get_storage_broker(self): def get_storage_broker(self):
return self.storage_broker return self.storage_broker

View File

@ -59,6 +59,8 @@ def create_introducer(basedir=u"."):
default_connection_handlers, foolscap_connection_handlers = create_connection_handlers(reactor, basedir, config, i2p_provider, tor_provider) default_connection_handlers, foolscap_connection_handlers = create_connection_handlers(reactor, basedir, config, i2p_provider, tor_provider)
tub_options = create_tub_options(config) tub_options = create_tub_options(config)
# we don't remember these because the Introducer doesn't make
# outbound connections.
i2p_provider = None i2p_provider = None
tor_provider = None tor_provider = None
main_tub, is_listening = create_main_tub( main_tub, is_listening = create_main_tub(

View File

@ -273,12 +273,6 @@ class _Config(object):
def validate(self, valid_config_sections): def validate(self, valid_config_sections):
configutil.validate_config(self._config_fname, self.config, valid_config_sections) configutil.validate_config(self._config_fname, self.config, valid_config_sections)
def write_config_file(self, name, value, mode="w"):
"""
writes the given 'value' into a file called 'name' in the config
directory
"""
fn = os.path.join(self._basedir, name)
try: try:
fileutil.write(fn, value, mode) fileutil.write(fn, value, mode)
except EnvironmentError: except EnvironmentError:
@ -675,13 +669,9 @@ class Node(service.MultiService):
self.nickname = config.nickname # XXX stopgap self.nickname = config.nickname # XXX stopgap
# this can go away once Client.init_client_storage_broker is moved into create_client() # this can go away once Client.init_client_storage_broker is moved into create_client()
# (tests sometimes have None here)
self._i2p_provider = i2p_provider self._i2p_provider = i2p_provider
self._tor_provider = tor_provider self._tor_provider = tor_provider
# tests can provide None
if i2p_provider:
self._i2p_provider.setServiceParent(self)
if tor_provider:
self._tor_provider.setServiceParent(self)
self.init_tempdir() self.init_tempdir()

View File

@ -169,6 +169,8 @@ class NoNetworkServer(object):
@implementer(IStorageBroker) @implementer(IStorageBroker)
class NoNetworkStorageBroker(object): class NoNetworkStorageBroker(object):
def setServiceParent(self, _):
pass
def get_servers_for_psi(self, peer_selection_index): def get_servers_for_psi(self, peer_selection_index):
def _permuted(server): def _permuted(server):
seed = server.get_permutation_seed() seed = server.get_permutation_seed()
@ -187,7 +189,12 @@ class NoNetworkStorageBroker(object):
def create_no_network_client(basedir): def create_no_network_client(basedir):
return create_client(basedir, _client_factory=_NoNetworkClient) c = create_client(basedir, _client_factory=_NoNetworkClient)
# XXX we should probably make a way to pass this in instead of
# changing it later.. also, a reference-cycle (but, existed before :/)
c.storage_broker = NoNetworkStorageBroker()
storage_broker.client = c
return c
class _NoNetworkClient(_Client): class _NoNetworkClient(_Client):

View File

@ -6,9 +6,10 @@ from twisted.internet.interfaces import IStreamClientEndpoint
from foolscap.connections import tcp from foolscap.connections import tcp
from ..node import Node, PrivacyError, config_from_string from ..node import Node, PrivacyError, config_from_string
from ..node import create_connection_handlers from ..node import create_connection_handlers
from ..node import create_i2p_provider, create_tor_provider
from ..node import create_main_tub, _tub_portlocation from ..node import create_main_tub, _tub_portlocation
from ..util import connection_status from ..util import connection_status
from ..util.i2p_provider import create as create_i2p_provider
from ..util.tor_provider import create as create_tor_provider
class FakeNode(Node): class FakeNode(Node):

View File

@ -26,6 +26,8 @@ from allmydata.util import fileutil, iputil
from allmydata.util import i2p_provider, tor_provider from allmydata.util import i2p_provider, tor_provider
from allmydata.util.namespace import Namespace from allmydata.util.namespace import Namespace
from allmydata.util.configutil import UnknownConfigError from allmydata.util.configutil import UnknownConfigError
from allmydata.util.i2p_provider import create as create_i2p_provider
from allmydata.util.tor_provider import create as create_tor_provider
import allmydata.test.common_util as testutil import allmydata.test.common_util as testutil
@ -36,7 +38,6 @@ class LoggingMultiService(service.MultiService):
def testing_tub(config_data=''): def testing_tub(config_data=''):
from twisted.internet import reactor from twisted.internet import reactor
from allmydata.node import create_i2p_provider, create_tor_provider
basedir = 'dummy_basedir' basedir = 'dummy_basedir'
config = config_from_string(config_data, 'DEFAULT_PORTNUMFILE_BLANK', basedir) config = config_from_string(config_data, 'DEFAULT_PORTNUMFILE_BLANK', basedir)
fileutil.make_dirs(os.path.join(basedir, 'private')) fileutil.make_dirs(os.path.join(basedir, 'private'))