WIP Storage broker client creates one tub per server

i was unable to get all the unit tests working;
this is my work in progress.
This commit is contained in:
David Stainton
2016-05-02 15:23:07 +00:00
parent 22c1031d19
commit 6061b6fc3c
9 changed files with 32 additions and 14 deletions

View File

@ -359,8 +359,10 @@ class Client(node.Node, pollmixin.PollMixin):
# (and everybody else who wants to use storage servers) # (and everybody else who wants to use storage servers)
ps = self.get_config("client", "peers.preferred", "").split(",") ps = self.get_config("client", "peers.preferred", "").split(",")
preferred_peers = tuple([p.strip() for p in ps if p != ""]) preferred_peers = tuple([p.strip() for p in ps if p != ""])
sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True, preferred_peers=preferred_peers) sb = storage_client.StorageFarmBroker(permute_peers=True, preferred_peers=preferred_peers)
self.storage_broker = sb self.storage_broker = sb
sb.setServiceParent(self)
connection_threshold = min(self.encoding_params["k"], connection_threshold = min(self.encoding_params["k"],
self.encoding_params["happy"] + 1) self.encoding_params["happy"] + 1)

View File

@ -32,7 +32,9 @@ the foolscap-based server implemented in src/allmydata/storage/*.py .
import re, time import re, time
from zope.interface import implements from zope.interface import implements
from twisted.internet import defer from twisted.internet import defer
from foolscap.api import eventually from twisted.application import service
from foolscap.api import Tub, eventually
from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer
from allmydata.util import log, base32 from allmydata.util import log, base32
from allmydata.util.assertutil import precondition from allmydata.util.assertutil import precondition
@ -92,7 +94,7 @@ class ConnectedEnough(object):
class StorageFarmBroker: class StorageFarmBroker(service.MultiService):
implements(IStorageBroker) implements(IStorageBroker)
"""I live on the client, and know about storage servers. For each server """I live on the client, and know about storage servers. For each server
that is participating in a grid, I either maintain a connection to it or that is participating in a grid, I either maintain a connection to it or
@ -100,11 +102,13 @@ class StorageFarmBroker:
I'm also responsible for subscribing to the IntroducerClient to find out I'm also responsible for subscribing to the IntroducerClient to find out
about new servers as they are announced by the Introducer. about new servers as they are announced by the Introducer.
""" """
def __init__(self, tub, permute_peers, preferred_peers=()): def __init__(self, permute_peers, preferred_peers=()):
self.tub = tub service.MultiService.__init__(self)
assert permute_peers # False not implemented yet assert permute_peers # False not implemented yet
self.permute_peers = permute_peers self.permute_peers = permute_peers
self.preferred_peers = preferred_peers self.preferred_peers = preferred_peers
self.tubs = {} # self.tubs maps serverid -> Tub
# self.servers maps serverid -> IServer, and keeps track of all the # self.servers maps serverid -> IServer, and keeps track of all the
# storage servers that we've heard about. Each descriptor manages its # storage servers that we've heard about. Each descriptor manages its
# own Reconnector, and will give us a RemoteReference when we ask # own Reconnector, and will give us a RemoteReference when we ask
@ -113,6 +117,9 @@ class StorageFarmBroker:
self.introducer_client = None self.introducer_client = None
self._server_listeners = ObserverList() self._server_listeners = ObserverList()
def startService(self):
service.MultiService.startService(self)
def on_servers_changed(self, callback): def on_servers_changed(self, callback):
self._server_listeners.subscribe(callback) self._server_listeners.subscribe(callback)
@ -131,6 +138,14 @@ class StorageFarmBroker:
self.introducer_client = ic = introducer_client self.introducer_client = ic = introducer_client
ic.subscribe_to("storage", self._got_announcement) ic.subscribe_to("storage", self._got_announcement)
def _ensure_tub_created(self, serverid):
if serverid in self.tubs:
return
self.tubs[serverid] = Tub()
# XXX set options?
self.tubs[serverid].setServiceParent(self)
def _got_connection(self): def _got_connection(self):
# this is called by NativeStorageClient when it is connected # this is called by NativeStorageClient when it is connected
self._server_listeners.notify() self._server_listeners.notify()
@ -152,7 +167,8 @@ class StorageFarmBroker:
old.stop_connecting() old.stop_connecting()
# now we forget about them and start using the new one # now we forget about them and start using the new one
self.servers[serverid] = s self.servers[serverid] = s
s.start_connecting(self.tub, self._trigger_connections) self._ensure_tub_created(serverid)
s.start_connecting(self.tubs[serverid], self._trigger_connections)
# the descriptor will manage their own Reconnector, and each time we # the descriptor will manage their own Reconnector, and each time we
# need servers, we'll ask them if they're connected or not. # need servers, we'll ask them if they're connected or not.

View File

@ -22,7 +22,7 @@ class FakeClient:
class WebResultsRendering(unittest.TestCase, WebRenderingMixin): class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
def create_fake_client(self): def create_fake_client(self):
sb = StorageFarmBroker(None, True) sb = StorageFarmBroker(True)
# s.get_name() (the "short description") will be "v0-00000000". # s.get_name() (the "short description") will be "v0-00000000".
# s.get_longname() will include the -long suffix. # s.get_longname() will include the -long suffix.
# s.get_peerid() (i.e. tubid) will be "aaa.." or "777.." or "ceir.." # s.get_peerid() (i.e. tubid) will be "aaa.." or "777.." or "ceir.."

View File

@ -236,7 +236,7 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
return [ s.get_longname() for s in sb.get_servers_for_psi(key) ] return [ s.get_longname() for s in sb.get_servers_for_psi(key) ]
def test_permute(self): def test_permute(self):
sb = StorageFarmBroker(None, True) sb = StorageFarmBroker(True)
for k in ["%d" % i for i in range(5)]: for k in ["%d" % i for i in range(5)]:
ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake", ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
"permutation-seed-base32": base32.b2a(k) } "permutation-seed-base32": base32.b2a(k) }
@ -248,7 +248,7 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
self.failUnlessReallyEqual(self._permute(sb, "one"), []) self.failUnlessReallyEqual(self._permute(sb, "one"), [])
def test_permute_with_preferred(self): def test_permute_with_preferred(self):
sb = StorageFarmBroker(None, True, ['1','4']) sb = StorageFarmBroker(True, ['1','4'])
for k in ["%d" % i for i in range(5)]: for k in ["%d" % i for i in range(5)]:
ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake", ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
"permutation-seed-base32": base32.b2a(k) } "permutation-seed-base32": base32.b2a(k) }

View File

@ -116,7 +116,7 @@ class AssistedUpload(unittest.TestCase):
timeout = 240 # It takes longer than 120 seconds on Francois's arm box. timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
def setUp(self): def setUp(self):
self.s = FakeClient() self.s = FakeClient()
self.s.storage_broker = StorageFarmBroker(None, True) self.s.storage_broker = StorageFarmBroker(True)
self.s.secret_holder = client.SecretHolder("lease secret", "converge") self.s.secret_holder = client.SecretHolder("lease secret", "converge")
self.s.startService() self.s.startService()

View File

@ -234,7 +234,7 @@ def make_storagebroker(s=None, num_peers=10):
s = FakeStorage() s = FakeStorage()
peerids = [tagged_hash("peerid", "%d" % i)[:20] peerids = [tagged_hash("peerid", "%d" % i)[:20]
for i in range(num_peers)] for i in range(num_peers)]
storage_broker = StorageFarmBroker(None, True) storage_broker = StorageFarmBroker(True)
for peerid in peerids: for peerid in peerids:
fss = FakeStorageServer(peerid, s) fss = FakeStorageServer(peerid, s)
ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid), ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid),

View File

@ -40,7 +40,7 @@ class TestStorageFarmBroker(unittest.TestCase):
def test_threshold_reached(self): def test_threshold_reached(self):
tub = Mock() tub = Mock()
introducer = Mock() introducer = Mock()
broker = StorageFarmBroker(tub, True) broker = StorageFarmBroker(True)
done = ConnectedEnough(broker, 5).when_connected_enough() done = ConnectedEnough(broker, 5).when_connected_enough()
broker.use_introducer(introducer) broker.use_introducer(introducer)
# subscribes to "storage" to learn of new storage nodes # subscribes to "storage" to learn of new storage nodes

View File

@ -198,7 +198,7 @@ class FakeClient:
mode = dict([i,mode] for i in range(num_servers)) mode = dict([i,mode] for i in range(num_servers))
servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid])) servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
for fakeid in range(self.num_servers) ] for fakeid in range(self.num_servers) ]
self.storage_broker = StorageFarmBroker(None, permute_peers=True) self.storage_broker = StorageFarmBroker(permute_peers=True)
for (serverid, rref) in servers: for (serverid, rref) in servers:
ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid), ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid),
"permutation-seed-base32": base32.b2a(serverid) } "permutation-seed-base32": base32.b2a(serverid) }

View File

@ -252,7 +252,7 @@ class FakeClient(Client):
self._secret_holder = SecretHolder("lease secret", "convergence secret") self._secret_holder = SecretHolder("lease secret", "convergence secret")
self.helper = None self.helper = None
self.convergence = "some random string" self.convergence = "some random string"
self.storage_broker = StorageFarmBroker(None, permute_peers=True) self.storage_broker = StorageFarmBroker(permute_peers=True)
# fake knowledge of another server # fake knowledge of another server
self.storage_broker.test_add_server("other_nodeid", self.storage_broker.test_add_server("other_nodeid",
FakeDisplayableServer( FakeDisplayableServer(