From 1b64ab5e857b907215ba5c7c1154ffd823003c3b Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Thu, 21 Jul 2016 17:23:22 -0700 Subject: [PATCH] simplify when_connected_enough() This seems happier as a method on StorageBroker, rather than a completely separate helper class. --- src/allmydata/client.py | 14 ++--- src/allmydata/storage_client.py | 67 +++++++++-------------- src/allmydata/test/test_storage_client.py | 4 +- 3 files changed, 35 insertions(+), 50 deletions(-) diff --git a/src/allmydata/client.py b/src/allmydata/client.py index bb3c70751..455176134 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -491,14 +491,12 @@ class Client(node.Node, pollmixin.PollMixin): s.setServiceParent(self) s.startService() - # start processing the upload queue when we've connected to enough servers - connection_threshold = min(self.encoding_params["k"], - self.encoding_params["happy"] + 1) - connected = storage_client.ConnectedEnough( - self.storage_broker, - connection_threshold, - ) - connected.when_connected_enough().addCallback(lambda ign: s.ready()) + # start processing the upload queue when we've connected to + # enough servers + threshold = min(self.encoding_params["k"], + self.encoding_params["happy"] + 1) + d = self.storage_broker.when_connected_enough(threshold) + d.addCallback(lambda ign: s.ready()) def _check_exit_trigger(self, exit_trigger_file): if os.path.exists(exit_trigger_file): diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index d4162fdec..2d1cf56a7 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -38,7 +38,7 @@ from foolscap.api import Tub, eventually from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer from allmydata.util import log, base32 from allmydata.util.assertutil import precondition -from allmydata.util.observer import OneShotObserverList, ObserverList +from allmydata.util.observer import ObserverList from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.util.hashutil import sha1 @@ -59,41 +59,6 @@ from allmydata.util.hashutil import sha1 # don't pass signatures: only pass validated blessed-objects -class ConnectedEnough(object): - def __init__(self, storage_farm_broker, threshold): - self._broker = storage_farm_broker - - self._threshold = int(threshold) - if self._threshold <= 0: - raise ValueError("threshold must be positive") - self._threshold_passed = False - - self._observers = OneShotObserverList() - self._broker.on_servers_changed(self._check_enough_connected) - - def when_connected_enough(self): - """ - :returns: a Deferred that fires if/when our high water mark for - number of connected servers becomes (or ever was) above - "threshold". - """ - if self._threshold_passed: - return defer.succeed(None) - return self._observers.when_fired() - - def _check_enough_connected(self): - """ - internal helper - """ - if self._threshold_passed: - return - num_servers = len(self._broker.get_connected_servers()) - if num_servers >= self._threshold: - self._threshold_passed = True - self._observers.fire(None) - - - class StorageFarmBroker(service.MultiService): implements(IStorageBroker) """I live on the client, and know about storage servers. For each server @@ -115,10 +80,19 @@ class StorageFarmBroker(service.MultiService): # them for it. self.servers = {} self.introducer_client = None - self._server_listeners = ObserverList() + self._threshold_listeners = [] # tuples of (threshold, Deferred) + self._connected_high_water_mark = 0 - def on_servers_changed(self, callback): - self._server_listeners.subscribe(callback) + def when_connected_enough(self, threshold): + """ + :returns: a Deferred that fires if/when our high water mark for + number of connected servers becomes (or ever was) above + "threshold". + """ + d = defer.Deferred() + self._threshold_listeners.append( (threshold, d) ) + self._check_connected_high_water_mark() + return d # these two are used in unit tests def test_add_rref(self, serverid, rref, ann): @@ -137,7 +111,20 @@ class StorageFarmBroker(service.MultiService): def _got_connection(self): # this is called by NativeStorageClient when it is connected - self._server_listeners.notify() + self._check_connected_high_water_mark() + + def _check_connected_high_water_mark(self): + current = len(self.get_connected_servers()) + if current > self._connected_high_water_mark: + self._connected_high_water_mark = current + + remaining = [] + for threshold, d in self._threshold_listeners: + if self._connected_high_water_mark >= threshold: + eventually(d.callback, None) + else: + remaining.append( (threshold, d) ) + self._threshold_listeners = remaining def _got_announcement(self, key_s, ann): if key_s is not None: diff --git a/src/allmydata/test/test_storage_client.py b/src/allmydata/test/test_storage_client.py index f55592c99..a80a3ed8c 100644 --- a/src/allmydata/test/test_storage_client.py +++ b/src/allmydata/test/test_storage_client.py @@ -5,7 +5,7 @@ from twisted.trial import unittest from twisted.internet.defer import succeed, inlineCallbacks from allmydata.storage_client import NativeStorageServer -from allmydata.storage_client import StorageFarmBroker, ConnectedEnough +from allmydata.storage_client import StorageFarmBroker class NativeStorageServerWithVersion(NativeStorageServer): @@ -42,7 +42,7 @@ class TestStorageFarmBroker(unittest.TestCase): def test_threshold_reached(self): introducer = Mock() broker = StorageFarmBroker(True) - done = ConnectedEnough(broker, 5).when_connected_enough() + done = broker.when_connected_enough(5) broker.use_introducer(introducer) # subscribes to "storage" to learn of new storage nodes subscribe = introducer.mock_calls[0]