From cac99569e9a642b98902bb431e1e6a706cc5bc76 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Sat, 27 Aug 2016 16:53:31 -0700 Subject: [PATCH] factor out "Tub maker" This adds Node._create_tub(), which knows how to make a Tub with all the right options and connection handlers that were specified in tahoe.cfg (the connection handlers are disabled for now, but they'll get implemented soon). The new Node.create_main_tub() calls it. This main Tub is used: * to connect to the Introducer * to host the Helper (if enabled) * to host the Storage Server (if enabled) Node._create_tub() is also passed into the StorageFarmBroker, which passes it into each NativeStorageServer, to create the (separate) Tub for each server connection. _create_tub knows about the options, and NativeStorageServer can override the connection handlers. This way we don't need to pass tub options or default handlers into Client, StorageFarmBroker, or NativeStorageServer. A number of tests create NativeStorageServer objects: these were updated to match the new arguments. test_storage_client was simplified because we no longer need to mock out the Tub() constructor. --- src/allmydata/client.py | 4 +- src/allmydata/node.py | 49 ++++++++++++++--------- src/allmydata/storage_client.py | 29 +++++--------- src/allmydata/test/mutable/util.py | 2 +- src/allmydata/test/no_network.py | 2 +- src/allmydata/test/test_checker.py | 4 +- src/allmydata/test/test_client.py | 4 +- src/allmydata/test/test_helper.py | 6 +-- src/allmydata/test/test_storage_client.py | 25 +++++++----- src/allmydata/test/test_upload.py | 2 +- src/allmydata/test/web/test_root.py | 2 +- src/allmydata/test/web/test_web.py | 2 +- 12 files changed, 68 insertions(+), 63 deletions(-) diff --git a/src/allmydata/client.py b/src/allmydata/client.py index b96bcffb4..6b98c4329 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -353,9 +353,9 @@ class Client(node.Node, pollmixin.PollMixin): ps = self.get_config("client", "peers.preferred", "").split(",") preferred_peers = tuple([p.strip() for p in ps if p != ""]) sb = storage_client.StorageFarmBroker(permute_peers=True, + tub_maker=self._create_tub, preferred_peers=preferred_peers, - tub_options=self.tub_options, - tub_handlers=self.tub_handlers) + ) self.storage_broker = sb sb.setServiceParent(self) diff --git a/src/allmydata/node.py b/src/allmydata/node.py index c7295b6ed..c35ffcc9d 100644 --- a/src/allmydata/node.py +++ b/src/allmydata/node.py @@ -83,7 +83,8 @@ class Node(service.MultiService): assert type(self.nickname) is unicode self.init_tempdir() - self.create_tub() + self.set_tub_options() + self.create_main_tub() self.create_control_tub() self.create_log_tub() self.logSource="Node" @@ -163,6 +164,31 @@ class Node(service.MultiService): twlog.msg(e) raise e + def set_tub_options(self): + self.tub_options = { + "logLocalFailures": True, + "logRemoteFailures": True, + "expose-remote-exception-types": False, + } + + # see #521 for a discussion of how to pick these timeout values. + keepalive_timeout_s = self.get_config("node", "timeout.keepalive", "") + if keepalive_timeout_s: + self.tub_options["keepaliveTimeout"] = int(keepalive_timeout_s) + disconnect_timeout_s = self.get_config("node", "timeout.disconnect", "") + if disconnect_timeout_s: + # N.B.: this is in seconds, so use "1800" to get 30min + self.tub_options["disconnectTimeout"] = int(disconnect_timeout_s) + + def _create_tub(self, handler_overrides={}, **kwargs): + assert not handler_overrides + # Create a Tub with the right options and handlers. It will be + # ephemeral unless the caller provides certFile= + tub = Tub(**kwargs) + for (name, value) in self.tub_options.items(): + tub.setOption(name, value) + return tub + def _convert_tub_port(self, s): if re.search(r'^\d+$', s): return "tcp:%d" % int(s) @@ -200,26 +226,9 @@ class Node(service.MultiService): new_locations.append(loc) return ",".join(new_locations) - def create_tub(self): + def create_main_tub(self): certfile = os.path.join(self.basedir, "private", self.CERTFILE) - self.tub = Tub(certFile=certfile) - self.tub_handlers = {} - self.tub_options = { - "logLocalFailures": True, - "logRemoteFailures": True, - "expose-remote-exception-types": False, - } - - # see #521 for a discussion of how to pick these timeout values. - keepalive_timeout_s = self.get_config("node", "timeout.keepalive", "") - if keepalive_timeout_s: - self.tub_options["keepaliveTimeout"] = int(keepalive_timeout_s) - disconnect_timeout_s = self.get_config("node", "timeout.disconnect", "") - if disconnect_timeout_s: - # N.B.: this is in seconds, so use "1800" to get 30min - self.tub_options["disconnectTimeout"] = int(disconnect_timeout_s) - for (name, value) in self.tub_options.items(): - self.tub.setOption(name, value) + self.tub = self._create_tub(certFile=certfile) self.nodeid = b32decode(self.tub.tubID.upper()) # binary format self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n") diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 784a04c9f..78c5b6360 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -34,7 +34,7 @@ from zope.interface import implements from twisted.internet import defer from twisted.application import service -from foolscap.api import Tub, eventually +from foolscap.api import eventually from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer from allmydata.util import log, base32 from allmydata.util.assertutil import precondition @@ -67,13 +67,12 @@ class StorageFarmBroker(service.MultiService): I'm also responsible for subscribing to the IntroducerClient to find out about new servers as they are announced by the Introducer. """ - def __init__(self, permute_peers, preferred_peers=(), tub_options={}, tub_handlers={}): + def __init__(self, permute_peers, tub_maker, preferred_peers=()): service.MultiService.__init__(self) assert permute_peers # False not implemented yet self.permute_peers = permute_peers + self._tub_maker = tub_maker self.preferred_peers = preferred_peers - self._tub_options = tub_options - self._tub_handlers = tub_handlers # self.servers maps serverid -> IServer, and keeps track of all the # storage servers that we've heard about. Each descriptor manages its @@ -88,10 +87,9 @@ class StorageFarmBroker(service.MultiService): def set_static_servers(self, servers): for (server_id, server) in servers.items(): self._static_server_ids.add(server_id) - handlers = self._tub_handlers.copy() - handlers.update(server.get("connections", {})) + handler_overrides = server.get("connections", {}) s = NativeStorageServer(server_id, server["ann"], - self._tub_options, handlers) + self._tub_maker, handler_overrides) s.on_status_changed(lambda _: self._got_connection()) s.setServiceParent(self) self.servers[server_id] = s @@ -110,7 +108,7 @@ class StorageFarmBroker(service.MultiService): # these two are used in unit tests def test_add_rref(self, serverid, rref, ann): - s = NativeStorageServer(serverid, ann.copy(), self._tub_options, self._tub_handlers) + s = NativeStorageServer(serverid, ann.copy(), self._tub_maker, {}) s.rref = rref s._is_connected = True self.servers[serverid] = s @@ -151,8 +149,7 @@ class StorageFarmBroker(service.MultiService): facility="tahoe.storage_broker", umid="AlxzqA", level=log.UNUSUAL) return - s = NativeStorageServer(server_id, ann, - self._tub_options, self._tub_handlers) + s = NativeStorageServer(server_id, ann, self._tub_maker, {}) s.on_status_changed(lambda _: self._got_connection()) server_id = s.get_serverid() old = self.servers.get(server_id) @@ -276,12 +273,12 @@ class NativeStorageServer(service.MultiService): "application-version": "unknown: no get_version()", } - def __init__(self, server_id, ann, tub_options={}, tub_handlers={}): + def __init__(self, server_id, ann, tub_maker, handler_overrides): service.MultiService.__init__(self) self._server_id = server_id self.announcement = ann - self._tub_options = tub_options - self._tub_handlers = tub_handlers + self._tub_maker = tub_maker + self._handler_overrides = handler_overrides assert "anonymous-storage-FURL" in ann, ann furl = str(ann["anonymous-storage-FURL"]) @@ -387,11 +384,7 @@ class NativeStorageServer(service.MultiService): def start_connecting(self, trigger_cb): - self._tub = Tub() - for (name, value) in self._tub_options.items(): - self._tub.setOption(name, value) - - # XXX todo: set tub handlers + self._tub = self._tub_maker(self._handler_overrides) self._tub.setServiceParent(self) furl = str(self.announcement["anonymous-storage-FURL"]) self._trigger_cb = trigger_cb diff --git a/src/allmydata/test/mutable/util.py b/src/allmydata/test/mutable/util.py index 4ea86b0f8..5dee87803 100644 --- a/src/allmydata/test/mutable/util.py +++ b/src/allmydata/test/mutable/util.py @@ -202,7 +202,7 @@ def make_storagebroker(s=None, num_peers=10): s = FakeStorage() peerids = [tagged_hash("peerid", "%d" % i)[:20] for i in range(num_peers)] - storage_broker = StorageFarmBroker(True) + storage_broker = StorageFarmBroker(True, None) for peerid in peerids: fss = FakeStorageServer(peerid, s) ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid), diff --git a/src/allmydata/test/no_network.py b/src/allmydata/test/no_network.py index 5707c50ca..d728b6819 100644 --- a/src/allmydata/test/no_network.py +++ b/src/allmydata/test/no_network.py @@ -180,7 +180,7 @@ class NoNetworkStorageBroker: class NoNetworkClient(Client): - def create_tub(self): + def create_main_tub(self): pass def init_introducer_client(self): pass diff --git a/src/allmydata/test/test_checker.py b/src/allmydata/test/test_checker.py index 475974c05..731ae7b41 100644 --- a/src/allmydata/test/test_checker.py +++ b/src/allmydata/test/test_checker.py @@ -22,7 +22,7 @@ class FakeClient: class WebResultsRendering(unittest.TestCase, WebRenderingMixin): def create_fake_client(self): - sb = StorageFarmBroker(True) + sb = StorageFarmBroker(True, None) # s.get_name() (the "short description") will be "v0-00000000". # s.get_longname() will include the -long suffix. servers = [("v0-00000000-long", "\x00"*20, "peer-0"), @@ -41,7 +41,7 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin): "my-version": "ver", "oldest-supported": "oldest", } - s = NativeStorageServer(server_id, ann) + s = NativeStorageServer(server_id, ann, None, None) sb.test_add_server(server_id, s) c = FakeClient() c.storage_broker = sb diff --git a/src/allmydata/test/test_client.py b/src/allmydata/test/test_client.py index 5acf80c93..d554dd146 100644 --- a/src/allmydata/test/test_client.py +++ b/src/allmydata/test/test_client.py @@ -236,7 +236,7 @@ class Basic(testutil.ReallyEqualMixin, testutil.NonASCIIPathMixin, unittest.Test return [ s.get_longname() for s in sb.get_servers_for_psi(key) ] def test_permute(self): - sb = StorageFarmBroker(True) + sb = StorageFarmBroker(True, None) for k in ["%d" % i for i in range(5)]: ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake", "permutation-seed-base32": base32.b2a(k) } @@ -248,7 +248,7 @@ class Basic(testutil.ReallyEqualMixin, testutil.NonASCIIPathMixin, unittest.Test self.failUnlessReallyEqual(self._permute(sb, "one"), []) def test_permute_with_preferred(self): - sb = StorageFarmBroker(True, preferred_peers=['1','4']) + sb = StorageFarmBroker(True, None, preferred_peers=['1','4']) for k in ["%d" % i for i in range(5)]: ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake", "permutation-seed-base32": base32.b2a(k) } diff --git a/src/allmydata/test/test_helper.py b/src/allmydata/test/test_helper.py index b0fe357db..f423d4712 100644 --- a/src/allmydata/test/test_helper.py +++ b/src/allmydata/test/test_helper.py @@ -115,13 +115,13 @@ def upload_data(uploader, data, convergence): class AssistedUpload(unittest.TestCase): timeout = 240 # It takes longer than 120 seconds on Francois's arm box. def setUp(self): + self.tub = t = Tub() + t.setOption("expose-remote-exception-types", False) self.s = FakeClient() - self.s.storage_broker = StorageFarmBroker(True) + self.s.storage_broker = StorageFarmBroker(True, lambda h: self.tub) self.s.secret_holder = client.SecretHolder("lease secret", "converge") self.s.startService() - self.tub = t = Tub() - t.setOption("expose-remote-exception-types", False) t.setServiceParent(self.s) self.s.tub = t # we never actually use this for network traffic, so it can use a diff --git a/src/allmydata/test/test_storage_client.py b/src/allmydata/test/test_storage_client.py index 8c53ec630..592689ef9 100644 --- a/src/allmydata/test/test_storage_client.py +++ b/src/allmydata/test/test_storage_client.py @@ -1,5 +1,5 @@ import hashlib -from mock import Mock, patch +from mock import Mock from allmydata.util import base32 from twisted.trial import unittest @@ -40,13 +40,13 @@ class TestNativeStorageServer(unittest.TestCase): ann = {"anonymous-storage-FURL": "pb://w2hqnbaa25yw4qgcvghl5psa3srpfgw3@tcp:127.0.0.1:51309/vucto2z4fxment3vfxbqecblbf6zyp6x", "permutation-seed-base32": "w2hqnbaa25yw4qgcvghl5psa3srpfgw3", } - nss = NativeStorageServer("server_id", ann) + nss = NativeStorageServer("server_id", ann, None, {}) self.assertEqual(nss.get_nickname(), "") class TestStorageFarmBroker(unittest.TestCase): def test_static_servers(self): - broker = StorageFarmBroker(True) + broker = StorageFarmBroker(True, lambda h: Mock()) key_s = 'v0-1234-{}'.format(1) ann = { @@ -76,7 +76,7 @@ class TestStorageFarmBroker(unittest.TestCase): self.assertEqual(s2.get_permutation_seed(), permseed) def test_static_permutation_seed_pubkey(self): - broker = StorageFarmBroker(True) + broker = StorageFarmBroker(True, lambda h: Mock()) server_id = "v0-4uazse3xb6uu5qpkb7tel2bm6bpea4jhuigdhqcuvvse7hugtsia" k = "4uazse3xb6uu5qpkb7tel2bm6bpea4jhuigdhqcuvvse7hugtsia" ann = { @@ -87,7 +87,7 @@ class TestStorageFarmBroker(unittest.TestCase): self.assertEqual(s.get_permutation_seed(), base32.a2b(k)) def test_static_permutation_seed_explicit(self): - broker = StorageFarmBroker(True) + broker = StorageFarmBroker(True, lambda h: Mock()) server_id = "v0-4uazse3xb6uu5qpkb7tel2bm6bpea4jhuigdhqcuvvse7hugtsia" k = "w5gl5igiexhwmftwzhai5jy2jixn7yx7" ann = { @@ -99,7 +99,7 @@ class TestStorageFarmBroker(unittest.TestCase): self.assertEqual(s.get_permutation_seed(), base32.a2b(k)) def test_static_permutation_seed_hashed(self): - broker = StorageFarmBroker(True) + broker = StorageFarmBroker(True, lambda h: Mock()) server_id = "unparseable" ann = { "anonymous-storage-FURL": "pb://abcde@nowhere/fake", @@ -112,7 +112,10 @@ class TestStorageFarmBroker(unittest.TestCase): @inlineCallbacks def test_threshold_reached(self): introducer = Mock() - broker = StorageFarmBroker(True) + new_tubs = [] + def make_tub(*args, **kwargs): + return new_tubs.pop() + broker = StorageFarmBroker(True, make_tub) done = broker.when_connected_enough(5) broker.use_introducer(introducer) # subscribes to "storage" to learn of new storage nodes @@ -130,10 +133,10 @@ class TestStorageFarmBroker(unittest.TestCase): def add_one_server(x): data["anonymous-storage-FURL"] = "pb://{}@nowhere/fake".format(base32.b2a(str(x))) tub = Mock() - with patch("allmydata.storage_client.Tub", side_effect=[tub]): - got_announcement('v0-1234-{}'.format(x), data) - self.assertEqual(tub.mock_calls[-1][0], 'connectTo') - got_connection = tub.mock_calls[-1][1][1] + new_tubs.append(tub) + got_announcement('v0-1234-{}'.format(x), data) + self.assertEqual(tub.mock_calls[-1][0], 'connectTo') + got_connection = tub.mock_calls[-1][1][1] rref = Mock() rref.callRemote = Mock(return_value=succeed(1234)) got_connection(rref) diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index c8c27be92..fadc66d4c 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -198,7 +198,7 @@ class FakeClient: mode = dict([i,mode] for i in range(num_servers)) servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid])) for fakeid in range(self.num_servers) ] - self.storage_broker = StorageFarmBroker(permute_peers=True) + self.storage_broker = StorageFarmBroker(permute_peers=True, tub_maker=None) for (serverid, rref) in servers: ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid), "permutation-seed-base32": base32.b2a(serverid) } diff --git a/src/allmydata/test/web/test_root.py b/src/allmydata/test/web/test_root.py index c9e5784c1..e0bfa779b 100644 --- a/src/allmydata/test/web/test_root.py +++ b/src/allmydata/test/web/test_root.py @@ -25,7 +25,7 @@ class RenderServiceRow(unittest.TestCase): ann = {"anonymous-storage-FURL": "pb://w2hqnbaa25yw4qgcvghl5psa3srpfgw3@tcp:127.0.0.1:51309/vucto2z4fxment3vfxbqecblbf6zyp6x", "permutation-seed-base32": "w2hqnbaa25yw4qgcvghl5psa3srpfgw3", } - s = NativeStorageServer("server_id", ann) + s = NativeStorageServer("server_id", ann, None, {}) r = FakeRoot() ctx = FakeContext() diff --git a/src/allmydata/test/web/test_web.py b/src/allmydata/test/web/test_web.py index a2d3fba32..f7d5d5e14 100644 --- a/src/allmydata/test/web/test_web.py +++ b/src/allmydata/test/web/test_web.py @@ -233,7 +233,7 @@ class FakeClient(Client): self._secret_holder = SecretHolder("lease secret", "convergence secret") self.helper = None self.convergence = "some random string" - self.storage_broker = StorageFarmBroker(permute_peers=True) + self.storage_broker = StorageFarmBroker(permute_peers=True, tub_maker=None) # fake knowledge of another server self.storage_broker.test_add_server("other_nodeid", FakeDisplayableServer(