mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-21 02:01:31 +00:00
Merge PR#269: add when-connected-enough hook
This makes it possible for automated-upload tools (like drop-upload and magic-folders) to be told when there are "enough" servers connected for uploads to be successful. This should help prevent the pathological case where the tools attempt to upload files immediately after node startup (or before the user turns on their wifi connection), and the node stores all the shares on itself. This new notification is single-shot and edge-triggered: when it fires, you know that, at some point in the past, the node *was* connected to at least $threshold servers. However you might have lost several connections since then. The user might turn off wifi after this fires, causing all connections to be dropped. In the long run, this API will change: clients will receive continuous notifications about servers coming and going, and tools like magic-folder should refrain from uploading during periods of insufficient connection. It might be as simple as checking the size of the connected server list when a periodic timer goes off, or something more responsive like an edge-triggered "upload as soon as you can" observer.
This commit is contained in:
commit
5d6fe5abdf
@ -375,6 +375,11 @@ class Client(node.Node, pollmixin.PollMixin):
|
||||
sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True, preferred_peers=preferred_peers)
|
||||
self.storage_broker = sb
|
||||
|
||||
connection_threshold = min(self.encoding_params["k"],
|
||||
self.encoding_params["happy"] + 1)
|
||||
helper = storage_client.ConnectedEnough(sb, connection_threshold)
|
||||
self.upload_ready_d = helper.when_connected_enough()
|
||||
|
||||
# load static server specifications from tahoe.cfg, if any.
|
||||
# Not quite ready yet.
|
||||
#if self.config.has_section("client-server-selection"):
|
||||
@ -528,6 +533,9 @@ class Client(node.Node, pollmixin.PollMixin):
|
||||
s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
|
||||
s.setServiceParent(self)
|
||||
s.startService()
|
||||
|
||||
# start processing the upload queue when we've connected to enough servers
|
||||
self.upload_ready_d.addCallback(s.upload_ready)
|
||||
except Exception, e:
|
||||
self.log("couldn't start drop-uploader: %r", args=(e,))
|
||||
|
||||
|
@ -35,6 +35,8 @@ class DropUploader(service.MultiService):
|
||||
self._convergence = client.convergence
|
||||
self._local_path = FilePath(local_dir)
|
||||
|
||||
self.is_upload_ready = False
|
||||
|
||||
if inotify is None:
|
||||
from twisted.internet import inotify
|
||||
self._inotify = inotify
|
||||
@ -68,6 +70,12 @@ class DropUploader(service.MultiService):
|
||||
self._stats_provider.count('drop_upload.dirs_monitored', 1)
|
||||
return d
|
||||
|
||||
def upload_ready(self):
|
||||
"""upload_ready is used to signal us to start
|
||||
processing the upload items...
|
||||
"""
|
||||
self.is_upload_ready = True
|
||||
|
||||
def _notify(self, opaque, path, events_mask):
|
||||
self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
|
||||
|
||||
|
@ -31,10 +31,12 @@ the foolscap-based server implemented in src/allmydata/storage/*.py .
|
||||
|
||||
import re, time
|
||||
from zope.interface import implements
|
||||
from twisted.internet import defer
|
||||
from foolscap.api import eventually
|
||||
from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer
|
||||
from allmydata.util import log, base32
|
||||
from allmydata.util.assertutil import precondition
|
||||
from allmydata.util.observer import OneShotObserverList, ObserverList
|
||||
from allmydata.util.rrefutil import add_version_to_remote_reference
|
||||
from allmydata.util.hashutil import sha1
|
||||
|
||||
@ -54,6 +56,42 @@ from allmydata.util.hashutil import sha1
|
||||
# look like?
|
||||
# don't pass signatures: only pass validated blessed-objects
|
||||
|
||||
|
||||
class ConnectedEnough(object):
|
||||
def __init__(self, storage_farm_broker, threshold):
|
||||
self._broker = storage_farm_broker
|
||||
|
||||
self._threshold = int(threshold)
|
||||
if self._threshold <= 0:
|
||||
raise ValueError("threshold must be positive")
|
||||
self._threshold_passed = False
|
||||
|
||||
self._observers = OneShotObserverList()
|
||||
self._broker.on_servers_changed(self._check_enough_connected)
|
||||
|
||||
def when_connected_enough(self):
|
||||
"""
|
||||
:returns: a Deferred that fires if/when our high water mark for
|
||||
number of connected servers becomes (or ever was) above
|
||||
"threshold".
|
||||
"""
|
||||
if self._threshold_passed:
|
||||
return defer.succeed(None)
|
||||
return self._observers.when_fired()
|
||||
|
||||
def _check_enough_connected(self):
|
||||
"""
|
||||
internal helper
|
||||
"""
|
||||
if self._threshold_passed:
|
||||
return
|
||||
num_servers = len(self._broker.get_connected_servers())
|
||||
if num_servers >= self._threshold:
|
||||
self._threshold_passed = True
|
||||
self._observers.fire(None)
|
||||
|
||||
|
||||
|
||||
class StorageFarmBroker:
|
||||
implements(IStorageBroker)
|
||||
"""I live on the client, and know about storage servers. For each server
|
||||
@ -73,6 +111,10 @@ class StorageFarmBroker:
|
||||
# them for it.
|
||||
self.servers = {}
|
||||
self.introducer_client = None
|
||||
self._server_listeners = ObserverList()
|
||||
|
||||
def on_servers_changed(self, callback):
|
||||
self._server_listeners.subscribe(callback)
|
||||
|
||||
# these two are used in unit tests
|
||||
def test_add_rref(self, serverid, rref, ann):
|
||||
@ -82,18 +124,24 @@ class StorageFarmBroker:
|
||||
self.servers[serverid] = s
|
||||
|
||||
def test_add_server(self, serverid, s):
|
||||
s.on_status_changed(lambda _: self._got_connection())
|
||||
self.servers[serverid] = s
|
||||
|
||||
def use_introducer(self, introducer_client):
|
||||
self.introducer_client = ic = introducer_client
|
||||
ic.subscribe_to("storage", self._got_announcement)
|
||||
|
||||
def _got_connection(self):
|
||||
# this is called by NativeStorageClient when it is connected
|
||||
self._server_listeners.notify()
|
||||
|
||||
def _got_announcement(self, key_s, ann):
|
||||
if key_s is not None:
|
||||
precondition(isinstance(key_s, str), key_s)
|
||||
precondition(key_s.startswith("v0-"), key_s)
|
||||
assert ann["service-name"] == "storage"
|
||||
s = NativeStorageServer(key_s, ann)
|
||||
s.on_status_changed(lambda _: self._got_connection())
|
||||
serverid = s.get_serverid()
|
||||
old = self.servers.get(serverid)
|
||||
if old:
|
||||
@ -102,7 +150,7 @@ class StorageFarmBroker:
|
||||
# replacement
|
||||
del self.servers[serverid]
|
||||
old.stop_connecting()
|
||||
# now we forget about them and start using the new one
|
||||
# now we forget about them and start using the new one
|
||||
self.servers[serverid] = s
|
||||
s.start_connecting(self.tub, self._trigger_connections)
|
||||
# the descriptor will manage their own Reconnector, and each time we
|
||||
@ -222,6 +270,14 @@ class NativeStorageServer:
|
||||
self._is_connected = False
|
||||
self._reconnector = None
|
||||
self._trigger_cb = None
|
||||
self._on_status_changed = ObserverList()
|
||||
|
||||
def on_status_changed(self, status_changed):
|
||||
"""
|
||||
:param status_changed: a callable taking a single arg (the
|
||||
NativeStorageServer) that is notified when we become connected
|
||||
"""
|
||||
return self._on_status_changed.subscribe(status_changed)
|
||||
|
||||
# Special methods used by copy.copy() and copy.deepcopy(). When those are
|
||||
# used in allmydata.immutable.filenode to copy CheckResults during
|
||||
@ -295,6 +351,7 @@ class NativeStorageServer:
|
||||
default = self.VERSION_DEFAULTS
|
||||
d = add_version_to_remote_reference(rref, default)
|
||||
d.addCallback(self._got_versioned_service, lp)
|
||||
d.addCallback(lambda ign: self._on_status_changed.notify(self))
|
||||
d.addErrback(log.err, format="storageclient._got_connection",
|
||||
name=self.get_name(), umid="Sdq3pg")
|
||||
|
||||
|
@ -1,6 +1,11 @@
|
||||
from mock import Mock
|
||||
from allmydata.util import base32
|
||||
|
||||
from twisted.trial import unittest
|
||||
from twisted.internet.defer import succeed, inlineCallbacks
|
||||
|
||||
from allmydata.storage_client import NativeStorageServer
|
||||
from allmydata.storage_client import StorageFarmBroker, ConnectedEnough
|
||||
|
||||
|
||||
class NativeStorageServerWithVersion(NativeStorageServer):
|
||||
@ -28,3 +33,53 @@ class TestNativeStorageServer(unittest.TestCase):
|
||||
})
|
||||
self.failUnlessEqual(nss.get_available_space(), 111)
|
||||
|
||||
|
||||
class TestStorageFarmBroker(unittest.TestCase):
|
||||
|
||||
@inlineCallbacks
|
||||
def test_threshold_reached(self):
|
||||
tub = Mock()
|
||||
introducer = Mock()
|
||||
broker = StorageFarmBroker(tub, True)
|
||||
done = ConnectedEnough(broker, 5).when_connected_enough()
|
||||
broker.use_introducer(introducer)
|
||||
# subscribes to "storage" to learn of new storage nodes
|
||||
subscribe = introducer.mock_calls[0]
|
||||
self.assertEqual(subscribe[0], 'subscribe_to')
|
||||
|
||||
data = {
|
||||
"service-name": "storage",
|
||||
"anonymous-storage-FURL": None,
|
||||
"permutation-seed-base32": "aaaaaaaaaaaaaaaaaaaaaaaa",
|
||||
}
|
||||
|
||||
def add_one_server(x):
|
||||
self.assertEqual(introducer.mock_calls[-1][1][0], 'storage')
|
||||
got_announce = introducer.mock_calls[-1][1][1]
|
||||
data["anonymous-storage-FURL"] = "pb://{}@nowhere/fake".format(base32.b2a(str(x)))
|
||||
got_announce('v0-1234-{}'.format(x), data)
|
||||
self.assertEqual(tub.mock_calls[-1][0], 'connectTo')
|
||||
got_connection = tub.mock_calls[-1][1][1]
|
||||
rref = Mock()
|
||||
rref.callRemote = Mock(return_value=succeed(1234))
|
||||
got_connection(rref)
|
||||
|
||||
# first 4 shouldn't trigger connected_threashold
|
||||
for x in range(4):
|
||||
add_one_server(x)
|
||||
self.assertFalse(done.called)
|
||||
|
||||
# ...but the 5th *should* trigger the threshold
|
||||
add_one_server(42)
|
||||
|
||||
# so: the OneShotObserverList only notifies via
|
||||
# foolscap.eventually() -- which forces the Deferred call
|
||||
# through the reactor -- so it's no longer synchronous,
|
||||
# meaning that we have to do "real reactor stuff" for the
|
||||
# Deferred from when_connected_enough() to actually fire. (or
|
||||
# @patch() out the reactor in foolscap.eventually to be a
|
||||
# Clock() so we can advance time ourselves, but ... luckily
|
||||
# eventually() uses 0 as the timeout currently)
|
||||
|
||||
yield done
|
||||
self.assertTrue(done.called)
|
||||
|
@ -184,6 +184,8 @@ class FakeDisplayableServer(StubServer):
|
||||
self.last_loss_time = last_loss_time
|
||||
self.last_rx_time = last_rx_time
|
||||
self.last_connect_time = last_connect_time
|
||||
def on_status_changed(self, cb):
|
||||
cb(self)
|
||||
def is_connected(self):
|
||||
return self.connected
|
||||
def get_permutation_seed(self):
|
||||
@ -234,6 +236,8 @@ class FakeStorageServer(service.MultiService):
|
||||
self.lease_checker = FakeLeaseChecker()
|
||||
def get_stats(self):
|
||||
return {"storage_server.accepting_immutable_shares": False}
|
||||
def on_status_changed(self, cb):
|
||||
cb(self)
|
||||
|
||||
class FakeClient(Client):
|
||||
def __init__(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user