From b65a8fe1426076df1d7aab9a329da34c2d29ff87 Mon Sep 17 00:00:00 2001 From: meejah Date: Tue, 26 Apr 2016 00:32:21 -0600 Subject: [PATCH] Switch to when_connected_enough() - instead of passing in a Deferred(), we use an observer - fix up the tests - TODO: fix magic-folder --- src/allmydata/storage_client.py | 24 +++++++++++++++++------ src/allmydata/test/test_storage_client.py | 18 ++++++++++++++--- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 8b6cb4e2c..ba154accb 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -35,6 +35,7 @@ from foolscap.api import eventually from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer from allmydata.util import log, base32 from allmydata.util.assertutil import precondition +from allmydata.util.observer import OneShotObserverList from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.util.hashutil import sha1 @@ -62,13 +63,12 @@ class StorageFarmBroker: I'm also responsible for subscribing to the IntroducerClient to find out about new servers as they are announced by the Introducer. """ - def __init__(self, tub, permute_peers, connected_threshold, connected_d, + def __init__(self, tub, permute_peers, connected_threshold, preferred_peers=()): self.tub = tub assert permute_peers # False not implemented yet self.permute_peers = permute_peers self.connected_threshold = connected_threshold - self.connected_d = connected_d self.preferred_peers = preferred_peers # self.servers maps serverid -> IServer, and keeps track of all the # storage servers that we've heard about. Each descriptor manages its @@ -76,6 +76,19 @@ class StorageFarmBroker: # them for it. self.servers = {} self.introducer_client = None + # becomes True if we've ever gone past connected_threshold + self._threshold_passed = False + self._connected_observer = OneShotObserverList() + + def when_connected_enough(self): + """ + :returns: a Deferred that fires when we have connected to enough + servers to cross the "connected_threshold". Once this happens + -- even if the number of connected servers goes back below the + threshold -- all calls to this method will immediately succeed + (i.e. return an already-fired Deferred). + """ + return self._connected_observer.when_fired() # these two are used in unit tests def test_add_rref(self, serverid, rref, ann): @@ -123,11 +136,10 @@ class StorageFarmBroker: dsc.try_to_connect() def check_enough_connected(self): - if (self.connected_d is not None and + if (not self._threshold_passed and len(self.get_connected_servers()) >= self.connected_threshold): - d = self.connected_d - self.connected_d = None - d.callback(None) + self._threshold_passed = True + self._connected_observer.fire(None) def get_servers_for_psi(self, peer_selection_index): # return a list of server objects (IServers) diff --git a/src/allmydata/test/test_storage_client.py b/src/allmydata/test/test_storage_client.py index d70240d01..b898622db 100644 --- a/src/allmydata/test/test_storage_client.py +++ b/src/allmydata/test/test_storage_client.py @@ -3,7 +3,7 @@ from mock import Mock, patch from allmydata.util import base32 from twisted.trial import unittest -from twisted.internet.defer import Deferred, succeed +from twisted.internet.defer import Deferred, succeed, inlineCallbacks from allmydata.storage_client import NativeStorageServer from allmydata.storage_client import StorageFarmBroker @@ -37,11 +37,12 @@ class TestNativeStorageServer(unittest.TestCase): class TestStorageFarmBroker(unittest.TestCase): + @inlineCallbacks def test_threshold_reached(self): tub = Mock() introducer = Mock() - done = Deferred() - broker = StorageFarmBroker(tub, True, 5, done) + broker = StorageFarmBroker(tub, True, 5) + done = broker.when_connected_enough() broker.use_introducer(introducer) # subscribes to "storage" to learn of new storage nodes subscribe = introducer.mock_calls[0] @@ -71,4 +72,15 @@ class TestStorageFarmBroker(unittest.TestCase): # ...but the 5th *should* trigger the threshold add_one_server(42) + + # so: the OneShotObserverList only notifies via + # foolscap.eventually() -- which forces the Deferred call + # through the reactor -- so it's no longer synchronous, + # meaning that we have to do "real reactor stuff" for the + # Deferred from when_connected_enough() to actually fire. (or + # @patch() out the reactor in foolscap.eventually to be a + # Clock() so we can advance time ourselves, but ... luckily + # eventually() uses 0 as the timeout currently) + + yield done self.assertTrue(done.called)