mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-04-07 10:56:49 +00:00
factor out "Tub maker"
This adds Node._create_tub(), which knows how to make a Tub with all the right options and connection handlers that were specified in tahoe.cfg (the connection handlers are disabled for now, but they'll get implemented soon). The new Node.create_main_tub() calls it. This main Tub is used: * to connect to the Introducer * to host the Helper (if enabled) * to host the Storage Server (if enabled) Node._create_tub() is also passed into the StorageFarmBroker, which passes it into each NativeStorageServer, to create the (separate) Tub for each server connection. _create_tub knows about the options, and NativeStorageServer can override the connection handlers. This way we don't need to pass tub options or default handlers into Client, StorageFarmBroker, or NativeStorageServer. A number of tests create NativeStorageServer objects: these were updated to match the new arguments. test_storage_client was simplified because we no longer need to mock out the Tub() constructor.
This commit is contained in:
parent
8cf53d2d12
commit
cac99569e9
@ -353,9 +353,9 @@ class Client(node.Node, pollmixin.PollMixin):
|
||||
ps = self.get_config("client", "peers.preferred", "").split(",")
|
||||
preferred_peers = tuple([p.strip() for p in ps if p != ""])
|
||||
sb = storage_client.StorageFarmBroker(permute_peers=True,
|
||||
tub_maker=self._create_tub,
|
||||
preferred_peers=preferred_peers,
|
||||
tub_options=self.tub_options,
|
||||
tub_handlers=self.tub_handlers)
|
||||
)
|
||||
self.storage_broker = sb
|
||||
sb.setServiceParent(self)
|
||||
|
||||
|
@ -83,7 +83,8 @@ class Node(service.MultiService):
|
||||
assert type(self.nickname) is unicode
|
||||
|
||||
self.init_tempdir()
|
||||
self.create_tub()
|
||||
self.set_tub_options()
|
||||
self.create_main_tub()
|
||||
self.create_control_tub()
|
||||
self.create_log_tub()
|
||||
self.logSource="Node"
|
||||
@ -163,6 +164,31 @@ class Node(service.MultiService):
|
||||
twlog.msg(e)
|
||||
raise e
|
||||
|
||||
def set_tub_options(self):
|
||||
self.tub_options = {
|
||||
"logLocalFailures": True,
|
||||
"logRemoteFailures": True,
|
||||
"expose-remote-exception-types": False,
|
||||
}
|
||||
|
||||
# see #521 for a discussion of how to pick these timeout values.
|
||||
keepalive_timeout_s = self.get_config("node", "timeout.keepalive", "")
|
||||
if keepalive_timeout_s:
|
||||
self.tub_options["keepaliveTimeout"] = int(keepalive_timeout_s)
|
||||
disconnect_timeout_s = self.get_config("node", "timeout.disconnect", "")
|
||||
if disconnect_timeout_s:
|
||||
# N.B.: this is in seconds, so use "1800" to get 30min
|
||||
self.tub_options["disconnectTimeout"] = int(disconnect_timeout_s)
|
||||
|
||||
def _create_tub(self, handler_overrides={}, **kwargs):
|
||||
assert not handler_overrides
|
||||
# Create a Tub with the right options and handlers. It will be
|
||||
# ephemeral unless the caller provides certFile=
|
||||
tub = Tub(**kwargs)
|
||||
for (name, value) in self.tub_options.items():
|
||||
tub.setOption(name, value)
|
||||
return tub
|
||||
|
||||
def _convert_tub_port(self, s):
|
||||
if re.search(r'^\d+$', s):
|
||||
return "tcp:%d" % int(s)
|
||||
@ -200,26 +226,9 @@ class Node(service.MultiService):
|
||||
new_locations.append(loc)
|
||||
return ",".join(new_locations)
|
||||
|
||||
def create_tub(self):
|
||||
def create_main_tub(self):
|
||||
certfile = os.path.join(self.basedir, "private", self.CERTFILE)
|
||||
self.tub = Tub(certFile=certfile)
|
||||
self.tub_handlers = {}
|
||||
self.tub_options = {
|
||||
"logLocalFailures": True,
|
||||
"logRemoteFailures": True,
|
||||
"expose-remote-exception-types": False,
|
||||
}
|
||||
|
||||
# see #521 for a discussion of how to pick these timeout values.
|
||||
keepalive_timeout_s = self.get_config("node", "timeout.keepalive", "")
|
||||
if keepalive_timeout_s:
|
||||
self.tub_options["keepaliveTimeout"] = int(keepalive_timeout_s)
|
||||
disconnect_timeout_s = self.get_config("node", "timeout.disconnect", "")
|
||||
if disconnect_timeout_s:
|
||||
# N.B.: this is in seconds, so use "1800" to get 30min
|
||||
self.tub_options["disconnectTimeout"] = int(disconnect_timeout_s)
|
||||
for (name, value) in self.tub_options.items():
|
||||
self.tub.setOption(name, value)
|
||||
self.tub = self._create_tub(certFile=certfile)
|
||||
|
||||
self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
|
||||
self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
|
||||
|
@ -34,7 +34,7 @@ from zope.interface import implements
|
||||
from twisted.internet import defer
|
||||
from twisted.application import service
|
||||
|
||||
from foolscap.api import Tub, eventually
|
||||
from foolscap.api import eventually
|
||||
from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer
|
||||
from allmydata.util import log, base32
|
||||
from allmydata.util.assertutil import precondition
|
||||
@ -67,13 +67,12 @@ class StorageFarmBroker(service.MultiService):
|
||||
I'm also responsible for subscribing to the IntroducerClient to find out
|
||||
about new servers as they are announced by the Introducer.
|
||||
"""
|
||||
def __init__(self, permute_peers, preferred_peers=(), tub_options={}, tub_handlers={}):
|
||||
def __init__(self, permute_peers, tub_maker, preferred_peers=()):
|
||||
service.MultiService.__init__(self)
|
||||
assert permute_peers # False not implemented yet
|
||||
self.permute_peers = permute_peers
|
||||
self._tub_maker = tub_maker
|
||||
self.preferred_peers = preferred_peers
|
||||
self._tub_options = tub_options
|
||||
self._tub_handlers = tub_handlers
|
||||
|
||||
# self.servers maps serverid -> IServer, and keeps track of all the
|
||||
# storage servers that we've heard about. Each descriptor manages its
|
||||
@ -88,10 +87,9 @@ class StorageFarmBroker(service.MultiService):
|
||||
def set_static_servers(self, servers):
|
||||
for (server_id, server) in servers.items():
|
||||
self._static_server_ids.add(server_id)
|
||||
handlers = self._tub_handlers.copy()
|
||||
handlers.update(server.get("connections", {}))
|
||||
handler_overrides = server.get("connections", {})
|
||||
s = NativeStorageServer(server_id, server["ann"],
|
||||
self._tub_options, handlers)
|
||||
self._tub_maker, handler_overrides)
|
||||
s.on_status_changed(lambda _: self._got_connection())
|
||||
s.setServiceParent(self)
|
||||
self.servers[server_id] = s
|
||||
@ -110,7 +108,7 @@ class StorageFarmBroker(service.MultiService):
|
||||
|
||||
# these two are used in unit tests
|
||||
def test_add_rref(self, serverid, rref, ann):
|
||||
s = NativeStorageServer(serverid, ann.copy(), self._tub_options, self._tub_handlers)
|
||||
s = NativeStorageServer(serverid, ann.copy(), self._tub_maker, {})
|
||||
s.rref = rref
|
||||
s._is_connected = True
|
||||
self.servers[serverid] = s
|
||||
@ -151,8 +149,7 @@ class StorageFarmBroker(service.MultiService):
|
||||
facility="tahoe.storage_broker", umid="AlxzqA",
|
||||
level=log.UNUSUAL)
|
||||
return
|
||||
s = NativeStorageServer(server_id, ann,
|
||||
self._tub_options, self._tub_handlers)
|
||||
s = NativeStorageServer(server_id, ann, self._tub_maker, {})
|
||||
s.on_status_changed(lambda _: self._got_connection())
|
||||
server_id = s.get_serverid()
|
||||
old = self.servers.get(server_id)
|
||||
@ -276,12 +273,12 @@ class NativeStorageServer(service.MultiService):
|
||||
"application-version": "unknown: no get_version()",
|
||||
}
|
||||
|
||||
def __init__(self, server_id, ann, tub_options={}, tub_handlers={}):
|
||||
def __init__(self, server_id, ann, tub_maker, handler_overrides):
|
||||
service.MultiService.__init__(self)
|
||||
self._server_id = server_id
|
||||
self.announcement = ann
|
||||
self._tub_options = tub_options
|
||||
self._tub_handlers = tub_handlers
|
||||
self._tub_maker = tub_maker
|
||||
self._handler_overrides = handler_overrides
|
||||
|
||||
assert "anonymous-storage-FURL" in ann, ann
|
||||
furl = str(ann["anonymous-storage-FURL"])
|
||||
@ -387,11 +384,7 @@ class NativeStorageServer(service.MultiService):
|
||||
|
||||
|
||||
def start_connecting(self, trigger_cb):
|
||||
self._tub = Tub()
|
||||
for (name, value) in self._tub_options.items():
|
||||
self._tub.setOption(name, value)
|
||||
|
||||
# XXX todo: set tub handlers
|
||||
self._tub = self._tub_maker(self._handler_overrides)
|
||||
self._tub.setServiceParent(self)
|
||||
furl = str(self.announcement["anonymous-storage-FURL"])
|
||||
self._trigger_cb = trigger_cb
|
||||
|
@ -202,7 +202,7 @@ def make_storagebroker(s=None, num_peers=10):
|
||||
s = FakeStorage()
|
||||
peerids = [tagged_hash("peerid", "%d" % i)[:20]
|
||||
for i in range(num_peers)]
|
||||
storage_broker = StorageFarmBroker(True)
|
||||
storage_broker = StorageFarmBroker(True, None)
|
||||
for peerid in peerids:
|
||||
fss = FakeStorageServer(peerid, s)
|
||||
ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid),
|
||||
|
@ -180,7 +180,7 @@ class NoNetworkStorageBroker:
|
||||
|
||||
class NoNetworkClient(Client):
|
||||
|
||||
def create_tub(self):
|
||||
def create_main_tub(self):
|
||||
pass
|
||||
def init_introducer_client(self):
|
||||
pass
|
||||
|
@ -22,7 +22,7 @@ class FakeClient:
|
||||
class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
|
||||
|
||||
def create_fake_client(self):
|
||||
sb = StorageFarmBroker(True)
|
||||
sb = StorageFarmBroker(True, None)
|
||||
# s.get_name() (the "short description") will be "v0-00000000".
|
||||
# s.get_longname() will include the -long suffix.
|
||||
servers = [("v0-00000000-long", "\x00"*20, "peer-0"),
|
||||
@ -41,7 +41,7 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
|
||||
"my-version": "ver",
|
||||
"oldest-supported": "oldest",
|
||||
}
|
||||
s = NativeStorageServer(server_id, ann)
|
||||
s = NativeStorageServer(server_id, ann, None, None)
|
||||
sb.test_add_server(server_id, s)
|
||||
c = FakeClient()
|
||||
c.storage_broker = sb
|
||||
|
@ -236,7 +236,7 @@ class Basic(testutil.ReallyEqualMixin, testutil.NonASCIIPathMixin, unittest.Test
|
||||
return [ s.get_longname() for s in sb.get_servers_for_psi(key) ]
|
||||
|
||||
def test_permute(self):
|
||||
sb = StorageFarmBroker(True)
|
||||
sb = StorageFarmBroker(True, None)
|
||||
for k in ["%d" % i for i in range(5)]:
|
||||
ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
|
||||
"permutation-seed-base32": base32.b2a(k) }
|
||||
@ -248,7 +248,7 @@ class Basic(testutil.ReallyEqualMixin, testutil.NonASCIIPathMixin, unittest.Test
|
||||
self.failUnlessReallyEqual(self._permute(sb, "one"), [])
|
||||
|
||||
def test_permute_with_preferred(self):
|
||||
sb = StorageFarmBroker(True, preferred_peers=['1','4'])
|
||||
sb = StorageFarmBroker(True, None, preferred_peers=['1','4'])
|
||||
for k in ["%d" % i for i in range(5)]:
|
||||
ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
|
||||
"permutation-seed-base32": base32.b2a(k) }
|
||||
|
@ -115,13 +115,13 @@ def upload_data(uploader, data, convergence):
|
||||
class AssistedUpload(unittest.TestCase):
|
||||
timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
|
||||
def setUp(self):
|
||||
self.tub = t = Tub()
|
||||
t.setOption("expose-remote-exception-types", False)
|
||||
self.s = FakeClient()
|
||||
self.s.storage_broker = StorageFarmBroker(True)
|
||||
self.s.storage_broker = StorageFarmBroker(True, lambda h: self.tub)
|
||||
self.s.secret_holder = client.SecretHolder("lease secret", "converge")
|
||||
self.s.startService()
|
||||
|
||||
self.tub = t = Tub()
|
||||
t.setOption("expose-remote-exception-types", False)
|
||||
t.setServiceParent(self.s)
|
||||
self.s.tub = t
|
||||
# we never actually use this for network traffic, so it can use a
|
||||
|
@ -1,5 +1,5 @@
|
||||
import hashlib
|
||||
from mock import Mock, patch
|
||||
from mock import Mock
|
||||
from allmydata.util import base32
|
||||
|
||||
from twisted.trial import unittest
|
||||
@ -40,13 +40,13 @@ class TestNativeStorageServer(unittest.TestCase):
|
||||
ann = {"anonymous-storage-FURL": "pb://w2hqnbaa25yw4qgcvghl5psa3srpfgw3@tcp:127.0.0.1:51309/vucto2z4fxment3vfxbqecblbf6zyp6x",
|
||||
"permutation-seed-base32": "w2hqnbaa25yw4qgcvghl5psa3srpfgw3",
|
||||
}
|
||||
nss = NativeStorageServer("server_id", ann)
|
||||
nss = NativeStorageServer("server_id", ann, None, {})
|
||||
self.assertEqual(nss.get_nickname(), "")
|
||||
|
||||
class TestStorageFarmBroker(unittest.TestCase):
|
||||
|
||||
def test_static_servers(self):
|
||||
broker = StorageFarmBroker(True)
|
||||
broker = StorageFarmBroker(True, lambda h: Mock())
|
||||
|
||||
key_s = 'v0-1234-{}'.format(1)
|
||||
ann = {
|
||||
@ -76,7 +76,7 @@ class TestStorageFarmBroker(unittest.TestCase):
|
||||
self.assertEqual(s2.get_permutation_seed(), permseed)
|
||||
|
||||
def test_static_permutation_seed_pubkey(self):
|
||||
broker = StorageFarmBroker(True)
|
||||
broker = StorageFarmBroker(True, lambda h: Mock())
|
||||
server_id = "v0-4uazse3xb6uu5qpkb7tel2bm6bpea4jhuigdhqcuvvse7hugtsia"
|
||||
k = "4uazse3xb6uu5qpkb7tel2bm6bpea4jhuigdhqcuvvse7hugtsia"
|
||||
ann = {
|
||||
@ -87,7 +87,7 @@ class TestStorageFarmBroker(unittest.TestCase):
|
||||
self.assertEqual(s.get_permutation_seed(), base32.a2b(k))
|
||||
|
||||
def test_static_permutation_seed_explicit(self):
|
||||
broker = StorageFarmBroker(True)
|
||||
broker = StorageFarmBroker(True, lambda h: Mock())
|
||||
server_id = "v0-4uazse3xb6uu5qpkb7tel2bm6bpea4jhuigdhqcuvvse7hugtsia"
|
||||
k = "w5gl5igiexhwmftwzhai5jy2jixn7yx7"
|
||||
ann = {
|
||||
@ -99,7 +99,7 @@ class TestStorageFarmBroker(unittest.TestCase):
|
||||
self.assertEqual(s.get_permutation_seed(), base32.a2b(k))
|
||||
|
||||
def test_static_permutation_seed_hashed(self):
|
||||
broker = StorageFarmBroker(True)
|
||||
broker = StorageFarmBroker(True, lambda h: Mock())
|
||||
server_id = "unparseable"
|
||||
ann = {
|
||||
"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
|
||||
@ -112,7 +112,10 @@ class TestStorageFarmBroker(unittest.TestCase):
|
||||
@inlineCallbacks
|
||||
def test_threshold_reached(self):
|
||||
introducer = Mock()
|
||||
broker = StorageFarmBroker(True)
|
||||
new_tubs = []
|
||||
def make_tub(*args, **kwargs):
|
||||
return new_tubs.pop()
|
||||
broker = StorageFarmBroker(True, make_tub)
|
||||
done = broker.when_connected_enough(5)
|
||||
broker.use_introducer(introducer)
|
||||
# subscribes to "storage" to learn of new storage nodes
|
||||
@ -130,10 +133,10 @@ class TestStorageFarmBroker(unittest.TestCase):
|
||||
def add_one_server(x):
|
||||
data["anonymous-storage-FURL"] = "pb://{}@nowhere/fake".format(base32.b2a(str(x)))
|
||||
tub = Mock()
|
||||
with patch("allmydata.storage_client.Tub", side_effect=[tub]):
|
||||
got_announcement('v0-1234-{}'.format(x), data)
|
||||
self.assertEqual(tub.mock_calls[-1][0], 'connectTo')
|
||||
got_connection = tub.mock_calls[-1][1][1]
|
||||
new_tubs.append(tub)
|
||||
got_announcement('v0-1234-{}'.format(x), data)
|
||||
self.assertEqual(tub.mock_calls[-1][0], 'connectTo')
|
||||
got_connection = tub.mock_calls[-1][1][1]
|
||||
rref = Mock()
|
||||
rref.callRemote = Mock(return_value=succeed(1234))
|
||||
got_connection(rref)
|
||||
|
@ -198,7 +198,7 @@ class FakeClient:
|
||||
mode = dict([i,mode] for i in range(num_servers))
|
||||
servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
|
||||
for fakeid in range(self.num_servers) ]
|
||||
self.storage_broker = StorageFarmBroker(permute_peers=True)
|
||||
self.storage_broker = StorageFarmBroker(permute_peers=True, tub_maker=None)
|
||||
for (serverid, rref) in servers:
|
||||
ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid),
|
||||
"permutation-seed-base32": base32.b2a(serverid) }
|
||||
|
@ -25,7 +25,7 @@ class RenderServiceRow(unittest.TestCase):
|
||||
ann = {"anonymous-storage-FURL": "pb://w2hqnbaa25yw4qgcvghl5psa3srpfgw3@tcp:127.0.0.1:51309/vucto2z4fxment3vfxbqecblbf6zyp6x",
|
||||
"permutation-seed-base32": "w2hqnbaa25yw4qgcvghl5psa3srpfgw3",
|
||||
}
|
||||
s = NativeStorageServer("server_id", ann)
|
||||
s = NativeStorageServer("server_id", ann, None, {})
|
||||
|
||||
r = FakeRoot()
|
||||
ctx = FakeContext()
|
||||
|
@ -233,7 +233,7 @@ class FakeClient(Client):
|
||||
self._secret_holder = SecretHolder("lease secret", "convergence secret")
|
||||
self.helper = None
|
||||
self.convergence = "some random string"
|
||||
self.storage_broker = StorageFarmBroker(permute_peers=True)
|
||||
self.storage_broker = StorageFarmBroker(permute_peers=True, tub_maker=None)
|
||||
# fake knowledge of another server
|
||||
self.storage_broker.test_add_server("other_nodeid",
|
||||
FakeDisplayableServer(
|
||||
|
Loading…
x
Reference in New Issue
Block a user