mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-06-23 09:15:32 +00:00
more storage_broker refactoring: downloader gets a broker instead of a client,
use Client.get_storage_broker() accessor instead of direct attribute access.
This commit is contained in:
@ -263,6 +263,9 @@ class Client(node.Node, pollmixin.PollMixin):
|
||||
default=True, boolean=True):
|
||||
sb.use_introducer(self.introducer_client)
|
||||
|
||||
def get_storage_broker(self):
|
||||
return self.storage_broker
|
||||
|
||||
def init_stub_client(self):
|
||||
def _publish(res):
|
||||
# we publish an empty object so that the introducer can count how
|
||||
|
@ -616,21 +616,22 @@ class DownloadStatus:
|
||||
self.results = value
|
||||
|
||||
class CiphertextDownloader(log.PrefixingLogMixin):
|
||||
""" I download shares, check their integrity, then decode them, check the integrity of the
|
||||
resulting ciphertext, then and write it to my target. Before I send any new request to a
|
||||
server, I always ask the "monitor" object that was passed into my constructor whether this
|
||||
task has been cancelled (by invoking its raise_if_cancelled() method). """
|
||||
""" I download shares, check their integrity, then decode them, check the
|
||||
integrity of the resulting ciphertext, then and write it to my target.
|
||||
Before I send any new request to a server, I always ask the 'monitor'
|
||||
object that was passed into my constructor whether this task has been
|
||||
cancelled (by invoking its raise_if_cancelled() method)."""
|
||||
implements(IPushProducer)
|
||||
_status = None
|
||||
|
||||
def __init__(self, client, v, target, monitor):
|
||||
def __init__(self, storage_broker, v, target, monitor):
|
||||
|
||||
precondition(IVerifierURI.providedBy(v), v)
|
||||
precondition(IDownloadTarget.providedBy(target), target)
|
||||
|
||||
prefix=base32.b2a_l(v.storage_index[:8], 60)
|
||||
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
|
||||
self._client = client
|
||||
self._storage_broker = storage_broker
|
||||
|
||||
self._verifycap = v
|
||||
self._storage_index = v.storage_index
|
||||
@ -743,7 +744,7 @@ class CiphertextDownloader(log.PrefixingLogMixin):
|
||||
|
||||
def _get_all_shareholders(self):
|
||||
dl = []
|
||||
sb = self._client.storage_broker
|
||||
sb = self._storage_broker
|
||||
for (peerid,ss) in sb.get_servers(self._storage_index):
|
||||
d = ss.callRemote("get_buckets", self._storage_index)
|
||||
d.addCallbacks(self._got_response, self._got_error,
|
||||
@ -1191,11 +1192,13 @@ class Downloader(service.MultiService):
|
||||
# include LIT files
|
||||
self.stats_provider.count('downloader.files_downloaded', 1)
|
||||
self.stats_provider.count('downloader.bytes_downloaded', u.get_size())
|
||||
storage_broker = self.parent.get_storage_broker()
|
||||
|
||||
target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id)
|
||||
if not monitor:
|
||||
monitor=Monitor()
|
||||
dl = CiphertextDownloader(self.parent, u.get_verify_cap(), target, monitor=monitor)
|
||||
dl = CiphertextDownloader(storage_broker, u.get_verify_cap(), target,
|
||||
monitor=monitor)
|
||||
self._all_downloads[dl] = None
|
||||
if history:
|
||||
history.add_download(dl.get_download_status())
|
||||
|
@ -618,7 +618,7 @@ class Helper(Referenceable, service.MultiService):
|
||||
# see if this file is already in the grid
|
||||
lp2 = self.log("doing a quick check+UEBfetch",
|
||||
parent=lp, level=log.NOISY)
|
||||
sb = self.parent.storage_broker
|
||||
sb = self.parent.get_storage_broker()
|
||||
c = CHKCheckerAndUEBFetcher(sb.get_servers, storage_index, lp2)
|
||||
d = c.check()
|
||||
def _checked(res):
|
||||
|
@ -166,7 +166,7 @@ class Tahoe2PeerSelector:
|
||||
self.use_peers = set() # PeerTrackers that have shares assigned to them
|
||||
self.preexisting_shares = {} # sharenum -> peerid holding the share
|
||||
|
||||
sb = client.storage_broker
|
||||
sb = client.get_storage_broker()
|
||||
peers = list(sb.get_servers(storage_index))
|
||||
if not peers:
|
||||
raise NoServersError("client gave us zero peers")
|
||||
|
@ -190,7 +190,7 @@ class Publish:
|
||||
assert self._privkey
|
||||
self._encprivkey = self._node.get_encprivkey()
|
||||
|
||||
sb = self._node._client.storage_broker
|
||||
sb = self._node._client.get_storage_broker()
|
||||
full_peerlist = sb.get_servers(self._storage_index)
|
||||
self.full_peerlist = full_peerlist # for use later, immutable
|
||||
self.bad_peers = set() # peerids who have errbacked/refused requests
|
||||
|
@ -421,7 +421,7 @@ class ServermapUpdater:
|
||||
|
||||
self._queries_completed = 0
|
||||
|
||||
sb = self._node._client.storage_broker
|
||||
sb = self._node._client.get_storage_broker()
|
||||
full_peerlist = list(sb.get_servers(self._node._storage_index))
|
||||
self.full_peerlist = full_peerlist # for use later, immutable
|
||||
self.extra_peers = full_peerlist[:] # peers are removed as we use them
|
||||
|
@ -9,6 +9,8 @@ from common_web import WebRenderingMixin
|
||||
class FakeClient:
|
||||
def get_nickname_for_serverid(self, serverid):
|
||||
return self.storage_broker.get_nickname_for_serverid(serverid)
|
||||
def get_storage_broker(self):
|
||||
return self.storage_broker
|
||||
|
||||
class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
|
||||
|
||||
|
@ -68,6 +68,8 @@ class FakeClient(service.MultiService):
|
||||
return log.msg(*args, **kwargs)
|
||||
def get_encoding_parameters(self):
|
||||
return self.DEFAULT_ENCODING_PARAMETERS
|
||||
def get_storage_broker(self):
|
||||
return self.storage_broker
|
||||
|
||||
def flush_but_dont_ignore(res):
|
||||
d = flushEventualQueue()
|
||||
|
@ -181,6 +181,8 @@ class FakeClient:
|
||||
|
||||
def get_all_serverids(self):
|
||||
return self.storage_broker.get_all_serverids()
|
||||
def get_storage_broker(self):
|
||||
return self.storage_broker
|
||||
def debug_break_connection(self, peerid):
|
||||
self.storage_broker.servers[peerid].broken = True
|
||||
def debug_remove_connection(self, peerid):
|
||||
|
@ -177,6 +177,8 @@ class FakeClient:
|
||||
pass
|
||||
def get_encoding_parameters(self):
|
||||
return self.DEFAULT_ENCODING_PARAMETERS
|
||||
def get_storage_broker(self):
|
||||
return self.storage_broker
|
||||
|
||||
def get_renewal_secret(self):
|
||||
return ""
|
||||
|
@ -69,6 +69,8 @@ class FakeClient(service.MultiService):
|
||||
return u"John Doe"
|
||||
|
||||
storage_broker = StorageFarmBroker()
|
||||
def get_storage_broker(self):
|
||||
return self.storage_broker
|
||||
|
||||
def create_node_from_uri(self, auri):
|
||||
precondition(isinstance(auri, str), auri)
|
||||
|
@ -137,7 +137,7 @@ class ResultsBase:
|
||||
add("Unrecoverable Versions", data["count-unrecoverable-versions"])
|
||||
|
||||
# this table is sorted by permuted order
|
||||
sb = c.storage_broker
|
||||
sb = c.get_storage_broker()
|
||||
permuted_peer_ids = [peerid
|
||||
for (peerid, rref)
|
||||
in sb.get_servers(cr.get_storage_index())]
|
||||
|
Reference in New Issue
Block a user