create Tub inside NativeStorageServer, not broker

This makes IServer instances responsible for their own network
connections, which will help when we add HTTP-based servers in the
future. The StorageFarmBroker should not care about how the IServer uses
the network, it just provides the announcement (and local config).
This commit is contained in:
Brian Warner 2016-05-03 15:09:13 -07:00
parent e147aa3da2
commit 76f6d2d21a
3 changed files with 32 additions and 19 deletions

View File

@ -459,7 +459,7 @@ class IDisplayableServer(Interface):
class IServer(IDisplayableServer):
"""I live in the client, and represent a single server."""
def start_connecting(tub, trigger_cb):
def start_connecting(trigger_cb):
pass
def get_rref():

View File

@ -109,7 +109,6 @@ class StorageFarmBroker(service.MultiService):
self.preferred_peers = preferred_peers
self._tub_options = tub_options
self.tubs = {} # self.tubs maps serverid -> Tub
# self.servers maps serverid -> IServer, and keeps track of all the
# storage servers that we've heard about. Each descriptor manages its
# own Reconnector, and will give us a RemoteReference when we ask
@ -139,14 +138,6 @@ class StorageFarmBroker(service.MultiService):
self.introducer_client = ic = introducer_client
ic.subscribe_to("storage", self._got_announcement)
def _ensure_tub_created(self, serverid):
if serverid in self.tubs:
return
self.tubs[serverid] = t = Tub()
for (name, value) in self._tub_options.items():
t.setOption(name, value)
self.tubs[serverid].setServiceParent(self)
def _got_connection(self):
# this is called by NativeStorageClient when it is connected
self._server_listeners.notify()
@ -156,7 +147,7 @@ class StorageFarmBroker(service.MultiService):
precondition(isinstance(key_s, str), key_s)
precondition(key_s.startswith("v0-"), key_s)
assert ann["service-name"] == "storage"
s = NativeStorageServer(key_s, ann)
s = NativeStorageServer(key_s, ann, self._tub_options)
s.on_status_changed(lambda _: self._got_connection())
serverid = s.get_serverid()
old = self.servers.get(serverid)
@ -166,10 +157,23 @@ class StorageFarmBroker(service.MultiService):
# replacement
del self.servers[serverid]
old.stop_connecting()
old.disownServiceParent()
# NOTE: this disownServiceParent() returns a Deferred that
# doesn't fire until Tub.stopService fires, which will wait for
# any existing connections to be shut down. This doesn't
# generally matter for normal runtime, but unit tests can run
# into DirtyReactorErrors if they don't block on these. If a test
# replaces one server with a newer version, then terminates
# before the old one has been shut down, it might get
# DirtyReactorErrors. The fix would be to gather these Deferreds
# into a structure that will block StorageFarmBroker.stopService
# until they have fired (but hopefully don't keep reference
# cycles around when they fire earlier than that, which will
# almost always be the case for normal runtime).
# now we forget about them and start using the new one
self.servers[serverid] = s
self._ensure_tub_created(serverid)
s.start_connecting(self.tubs[serverid], self._trigger_connections)
s.setServiceParent(self)
s.start_connecting(self._trigger_connections)
# the descriptor will manage their own Reconnector, and each time we
# need servers, we'll ask them if they're connected or not.
@ -227,7 +231,7 @@ class StubServer:
def get_nickname(self):
return "?"
class NativeStorageServer:
class NativeStorageServer(service.MultiService):
"""I hold information about a storage server that we want to connect to.
If we are connected, I hold the RemoteReference, their host address, and
the their version information. I remember information about when we were
@ -255,9 +259,11 @@ class NativeStorageServer:
"application-version": "unknown: no get_version()",
}
def __init__(self, key_s, ann):
def __init__(self, key_s, ann, tub_options={}):
service.MultiService.__init__(self)
self.key_s = key_s
self.announcement = ann
self._tub_options = tub_options
assert "anonymous-storage-FURL" in ann, ann
furl = str(ann["anonymous-storage-FURL"])
@ -354,10 +360,15 @@ class NativeStorageServer:
available_space = protocol_v1_version.get('maximum-immutable-share-size', None)
return available_space
def start_connecting(self, tub, trigger_cb):
def start_connecting(self, trigger_cb):
self._tub = Tub()
for (name, value) in self._tub_options.items():
self._tub.setOption(name, value)
self._tub.setServiceParent(self)
furl = str(self.announcement["anonymous-storage-FURL"])
self._trigger_cb = trigger_cb
self._reconnector = tub.connectTo(furl, self._got_connection)
self._reconnector = self._tub.connectTo(furl, self._got_connection)
def _got_connection(self, rref):
lp = log.msg(format="got connection to %(name)s, getting versions",

View File

@ -9,8 +9,10 @@ from allmydata.storage_client import StorageFarmBroker, ConnectedEnough
class NativeStorageServerWithVersion(NativeStorageServer):
def __init__(self,version):
self.version=version
def __init__(self, version):
# note: these instances won't work for anything other than
# get_available_space() because we don't upcall
self.version = version
def get_version(self):
return self.version