mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-21 02:01:31 +00:00
Merge branch 'storage_broker_tub.0'
This commit is contained in:
commit
9402b40da5
@ -359,8 +359,11 @@ class Client(node.Node, pollmixin.PollMixin):
|
||||
# (and everybody else who wants to use storage servers)
|
||||
ps = self.get_config("client", "peers.preferred", "").split(",")
|
||||
preferred_peers = tuple([p.strip() for p in ps if p != ""])
|
||||
sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True, preferred_peers=preferred_peers)
|
||||
sb = storage_client.StorageFarmBroker(permute_peers=True,
|
||||
preferred_peers=preferred_peers,
|
||||
tub_options=self.tub_options)
|
||||
self.storage_broker = sb
|
||||
sb.setServiceParent(self)
|
||||
|
||||
connection_threshold = min(self.encoding_params["k"],
|
||||
self.encoding_params["happy"] + 1)
|
||||
|
@ -459,7 +459,7 @@ class IDisplayableServer(Interface):
|
||||
|
||||
class IServer(IDisplayableServer):
|
||||
"""I live in the client, and represent a single server."""
|
||||
def start_connecting(tub, trigger_cb):
|
||||
def start_connecting(trigger_cb):
|
||||
pass
|
||||
|
||||
def get_rref():
|
||||
|
@ -201,18 +201,22 @@ class Node(service.MultiService):
|
||||
def create_tub(self):
|
||||
certfile = os.path.join(self.basedir, "private", self.CERTFILE)
|
||||
self.tub = Tub(certFile=certfile)
|
||||
self.tub.setOption("logLocalFailures", True)
|
||||
self.tub.setOption("logRemoteFailures", True)
|
||||
self.tub.setOption("expose-remote-exception-types", False)
|
||||
self.tub_options = {
|
||||
"logLocalFailures": True,
|
||||
"logRemoteFailures": True,
|
||||
"expose-remote-exception-types": False,
|
||||
}
|
||||
|
||||
# see #521 for a discussion of how to pick these timeout values.
|
||||
keepalive_timeout_s = self.get_config("node", "timeout.keepalive", "")
|
||||
if keepalive_timeout_s:
|
||||
self.tub.setOption("keepaliveTimeout", int(keepalive_timeout_s))
|
||||
self.tub_options["keepaliveTimeout"] = int(keepalive_timeout_s)
|
||||
disconnect_timeout_s = self.get_config("node", "timeout.disconnect", "")
|
||||
if disconnect_timeout_s:
|
||||
# N.B.: this is in seconds, so use "1800" to get 30min
|
||||
self.tub.setOption("disconnectTimeout", int(disconnect_timeout_s))
|
||||
self.tub_options["disconnectTimeout"] = int(disconnect_timeout_s)
|
||||
for (name, value) in self.tub_options.items():
|
||||
self.tub.setOption(name, value)
|
||||
|
||||
self.nodeid = b32decode(self.tub.tubID.upper()) # binary format
|
||||
self.write_config("my_nodeid", b32encode(self.nodeid).lower() + "\n")
|
||||
|
@ -32,7 +32,9 @@ the foolscap-based server implemented in src/allmydata/storage/*.py .
|
||||
import re, time
|
||||
from zope.interface import implements
|
||||
from twisted.internet import defer
|
||||
from foolscap.api import eventually
|
||||
from twisted.application import service
|
||||
|
||||
from foolscap.api import Tub, eventually
|
||||
from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer
|
||||
from allmydata.util import log, base32
|
||||
from allmydata.util.assertutil import precondition
|
||||
@ -92,7 +94,7 @@ class ConnectedEnough(object):
|
||||
|
||||
|
||||
|
||||
class StorageFarmBroker:
|
||||
class StorageFarmBroker(service.MultiService):
|
||||
implements(IStorageBroker)
|
||||
"""I live on the client, and know about storage servers. For each server
|
||||
that is participating in a grid, I either maintain a connection to it or
|
||||
@ -100,11 +102,13 @@ class StorageFarmBroker:
|
||||
I'm also responsible for subscribing to the IntroducerClient to find out
|
||||
about new servers as they are announced by the Introducer.
|
||||
"""
|
||||
def __init__(self, tub, permute_peers, preferred_peers=()):
|
||||
self.tub = tub
|
||||
def __init__(self, permute_peers, preferred_peers=(), tub_options={}):
|
||||
service.MultiService.__init__(self)
|
||||
assert permute_peers # False not implemented yet
|
||||
self.permute_peers = permute_peers
|
||||
self.preferred_peers = preferred_peers
|
||||
self._tub_options = tub_options
|
||||
|
||||
# self.servers maps serverid -> IServer, and keeps track of all the
|
||||
# storage servers that we've heard about. Each descriptor manages its
|
||||
# own Reconnector, and will give us a RemoteReference when we ask
|
||||
@ -140,7 +144,7 @@ class StorageFarmBroker:
|
||||
precondition(isinstance(key_s, str), key_s)
|
||||
precondition(key_s.startswith("v0-"), key_s)
|
||||
assert ann["service-name"] == "storage"
|
||||
s = NativeStorageServer(key_s, ann)
|
||||
s = NativeStorageServer(key_s, ann, self._tub_options)
|
||||
s.on_status_changed(lambda _: self._got_connection())
|
||||
serverid = s.get_serverid()
|
||||
old = self.servers.get(serverid)
|
||||
@ -150,9 +154,23 @@ class StorageFarmBroker:
|
||||
# replacement
|
||||
del self.servers[serverid]
|
||||
old.stop_connecting()
|
||||
old.disownServiceParent()
|
||||
# NOTE: this disownServiceParent() returns a Deferred that
|
||||
# doesn't fire until Tub.stopService fires, which will wait for
|
||||
# any existing connections to be shut down. This doesn't
|
||||
# generally matter for normal runtime, but unit tests can run
|
||||
# into DirtyReactorErrors if they don't block on these. If a test
|
||||
# replaces one server with a newer version, then terminates
|
||||
# before the old one has been shut down, it might get
|
||||
# DirtyReactorErrors. The fix would be to gather these Deferreds
|
||||
# into a structure that will block StorageFarmBroker.stopService
|
||||
# until they have fired (but hopefully don't keep reference
|
||||
# cycles around when they fire earlier than that, which will
|
||||
# almost always be the case for normal runtime).
|
||||
# now we forget about them and start using the new one
|
||||
self.servers[serverid] = s
|
||||
s.start_connecting(self.tub, self._trigger_connections)
|
||||
s.setServiceParent(self)
|
||||
s.start_connecting(self._trigger_connections)
|
||||
# the descriptor will manage their own Reconnector, and each time we
|
||||
# need servers, we'll ask them if they're connected or not.
|
||||
|
||||
@ -210,7 +228,7 @@ class StubServer:
|
||||
def get_nickname(self):
|
||||
return "?"
|
||||
|
||||
class NativeStorageServer:
|
||||
class NativeStorageServer(service.MultiService):
|
||||
"""I hold information about a storage server that we want to connect to.
|
||||
If we are connected, I hold the RemoteReference, their host address, and
|
||||
the their version information. I remember information about when we were
|
||||
@ -238,9 +256,11 @@ class NativeStorageServer:
|
||||
"application-version": "unknown: no get_version()",
|
||||
}
|
||||
|
||||
def __init__(self, key_s, ann):
|
||||
def __init__(self, key_s, ann, tub_options={}):
|
||||
service.MultiService.__init__(self)
|
||||
self.key_s = key_s
|
||||
self.announcement = ann
|
||||
self._tub_options = tub_options
|
||||
|
||||
assert "anonymous-storage-FURL" in ann, ann
|
||||
furl = str(ann["anonymous-storage-FURL"])
|
||||
@ -337,10 +357,15 @@ class NativeStorageServer:
|
||||
available_space = protocol_v1_version.get('maximum-immutable-share-size', None)
|
||||
return available_space
|
||||
|
||||
def start_connecting(self, tub, trigger_cb):
|
||||
def start_connecting(self, trigger_cb):
|
||||
self._tub = Tub()
|
||||
for (name, value) in self._tub_options.items():
|
||||
self._tub.setOption(name, value)
|
||||
self._tub.setServiceParent(self)
|
||||
|
||||
furl = str(self.announcement["anonymous-storage-FURL"])
|
||||
self._trigger_cb = trigger_cb
|
||||
self._reconnector = tub.connectTo(furl, self._got_connection)
|
||||
self._reconnector = self._tub.connectTo(furl, self._got_connection)
|
||||
|
||||
def _got_connection(self, rref):
|
||||
lp = log.msg(format="got connection to %(name)s, getting versions",
|
||||
|
@ -22,7 +22,7 @@ class FakeClient:
|
||||
class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
|
||||
|
||||
def create_fake_client(self):
|
||||
sb = StorageFarmBroker(None, True)
|
||||
sb = StorageFarmBroker(True)
|
||||
# s.get_name() (the "short description") will be "v0-00000000".
|
||||
# s.get_longname() will include the -long suffix.
|
||||
# s.get_peerid() (i.e. tubid) will be "aaa.." or "777.." or "ceir.."
|
||||
|
@ -236,7 +236,7 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
|
||||
return [ s.get_longname() for s in sb.get_servers_for_psi(key) ]
|
||||
|
||||
def test_permute(self):
|
||||
sb = StorageFarmBroker(None, True)
|
||||
sb = StorageFarmBroker(True)
|
||||
for k in ["%d" % i for i in range(5)]:
|
||||
ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
|
||||
"permutation-seed-base32": base32.b2a(k) }
|
||||
@ -248,7 +248,7 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
|
||||
self.failUnlessReallyEqual(self._permute(sb, "one"), [])
|
||||
|
||||
def test_permute_with_preferred(self):
|
||||
sb = StorageFarmBroker(None, True, ['1','4'])
|
||||
sb = StorageFarmBroker(True, ['1','4'])
|
||||
for k in ["%d" % i for i in range(5)]:
|
||||
ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
|
||||
"permutation-seed-base32": base32.b2a(k) }
|
||||
|
@ -116,7 +116,7 @@ class AssistedUpload(unittest.TestCase):
|
||||
timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
|
||||
def setUp(self):
|
||||
self.s = FakeClient()
|
||||
self.s.storage_broker = StorageFarmBroker(None, True)
|
||||
self.s.storage_broker = StorageFarmBroker(True)
|
||||
self.s.secret_holder = client.SecretHolder("lease secret", "converge")
|
||||
self.s.startService()
|
||||
|
||||
|
@ -234,7 +234,7 @@ def make_storagebroker(s=None, num_peers=10):
|
||||
s = FakeStorage()
|
||||
peerids = [tagged_hash("peerid", "%d" % i)[:20]
|
||||
for i in range(num_peers)]
|
||||
storage_broker = StorageFarmBroker(None, True)
|
||||
storage_broker = StorageFarmBroker(True)
|
||||
for peerid in peerids:
|
||||
fss = FakeStorageServer(peerid, s)
|
||||
ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid),
|
||||
|
@ -1,4 +1,4 @@
|
||||
from mock import Mock
|
||||
from mock import Mock, patch
|
||||
from allmydata.util import base32
|
||||
|
||||
from twisted.trial import unittest
|
||||
@ -9,8 +9,10 @@ from allmydata.storage_client import StorageFarmBroker, ConnectedEnough
|
||||
|
||||
|
||||
class NativeStorageServerWithVersion(NativeStorageServer):
|
||||
def __init__(self,version):
|
||||
self.version=version
|
||||
def __init__(self, version):
|
||||
# note: these instances won't work for anything other than
|
||||
# get_available_space() because we don't upcall
|
||||
self.version = version
|
||||
def get_version(self):
|
||||
return self.version
|
||||
|
||||
@ -38,14 +40,15 @@ class TestStorageFarmBroker(unittest.TestCase):
|
||||
|
||||
@inlineCallbacks
|
||||
def test_threshold_reached(self):
|
||||
tub = Mock()
|
||||
introducer = Mock()
|
||||
broker = StorageFarmBroker(tub, True)
|
||||
broker = StorageFarmBroker(True)
|
||||
done = ConnectedEnough(broker, 5).when_connected_enough()
|
||||
broker.use_introducer(introducer)
|
||||
# subscribes to "storage" to learn of new storage nodes
|
||||
subscribe = introducer.mock_calls[0]
|
||||
self.assertEqual(subscribe[0], 'subscribe_to')
|
||||
self.assertEqual(subscribe[1][0], 'storage')
|
||||
got_announcement = subscribe[1][1]
|
||||
|
||||
data = {
|
||||
"service-name": "storage",
|
||||
@ -54,12 +57,12 @@ class TestStorageFarmBroker(unittest.TestCase):
|
||||
}
|
||||
|
||||
def add_one_server(x):
|
||||
self.assertEqual(introducer.mock_calls[-1][1][0], 'storage')
|
||||
got_announce = introducer.mock_calls[-1][1][1]
|
||||
data["anonymous-storage-FURL"] = "pb://{}@nowhere/fake".format(base32.b2a(str(x)))
|
||||
got_announce('v0-1234-{}'.format(x), data)
|
||||
self.assertEqual(tub.mock_calls[-1][0], 'connectTo')
|
||||
got_connection = tub.mock_calls[-1][1][1]
|
||||
tub = Mock()
|
||||
with patch("allmydata.storage_client.Tub", side_effect=[tub]):
|
||||
got_announcement('v0-1234-{}'.format(x), data)
|
||||
self.assertEqual(tub.mock_calls[-1][0], 'connectTo')
|
||||
got_connection = tub.mock_calls[-1][1][1]
|
||||
rref = Mock()
|
||||
rref.callRemote = Mock(return_value=succeed(1234))
|
||||
got_connection(rref)
|
||||
|
@ -198,7 +198,7 @@ class FakeClient:
|
||||
mode = dict([i,mode] for i in range(num_servers))
|
||||
servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
|
||||
for fakeid in range(self.num_servers) ]
|
||||
self.storage_broker = StorageFarmBroker(None, permute_peers=True)
|
||||
self.storage_broker = StorageFarmBroker(permute_peers=True)
|
||||
for (serverid, rref) in servers:
|
||||
ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid),
|
||||
"permutation-seed-base32": base32.b2a(serverid) }
|
||||
|
@ -252,7 +252,7 @@ class FakeClient(Client):
|
||||
self._secret_holder = SecretHolder("lease secret", "convergence secret")
|
||||
self.helper = None
|
||||
self.convergence = "some random string"
|
||||
self.storage_broker = StorageFarmBroker(None, permute_peers=True)
|
||||
self.storage_broker = StorageFarmBroker(permute_peers=True)
|
||||
# fake knowledge of another server
|
||||
self.storage_broker.test_add_server("other_nodeid",
|
||||
FakeDisplayableServer(
|
||||
|
25
topfiles/2759.docs
Normal file
25
topfiles/2759.docs
Normal file
@ -0,0 +1,25 @@
|
||||
Tahoe now uses a separate Foolscap tub for each outbound storage server
|
||||
connection. This has two benefits:
|
||||
|
||||
* a slight privacy improvement: storage servers can no longer compare client
|
||||
TubIDs to confirm/deny that two clients are the same (but note there are
|
||||
other reliable signals: timing correlations, interest in the same shares,
|
||||
future Accounting identifiers)
|
||||
* this enables future per-server connection options, like using Tor for some
|
||||
servers but direct TCP connections for others (#517).
|
||||
|
||||
and a few drawbacks:
|
||||
|
||||
* It causes a small performance hit to generate new TLS keys (2048-bit RSA)
|
||||
for each connection. On a modern computer, this adds 75ms per server.
|
||||
* It breaks a NAT-bypass trick which enabled storage servers to run behind
|
||||
NAT boxes, which was only useful if all the *clients* of the storage server
|
||||
had public IP addresses, and those clients were also configured as servers.
|
||||
The trick was to configure the NAT-bound server as a client too: its
|
||||
outbound connections to the "servers" would be used in the opposite
|
||||
direction to provide storgae service to the clients (Foolscap doesn't care
|
||||
who initiated the connection, as long as both ends have the right TLS
|
||||
keys). We decided that this trick is not sufficiently general to preserve.
|
||||
* Server logs that record a TubID are no longer so easy to use: until
|
||||
Accounting (#666) lands and a client-id is used for log messages, it will
|
||||
be difficult to identify exactly which client the log is referencing.
|
Loading…
x
Reference in New Issue
Block a user