Switch to when_connected_enough()

- instead of passing in a Deferred(), we use an observer
- fix up the tests
- TODO: fix magic-folder
This commit is contained in:
meejah 2016-04-26 00:32:21 -06:00
parent be2576f15d
commit b65a8fe142
2 changed files with 33 additions and 9 deletions

View File

@ -35,6 +35,7 @@ from foolscap.api import eventually
from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer
from allmydata.util import log, base32
from allmydata.util.assertutil import precondition
from allmydata.util.observer import OneShotObserverList
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.util.hashutil import sha1
@ -62,13 +63,12 @@ class StorageFarmBroker:
I'm also responsible for subscribing to the IntroducerClient to find out
about new servers as they are announced by the Introducer.
"""
def __init__(self, tub, permute_peers, connected_threshold, connected_d,
def __init__(self, tub, permute_peers, connected_threshold,
preferred_peers=()):
self.tub = tub
assert permute_peers # False not implemented yet
self.permute_peers = permute_peers
self.connected_threshold = connected_threshold
self.connected_d = connected_d
self.preferred_peers = preferred_peers
# self.servers maps serverid -> IServer, and keeps track of all the
# storage servers that we've heard about. Each descriptor manages its
@ -76,6 +76,19 @@ class StorageFarmBroker:
# them for it.
self.servers = {}
self.introducer_client = None
# becomes True if we've ever gone past connected_threshold
self._threshold_passed = False
self._connected_observer = OneShotObserverList()
def when_connected_enough(self):
"""
:returns: a Deferred that fires when we have connected to enough
servers to cross the "connected_threshold". Once this happens
-- even if the number of connected servers goes back below the
threshold -- all calls to this method will immediately succeed
(i.e. return an already-fired Deferred).
"""
return self._connected_observer.when_fired()
# these two are used in unit tests
def test_add_rref(self, serverid, rref, ann):
@ -123,11 +136,10 @@ class StorageFarmBroker:
dsc.try_to_connect()
def check_enough_connected(self):
if (self.connected_d is not None and
if (not self._threshold_passed and
len(self.get_connected_servers()) >= self.connected_threshold):
d = self.connected_d
self.connected_d = None
d.callback(None)
self._threshold_passed = True
self._connected_observer.fire(None)
def get_servers_for_psi(self, peer_selection_index):
# return a list of server objects (IServers)

View File

@ -3,7 +3,7 @@ from mock import Mock, patch
from allmydata.util import base32
from twisted.trial import unittest
from twisted.internet.defer import Deferred, succeed
from twisted.internet.defer import Deferred, succeed, inlineCallbacks
from allmydata.storage_client import NativeStorageServer
from allmydata.storage_client import StorageFarmBroker
@ -37,11 +37,12 @@ class TestNativeStorageServer(unittest.TestCase):
class TestStorageFarmBroker(unittest.TestCase):
@inlineCallbacks
def test_threshold_reached(self):
tub = Mock()
introducer = Mock()
done = Deferred()
broker = StorageFarmBroker(tub, True, 5, done)
broker = StorageFarmBroker(tub, True, 5)
done = broker.when_connected_enough()
broker.use_introducer(introducer)
# subscribes to "storage" to learn of new storage nodes
subscribe = introducer.mock_calls[0]
@ -71,4 +72,15 @@ class TestStorageFarmBroker(unittest.TestCase):
# ...but the 5th *should* trigger the threshold
add_one_server(42)
# so: the OneShotObserverList only notifies via
# foolscap.eventually() -- which forces the Deferred call
# through the reactor -- so it's no longer synchronous,
# meaning that we have to do "real reactor stuff" for the
# Deferred from when_connected_enough() to actually fire. (or
# @patch() out the reactor in foolscap.eventually to be a
# Clock() so we can advance time ourselves, but ... luckily
# eventually() uses 0 as the timeout currently)
yield done
self.assertTrue(done.called)