diff --git a/newsfragments/3048.minor b/newsfragments/3048.minor new file mode 100644 index 000000000..e69de29bb diff --git a/src/allmydata/control.py b/src/allmydata/control.py index 91454cbe3..07802efba 100644 --- a/src/allmydata/control.py +++ b/src/allmydata/control.py @@ -123,9 +123,9 @@ class ControlServer(Referenceable, service.Service): return results server = everyone_left.pop(0) server_name = server.get_longname() - connection = server.get_rref() + storage_server = server.get_storage_server() start = time.time() - d = connection.callRemote("get_buckets", "\x00"*16) + d = storage_server.get_buckets("\x00" * 16) def _done(ignored): stop = time.time() elapsed = stop - start diff --git a/src/allmydata/immutable/checker.py b/src/allmydata/immutable/checker.py index 596bc8225..11f4341d4 100644 --- a/src/allmydata/immutable/checker.py +++ b/src/allmydata/immutable/checker.py @@ -496,16 +496,19 @@ class Checker(log.PrefixingLogMixin): that we want to track and report whether or not each server responded.)""" - rref = s.get_rref() + storage_server = s.get_storage_server() lease_seed = s.get_lease_seed() if self._add_lease: renew_secret = self._get_renewal_secret(lease_seed) cancel_secret = self._get_cancel_secret(lease_seed) - d2 = rref.callRemote("add_lease", storageindex, - renew_secret, cancel_secret) + d2 = storage_server.add_lease( + storageindex, + renew_secret, + cancel_secret, + ) d2.addErrback(self._add_lease_failed, s.get_name(), storageindex) - d = rref.callRemote("get_buckets", storageindex) + d = storage_server.get_buckets(storageindex) def _wrap_results(res): return (res, True) diff --git a/src/allmydata/immutable/downloader/finder.py b/src/allmydata/immutable/downloader/finder.py index ad99c4647..3c6788537 100644 --- a/src/allmydata/immutable/downloader/finder.py +++ b/src/allmydata/immutable/downloader/finder.py @@ -139,7 +139,7 @@ class ShareFinder(object): # TODO: get the timer from a Server object, it knows best self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT, self.overdue, req) - d = server.get_rref().callRemote("get_buckets", self._storage_index) + d = server.get_storage_server().get_buckets(self._storage_index) d.addBoth(incidentally, self._request_retired, req) d.addCallbacks(self._got_response, self._got_error, callbackArgs=(server, req, d_ev, time_sent, lp), @@ -221,5 +221,3 @@ class ShareFinder(object): self.log(format="got error from [%(name)s]", name=server.get_name(), failure=f, level=log.UNUSUAL, parent=lp, umid="zUKdCw") - - diff --git a/src/allmydata/immutable/offloaded.py b/src/allmydata/immutable/offloaded.py index c5b402af3..e04e94e8f 100644 --- a/src/allmydata/immutable/offloaded.py +++ b/src/allmydata/immutable/offloaded.py @@ -54,7 +54,7 @@ class CHKCheckerAndUEBFetcher(object): def _get_all_shareholders(self, storage_index): dl = [] for s in self._peer_getter(storage_index): - d = s.get_rref().callRemote("get_buckets", storage_index) + d = s.get_storage_server().get_buckets(storage_index) d.addCallbacks(self._got_response, self._got_error, callbackArgs=(s,)) dl.append(d) diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index 838ee85f4..7a683f317 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -261,20 +261,21 @@ class ServerTracker(object): return self._server.get_name() def query(self, sharenums): - rref = self._server.get_rref() - d = rref.callRemote("allocate_buckets", - self.storage_index, - self.renew_secret, - self.cancel_secret, - sharenums, - self.allocated_size, - canary=Referenceable()) + storage_server = self._server.get_storage_server() + d = storage_server.allocate_buckets( + self.storage_index, + self.renew_secret, + self.cancel_secret, + sharenums, + self.allocated_size, + canary=Referenceable(), + ) d.addCallback(self._buckets_allocated) return d def ask_about_existing_shares(self): - rref = self._server.get_rref() - return rref.callRemote("get_buckets", self.storage_index) + storage_server = self._server.get_storage_server() + return storage_server.get_buckets(self.storage_index) def _buckets_allocated(self, alreadygot_and_buckets): #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets))) @@ -415,7 +416,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): # field) from getting large shares (for files larger than about # 12GiB). See #439 for details. def _get_maxsize(server): - v0 = server.get_rref().version + v0 = server.get_version() v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"] return v1["maximum-immutable-share-size"] diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 36c3622d8..dee197ca2 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -291,6 +291,81 @@ class RIStorageServer(RemoteInterface): """ +class IStorageServer(Interface): + """ + An object capable of storing shares for a storage client. + """ + def get_version(): + """ + :see: ``RIStorageServer.get_version`` + """ + + def allocate_buckets( + storage_index, + renew_secret, + cancel_secret, + sharenums, + allocated_size, + canary, + ): + """ + :see: ``RIStorageServer.allocate_buckets`` + """ + + def add_lease( + storage_index, + renew_secret, + cancel_secret, + ): + """ + :see: ``RIStorageServer.add_lease`` + """ + + def renew_lease( + storage_index, + renew_secret, + ): + """ + :see: ``RIStorageServer.renew_lease`` + """ + + def get_buckets( + storage_index, + ): + """ + :see: ``RIStorageServer.get_buckets`` + """ + + def slot_readv( + storage_index, + shares, + readv, + ): + """ + :see: ``RIStorageServer.slot_readv`` + """ + + def slot_testv_and_readv_and_writev( + storage_index, + secrets, + tw_vectors, + r_vector, + ): + """ + :see: ``RIStorageServer.slot_testv_readv_and_writev`` + """ + + def advise_corrupt_share( + share_type, + storage_index, + shnum, + reason, + ): + """ + :see: ``RIStorageServer.advise_corrupt_share`` + """ + + class IStorageBucketWriter(Interface): """ Objects of this kind live on the client side. @@ -463,13 +538,26 @@ class IServer(IDisplayableServer): pass def get_rref(): - """Once a server is connected, I return a RemoteReference. + """Obsolete. Use ``get_storage_server`` instead. + + Once a server is connected, I return a RemoteReference. Before a server is connected for the first time, I return None. Note that the rref I return will start producing DeadReferenceErrors once the connection is lost. """ + def get_storage_server(): + """ + Once a server is connected, I return an ``IStorageServer``. + Before a server is connected for the first time, I return None. + + Note that the ``IStorageServer`` I return will start producing + DeadReferenceErrors once the connection is lost. + """ + + + class IMutableSlotWriter(Interface): """ @@ -2935,4 +3023,3 @@ class IConnectionStatus(Interface): connection hint and the handler it is using) to the status string (pending, connected, refused, or other errors). """) - diff --git a/src/allmydata/mutable/layout.py b/src/allmydata/mutable/layout.py index d16088994..20f2df3aa 100644 --- a/src/allmydata/mutable/layout.py +++ b/src/allmydata/mutable/layout.py @@ -230,7 +230,7 @@ class SDMFSlotWriteProxy(object): """ def __init__(self, shnum, - rref, # a remote reference to a storage server + storage_server, # an IStorageServer storage_index, secrets, # (write_enabler, renew_secret, cancel_secret) seqnum, # the sequence number of the mutable file @@ -239,7 +239,7 @@ class SDMFSlotWriteProxy(object): segment_size, data_length): # the length of the original file self.shnum = shnum - self._rref = rref + self._storage_server = storage_server self._storage_index = storage_index self._secrets = secrets self._seqnum = seqnum @@ -541,12 +541,13 @@ class SDMFSlotWriteProxy(object): tw_vectors = {} tw_vectors[self.shnum] = (self._testvs, datavs, None) - return self._rref.callRemote("slot_testv_and_readv_and_writev", - self._storage_index, - self._secrets, - tw_vectors, - # TODO is it useful to read something? - self._readvs) + return self._storage_server.slot_testv_and_readv_and_writev( + self._storage_index, + self._secrets, + tw_vectors, + # TODO is it useful to read something? + self._readvs, + ) MDMFHEADER = ">BQ32sBBQQ QQQQQQQQ" @@ -729,7 +730,7 @@ class MDMFSlotWriteProxy(object): # disruption. def __init__(self, shnum, - rref, # a remote reference to a storage server + storage_server, # a remote reference to a storage server storage_index, secrets, # (write_enabler, renew_secret, cancel_secret) seqnum, # the sequence number of the mutable file @@ -738,7 +739,7 @@ class MDMFSlotWriteProxy(object): segment_size, data_length): # the length of the original file self.shnum = shnum - self._rref = rref + self._storage_server = storage_server self._storage_index = storage_index self._seqnum = seqnum self._required_shares = required_shares @@ -1159,11 +1160,12 @@ class MDMFSlotWriteProxy(object): self._testvs = [(0, len(new_checkstring), "eq", new_checkstring)] on_success = _first_write tw_vectors[self.shnum] = (self._testvs, datavs, None) - d = self._rref.callRemote("slot_testv_and_readv_and_writev", - self._storage_index, - self._secrets, - tw_vectors, - self._readv) + d = self._storage_server.slot_testv_and_readv_and_writev( + self._storage_index, + self._secrets, + tw_vectors, + self._readv, + ) def _result(results): if isinstance(results, failure.Failure) or not results[0]: # Do nothing; the write was unsuccessful. @@ -1189,13 +1191,13 @@ class MDMFSlotReadProxy(object): it is valid) to eliminate some of the need to fetch it from servers. """ def __init__(self, - rref, + storage_server, storage_index, shnum, data="", data_is_everything=False): # Start the initialization process. - self._rref = rref + self._storage_server = storage_server self._storage_index = storage_index self.shnum = shnum @@ -1752,10 +1754,11 @@ class MDMFSlotReadProxy(object): results = {self.shnum: results} return defer.succeed(results) else: - return self._rref.callRemote("slot_readv", - self._storage_index, - [self.shnum], - readvs) + return self._storage_server.slot_readv( + self._storage_index, + [self.shnum], + readvs, + ) def is_sdmf(self): diff --git a/src/allmydata/mutable/publish.py b/src/allmydata/mutable/publish.py index fcd949124..7f0dad180 100644 --- a/src/allmydata/mutable/publish.py +++ b/src/allmydata/mutable/publish.py @@ -269,7 +269,7 @@ class Publish(object): secrets = (write_enabler, renew_secret, cancel_secret) writer = writer_class(shnum, - server.get_rref(), + server.get_storage_server(), self._storage_index, secrets, self._new_seqnum, @@ -471,7 +471,7 @@ class Publish(object): secrets = (write_enabler, renew_secret, cancel_secret) writer = writer_class(shnum, - server.get_rref(), + server.get_storage_server(), self._storage_index, secrets, self._new_seqnum, diff --git a/src/allmydata/mutable/retrieve.py b/src/allmydata/mutable/retrieve.py index 56dcfcfb9..1e5d51497 100644 --- a/src/allmydata/mutable/retrieve.py +++ b/src/allmydata/mutable/retrieve.py @@ -309,7 +309,7 @@ class Retrieve(object): if key in self.servermap.proxies: reader = self.servermap.proxies[key] else: - reader = MDMFSlotReadProxy(server.get_rref(), + reader = MDMFSlotReadProxy(server.get_storage_server(), self._storage_index, shnum, None) reader.server = server self.readers[shnum] = reader @@ -906,9 +906,13 @@ class Retrieve(object): def notify_server_corruption(self, server, shnum, reason): - rref = server.get_rref() - rref.callRemoteOnly("advise_corrupt_share", - "mutable", self._storage_index, shnum, reason) + storage_server = server.get_storage_server() + storage_server.advise_corrupt_share( + "mutable", + self._storage_index, + shnum, + reason, + ) def _try_to_validate_privkey(self, enc_privkey, reader, server): diff --git a/src/allmydata/mutable/servermap.py b/src/allmydata/mutable/servermap.py index 0c7656028..2dd95593f 100644 --- a/src/allmydata/mutable/servermap.py +++ b/src/allmydata/mutable/servermap.py @@ -592,7 +592,7 @@ class ServermapUpdater(object): return d def _do_read(self, server, storage_index, shnums, readv): - ss = server.get_rref() + ss = server.get_storage_server() if self._add_lease: # send an add-lease message in parallel. The results are handled # separately. This is sent before the slot_readv() so that we can @@ -601,11 +601,14 @@ class ServermapUpdater(object): # add_lease is synchronous). renew_secret = self._node.get_renewal_secret(server) cancel_secret = self._node.get_cancel_secret(server) - d2 = ss.callRemote("add_lease", storage_index, - renew_secret, cancel_secret) + d2 = ss.add_lease( + storage_index, + renew_secret, + cancel_secret, + ) # we ignore success d2.addErrback(self._add_lease_failed, server, storage_index) - d = ss.callRemote("slot_readv", storage_index, shnums, readv) + d = ss.slot_readv(storage_index, shnums, readv) return d @@ -638,7 +641,7 @@ class ServermapUpdater(object): lp = self.log(format="got result from [%(name)s], %(numshares)d shares", name=server.get_name(), numshares=len(datavs)) - ss = server.get_rref() + ss = server.get_storage_server() now = time.time() elapsed = now - started def _done_processing(ignored=None): @@ -796,9 +799,13 @@ class ServermapUpdater(object): def notify_server_corruption(self, server, shnum, reason): - rref = server.get_rref() - rref.callRemoteOnly("advise_corrupt_share", - "mutable", self._storage_index, shnum, reason) + ss = server.get_storage_server() + ss.advise_corrupt_share( + "mutable", + self._storage_index, + shnum, + reason, + ) def _got_signature_one_share(self, results, shnum, server, lp): @@ -1220,5 +1227,3 @@ class ServermapUpdater(object): def _fatal_error(self, f): self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw") self._done_deferred.errback(f) - - diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 65c65f535..3c761032f 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -30,12 +30,18 @@ the foolscap-based server implemented in src/allmydata/storage/*.py . import re, time, hashlib +import attr from zope.interface import implementer from twisted.internet import defer from twisted.application import service from foolscap.api import eventually -from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer +from allmydata.interfaces import ( + IStorageBroker, + IDisplayableServer, + IServer, + IStorageServer, +) from allmydata.util import log, base32, connection_status from allmydata.util.assertutil import precondition from allmydata.util.observer import ObserverList @@ -111,7 +117,7 @@ class StorageFarmBroker(service.MultiService): # these two are used in unit tests def test_add_rref(self, serverid, rref, ann): s = NativeStorageServer(serverid, ann.copy(), self._tub_maker, {}) - s.rref = rref + s._rref = rref s._is_connected = True self.servers[serverid] = s @@ -315,7 +321,7 @@ class NativeStorageServer(service.MultiService): self.last_connect_time = None self.last_loss_time = None self.remote_host = None - self.rref = None + self._rref = None self._is_connected = False self._reconnector = None self._trigger_cb = None @@ -344,8 +350,8 @@ class NativeStorageServer(service.MultiService): def get_permutation_seed(self): return self._permutation_seed def get_version(self): - if self.rref: - return self.rref.version + if self._rref: + return self._rref.version return None def get_name(self): # keep methodname short # TODO: decide who adds [] in the short description. It should @@ -367,8 +373,8 @@ class NativeStorageServer(service.MultiService): def get_connection_status(self): last_received = None - if self.rref: - last_received = self.rref.getDataLastReceivedAt() + if self._rref: + last_received = self._rref.getDataLastReceivedAt() return connection_status.from_foolscap_reconnector(self._reconnector, last_received) @@ -414,18 +420,30 @@ class NativeStorageServer(service.MultiService): self.last_connect_time = time.time() self.remote_host = rref.getLocationHints() - self.rref = rref + self._rref = rref self._is_connected = True rref.notifyOnDisconnect(self._lost) def get_rref(self): - return self.rref + return self._rref + + def get_storage_server(self): + """ + See ``IServer.get_storage_server``. + """ + if self._rref is None: + return None + # Pass in an accessor for our _rref attribute. The value of the + # attribute may change over time as connections are lost and + # re-established. The _StorageServer should always be able to get the + # most up-to-date value. + return _StorageServer(get_rref=self.get_rref) def _lost(self): log.msg(format="lost connection to %(name)s", name=self.get_name(), facility="tahoe.storage_broker", umid="zbRllw") self.last_loss_time = time.time() - # self.rref is now stale: all callRemote()s will get a + # self._rref is now stale: all callRemote()s will get a # DeadReferenceError. We leave the stale reference in place so that # uploader/downloader code (which received this IServer through # get_connected_servers() or get_servers_for_psi()) can continue to @@ -443,3 +461,117 @@ class NativeStorageServer(service.MultiService): class UnknownServerTypeError(Exception): pass + + +@implementer(IStorageServer) +@attr.s +class _StorageServer(object): + """ + ``_StorageServer`` is a direct pass-through to an ``RIStorageServer`` via + a ``RemoteReference``. + """ + _get_rref = attr.ib() + + @property + def _rref(self): + return self._get_rref() + + def get_version(self): + return self._rref.callRemote( + "get_version", + ) + + def allocate_buckets( + self, + storage_index, + renew_secret, + cancel_secret, + sharenums, + allocated_size, + canary, + ): + return self._rref.callRemote( + "allocate_buckets", + storage_index, + renew_secret, + cancel_secret, + sharenums, + allocated_size, + canary, + ) + + def add_lease( + self, + storage_index, + renew_secret, + cancel_secret, + ): + return self._rref.callRemote( + "add_lease", + storage_index, + renew_secret, + cancel_secret, + ) + + def renew_lease( + self, + storage_index, + renew_secret, + ): + return self._rref.callRemote( + "renew_lease", + storage_index, + renew_secret, + ) + + def get_buckets( + self, + storage_index, + ): + return self._rref.callRemote( + "get_buckets", + storage_index, + ) + + def slot_readv( + self, + storage_index, + shares, + readv, + ): + return self._rref.callRemote( + "slot_readv", + storage_index, + shares, + readv, + ) + + def slot_testv_and_readv_and_writev( + self, + storage_index, + secrets, + tw_vectors, + r_vector, + ): + return self._rref.callRemote( + "slot_testv_and_readv_and_writev", + storage_index, + secrets, + tw_vectors, + r_vector, + ) + + def advise_corrupt_share( + self, + share_type, + storage_index, + shnum, + reason, + ): + return self._rref.callRemoteOnly( + "advise_corrupt_share", + share_type, + storage_index, + shnum, + reason, + ) diff --git a/src/allmydata/test/mutable/test_filenode.py b/src/allmydata/test/mutable/test_filenode.py index 62b720442..fdc19d5cb 100644 --- a/src/allmydata/test/mutable/test_filenode.py +++ b/src/allmydata/test/mutable/test_filenode.py @@ -11,7 +11,11 @@ from allmydata.mutable.publish import MutableData from ..test_download import PausingConsumer, PausingAndStoppingConsumer, \ StoppingConsumer, ImmediatelyStoppingConsumer from .. import common_util as testutil -from .util import FakeStorage, make_nodemaker +from .util import ( + FakeStorage, + make_nodemaker_with_peers, + make_peer, +) class Filenode(unittest.TestCase, testutil.ShouldFailMixin): # this used to be in Publish, but we removed the limit. Some of @@ -19,8 +23,15 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin): # larger than the limit. OLD_MAX_SEGMENT_SIZE = 3500000 def setUp(self): - self._storage = s = FakeStorage() - self.nodemaker = make_nodemaker(s) + self._storage = FakeStorage() + self._peers = list( + make_peer(self._storage, n) + for n + # 10 is the default for N. We're trying to make enough servers + # here so that each only gets one share. + in range(10) + ) + self.nodemaker = make_nodemaker_with_peers(self._peers) def test_create(self): d = self.nodemaker.create_mutable_file() @@ -352,16 +363,19 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin): def test_mdmf_write_count(self): - # Publishing an MDMF file should only cause one write for each - # share that is to be published. Otherwise, we introduce - # undesirable semantics that are a regression from SDMF + """ + Publishing an MDMF file causes exactly one write for each share that is to + be published. Otherwise, we introduce undesirable semantics that are a + regression from SDMF. + """ upload = MutableData("MDMF" * 100000) # about 400 KiB d = self.nodemaker.create_mutable_file(upload, version=MDMF_VERSION) def _check_server_write_counts(ignored): - sb = self.nodemaker.storage_broker - for server in sb.servers.itervalues(): - self.failUnlessEqual(server.get_rref().queries, 1) + for peer in self._peers: + # There were enough servers for each to only get a single + # share. + self.assertEqual(peer.storage_server.queries, 1) d.addCallback(_check_server_write_counts) return d diff --git a/src/allmydata/test/mutable/util.py b/src/allmydata/test/mutable/util.py index 99c039cab..89d2269f3 100644 --- a/src/allmydata/test/mutable/util.py +++ b/src/allmydata/test/mutable/util.py @@ -1,4 +1,5 @@ from six.moves import cStringIO as StringIO +import attr from twisted.internet import defer, reactor from foolscap.api import eventually, fireEventually from allmydata import client @@ -199,21 +200,98 @@ def corrupt(res, s, offset, shnums_to_corrupt=None, offset_offset=0): dl.addCallback(lambda ignored: res) return dl +@attr.s +class Peer(object): + peerid = attr.ib() + storage_server = attr.ib() + announcement = attr.ib() + +def make_peer(s, i): + """ + Create a "peer" suitable for use with ``make_storagebroker_with_peers`` or + ``make_nodemaker_with_peers``. + + :param IServer s: The server with which to associate the peers. + + :param int i: A unique identifier for this peer within the whole group of + peers to be used. For example, a sequence number. This is used to + generate a unique peer id. + + :rtype: ``Peer`` + """ + peerid = tagged_hash("peerid", "%d" % i)[:20] + fss = FakeStorageServer(peerid, s) + ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid), + "permutation-seed-base32": base32.b2a(peerid) } + return Peer(peerid=peerid, storage_server=fss, announcement=ann) + + def make_storagebroker(s=None, num_peers=10): + """ + Make a ``StorageFarmBroker`` connected to some number of fake storage + servers. + + :param IServer s: The server with which to associate the fake storage + servers. + + :param int num_peers: The number of fake storage servers to associate with + the broker. + """ if not s: s = FakeStorage() - peerids = [tagged_hash("peerid", "%d" % i)[:20] - for i in range(num_peers)] + peers = [] + for peer_num in range(num_peers): + peers.append(make_peer(s, peer_num)) + return make_storagebroker_with_peers(peers) + + +def make_storagebroker_with_peers(peers): + """ + Make a ``StorageFarmBroker`` connected to the given storage servers. + + :param list peers: The storage servers to associate with the storage + broker. + """ storage_broker = StorageFarmBroker(True, None) - for peerid in peerids: - fss = FakeStorageServer(peerid, s) - ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid), - "permutation-seed-base32": base32.b2a(peerid) } - storage_broker.test_add_rref(peerid, fss, ann) + for peer in peers: + storage_broker.test_add_rref( + peer.peerid, + peer.storage_server, + peer.announcement, + ) return storage_broker + def make_nodemaker(s=None, num_peers=10, keysize=TEST_RSA_KEY_SIZE): + """ + Make a ``NodeMaker`` connected to some number of fake storage servers. + + :param IServer s: The server with which to associate the fake storage + servers. + + :param int num_peers: The number of fake storage servers to associate with + the node maker. + """ storage_broker = make_storagebroker(s, num_peers) + return make_nodemaker_with_storage_broker(storage_broker, keysize) + + +def make_nodemaker_with_peers(peers, keysize=TEST_RSA_KEY_SIZE): + """ + Make a ``NodeMaker`` connected to the given storage servers. + + :param list peers: The storage servers to associate with the node maker. + """ + storage_broker = make_storagebroker_with_peers(peers) + return make_nodemaker_with_storage_broker(storage_broker, keysize) + + +def make_nodemaker_with_storage_broker(storage_broker, keysize): + """ + Make a ``NodeMaker`` using the given storage broker. + + :param StorageFarmBroker peers: The storage broker to use. + """ sh = client.SecretHolder("lease secret", "convergence secret") keygen = client.KeyGenerator() if keysize: @@ -223,6 +301,7 @@ def make_nodemaker(s=None, num_peers=10, keysize=TEST_RSA_KEY_SIZE): {"k": 3, "n": 10}, SDMF_VERSION, keygen) return nodemaker + class PublishMixin(object): def publish_one(self): # publish a file and create shares, which can then be manipulated @@ -351,4 +430,3 @@ class CheckerMixin(object): return self.fail("%s: didn't see expected exception %s in problems %s" % (where, expected_exception, r.get_share_problems())) - diff --git a/src/allmydata/test/no_network.py b/src/allmydata/test/no_network.py index e75238c56..1333dba60 100644 --- a/src/allmydata/test/no_network.py +++ b/src/allmydata/test/no_network.py @@ -32,6 +32,9 @@ from allmydata.util import fileutil, idlib, hashutil from allmydata.util.hashutil import permute_server_hash from allmydata.util.fileutil import abspath_expanduser_unicode from allmydata.interfaces import IStorageBroker, IServer +from allmydata.storage_client import ( + _StorageServer, +) from .common import ( TEST_RSA_KEY_SIZE, SameProcessStreamEndpointAssigner, @@ -166,6 +169,10 @@ class NoNetworkServer(object): return "nickname" def get_rref(self): return self.rref + def get_storage_server(self): + if self.rref is None: + return None + return _StorageServer(lambda: self.rref) def get_version(self): return self.rref.version diff --git a/src/allmydata/test/test_download.py b/src/allmydata/test/test_download.py index 03e1ad56a..5c78a9805 100644 --- a/src/allmydata/test/test_download.py +++ b/src/allmydata/test/test_download.py @@ -601,8 +601,8 @@ class DownloadTest(_Base, unittest.TestCase): # that they're old and can't handle reads that overrun the length of # the share. This exercises a different code path. for s in self.c0.storage_broker.get_connected_servers(): - rref = s.get_rref() - v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] + v = s.get_version() + v1 = v["http://allmydata.org/tahoe/protocols/storage/v1"] v1["tolerates-immutable-read-overrun"] = False n = self.c0.create_node_from_uri(immutable_uri) @@ -1178,8 +1178,8 @@ class DownloadV2(_Base, unittest.TestCase): # that they're old and can't handle reads that overrun the length of # the share. This exercises a different code path. for s in self.c0.storage_broker.get_connected_servers(): - rref = s.get_rref() - v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] + v = s.get_version() + v1 = v["http://allmydata.org/tahoe/protocols/storage/v1"] v1["tolerates-immutable-read-overrun"] = False # upload a file @@ -1198,8 +1198,8 @@ class DownloadV2(_Base, unittest.TestCase): self.c0 = self.g.clients[0] for s in self.c0.storage_broker.get_connected_servers(): - rref = s.get_rref() - v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] + v = s.get_version() + v1 = v["http://allmydata.org/tahoe/protocols/storage/v1"] v1["tolerates-immutable-read-overrun"] = False # upload a file diff --git a/src/allmydata/test/test_immutable.py b/src/allmydata/test/test_immutable.py index be4c3dc20..0c7a15199 100644 --- a/src/allmydata/test/test_immutable.py +++ b/src/allmydata/test/test_immutable.py @@ -16,6 +16,9 @@ from allmydata.interfaces import NotEnoughSharesError from allmydata.immutable.upload import Data from allmydata.immutable.downloader import finder +from .no_network import ( + NoNetworkServer, +) class MockShareHashTree(object): def needed_hashes(self): @@ -106,19 +109,6 @@ class TestShareFinder(unittest.TestCase): eventually(_give_buckets_and_hunger_again) return d - class MockIServer(object): - def __init__(self, serverid, rref): - self.serverid = serverid - self.rref = rref - def get_serverid(self): - return self.serverid - def get_rref(self): - return self.rref - def get_name(self): - return "name-%s" % self.serverid - def get_version(self): - return self.rref.version - class MockStorageBroker(object): def __init__(self, servers): self.servers = servers @@ -136,9 +126,9 @@ class TestShareFinder(unittest.TestCase): mockserver1 = MockServer({1: MockBuckets(), 2: MockBuckets()}) mockserver2 = MockServer({}) mockserver3 = MockServer({3: MockBuckets()}) - servers = [ MockIServer("ms1", mockserver1), - MockIServer("ms2", mockserver2), - MockIServer("ms3", mockserver3), ] + servers = [ NoNetworkServer("ms1", mockserver1), + NoNetworkServer("ms2", mockserver2), + NoNetworkServer("ms3", mockserver3), ] mockstoragebroker = MockStorageBroker(servers) mockdownloadstatus = MockDownloadStatus() mocknode = MockNode(check_reneging=True, check_fetch_failed=True) diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index ad3fe3580..e9723a2bc 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -32,6 +32,9 @@ from allmydata.test.common import LoggingServiceParent, ShouldFailMixin from allmydata.test.common_web import WebRenderingMixin from allmydata.test.no_network import NoNetworkServer from allmydata.web.storage import StorageStatus, remove_prefix +from allmydata.storage_client import ( + _StorageServer, +) class Marker(object): pass @@ -162,7 +165,8 @@ class Bucket(unittest.TestCase): class RemoteBucket(object): - def __init__(self): + def __init__(self, target): + self.target = target self.read_count = 0 self.write_count = 0 @@ -188,8 +192,7 @@ class BucketProxy(unittest.TestCase): fileutil.make_dirs(os.path.join(basedir, "tmp")) bw = BucketWriter(self, incoming, final, size, self.make_lease(), FakeCanary()) - rb = RemoteBucket() - rb.target = bw + rb = RemoteBucket(bw) return bw, rb, final def make_lease(self): @@ -261,8 +264,7 @@ class BucketProxy(unittest.TestCase): # now read everything back def _start_reading(res): br = BucketReader(self, sharefname) - rb = RemoteBucket() - rb.target = br + rb = RemoteBucket(br) server = NoNetworkServer("abc", None) rbp = rbp_class(rb, server, storage_index="") self.failUnlessIn("to peer", repr(rbp)) @@ -1374,8 +1376,8 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): self.sparent = LoggingServiceParent() self._lease_secret = itertools.count() self.ss = self.create("MDMFProxies storage test server") - self.rref = RemoteBucket() - self.rref.target = self.ss + self.rref = RemoteBucket(self.ss) + self.storage_server = _StorageServer(lambda: self.rref) self.secrets = (self.write_enabler("we_secret"), self.renew_secret("renew_secret"), self.cancel_secret("cancel_secret")) @@ -1605,7 +1607,6 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): empty=False): # Some tests need SDMF shares to verify that we can still # read them. This method writes one, which resembles but is not - assert self.rref write = self.ss.remote_slot_testv_and_readv_and_writev share = self.build_test_sdmf_share(empty) testvs = [(0, 1, "eq", "")] @@ -1618,7 +1619,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): def test_read(self): self.write_test_share_to_server("si1") - mr = MDMFSlotReadProxy(self.rref, "si1", 0) + mr = MDMFSlotReadProxy(self.storage_server, "si1", 0) # Check that every method equals what we expect it to. d = defer.succeed(None) def _check_block_and_salt(block_and_salt): @@ -1690,7 +1691,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): def test_read_with_different_tail_segment_size(self): self.write_test_share_to_server("si1", tail_segment=True) - mr = MDMFSlotReadProxy(self.rref, "si1", 0) + mr = MDMFSlotReadProxy(self.storage_server, "si1", 0) d = mr.get_block_and_salt(5) def _check_tail_segment(results): block, salt = results @@ -1702,7 +1703,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): def test_get_block_with_invalid_segnum(self): self.write_test_share_to_server("si1") - mr = MDMFSlotReadProxy(self.rref, "si1", 0) + mr = MDMFSlotReadProxy(self.storage_server, "si1", 0) d = defer.succeed(None) d.addCallback(lambda ignored: self.shouldFail(LayoutInvalid, "test invalid segnum", @@ -1713,7 +1714,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): def test_get_encoding_parameters_first(self): self.write_test_share_to_server("si1") - mr = MDMFSlotReadProxy(self.rref, "si1", 0) + mr = MDMFSlotReadProxy(self.storage_server, "si1", 0) d = mr.get_encoding_parameters() def _check_encoding_parameters(args): (k, n, segment_size, datalen) = args @@ -1727,7 +1728,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): def test_get_seqnum_first(self): self.write_test_share_to_server("si1") - mr = MDMFSlotReadProxy(self.rref, "si1", 0) + mr = MDMFSlotReadProxy(self.storage_server, "si1", 0) d = mr.get_seqnum() d.addCallback(lambda seqnum: self.failUnlessEqual(seqnum, 0)) @@ -1736,7 +1737,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): def test_get_root_hash_first(self): self.write_test_share_to_server("si1") - mr = MDMFSlotReadProxy(self.rref, "si1", 0) + mr = MDMFSlotReadProxy(self.storage_server, "si1", 0) d = mr.get_root_hash() d.addCallback(lambda root_hash: self.failUnlessEqual(root_hash, self.root_hash)) @@ -1745,7 +1746,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): def test_get_checkstring_first(self): self.write_test_share_to_server("si1") - mr = MDMFSlotReadProxy(self.rref, "si1", 0) + mr = MDMFSlotReadProxy(self.storage_server, "si1", 0) d = mr.get_checkstring() d.addCallback(lambda checkstring: self.failUnlessEqual(checkstring, self.checkstring)) @@ -2060,7 +2061,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): # size of 6, we know that it has 6 byte segments, which will # be split into blocks of 2 bytes because our FEC k # parameter is 3. - mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10, + mw = MDMFSlotWriteProxy(share, self.storage_server, si, self.secrets, 0, 3, 10, 6, datalength) return mw @@ -2263,7 +2264,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): d.addCallback(lambda ignored: mw.finish_publishing()) - mr = MDMFSlotReadProxy(self.rref, "si1", 0) + mr = MDMFSlotReadProxy(self.storage_server, "si1", 0) def _check_block_and_salt(block_and_salt): (block, salt) = block_and_salt self.failUnlessEqual(block, self.block) @@ -2331,7 +2332,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): # since it will encounter them on the grid. Callers use the # is_sdmf method to test this. self.write_sdmf_share_to_server("si1") - mr = MDMFSlotReadProxy(self.rref, "si1", 0) + mr = MDMFSlotReadProxy(self.storage_server, "si1", 0) d = mr.is_sdmf() d.addCallback(lambda issdmf: self.failUnless(issdmf)) @@ -2342,7 +2343,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): # The slot read proxy should, naturally, know how to tell us # about data in the SDMF format self.write_sdmf_share_to_server("si1") - mr = MDMFSlotReadProxy(self.rref, "si1", 0) + mr = MDMFSlotReadProxy(self.storage_server, "si1", 0) d = defer.succeed(None) d.addCallback(lambda ignored: mr.is_sdmf()) @@ -2413,7 +2414,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): # read more segments than that. The reader should know this and # complain if we try to do that. self.write_sdmf_share_to_server("si1") - mr = MDMFSlotReadProxy(self.rref, "si1", 0) + mr = MDMFSlotReadProxy(self.storage_server, "si1", 0) d = defer.succeed(None) d.addCallback(lambda ignored: mr.is_sdmf()) @@ -2435,7 +2436,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): mdmf_data = self.build_test_mdmf_share() self.write_test_share_to_server("si1") def _make_mr(ignored, length): - mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length]) + mr = MDMFSlotReadProxy(self.storage_server, "si1", 0, mdmf_data[:length]) return mr d = defer.succeed(None) @@ -2496,7 +2497,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): sdmf_data = self.build_test_sdmf_share() self.write_sdmf_share_to_server("si1") def _make_mr(ignored, length): - mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length]) + mr = MDMFSlotReadProxy(self.storage_server, "si1", 0, sdmf_data[:length]) return mr d = defer.succeed(None) @@ -2562,7 +2563,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): # unrelated to the actual handling of the content of the file. # The reader should behave intelligently in these cases. self.write_test_share_to_server("si1", empty=True) - mr = MDMFSlotReadProxy(self.rref, "si1", 0) + mr = MDMFSlotReadProxy(self.storage_server, "si1", 0) # We should be able to get the encoding parameters, and they # should be correct. d = defer.succeed(None) @@ -2588,7 +2589,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): def test_read_with_empty_sdmf_file(self): self.write_sdmf_share_to_server("si1", empty=True) - mr = MDMFSlotReadProxy(self.rref, "si1", 0) + mr = MDMFSlotReadProxy(self.storage_server, "si1", 0) # We should be able to get the encoding parameters, and they # should be correct d = defer.succeed(None) @@ -2614,7 +2615,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): def test_verinfo_with_sdmf_file(self): self.write_sdmf_share_to_server("si1") - mr = MDMFSlotReadProxy(self.rref, "si1", 0) + mr = MDMFSlotReadProxy(self.storage_server, "si1", 0) # We should be able to get the version information. d = defer.succeed(None) d.addCallback(lambda ignored: @@ -2655,7 +2656,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): def test_verinfo_with_mdmf_file(self): self.write_test_share_to_server("si1") - mr = MDMFSlotReadProxy(self.rref, "si1", 0) + mr = MDMFSlotReadProxy(self.storage_server, "si1", 0) d = defer.succeed(None) d.addCallback(lambda ignored: mr.get_verinfo()) @@ -2701,7 +2702,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): # set the way we want them for the tests below. data = self.build_test_sdmf_share() sdmfr = SDMFSlotWriteProxy(0, - self.rref, + self.storage_server, "si1", self.secrets, 0, 3, 10, 36, 36) @@ -2744,7 +2745,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): # write, we need to set the checkstring correctly. When we # don't, no write should occur. sdmfw = SDMFSlotWriteProxy(0, - self.rref, + self.storage_server, "si1", self.secrets, 1, 3, 10, 36, 36) diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index ef87517e9..5bbb31d84 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -2511,9 +2511,9 @@ class Connections(SystemTestMixin, unittest.TestCase): self.failUnlessEqual(len(nonclients), 1) self.s1 = nonclients[0] # s1 is the server, not c0 - self.s1_rref = self.s1.get_rref() - self.failIfEqual(self.s1_rref, None) - self.failUnless(self.s1.is_connected()) + self.s1_storage_server = self.s1.get_storage_server() + self.assertIsNot(self.s1_storage_server, None) + self.assertTrue(self.s1.is_connected()) d.addCallback(_start) # now shut down the server @@ -2524,9 +2524,9 @@ class Connections(SystemTestMixin, unittest.TestCase): d.addCallback(lambda ign: self.poll(_poll)) def _down(ign): - self.failIf(self.s1.is_connected()) - rref = self.s1.get_rref() - self.failUnless(rref) - self.failUnlessIdentical(rref, self.s1_rref) + self.assertFalse(self.s1.is_connected()) + storage_server = self.s1.get_storage_server() + self.assertIsNot(storage_server, None) + self.assertEqual(storage_server, self.s1_storage_server) d.addCallback(_down) return d