clean up storage_broker interface: should fix #732

This commit is contained in:
Brian Warner
2009-06-21 16:51:19 -07:00
parent f14004eeb3
commit 711c09bc5d
16 changed files with 65 additions and 37 deletions

View File

@ -326,11 +326,6 @@ class Client(node.Node, pollmixin.PollMixin):
def _lost_key_generator(self): def _lost_key_generator(self):
self._key_generator = None self._key_generator = None
def get_servers(self, service_name):
""" Return frozenset of (peerid, versioned-rref) """
assert isinstance(service_name, str)
return self.introducer_client.get_peers(service_name)
def init_web(self, webport): def init_web(self, webport):
self.log("init_web(webport=%s)", args=(webport,)) self.log("init_web(webport=%s)", args=(webport,))

View File

@ -9,7 +9,8 @@ from allmydata.util import base32, deferredutil, hashutil, log, mathutil, idlib
from allmydata.util.assertutil import _assert, precondition from allmydata.util.assertutil import _assert, precondition
from allmydata import codec, hashtree, uri from allmydata import codec, hashtree, uri
from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, IVerifierURI, \ from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, IVerifierURI, \
IDownloadStatus, IDownloadResults, IValidatedThingProxy, NotEnoughSharesError, \ IDownloadStatus, IDownloadResults, IValidatedThingProxy, \
IStorageBroker, NotEnoughSharesError, \
UnableToFetchCriticalDownloadDataError UnableToFetchCriticalDownloadDataError
from allmydata.immutable import layout from allmydata.immutable import layout
from allmydata.monitor import Monitor from allmydata.monitor import Monitor
@ -626,6 +627,7 @@ class CiphertextDownloader(log.PrefixingLogMixin):
def __init__(self, storage_broker, v, target, monitor): def __init__(self, storage_broker, v, target, monitor):
precondition(IStorageBroker.providedBy(storage_broker), storage_broker)
precondition(IVerifierURI.providedBy(v), v) precondition(IVerifierURI.providedBy(v), v)
precondition(IDownloadTarget.providedBy(target), target) precondition(IDownloadTarget.providedBy(target), target)
@ -745,7 +747,7 @@ class CiphertextDownloader(log.PrefixingLogMixin):
def _get_all_shareholders(self): def _get_all_shareholders(self):
dl = [] dl = []
sb = self._storage_broker sb = self._storage_broker
for (peerid,ss) in sb.get_servers(self._storage_index): for (peerid,ss) in sb.get_servers_for_index(self._storage_index):
self.log(format="sending DYHB to [%(peerid)s]", self.log(format="sending DYHB to [%(peerid)s]",
peerid=idlib.shortnodeid_b2a(peerid), peerid=idlib.shortnodeid_b2a(peerid),
level=log.NOISY, umid="rT03hg") level=log.NOISY, umid="rT03hg")

View File

@ -201,7 +201,8 @@ class FileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
def check_and_repair(self, monitor, verify=False, add_lease=False): def check_and_repair(self, monitor, verify=False, add_lease=False):
verifycap = self.get_verify_cap() verifycap = self.get_verify_cap()
servers = self._client.get_servers("storage") sb = self._client.get_storage_broker()
servers = sb.get_all_servers()
c = Checker(client=self._client, verifycap=verifycap, servers=servers, c = Checker(client=self._client, verifycap=verifycap, servers=servers,
verify=verify, add_lease=add_lease, monitor=monitor) verify=verify, add_lease=add_lease, monitor=monitor)
@ -253,8 +254,11 @@ class FileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
return d return d
def check(self, monitor, verify=False, add_lease=False): def check(self, monitor, verify=False, add_lease=False):
v = Checker(client=self._client, verifycap=self.get_verify_cap(), verifycap = self.get_verify_cap()
servers=self._client.get_servers("storage"), sb = self._client.get_storage_broker()
servers = sb.get_all_servers()
v = Checker(client=self._client, verifycap=verifycap, servers=servers,
verify=verify, add_lease=add_lease, monitor=monitor) verify=verify, add_lease=add_lease, monitor=monitor)
return v.start() return v.start()

View File

@ -619,7 +619,7 @@ class Helper(Referenceable, service.MultiService):
lp2 = self.log("doing a quick check+UEBfetch", lp2 = self.log("doing a quick check+UEBfetch",
parent=lp, level=log.NOISY) parent=lp, level=log.NOISY)
sb = self.parent.get_storage_broker() sb = self.parent.get_storage_broker()
c = CHKCheckerAndUEBFetcher(sb.get_servers, storage_index, lp2) c = CHKCheckerAndUEBFetcher(sb.get_servers_for_index, storage_index, lp2)
d = c.check() d = c.check()
def _checked(res): def _checked(res):
if res: if res:

View File

@ -50,7 +50,8 @@ class Repairer(log.PrefixingLogMixin):
def start(self): def start(self):
self.log("starting repair") self.log("starting repair")
duc = DownUpConnector() duc = DownUpConnector()
dl = download.CiphertextDownloader(self._client, self._verifycap, target=duc, monitor=self._monitor) sb = self._client.get_storage_broker()
dl = download.CiphertextDownloader(sb, self._verifycap, target=duc, monitor=self._monitor)
ul = upload.CHKUploader(self._client) ul = upload.CHKUploader(self._client)
d = defer.Deferred() d = defer.Deferred()

View File

@ -167,7 +167,7 @@ class Tahoe2PeerSelector:
self.preexisting_shares = {} # sharenum -> peerid holding the share self.preexisting_shares = {} # sharenum -> peerid holding the share
sb = client.get_storage_broker() sb = client.get_storage_broker()
peers = list(sb.get_servers(storage_index)) peers = sb.get_servers_for_index(storage_index)
if not peers: if not peers:
raise NoServersError("client gave us zero peers") raise NoServersError("client gave us zero peers")

View File

@ -349,6 +349,23 @@ class IStorageBucketReader(Interface):
@return: URIExtensionData @return: URIExtensionData
""" """
class IStorageBroker(Interface):
def get_servers_for_index(peer_selection_index):
"""
@return: list of (peerid, versioned-rref) tuples
"""
def get_all_servers():
"""
@return: frozenset of (peerid, versioned-rref) tuples
"""
def get_all_serverids():
"""
@return: iterator of serverid strings
"""
def get_nickname_for_serverid(serverid):
"""
@return: unicode nickname, or None
"""
# hm, we need a solution for forward references in schemas # hm, we need a solution for forward references in schemas

View File

@ -177,7 +177,7 @@ class Publish:
self._encprivkey = self._node.get_encprivkey() self._encprivkey = self._node.get_encprivkey()
sb = self._node._client.get_storage_broker() sb = self._node._client.get_storage_broker()
full_peerlist = sb.get_servers(self._storage_index) full_peerlist = sb.get_servers_for_index(self._storage_index)
self.full_peerlist = full_peerlist # for use later, immutable self.full_peerlist = full_peerlist # for use later, immutable
self.bad_peers = set() # peerids who have errbacked/refused requests self.bad_peers = set() # peerids who have errbacked/refused requests

View File

@ -422,7 +422,7 @@ class ServermapUpdater:
self._queries_completed = 0 self._queries_completed = 0
sb = self._node._client.get_storage_broker() sb = self._node._client.get_storage_broker()
full_peerlist = list(sb.get_servers(self._node._storage_index)) full_peerlist = sb.get_servers_for_index(self._node._storage_index)
self.full_peerlist = full_peerlist # for use later, immutable self.full_peerlist = full_peerlist # for use later, immutable
self.extra_peers = full_peerlist[:] # peers are removed as we use them self.extra_peers = full_peerlist[:] # peers are removed as we use them
self._good_peers = set() # peers who had some shares self._good_peers = set() # peers who had some shares

View File

@ -19,8 +19,11 @@ the foolscap-based server implemented in src/allmydata/storage/*.py .
# implement tahoe.cfg scanner, create static NativeStorageClients # implement tahoe.cfg scanner, create static NativeStorageClients
import sha import sha
from zope.interface import implements
from allmydata.interfaces import IStorageBroker
class StorageFarmBroker: class StorageFarmBroker:
implements(IStorageBroker)
"""I live on the client, and know about storage servers. For each server """I live on the client, and know about storage servers. For each server
that is participating in a grid, I either maintain a connection to it or that is participating in a grid, I either maintain a connection to it or
remember enough information to establish a connection to it on demand. remember enough information to establish a connection to it on demand.
@ -38,20 +41,23 @@ class StorageFarmBroker:
self.introducer_client = ic = introducer_client self.introducer_client = ic = introducer_client
ic.subscribe_to("storage") ic.subscribe_to("storage")
def get_servers(self, peer_selection_index): def get_servers_for_index(self, peer_selection_index):
# first cut: return an iterator of (peerid, versioned-rref) tuples # first cut: return a list of (peerid, versioned-rref) tuples
assert self.permute_peers == True assert self.permute_peers == True
servers = self.get_all_servers()
key = peer_selection_index
return sorted(servers, key=lambda x: sha.new(key+x[0]).digest())
def get_all_servers(self):
# return a frozenset of (peerid, versioned-rref) tuples
servers = {} servers = {}
for serverid,server in self.servers.items(): for serverid,server in self.servers.items():
servers[serverid] = server servers[serverid] = server
if self.introducer_client: if self.introducer_client:
ic = self.introducer_client ic = self.introducer_client
for serverid,server in ic.get_permuted_peers("storage", for serverid,server in ic.get_peers("storage"):
peer_selection_index):
servers[serverid] = server servers[serverid] = server
servers = servers.items() return frozenset(servers.items())
key = peer_selection_index
return sorted(servers, key=lambda x: sha.new(key+x[0]).digest())
def get_all_serverids(self): def get_all_serverids(self):
for serverid in self.servers: for serverid in self.servers:

View File

@ -15,6 +15,7 @@
import os.path import os.path
import sha import sha
from zope.interface import implements
from twisted.application import service from twisted.application import service
from twisted.internet import reactor from twisted.internet import reactor
from twisted.python.failure import Failure from twisted.python.failure import Failure
@ -26,6 +27,7 @@ from allmydata.storage.server import StorageServer, storage_index_to_dir
from allmydata.util import fileutil, idlib, hashutil from allmydata.util import fileutil, idlib, hashutil
from allmydata.introducer.client import RemoteServiceConnector from allmydata.introducer.client import RemoteServiceConnector
from allmydata.test.common_web import HTTPClientGETFactory from allmydata.test.common_web import HTTPClientGETFactory
from allmydata.interfaces import IStorageBroker
class IntentionalError(Exception): class IntentionalError(Exception):
pass pass
@ -105,9 +107,12 @@ def wrap(original, service_name):
return wrapper return wrapper
class NoNetworkStorageBroker: class NoNetworkStorageBroker:
def get_servers(self, key): implements(IStorageBroker)
def get_servers_for_index(self, key):
return sorted(self.client._servers, return sorted(self.client._servers,
key=lambda x: sha.new(key+x[0]).digest()) key=lambda x: sha.new(key+x[0]).digest())
def get_all_servers(self):
return frozenset(self.client._servers)
def get_nickname_for_serverid(self, serverid): def get_nickname_for_serverid(self, serverid):
return None return None
@ -138,9 +143,7 @@ class NoNetworkClient(Client):
self.storage_broker.client = self self.storage_broker.client = self
def init_stub_client(self): def init_stub_client(self):
pass pass
#._servers will be set by the NoNetworkGrid which creates us
def get_servers(self, service_name):
return self._servers
class SimpleStats: class SimpleStats:
def __init__(self): def __init__(self):

View File

@ -143,7 +143,7 @@ class Basic(unittest.TestCase):
def _permute(self, sb, key): def _permute(self, sb, key):
return [ peerid return [ peerid
for (peerid,rref) in sb.get_servers(key) ] for (peerid,rref) in sb.get_servers_for_index(key) ]
def test_permute(self): def test_permute(self):
sb = StorageFarmBroker() sb = StorageFarmBroker()

View File

@ -8,7 +8,8 @@ from allmydata import hashtree, uri
from allmydata.immutable import encode, upload, download from allmydata.immutable import encode, upload, download
from allmydata.util import hashutil from allmydata.util import hashutil
from allmydata.util.assertutil import _assert from allmydata.util.assertutil import _assert
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, NotEnoughSharesError from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
NotEnoughSharesError, IStorageBroker
from allmydata.monitor import Monitor from allmydata.monitor import Monitor
import common_util as testutil import common_util as testutil
@ -18,9 +19,8 @@ class LostPeerError(Exception):
def flip_bit(good): # flips the last bit def flip_bit(good): # flips the last bit
return good[:-1] + chr(ord(good[-1]) ^ 0x01) return good[:-1] + chr(ord(good[-1]) ^ 0x01)
class FakeClient: class FakeStorageBroker:
def log(self, *args, **kwargs): implements(IStorageBroker)
pass
class FakeBucketReaderWriterProxy: class FakeBucketReaderWriterProxy:
implements(IStorageBucketWriter, IStorageBucketReader) implements(IStorageBucketWriter, IStorageBucketReader)
@ -494,11 +494,11 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
total_shares=verifycap.total_shares, total_shares=verifycap.total_shares,
size=verifycap.size) size=verifycap.size)
client = FakeClient() sb = FakeStorageBroker()
if not target: if not target:
target = download.Data() target = download.Data()
target = download.DecryptingTarget(target, u.key) target = download.DecryptingTarget(target, u.key)
fd = download.CiphertextDownloader(client, u.get_verify_cap(), target, monitor=Monitor()) fd = download.CiphertextDownloader(sb, u.get_verify_cap(), target, monitor=Monitor())
# we manually cycle the CiphertextDownloader through a number of steps that # we manually cycle the CiphertextDownloader through a number of steps that
# would normally be sequenced by a Deferred chain in # would normally be sequenced by a Deferred chain in

View File

@ -1914,7 +1914,7 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
d.addCallback(n._generated) d.addCallback(n._generated)
def _break_peer0(res): def _break_peer0(res):
si = n.get_storage_index() si = n.get_storage_index()
peerlist = list(self.client.storage_broker.get_servers(si)) peerlist = self.client.storage_broker.get_servers_for_index(si)
peerid0, connection0 = peerlist[0] peerid0, connection0 = peerlist[0]
peerid1, connection1 = peerlist[1] peerid1, connection1 = peerlist[1]
connection0.broken = True connection0.broken = True

View File

@ -76,7 +76,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
all_peerids = list(c.get_storage_broker().get_all_serverids()) all_peerids = list(c.get_storage_broker().get_all_serverids())
self.failUnlessEqual(len(all_peerids), self.numclients+1) self.failUnlessEqual(len(all_peerids), self.numclients+1)
sb = c.storage_broker sb = c.storage_broker
permuted_peers = list(sb.get_servers("a")) permuted_peers = list(sb.get_servers_for_index("a"))
self.failUnlessEqual(len(permuted_peers), self.numclients+1) self.failUnlessEqual(len(permuted_peers), self.numclients+1)
d.addCallback(_check) d.addCallback(_check)
@ -111,7 +111,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
all_peerids = list(c.get_storage_broker().get_all_serverids()) all_peerids = list(c.get_storage_broker().get_all_serverids())
self.failUnlessEqual(len(all_peerids), self.numclients) self.failUnlessEqual(len(all_peerids), self.numclients)
sb = c.storage_broker sb = c.storage_broker
permuted_peers = list(sb.get_servers("a")) permuted_peers = list(sb.get_servers_for_index("a"))
self.failUnlessEqual(len(permuted_peers), self.numclients) self.failUnlessEqual(len(permuted_peers), self.numclients)
d.addCallback(_check_connections) d.addCallback(_check_connections)

View File

@ -141,7 +141,7 @@ class ResultsBase:
sb = c.get_storage_broker() sb = c.get_storage_broker()
permuted_peer_ids = [peerid permuted_peer_ids = [peerid
for (peerid, rref) for (peerid, rref)
in sb.get_servers(cr.get_storage_index())] in sb.get_servers_for_index(cr.get_storage_index())]
num_shares_left = sum([len(shares) for shares in servers.values()]) num_shares_left = sum([len(shares) for shares in servers.values()])
servermap = [] servermap = []