Teach StorageFarmBroker to fire a deferred when a connection threshold is reached. refs #1449

Signed-off-by: Daira Hopwood <daira@jacaranda.org>
This commit is contained in:
Daira Hopwood
2015-04-28 21:03:45 +01:00
committed by meejah
parent 84a1064b87
commit a56a3adaae
9 changed files with 41 additions and 12 deletions

View File

@ -131,6 +131,7 @@ class Client(node.Node, pollmixin.PollMixin):
def __init__(self, basedir="."): def __init__(self, basedir="."):
node.Node.__init__(self, basedir) node.Node.__init__(self, basedir)
self.upload_ready_d = defer.Deferred()
self.started_timestamp = time.time() self.started_timestamp = time.time()
self.logSource="Client" self.logSource="Client"
self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy() self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
@ -372,7 +373,12 @@ class Client(node.Node, pollmixin.PollMixin):
# (and everybody else who wants to use storage servers) # (and everybody else who wants to use storage servers)
ps = self.get_config("client", "peers.preferred", "").split(",") ps = self.get_config("client", "peers.preferred", "").split(",")
preferred_peers = tuple([p.strip() for p in ps if p != ""]) preferred_peers = tuple([p.strip() for p in ps if p != ""])
sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True, preferred_peers=preferred_peers)
connection_threshold = min(self.encoding_params["k"],
self.encoding_params["happy"] + 1)
sb = storage_client.StorageFarmBroker(self.tub, True, connection_threshold,
self.upload_ready_d, preferred_peers=preferred_peers)
self.storage_broker = sb self.storage_broker = sb
# load static server specifications from tahoe.cfg, if any. # load static server specifications from tahoe.cfg, if any.
@ -528,6 +534,9 @@ class Client(node.Node, pollmixin.PollMixin):
s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8) s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
s.setServiceParent(self) s.setServiceParent(self)
s.startService() s.startService()
# start processing the upload queue when we've connected to enough servers
self.upload_ready_d.addCallback(s.upload_ready)
except Exception, e: except Exception, e:
self.log("couldn't start drop-uploader: %r", args=(e,)) self.log("couldn't start drop-uploader: %r", args=(e,))

View File

@ -35,6 +35,8 @@ class DropUploader(service.MultiService):
self._convergence = client.convergence self._convergence = client.convergence
self._local_path = FilePath(local_dir) self._local_path = FilePath(local_dir)
self.is_upload_ready = False
if inotify is None: if inotify is None:
from twisted.internet import inotify from twisted.internet import inotify
self._inotify = inotify self._inotify = inotify
@ -68,6 +70,12 @@ class DropUploader(service.MultiService):
self._stats_provider.count('drop_upload.dirs_monitored', 1) self._stats_provider.count('drop_upload.dirs_monitored', 1)
return d return d
def upload_ready(self):
"""upload_ready is used to signal us to start
processing the upload items...
"""
self.is_upload_ready = True
def _notify(self, opaque, path, events_mask): def _notify(self, opaque, path, events_mask):
self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask)))) self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))

View File

@ -62,10 +62,13 @@ class StorageFarmBroker:
I'm also responsible for subscribing to the IntroducerClient to find out I'm also responsible for subscribing to the IntroducerClient to find out
about new servers as they are announced by the Introducer. about new servers as they are announced by the Introducer.
""" """
def __init__(self, tub, permute_peers, preferred_peers=()): def __init__(self, tub, permute_peers, connected_threshold, connected_d,
preferred_peers=()):
self.tub = tub self.tub = tub
assert permute_peers # False not implemented yet assert permute_peers # False not implemented yet
self.permute_peers = permute_peers self.permute_peers = permute_peers
self.connected_threshold = connected_threshold
self.connected_d = connected_d
self.preferred_peers = preferred_peers self.preferred_peers = preferred_peers
# self.servers maps serverid -> IServer, and keeps track of all the # self.servers maps serverid -> IServer, and keeps track of all the
# storage servers that we've heard about. Each descriptor manages its # storage servers that we've heard about. Each descriptor manages its
@ -76,7 +79,7 @@ class StorageFarmBroker:
# these two are used in unit tests # these two are used in unit tests
def test_add_rref(self, serverid, rref, ann): def test_add_rref(self, serverid, rref, ann):
s = NativeStorageServer(serverid, ann.copy()) s = NativeStorageServer(serverid, ann.copy(), self)
s.rref = rref s.rref = rref
s._is_connected = True s._is_connected = True
self.servers[serverid] = s self.servers[serverid] = s
@ -93,7 +96,7 @@ class StorageFarmBroker:
precondition(isinstance(key_s, str), key_s) precondition(isinstance(key_s, str), key_s)
precondition(key_s.startswith("v0-"), key_s) precondition(key_s.startswith("v0-"), key_s)
assert ann["service-name"] == "storage" assert ann["service-name"] == "storage"
s = NativeStorageServer(key_s, ann) s = NativeStorageServer(key_s, ann, self)
serverid = s.get_serverid() serverid = s.get_serverid()
old = self.servers.get(serverid) old = self.servers.get(serverid)
if old: if old:
@ -119,6 +122,13 @@ class StorageFarmBroker:
for dsc in self.servers.values(): for dsc in self.servers.values():
dsc.try_to_connect() dsc.try_to_connect()
def check_enough_connected(self):
if (self.connected_d is not None and
len(self.get_connected_servers()) >= self.connected_threshold):
d = self.connected_d
self.connected_d = None
d.callback(None)
def get_servers_for_psi(self, peer_selection_index): def get_servers_for_psi(self, peer_selection_index):
# return a list of server objects (IServers) # return a list of server objects (IServers)
assert self.permute_peers == True assert self.permute_peers == True
@ -190,9 +200,10 @@ class NativeStorageServer:
"application-version": "unknown: no get_version()", "application-version": "unknown: no get_version()",
} }
def __init__(self, key_s, ann): def __init__(self, key_s, ann, broker):
self.key_s = key_s self.key_s = key_s
self.announcement = ann self.announcement = ann
self.broker = broker
assert "anonymous-storage-FURL" in ann, ann assert "anonymous-storage-FURL" in ann, ann
furl = str(ann["anonymous-storage-FURL"]) furl = str(ann["anonymous-storage-FURL"])
@ -295,6 +306,7 @@ class NativeStorageServer:
default = self.VERSION_DEFAULTS default = self.VERSION_DEFAULTS
d = add_version_to_remote_reference(rref, default) d = add_version_to_remote_reference(rref, default)
d.addCallback(self._got_versioned_service, lp) d.addCallback(self._got_versioned_service, lp)
d.addCallback(lambda ign: self.broker.check_enough_connected())
d.addErrback(log.err, format="storageclient._got_connection", d.addErrback(log.err, format="storageclient._got_connection",
name=self.get_name(), umid="Sdq3pg") name=self.get_name(), umid="Sdq3pg")

View File

@ -22,7 +22,7 @@ class FakeClient:
class WebResultsRendering(unittest.TestCase, WebRenderingMixin): class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
def create_fake_client(self): def create_fake_client(self):
sb = StorageFarmBroker(None, True) sb = StorageFarmBroker(None, True, 0, None)
# s.get_name() (the "short description") will be "v0-00000000". # s.get_name() (the "short description") will be "v0-00000000".
# s.get_longname() will include the -long suffix. # s.get_longname() will include the -long suffix.
# s.get_peerid() (i.e. tubid) will be "aaa.." or "777.." or "ceir.." # s.get_peerid() (i.e. tubid) will be "aaa.." or "777.." or "ceir.."
@ -41,7 +41,7 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
"my-version": "ver", "my-version": "ver",
"oldest-supported": "oldest", "oldest-supported": "oldest",
} }
s = NativeStorageServer(key_s, ann) s = NativeStorageServer(key_s, ann, sb)
sb.test_add_server(peerid, s) # XXX: maybe use key_s? sb.test_add_server(peerid, s) # XXX: maybe use key_s?
c = FakeClient() c = FakeClient()
c.storage_broker = sb c.storage_broker = sb

View File

@ -236,7 +236,7 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
return [ s.get_longname() for s in sb.get_servers_for_psi(key) ] return [ s.get_longname() for s in sb.get_servers_for_psi(key) ]
def test_permute(self): def test_permute(self):
sb = StorageFarmBroker(None, True) sb = StorageFarmBroker(None, True, 0, None)
for k in ["%d" % i for i in range(5)]: for k in ["%d" % i for i in range(5)]:
ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake", ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
"permutation-seed-base32": base32.b2a(k) } "permutation-seed-base32": base32.b2a(k) }

View File

@ -116,7 +116,7 @@ class AssistedUpload(unittest.TestCase):
timeout = 240 # It takes longer than 120 seconds on Francois's arm box. timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
def setUp(self): def setUp(self):
self.s = FakeClient() self.s = FakeClient()
self.s.storage_broker = StorageFarmBroker(None, True) self.s.storage_broker = StorageFarmBroker(None, True, 0, None)
self.s.secret_holder = client.SecretHolder("lease secret", "converge") self.s.secret_holder = client.SecretHolder("lease secret", "converge")
self.s.startService() self.s.startService()

View File

@ -234,7 +234,7 @@ def make_storagebroker(s=None, num_peers=10):
s = FakeStorage() s = FakeStorage()
peerids = [tagged_hash("peerid", "%d" % i)[:20] peerids = [tagged_hash("peerid", "%d" % i)[:20]
for i in range(num_peers)] for i in range(num_peers)]
storage_broker = StorageFarmBroker(None, True) storage_broker = StorageFarmBroker(None, True, 0, None)
for peerid in peerids: for peerid in peerids:
fss = FakeStorageServer(peerid, s) fss = FakeStorageServer(peerid, s)
ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid), ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid),

View File

@ -198,7 +198,7 @@ class FakeClient:
mode = dict([i,mode] for i in range(num_servers)) mode = dict([i,mode] for i in range(num_servers))
servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid])) servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
for fakeid in range(self.num_servers) ] for fakeid in range(self.num_servers) ]
self.storage_broker = StorageFarmBroker(None, permute_peers=True) self.storage_broker = StorageFarmBroker(None, True, 0, None)
for (serverid, rref) in servers: for (serverid, rref) in servers:
ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid), ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid),
"permutation-seed-base32": base32.b2a(serverid) } "permutation-seed-base32": base32.b2a(serverid) }

View File

@ -248,7 +248,7 @@ class FakeClient(Client):
self._secret_holder = SecretHolder("lease secret", "convergence secret") self._secret_holder = SecretHolder("lease secret", "convergence secret")
self.helper = None self.helper = None
self.convergence = "some random string" self.convergence = "some random string"
self.storage_broker = StorageFarmBroker(None, permute_peers=True) self.storage_broker = StorageFarmBroker(None, True, 0, None)
# fake knowledge of another server # fake knowledge of another server
self.storage_broker.test_add_server("other_nodeid", self.storage_broker.test_add_server("other_nodeid",
FakeDisplayableServer( FakeDisplayableServer(