mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-06-19 07:48:11 +00:00
Improve loading of static servers
This follows the latest comments in ticket:2788, moving the static server definitions from "connections.yaml" to "servers.yaml". It removes the "connections" and "introducers" blocks from that file, leaving it responsible for just static servers (I think connections and introducers can be configured from tahoe.cfg). This feeds all the static server specs to the StorageFarmBroker in a single call, rather than delivering them as simulated introducer announcements. It cleans up the way handlers are specified too (the handler dictionary is ignored, but that will change soon).
This commit is contained in:
@ -8,7 +8,6 @@ from twisted.application import service
|
|||||||
from twisted.application.internet import TimerService
|
from twisted.application.internet import TimerService
|
||||||
from twisted.python.filepath import FilePath
|
from twisted.python.filepath import FilePath
|
||||||
from pycryptopp.publickey import rsa
|
from pycryptopp.publickey import rsa
|
||||||
from foolscap.api import eventually
|
|
||||||
|
|
||||||
import allmydata
|
import allmydata
|
||||||
from allmydata.storage.server import StorageServer
|
from allmydata.storage.server import StorageServer
|
||||||
@ -128,7 +127,6 @@ class Client(node.Node, pollmixin.PollMixin):
|
|||||||
self.started_timestamp = time.time()
|
self.started_timestamp = time.time()
|
||||||
self.logSource="Client"
|
self.logSource="Client"
|
||||||
self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
|
self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
|
||||||
self.load_connections()
|
|
||||||
self.init_introducer_client()
|
self.init_introducer_client()
|
||||||
self.init_stats_provider()
|
self.init_stats_provider()
|
||||||
self.init_secrets()
|
self.init_secrets()
|
||||||
@ -140,6 +138,7 @@ class Client(node.Node, pollmixin.PollMixin):
|
|||||||
if key_gen_furl:
|
if key_gen_furl:
|
||||||
log.msg("[client]key_generator.furl= is now ignored, see #2783")
|
log.msg("[client]key_generator.furl= is now ignored, see #2783")
|
||||||
self.init_client()
|
self.init_client()
|
||||||
|
self.load_static_servers()
|
||||||
self.helper = None
|
self.helper = None
|
||||||
if self.get_config("helper", "enabled", False, boolean=True):
|
if self.get_config("helper", "enabled", False, boolean=True):
|
||||||
self.init_helper()
|
self.init_helper()
|
||||||
@ -186,21 +185,6 @@ class Client(node.Node, pollmixin.PollMixin):
|
|||||||
self.introducer_client = ic
|
self.introducer_client = ic
|
||||||
ic.setServiceParent(self)
|
ic.setServiceParent(self)
|
||||||
|
|
||||||
def load_connections(self):
|
|
||||||
"""
|
|
||||||
Load the connections.yaml file if it exists, otherwise
|
|
||||||
create a default configuration.
|
|
||||||
"""
|
|
||||||
fn = os.path.join(self.basedir, "private", "connections.yaml")
|
|
||||||
connections_filepath = FilePath(fn)
|
|
||||||
try:
|
|
||||||
with connections_filepath.open() as f:
|
|
||||||
self.connections_config = yamlutil.safe_load(f)
|
|
||||||
except EnvironmentError:
|
|
||||||
self.connections_config = { 'servers' : {} }
|
|
||||||
content = yamlutil.safe_dump(self.connections_config)
|
|
||||||
connections_filepath.setContent(content)
|
|
||||||
|
|
||||||
def init_stats_provider(self):
|
def init_stats_provider(self):
|
||||||
gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
|
gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
|
||||||
self.stats_provider = StatsProvider(self, gatherer_furl)
|
self.stats_provider = StatsProvider(self, gatherer_furl)
|
||||||
@ -375,17 +359,28 @@ class Client(node.Node, pollmixin.PollMixin):
|
|||||||
self.storage_broker = sb
|
self.storage_broker = sb
|
||||||
sb.setServiceParent(self)
|
sb.setServiceParent(self)
|
||||||
|
|
||||||
# utilize the loaded static server specifications
|
|
||||||
for key, server in self.connections_config['servers'].items():
|
|
||||||
handlers = server.get("transport_handlers")
|
|
||||||
eventually(self.storage_broker.got_static_announcement,
|
|
||||||
key, server['announcement'], handlers)
|
|
||||||
|
|
||||||
sb.use_introducer(self.introducer_client)
|
sb.use_introducer(self.introducer_client)
|
||||||
|
|
||||||
def get_storage_broker(self):
|
def get_storage_broker(self):
|
||||||
return self.storage_broker
|
return self.storage_broker
|
||||||
|
|
||||||
|
def load_static_servers(self):
|
||||||
|
"""
|
||||||
|
Load the servers.yaml file if it exists, and provide the static
|
||||||
|
server data to the StorageFarmBroker.
|
||||||
|
"""
|
||||||
|
fn = os.path.join(self.basedir, "private", "servers.yaml")
|
||||||
|
servers_filepath = FilePath(fn)
|
||||||
|
try:
|
||||||
|
with servers_filepath.open() as f:
|
||||||
|
servers_yaml = yamlutil.safe_load(f)
|
||||||
|
static_servers = servers_yaml.get("storage", {})
|
||||||
|
log.msg("found %d static servers in private/servers.yaml" %
|
||||||
|
len(static_servers))
|
||||||
|
self.storage_broker.set_static_servers(static_servers)
|
||||||
|
except EnvironmentError:
|
||||||
|
pass
|
||||||
|
|
||||||
def init_blacklist(self):
|
def init_blacklist(self):
|
||||||
fn = os.path.join(self.basedir, "access.blacklist")
|
fn = os.path.join(self.basedir, "access.blacklist")
|
||||||
self.blacklist = Blacklist(fn)
|
self.blacklist = Blacklist(fn)
|
||||||
|
@ -80,11 +80,23 @@ class StorageFarmBroker(service.MultiService):
|
|||||||
# own Reconnector, and will give us a RemoteReference when we ask
|
# own Reconnector, and will give us a RemoteReference when we ask
|
||||||
# them for it.
|
# them for it.
|
||||||
self.servers = {}
|
self.servers = {}
|
||||||
self.static_servers = []
|
self._static_server_ids = set() # ignore announcements for these
|
||||||
self.introducer_client = None
|
self.introducer_client = None
|
||||||
self._threshold_listeners = [] # tuples of (threshold, Deferred)
|
self._threshold_listeners = [] # tuples of (threshold, Deferred)
|
||||||
self._connected_high_water_mark = 0
|
self._connected_high_water_mark = 0
|
||||||
|
|
||||||
|
def set_static_servers(self, servers):
|
||||||
|
for (server_id, server) in servers.items():
|
||||||
|
self._static_server_ids.add(server_id)
|
||||||
|
handlers = self._tub_handlers.copy()
|
||||||
|
handlers.update(server.get("connections", {}))
|
||||||
|
s = NativeStorageServer(server_id, server["ann"],
|
||||||
|
self._tub_options, handlers)
|
||||||
|
s.on_status_changed(lambda _: self._got_connection())
|
||||||
|
s.setServiceParent(self)
|
||||||
|
self.servers[server_id] = s
|
||||||
|
s.start_connecting(self._trigger_connections)
|
||||||
|
|
||||||
def when_connected_enough(self, threshold):
|
def when_connected_enough(self, threshold):
|
||||||
"""
|
"""
|
||||||
:returns: a Deferred that fires if/when our high water mark for
|
:returns: a Deferred that fires if/when our high water mark for
|
||||||
@ -128,24 +140,23 @@ class StorageFarmBroker(service.MultiService):
|
|||||||
remaining.append( (threshold, d) )
|
remaining.append( (threshold, d) )
|
||||||
self._threshold_listeners = remaining
|
self._threshold_listeners = remaining
|
||||||
|
|
||||||
def got_static_announcement(self, key_s, ann, handlers):
|
def _got_announcement(self, key_s, ann):
|
||||||
server_id = key_s
|
|
||||||
assert server_id not in self.static_servers # XXX
|
|
||||||
self.static_servers.append(server_id)
|
|
||||||
self._got_announcement(key_s, ann, handlers=handlers)
|
|
||||||
|
|
||||||
def _got_announcement(self, key_s, ann, handlers=None):
|
|
||||||
precondition(isinstance(key_s, str), key_s)
|
precondition(isinstance(key_s, str), key_s)
|
||||||
precondition(key_s.startswith("v0-"), key_s)
|
precondition(key_s.startswith("v0-"), key_s)
|
||||||
precondition(ann["service-name"] == "storage", ann["service-name"])
|
precondition(ann["service-name"] == "storage", ann["service-name"])
|
||||||
if handlers is not None:
|
server_id = key_s
|
||||||
s = NativeStorageServer(key_s, ann, self._tub_options, handlers)
|
if server_id in self._static_server_ids:
|
||||||
else:
|
log.msg(format="ignoring announcement for static server '%(id)s'",
|
||||||
s = NativeStorageServer(key_s, ann, self._tub_options, self._tub_handlers)
|
id=server_id,
|
||||||
|
facility="tahoe.storage_broker", umid="AlxzqA",
|
||||||
|
level=log.UNUSUAL)
|
||||||
|
return
|
||||||
|
s = NativeStorageServer(server_id, ann,
|
||||||
|
self._tub_options, self._tub_handlers)
|
||||||
s.on_status_changed(lambda _: self._got_connection())
|
s.on_status_changed(lambda _: self._got_connection())
|
||||||
server_id = s.get_serverid()
|
server_id = s.get_serverid()
|
||||||
old = self.servers.get(server_id)
|
old = self.servers.get(server_id)
|
||||||
if old and server_id not in self.static_servers:
|
if old:
|
||||||
if old.get_announcement() == ann:
|
if old.get_announcement() == ann:
|
||||||
return # duplicate
|
return # duplicate
|
||||||
# replacement
|
# replacement
|
||||||
|
@ -38,7 +38,7 @@ class TestNativeStorageServer(unittest.TestCase):
|
|||||||
|
|
||||||
class TestStorageFarmBroker(unittest.TestCase):
|
class TestStorageFarmBroker(unittest.TestCase):
|
||||||
|
|
||||||
def test_static_announcement(self):
|
def test_static_servers(self):
|
||||||
broker = StorageFarmBroker(True)
|
broker = StorageFarmBroker(True)
|
||||||
|
|
||||||
key_s = 'v0-1234-{}'.format(1)
|
key_s = 'v0-1234-{}'.format(1)
|
||||||
@ -47,10 +47,26 @@ class TestStorageFarmBroker(unittest.TestCase):
|
|||||||
"anonymous-storage-FURL": "pb://{}@nowhere/fake".format(base32.b2a(str(1))),
|
"anonymous-storage-FURL": "pb://{}@nowhere/fake".format(base32.b2a(str(1))),
|
||||||
"permutation-seed-base32": "aaaaaaaaaaaaaaaaaaaaaaaa",
|
"permutation-seed-base32": "aaaaaaaaaaaaaaaaaaaaaaaa",
|
||||||
}
|
}
|
||||||
broker.got_static_announcement(key_s, ann, None)
|
permseed = base32.a2b("aaaaaaaaaaaaaaaaaaaaaaaa")
|
||||||
self.failUnlessEqual(len(broker.static_servers), 1)
|
broker.set_static_servers({key_s: {"ann": ann}})
|
||||||
self.failUnlessEqual(broker.servers[key_s].announcement, ann)
|
self.failUnlessEqual(len(broker._static_server_ids), 1)
|
||||||
self.failUnlessEqual(broker.servers[key_s].get_serverid(), key_s)
|
s = broker.servers[key_s]
|
||||||
|
self.failUnlessEqual(s.announcement, ann)
|
||||||
|
self.failUnlessEqual(s.get_serverid(), key_s)
|
||||||
|
self.assertEqual(s.get_permutation_seed(), permseed)
|
||||||
|
|
||||||
|
# if the Introducer announces the same thing, we're supposed to
|
||||||
|
# ignore it
|
||||||
|
|
||||||
|
ann2 = {
|
||||||
|
"service-name": "storage",
|
||||||
|
"anonymous-storage-FURL": "pb://{}@nowhere/fake2".format(base32.b2a(str(1))),
|
||||||
|
"permutation-seed-base32": "bbbbbbbbbbbbbbbbbbbbbbbb",
|
||||||
|
}
|
||||||
|
broker._got_announcement(key_s, ann2)
|
||||||
|
s2 = broker.servers[key_s]
|
||||||
|
self.assertIdentical(s2, s)
|
||||||
|
self.assertEqual(s2.get_permutation_seed(), permseed)
|
||||||
|
|
||||||
@inlineCallbacks
|
@inlineCallbacks
|
||||||
def test_threshold_reached(self):
|
def test_threshold_reached(self):
|
||||||
|
Reference in New Issue
Block a user