Merge pull request #618 from tahoe-lafs/3048.storage-client-abstraction

Storage client abstraction
This commit is contained in:
meejah 2019-06-11 23:28:23 +00:00 committed by GitHub
commit a07426b2e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 467 additions and 144 deletions

0
newsfragments/3048.minor Normal file
View File

View File

@ -123,9 +123,9 @@ class ControlServer(Referenceable, service.Service):
return results
server = everyone_left.pop(0)
server_name = server.get_longname()
connection = server.get_rref()
storage_server = server.get_storage_server()
start = time.time()
d = connection.callRemote("get_buckets", "\x00"*16)
d = storage_server.get_buckets("\x00" * 16)
def _done(ignored):
stop = time.time()
elapsed = stop - start

View File

@ -496,16 +496,19 @@ class Checker(log.PrefixingLogMixin):
that we want to track and report whether or not each server
responded.)"""
rref = s.get_rref()
storage_server = s.get_storage_server()
lease_seed = s.get_lease_seed()
if self._add_lease:
renew_secret = self._get_renewal_secret(lease_seed)
cancel_secret = self._get_cancel_secret(lease_seed)
d2 = rref.callRemote("add_lease", storageindex,
renew_secret, cancel_secret)
d2 = storage_server.add_lease(
storageindex,
renew_secret,
cancel_secret,
)
d2.addErrback(self._add_lease_failed, s.get_name(), storageindex)
d = rref.callRemote("get_buckets", storageindex)
d = storage_server.get_buckets(storageindex)
def _wrap_results(res):
return (res, True)

View File

@ -139,7 +139,7 @@ class ShareFinder(object):
# TODO: get the timer from a Server object, it knows best
self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT,
self.overdue, req)
d = server.get_rref().callRemote("get_buckets", self._storage_index)
d = server.get_storage_server().get_buckets(self._storage_index)
d.addBoth(incidentally, self._request_retired, req)
d.addCallbacks(self._got_response, self._got_error,
callbackArgs=(server, req, d_ev, time_sent, lp),
@ -221,5 +221,3 @@ class ShareFinder(object):
self.log(format="got error from [%(name)s]",
name=server.get_name(), failure=f,
level=log.UNUSUAL, parent=lp, umid="zUKdCw")

View File

@ -54,7 +54,7 @@ class CHKCheckerAndUEBFetcher(object):
def _get_all_shareholders(self, storage_index):
dl = []
for s in self._peer_getter(storage_index):
d = s.get_rref().callRemote("get_buckets", storage_index)
d = s.get_storage_server().get_buckets(storage_index)
d.addCallbacks(self._got_response, self._got_error,
callbackArgs=(s,))
dl.append(d)

View File

@ -261,20 +261,21 @@ class ServerTracker(object):
return self._server.get_name()
def query(self, sharenums):
rref = self._server.get_rref()
d = rref.callRemote("allocate_buckets",
self.storage_index,
self.renew_secret,
self.cancel_secret,
sharenums,
self.allocated_size,
canary=Referenceable())
storage_server = self._server.get_storage_server()
d = storage_server.allocate_buckets(
self.storage_index,
self.renew_secret,
self.cancel_secret,
sharenums,
self.allocated_size,
canary=Referenceable(),
)
d.addCallback(self._buckets_allocated)
return d
def ask_about_existing_shares(self):
rref = self._server.get_rref()
return rref.callRemote("get_buckets", self.storage_index)
storage_server = self._server.get_storage_server()
return storage_server.get_buckets(self.storage_index)
def _buckets_allocated(self, alreadygot_and_buckets):
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
@ -415,7 +416,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
# field) from getting large shares (for files larger than about
# 12GiB). See #439 for details.
def _get_maxsize(server):
v0 = server.get_rref().version
v0 = server.get_version()
v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
return v1["maximum-immutable-share-size"]

View File

@ -291,6 +291,81 @@ class RIStorageServer(RemoteInterface):
"""
class IStorageServer(Interface):
"""
An object capable of storing shares for a storage client.
"""
def get_version():
"""
:see: ``RIStorageServer.get_version``
"""
def allocate_buckets(
storage_index,
renew_secret,
cancel_secret,
sharenums,
allocated_size,
canary,
):
"""
:see: ``RIStorageServer.allocate_buckets``
"""
def add_lease(
storage_index,
renew_secret,
cancel_secret,
):
"""
:see: ``RIStorageServer.add_lease``
"""
def renew_lease(
storage_index,
renew_secret,
):
"""
:see: ``RIStorageServer.renew_lease``
"""
def get_buckets(
storage_index,
):
"""
:see: ``RIStorageServer.get_buckets``
"""
def slot_readv(
storage_index,
shares,
readv,
):
"""
:see: ``RIStorageServer.slot_readv``
"""
def slot_testv_and_readv_and_writev(
storage_index,
secrets,
tw_vectors,
r_vector,
):
"""
:see: ``RIStorageServer.slot_testv_readv_and_writev``
"""
def advise_corrupt_share(
share_type,
storage_index,
shnum,
reason,
):
"""
:see: ``RIStorageServer.advise_corrupt_share``
"""
class IStorageBucketWriter(Interface):
"""
Objects of this kind live on the client side.
@ -463,13 +538,26 @@ class IServer(IDisplayableServer):
pass
def get_rref():
"""Once a server is connected, I return a RemoteReference.
"""Obsolete. Use ``get_storage_server`` instead.
Once a server is connected, I return a RemoteReference.
Before a server is connected for the first time, I return None.
Note that the rref I return will start producing DeadReferenceErrors
once the connection is lost.
"""
def get_storage_server():
"""
Once a server is connected, I return an ``IStorageServer``.
Before a server is connected for the first time, I return None.
Note that the ``IStorageServer`` I return will start producing
DeadReferenceErrors once the connection is lost.
"""
class IMutableSlotWriter(Interface):
"""
@ -2935,4 +3023,3 @@ class IConnectionStatus(Interface):
connection hint and the handler it is using) to the status string
(pending, connected, refused, or other errors).
""")

View File

@ -230,7 +230,7 @@ class SDMFSlotWriteProxy(object):
"""
def __init__(self,
shnum,
rref, # a remote reference to a storage server
storage_server, # an IStorageServer
storage_index,
secrets, # (write_enabler, renew_secret, cancel_secret)
seqnum, # the sequence number of the mutable file
@ -239,7 +239,7 @@ class SDMFSlotWriteProxy(object):
segment_size,
data_length): # the length of the original file
self.shnum = shnum
self._rref = rref
self._storage_server = storage_server
self._storage_index = storage_index
self._secrets = secrets
self._seqnum = seqnum
@ -541,12 +541,13 @@ class SDMFSlotWriteProxy(object):
tw_vectors = {}
tw_vectors[self.shnum] = (self._testvs, datavs, None)
return self._rref.callRemote("slot_testv_and_readv_and_writev",
self._storage_index,
self._secrets,
tw_vectors,
# TODO is it useful to read something?
self._readvs)
return self._storage_server.slot_testv_and_readv_and_writev(
self._storage_index,
self._secrets,
tw_vectors,
# TODO is it useful to read something?
self._readvs,
)
MDMFHEADER = ">BQ32sBBQQ QQQQQQQQ"
@ -729,7 +730,7 @@ class MDMFSlotWriteProxy(object):
# disruption.
def __init__(self,
shnum,
rref, # a remote reference to a storage server
storage_server, # a remote reference to a storage server
storage_index,
secrets, # (write_enabler, renew_secret, cancel_secret)
seqnum, # the sequence number of the mutable file
@ -738,7 +739,7 @@ class MDMFSlotWriteProxy(object):
segment_size,
data_length): # the length of the original file
self.shnum = shnum
self._rref = rref
self._storage_server = storage_server
self._storage_index = storage_index
self._seqnum = seqnum
self._required_shares = required_shares
@ -1159,11 +1160,12 @@ class MDMFSlotWriteProxy(object):
self._testvs = [(0, len(new_checkstring), "eq", new_checkstring)]
on_success = _first_write
tw_vectors[self.shnum] = (self._testvs, datavs, None)
d = self._rref.callRemote("slot_testv_and_readv_and_writev",
self._storage_index,
self._secrets,
tw_vectors,
self._readv)
d = self._storage_server.slot_testv_and_readv_and_writev(
self._storage_index,
self._secrets,
tw_vectors,
self._readv,
)
def _result(results):
if isinstance(results, failure.Failure) or not results[0]:
# Do nothing; the write was unsuccessful.
@ -1189,13 +1191,13 @@ class MDMFSlotReadProxy(object):
it is valid) to eliminate some of the need to fetch it from servers.
"""
def __init__(self,
rref,
storage_server,
storage_index,
shnum,
data="",
data_is_everything=False):
# Start the initialization process.
self._rref = rref
self._storage_server = storage_server
self._storage_index = storage_index
self.shnum = shnum
@ -1752,10 +1754,11 @@ class MDMFSlotReadProxy(object):
results = {self.shnum: results}
return defer.succeed(results)
else:
return self._rref.callRemote("slot_readv",
self._storage_index,
[self.shnum],
readvs)
return self._storage_server.slot_readv(
self._storage_index,
[self.shnum],
readvs,
)
def is_sdmf(self):

View File

@ -269,7 +269,7 @@ class Publish(object):
secrets = (write_enabler, renew_secret, cancel_secret)
writer = writer_class(shnum,
server.get_rref(),
server.get_storage_server(),
self._storage_index,
secrets,
self._new_seqnum,
@ -471,7 +471,7 @@ class Publish(object):
secrets = (write_enabler, renew_secret, cancel_secret)
writer = writer_class(shnum,
server.get_rref(),
server.get_storage_server(),
self._storage_index,
secrets,
self._new_seqnum,

View File

@ -309,7 +309,7 @@ class Retrieve(object):
if key in self.servermap.proxies:
reader = self.servermap.proxies[key]
else:
reader = MDMFSlotReadProxy(server.get_rref(),
reader = MDMFSlotReadProxy(server.get_storage_server(),
self._storage_index, shnum, None)
reader.server = server
self.readers[shnum] = reader
@ -906,9 +906,13 @@ class Retrieve(object):
def notify_server_corruption(self, server, shnum, reason):
rref = server.get_rref()
rref.callRemoteOnly("advise_corrupt_share",
"mutable", self._storage_index, shnum, reason)
storage_server = server.get_storage_server()
storage_server.advise_corrupt_share(
"mutable",
self._storage_index,
shnum,
reason,
)
def _try_to_validate_privkey(self, enc_privkey, reader, server):

View File

@ -592,7 +592,7 @@ class ServermapUpdater(object):
return d
def _do_read(self, server, storage_index, shnums, readv):
ss = server.get_rref()
ss = server.get_storage_server()
if self._add_lease:
# send an add-lease message in parallel. The results are handled
# separately. This is sent before the slot_readv() so that we can
@ -601,11 +601,14 @@ class ServermapUpdater(object):
# add_lease is synchronous).
renew_secret = self._node.get_renewal_secret(server)
cancel_secret = self._node.get_cancel_secret(server)
d2 = ss.callRemote("add_lease", storage_index,
renew_secret, cancel_secret)
d2 = ss.add_lease(
storage_index,
renew_secret,
cancel_secret,
)
# we ignore success
d2.addErrback(self._add_lease_failed, server, storage_index)
d = ss.callRemote("slot_readv", storage_index, shnums, readv)
d = ss.slot_readv(storage_index, shnums, readv)
return d
@ -638,7 +641,7 @@ class ServermapUpdater(object):
lp = self.log(format="got result from [%(name)s], %(numshares)d shares",
name=server.get_name(),
numshares=len(datavs))
ss = server.get_rref()
ss = server.get_storage_server()
now = time.time()
elapsed = now - started
def _done_processing(ignored=None):
@ -796,9 +799,13 @@ class ServermapUpdater(object):
def notify_server_corruption(self, server, shnum, reason):
rref = server.get_rref()
rref.callRemoteOnly("advise_corrupt_share",
"mutable", self._storage_index, shnum, reason)
ss = server.get_storage_server()
ss.advise_corrupt_share(
"mutable",
self._storage_index,
shnum,
reason,
)
def _got_signature_one_share(self, results, shnum, server, lp):
@ -1220,5 +1227,3 @@ class ServermapUpdater(object):
def _fatal_error(self, f):
self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
self._done_deferred.errback(f)

View File

@ -30,12 +30,18 @@ the foolscap-based server implemented in src/allmydata/storage/*.py .
import re, time, hashlib
import attr
from zope.interface import implementer
from twisted.internet import defer
from twisted.application import service
from foolscap.api import eventually
from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer
from allmydata.interfaces import (
IStorageBroker,
IDisplayableServer,
IServer,
IStorageServer,
)
from allmydata.util import log, base32, connection_status
from allmydata.util.assertutil import precondition
from allmydata.util.observer import ObserverList
@ -111,7 +117,7 @@ class StorageFarmBroker(service.MultiService):
# these two are used in unit tests
def test_add_rref(self, serverid, rref, ann):
s = NativeStorageServer(serverid, ann.copy(), self._tub_maker, {})
s.rref = rref
s._rref = rref
s._is_connected = True
self.servers[serverid] = s
@ -315,7 +321,7 @@ class NativeStorageServer(service.MultiService):
self.last_connect_time = None
self.last_loss_time = None
self.remote_host = None
self.rref = None
self._rref = None
self._is_connected = False
self._reconnector = None
self._trigger_cb = None
@ -344,8 +350,8 @@ class NativeStorageServer(service.MultiService):
def get_permutation_seed(self):
return self._permutation_seed
def get_version(self):
if self.rref:
return self.rref.version
if self._rref:
return self._rref.version
return None
def get_name(self): # keep methodname short
# TODO: decide who adds [] in the short description. It should
@ -367,8 +373,8 @@ class NativeStorageServer(service.MultiService):
def get_connection_status(self):
last_received = None
if self.rref:
last_received = self.rref.getDataLastReceivedAt()
if self._rref:
last_received = self._rref.getDataLastReceivedAt()
return connection_status.from_foolscap_reconnector(self._reconnector,
last_received)
@ -414,18 +420,30 @@ class NativeStorageServer(service.MultiService):
self.last_connect_time = time.time()
self.remote_host = rref.getLocationHints()
self.rref = rref
self._rref = rref
self._is_connected = True
rref.notifyOnDisconnect(self._lost)
def get_rref(self):
return self.rref
return self._rref
def get_storage_server(self):
"""
See ``IServer.get_storage_server``.
"""
if self._rref is None:
return None
# Pass in an accessor for our _rref attribute. The value of the
# attribute may change over time as connections are lost and
# re-established. The _StorageServer should always be able to get the
# most up-to-date value.
return _StorageServer(get_rref=self.get_rref)
def _lost(self):
log.msg(format="lost connection to %(name)s", name=self.get_name(),
facility="tahoe.storage_broker", umid="zbRllw")
self.last_loss_time = time.time()
# self.rref is now stale: all callRemote()s will get a
# self._rref is now stale: all callRemote()s will get a
# DeadReferenceError. We leave the stale reference in place so that
# uploader/downloader code (which received this IServer through
# get_connected_servers() or get_servers_for_psi()) can continue to
@ -443,3 +461,117 @@ class NativeStorageServer(service.MultiService):
class UnknownServerTypeError(Exception):
pass
@implementer(IStorageServer)
@attr.s
class _StorageServer(object):
"""
``_StorageServer`` is a direct pass-through to an ``RIStorageServer`` via
a ``RemoteReference``.
"""
_get_rref = attr.ib()
@property
def _rref(self):
return self._get_rref()
def get_version(self):
return self._rref.callRemote(
"get_version",
)
def allocate_buckets(
self,
storage_index,
renew_secret,
cancel_secret,
sharenums,
allocated_size,
canary,
):
return self._rref.callRemote(
"allocate_buckets",
storage_index,
renew_secret,
cancel_secret,
sharenums,
allocated_size,
canary,
)
def add_lease(
self,
storage_index,
renew_secret,
cancel_secret,
):
return self._rref.callRemote(
"add_lease",
storage_index,
renew_secret,
cancel_secret,
)
def renew_lease(
self,
storage_index,
renew_secret,
):
return self._rref.callRemote(
"renew_lease",
storage_index,
renew_secret,
)
def get_buckets(
self,
storage_index,
):
return self._rref.callRemote(
"get_buckets",
storage_index,
)
def slot_readv(
self,
storage_index,
shares,
readv,
):
return self._rref.callRemote(
"slot_readv",
storage_index,
shares,
readv,
)
def slot_testv_and_readv_and_writev(
self,
storage_index,
secrets,
tw_vectors,
r_vector,
):
return self._rref.callRemote(
"slot_testv_and_readv_and_writev",
storage_index,
secrets,
tw_vectors,
r_vector,
)
def advise_corrupt_share(
self,
share_type,
storage_index,
shnum,
reason,
):
return self._rref.callRemoteOnly(
"advise_corrupt_share",
share_type,
storage_index,
shnum,
reason,
)

View File

@ -11,7 +11,11 @@ from allmydata.mutable.publish import MutableData
from ..test_download import PausingConsumer, PausingAndStoppingConsumer, \
StoppingConsumer, ImmediatelyStoppingConsumer
from .. import common_util as testutil
from .util import FakeStorage, make_nodemaker
from .util import (
FakeStorage,
make_nodemaker_with_peers,
make_peer,
)
class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
# this used to be in Publish, but we removed the limit. Some of
@ -19,8 +23,15 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
# larger than the limit.
OLD_MAX_SEGMENT_SIZE = 3500000
def setUp(self):
self._storage = s = FakeStorage()
self.nodemaker = make_nodemaker(s)
self._storage = FakeStorage()
self._peers = list(
make_peer(self._storage, n)
for n
# 10 is the default for N. We're trying to make enough servers
# here so that each only gets one share.
in range(10)
)
self.nodemaker = make_nodemaker_with_peers(self._peers)
def test_create(self):
d = self.nodemaker.create_mutable_file()
@ -352,16 +363,19 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
def test_mdmf_write_count(self):
# Publishing an MDMF file should only cause one write for each
# share that is to be published. Otherwise, we introduce
# undesirable semantics that are a regression from SDMF
"""
Publishing an MDMF file causes exactly one write for each share that is to
be published. Otherwise, we introduce undesirable semantics that are a
regression from SDMF.
"""
upload = MutableData("MDMF" * 100000) # about 400 KiB
d = self.nodemaker.create_mutable_file(upload,
version=MDMF_VERSION)
def _check_server_write_counts(ignored):
sb = self.nodemaker.storage_broker
for server in sb.servers.itervalues():
self.failUnlessEqual(server.get_rref().queries, 1)
for peer in self._peers:
# There were enough servers for each to only get a single
# share.
self.assertEqual(peer.storage_server.queries, 1)
d.addCallback(_check_server_write_counts)
return d

View File

@ -1,4 +1,5 @@
from six.moves import cStringIO as StringIO
import attr
from twisted.internet import defer, reactor
from foolscap.api import eventually, fireEventually
from allmydata import client
@ -199,21 +200,98 @@ def corrupt(res, s, offset, shnums_to_corrupt=None, offset_offset=0):
dl.addCallback(lambda ignored: res)
return dl
@attr.s
class Peer(object):
peerid = attr.ib()
storage_server = attr.ib()
announcement = attr.ib()
def make_peer(s, i):
"""
Create a "peer" suitable for use with ``make_storagebroker_with_peers`` or
``make_nodemaker_with_peers``.
:param IServer s: The server with which to associate the peers.
:param int i: A unique identifier for this peer within the whole group of
peers to be used. For example, a sequence number. This is used to
generate a unique peer id.
:rtype: ``Peer``
"""
peerid = tagged_hash("peerid", "%d" % i)[:20]
fss = FakeStorageServer(peerid, s)
ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid),
"permutation-seed-base32": base32.b2a(peerid) }
return Peer(peerid=peerid, storage_server=fss, announcement=ann)
def make_storagebroker(s=None, num_peers=10):
"""
Make a ``StorageFarmBroker`` connected to some number of fake storage
servers.
:param IServer s: The server with which to associate the fake storage
servers.
:param int num_peers: The number of fake storage servers to associate with
the broker.
"""
if not s:
s = FakeStorage()
peerids = [tagged_hash("peerid", "%d" % i)[:20]
for i in range(num_peers)]
peers = []
for peer_num in range(num_peers):
peers.append(make_peer(s, peer_num))
return make_storagebroker_with_peers(peers)
def make_storagebroker_with_peers(peers):
"""
Make a ``StorageFarmBroker`` connected to the given storage servers.
:param list peers: The storage servers to associate with the storage
broker.
"""
storage_broker = StorageFarmBroker(True, None)
for peerid in peerids:
fss = FakeStorageServer(peerid, s)
ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid),
"permutation-seed-base32": base32.b2a(peerid) }
storage_broker.test_add_rref(peerid, fss, ann)
for peer in peers:
storage_broker.test_add_rref(
peer.peerid,
peer.storage_server,
peer.announcement,
)
return storage_broker
def make_nodemaker(s=None, num_peers=10, keysize=TEST_RSA_KEY_SIZE):
"""
Make a ``NodeMaker`` connected to some number of fake storage servers.
:param IServer s: The server with which to associate the fake storage
servers.
:param int num_peers: The number of fake storage servers to associate with
the node maker.
"""
storage_broker = make_storagebroker(s, num_peers)
return make_nodemaker_with_storage_broker(storage_broker, keysize)
def make_nodemaker_with_peers(peers, keysize=TEST_RSA_KEY_SIZE):
"""
Make a ``NodeMaker`` connected to the given storage servers.
:param list peers: The storage servers to associate with the node maker.
"""
storage_broker = make_storagebroker_with_peers(peers)
return make_nodemaker_with_storage_broker(storage_broker, keysize)
def make_nodemaker_with_storage_broker(storage_broker, keysize):
"""
Make a ``NodeMaker`` using the given storage broker.
:param StorageFarmBroker peers: The storage broker to use.
"""
sh = client.SecretHolder("lease secret", "convergence secret")
keygen = client.KeyGenerator()
if keysize:
@ -223,6 +301,7 @@ def make_nodemaker(s=None, num_peers=10, keysize=TEST_RSA_KEY_SIZE):
{"k": 3, "n": 10}, SDMF_VERSION, keygen)
return nodemaker
class PublishMixin(object):
def publish_one(self):
# publish a file and create shares, which can then be manipulated
@ -351,4 +430,3 @@ class CheckerMixin(object):
return
self.fail("%s: didn't see expected exception %s in problems %s" %
(where, expected_exception, r.get_share_problems()))

View File

@ -32,6 +32,9 @@ from allmydata.util import fileutil, idlib, hashutil
from allmydata.util.hashutil import permute_server_hash
from allmydata.util.fileutil import abspath_expanduser_unicode
from allmydata.interfaces import IStorageBroker, IServer
from allmydata.storage_client import (
_StorageServer,
)
from .common import (
TEST_RSA_KEY_SIZE,
SameProcessStreamEndpointAssigner,
@ -166,6 +169,10 @@ class NoNetworkServer(object):
return "nickname"
def get_rref(self):
return self.rref
def get_storage_server(self):
if self.rref is None:
return None
return _StorageServer(lambda: self.rref)
def get_version(self):
return self.rref.version

View File

@ -601,8 +601,8 @@ class DownloadTest(_Base, unittest.TestCase):
# that they're old and can't handle reads that overrun the length of
# the share. This exercises a different code path.
for s in self.c0.storage_broker.get_connected_servers():
rref = s.get_rref()
v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
v = s.get_version()
v1 = v["http://allmydata.org/tahoe/protocols/storage/v1"]
v1["tolerates-immutable-read-overrun"] = False
n = self.c0.create_node_from_uri(immutable_uri)
@ -1178,8 +1178,8 @@ class DownloadV2(_Base, unittest.TestCase):
# that they're old and can't handle reads that overrun the length of
# the share. This exercises a different code path.
for s in self.c0.storage_broker.get_connected_servers():
rref = s.get_rref()
v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
v = s.get_version()
v1 = v["http://allmydata.org/tahoe/protocols/storage/v1"]
v1["tolerates-immutable-read-overrun"] = False
# upload a file
@ -1198,8 +1198,8 @@ class DownloadV2(_Base, unittest.TestCase):
self.c0 = self.g.clients[0]
for s in self.c0.storage_broker.get_connected_servers():
rref = s.get_rref()
v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
v = s.get_version()
v1 = v["http://allmydata.org/tahoe/protocols/storage/v1"]
v1["tolerates-immutable-read-overrun"] = False
# upload a file

View File

@ -16,6 +16,9 @@ from allmydata.interfaces import NotEnoughSharesError
from allmydata.immutable.upload import Data
from allmydata.immutable.downloader import finder
from .no_network import (
NoNetworkServer,
)
class MockShareHashTree(object):
def needed_hashes(self):
@ -106,19 +109,6 @@ class TestShareFinder(unittest.TestCase):
eventually(_give_buckets_and_hunger_again)
return d
class MockIServer(object):
def __init__(self, serverid, rref):
self.serverid = serverid
self.rref = rref
def get_serverid(self):
return self.serverid
def get_rref(self):
return self.rref
def get_name(self):
return "name-%s" % self.serverid
def get_version(self):
return self.rref.version
class MockStorageBroker(object):
def __init__(self, servers):
self.servers = servers
@ -136,9 +126,9 @@ class TestShareFinder(unittest.TestCase):
mockserver1 = MockServer({1: MockBuckets(), 2: MockBuckets()})
mockserver2 = MockServer({})
mockserver3 = MockServer({3: MockBuckets()})
servers = [ MockIServer("ms1", mockserver1),
MockIServer("ms2", mockserver2),
MockIServer("ms3", mockserver3), ]
servers = [ NoNetworkServer("ms1", mockserver1),
NoNetworkServer("ms2", mockserver2),
NoNetworkServer("ms3", mockserver3), ]
mockstoragebroker = MockStorageBroker(servers)
mockdownloadstatus = MockDownloadStatus()
mocknode = MockNode(check_reneging=True, check_fetch_failed=True)

View File

@ -32,6 +32,9 @@ from allmydata.test.common import LoggingServiceParent, ShouldFailMixin
from allmydata.test.common_web import WebRenderingMixin
from allmydata.test.no_network import NoNetworkServer
from allmydata.web.storage import StorageStatus, remove_prefix
from allmydata.storage_client import (
_StorageServer,
)
class Marker(object):
pass
@ -162,7 +165,8 @@ class Bucket(unittest.TestCase):
class RemoteBucket(object):
def __init__(self):
def __init__(self, target):
self.target = target
self.read_count = 0
self.write_count = 0
@ -188,8 +192,7 @@ class BucketProxy(unittest.TestCase):
fileutil.make_dirs(os.path.join(basedir, "tmp"))
bw = BucketWriter(self, incoming, final, size, self.make_lease(),
FakeCanary())
rb = RemoteBucket()
rb.target = bw
rb = RemoteBucket(bw)
return bw, rb, final
def make_lease(self):
@ -261,8 +264,7 @@ class BucketProxy(unittest.TestCase):
# now read everything back
def _start_reading(res):
br = BucketReader(self, sharefname)
rb = RemoteBucket()
rb.target = br
rb = RemoteBucket(br)
server = NoNetworkServer("abc", None)
rbp = rbp_class(rb, server, storage_index="")
self.failUnlessIn("to peer", repr(rbp))
@ -1374,8 +1376,8 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
self.sparent = LoggingServiceParent()
self._lease_secret = itertools.count()
self.ss = self.create("MDMFProxies storage test server")
self.rref = RemoteBucket()
self.rref.target = self.ss
self.rref = RemoteBucket(self.ss)
self.storage_server = _StorageServer(lambda: self.rref)
self.secrets = (self.write_enabler("we_secret"),
self.renew_secret("renew_secret"),
self.cancel_secret("cancel_secret"))
@ -1605,7 +1607,6 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
empty=False):
# Some tests need SDMF shares to verify that we can still
# read them. This method writes one, which resembles but is not
assert self.rref
write = self.ss.remote_slot_testv_and_readv_and_writev
share = self.build_test_sdmf_share(empty)
testvs = [(0, 1, "eq", "")]
@ -1618,7 +1619,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
def test_read(self):
self.write_test_share_to_server("si1")
mr = MDMFSlotReadProxy(self.rref, "si1", 0)
mr = MDMFSlotReadProxy(self.storage_server, "si1", 0)
# Check that every method equals what we expect it to.
d = defer.succeed(None)
def _check_block_and_salt(block_and_salt):
@ -1690,7 +1691,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
def test_read_with_different_tail_segment_size(self):
self.write_test_share_to_server("si1", tail_segment=True)
mr = MDMFSlotReadProxy(self.rref, "si1", 0)
mr = MDMFSlotReadProxy(self.storage_server, "si1", 0)
d = mr.get_block_and_salt(5)
def _check_tail_segment(results):
block, salt = results
@ -1702,7 +1703,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
def test_get_block_with_invalid_segnum(self):
self.write_test_share_to_server("si1")
mr = MDMFSlotReadProxy(self.rref, "si1", 0)
mr = MDMFSlotReadProxy(self.storage_server, "si1", 0)
d = defer.succeed(None)
d.addCallback(lambda ignored:
self.shouldFail(LayoutInvalid, "test invalid segnum",
@ -1713,7 +1714,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
def test_get_encoding_parameters_first(self):
self.write_test_share_to_server("si1")
mr = MDMFSlotReadProxy(self.rref, "si1", 0)
mr = MDMFSlotReadProxy(self.storage_server, "si1", 0)
d = mr.get_encoding_parameters()
def _check_encoding_parameters(args):
(k, n, segment_size, datalen) = args
@ -1727,7 +1728,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
def test_get_seqnum_first(self):
self.write_test_share_to_server("si1")
mr = MDMFSlotReadProxy(self.rref, "si1", 0)
mr = MDMFSlotReadProxy(self.storage_server, "si1", 0)
d = mr.get_seqnum()
d.addCallback(lambda seqnum:
self.failUnlessEqual(seqnum, 0))
@ -1736,7 +1737,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
def test_get_root_hash_first(self):
self.write_test_share_to_server("si1")
mr = MDMFSlotReadProxy(self.rref, "si1", 0)
mr = MDMFSlotReadProxy(self.storage_server, "si1", 0)
d = mr.get_root_hash()
d.addCallback(lambda root_hash:
self.failUnlessEqual(root_hash, self.root_hash))
@ -1745,7 +1746,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
def test_get_checkstring_first(self):
self.write_test_share_to_server("si1")
mr = MDMFSlotReadProxy(self.rref, "si1", 0)
mr = MDMFSlotReadProxy(self.storage_server, "si1", 0)
d = mr.get_checkstring()
d.addCallback(lambda checkstring:
self.failUnlessEqual(checkstring, self.checkstring))
@ -2060,7 +2061,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
# size of 6, we know that it has 6 byte segments, which will
# be split into blocks of 2 bytes because our FEC k
# parameter is 3.
mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10,
mw = MDMFSlotWriteProxy(share, self.storage_server, si, self.secrets, 0, 3, 10,
6, datalength)
return mw
@ -2263,7 +2264,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
d.addCallback(lambda ignored:
mw.finish_publishing())
mr = MDMFSlotReadProxy(self.rref, "si1", 0)
mr = MDMFSlotReadProxy(self.storage_server, "si1", 0)
def _check_block_and_salt(block_and_salt):
(block, salt) = block_and_salt
self.failUnlessEqual(block, self.block)
@ -2331,7 +2332,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
# since it will encounter them on the grid. Callers use the
# is_sdmf method to test this.
self.write_sdmf_share_to_server("si1")
mr = MDMFSlotReadProxy(self.rref, "si1", 0)
mr = MDMFSlotReadProxy(self.storage_server, "si1", 0)
d = mr.is_sdmf()
d.addCallback(lambda issdmf:
self.failUnless(issdmf))
@ -2342,7 +2343,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
# The slot read proxy should, naturally, know how to tell us
# about data in the SDMF format
self.write_sdmf_share_to_server("si1")
mr = MDMFSlotReadProxy(self.rref, "si1", 0)
mr = MDMFSlotReadProxy(self.storage_server, "si1", 0)
d = defer.succeed(None)
d.addCallback(lambda ignored:
mr.is_sdmf())
@ -2413,7 +2414,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
# read more segments than that. The reader should know this and
# complain if we try to do that.
self.write_sdmf_share_to_server("si1")
mr = MDMFSlotReadProxy(self.rref, "si1", 0)
mr = MDMFSlotReadProxy(self.storage_server, "si1", 0)
d = defer.succeed(None)
d.addCallback(lambda ignored:
mr.is_sdmf())
@ -2435,7 +2436,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
mdmf_data = self.build_test_mdmf_share()
self.write_test_share_to_server("si1")
def _make_mr(ignored, length):
mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length])
mr = MDMFSlotReadProxy(self.storage_server, "si1", 0, mdmf_data[:length])
return mr
d = defer.succeed(None)
@ -2496,7 +2497,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
sdmf_data = self.build_test_sdmf_share()
self.write_sdmf_share_to_server("si1")
def _make_mr(ignored, length):
mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length])
mr = MDMFSlotReadProxy(self.storage_server, "si1", 0, sdmf_data[:length])
return mr
d = defer.succeed(None)
@ -2562,7 +2563,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
# unrelated to the actual handling of the content of the file.
# The reader should behave intelligently in these cases.
self.write_test_share_to_server("si1", empty=True)
mr = MDMFSlotReadProxy(self.rref, "si1", 0)
mr = MDMFSlotReadProxy(self.storage_server, "si1", 0)
# We should be able to get the encoding parameters, and they
# should be correct.
d = defer.succeed(None)
@ -2588,7 +2589,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
def test_read_with_empty_sdmf_file(self):
self.write_sdmf_share_to_server("si1", empty=True)
mr = MDMFSlotReadProxy(self.rref, "si1", 0)
mr = MDMFSlotReadProxy(self.storage_server, "si1", 0)
# We should be able to get the encoding parameters, and they
# should be correct
d = defer.succeed(None)
@ -2614,7 +2615,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
def test_verinfo_with_sdmf_file(self):
self.write_sdmf_share_to_server("si1")
mr = MDMFSlotReadProxy(self.rref, "si1", 0)
mr = MDMFSlotReadProxy(self.storage_server, "si1", 0)
# We should be able to get the version information.
d = defer.succeed(None)
d.addCallback(lambda ignored:
@ -2655,7 +2656,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
def test_verinfo_with_mdmf_file(self):
self.write_test_share_to_server("si1")
mr = MDMFSlotReadProxy(self.rref, "si1", 0)
mr = MDMFSlotReadProxy(self.storage_server, "si1", 0)
d = defer.succeed(None)
d.addCallback(lambda ignored:
mr.get_verinfo())
@ -2701,7 +2702,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
# set the way we want them for the tests below.
data = self.build_test_sdmf_share()
sdmfr = SDMFSlotWriteProxy(0,
self.rref,
self.storage_server,
"si1",
self.secrets,
0, 3, 10, 36, 36)
@ -2744,7 +2745,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
# write, we need to set the checkstring correctly. When we
# don't, no write should occur.
sdmfw = SDMFSlotWriteProxy(0,
self.rref,
self.storage_server,
"si1",
self.secrets,
1, 3, 10, 36, 36)

View File

@ -2511,9 +2511,9 @@ class Connections(SystemTestMixin, unittest.TestCase):
self.failUnlessEqual(len(nonclients), 1)
self.s1 = nonclients[0] # s1 is the server, not c0
self.s1_rref = self.s1.get_rref()
self.failIfEqual(self.s1_rref, None)
self.failUnless(self.s1.is_connected())
self.s1_storage_server = self.s1.get_storage_server()
self.assertIsNot(self.s1_storage_server, None)
self.assertTrue(self.s1.is_connected())
d.addCallback(_start)
# now shut down the server
@ -2524,9 +2524,9 @@ class Connections(SystemTestMixin, unittest.TestCase):
d.addCallback(lambda ign: self.poll(_poll))
def _down(ign):
self.failIf(self.s1.is_connected())
rref = self.s1.get_rref()
self.failUnless(rref)
self.failUnlessIdentical(rref, self.s1_rref)
self.assertFalse(self.s1.is_connected())
storage_server = self.s1.get_storage_server()
self.assertIsNot(storage_server, None)
self.assertEqual(storage_server, self.s1_storage_server)
d.addCallback(_down)
return d