Merge pull request #1165 from tahoe-lafs/3849-refactor-out-foolscap-in-storage-server

Refactor out Foolscap support from the storage server
This commit is contained in:
Itamar Turner-Trauring 2021-12-16 08:56:11 -05:00 committed by GitHub
commit bc8889f32f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 362 additions and 270 deletions

0
newsfragments/3849.minor Normal file
View File

View File

@ -36,7 +36,7 @@ from twisted.python.filepath import FilePath
import allmydata
from allmydata.crypto import rsa, ed25519
from allmydata.crypto.util import remove_prefix
from allmydata.storage.server import StorageServer
from allmydata.storage.server import StorageServer, FoolscapStorageServer
from allmydata import storage_client
from allmydata.immutable.upload import Uploader
from allmydata.immutable.offloaded import Helper
@ -834,7 +834,7 @@ class _Client(node.Node, pollmixin.PollMixin):
if anonymous_storage_enabled(self.config):
furl_file = self.config.get_private_path("storage.furl").encode(get_filesystem_encoding())
furl = self.tub.registerReference(ss, furlFile=furl_file)
furl = self.tub.registerReference(FoolscapStorageServer(ss), furlFile=furl_file)
announcement["anonymous-storage-FURL"] = furl
enabled_storage_servers = self._enable_storage_servers(

View File

@ -91,4 +91,4 @@ class HTTPServer(object):
@_authorized_route(_app, "/v1/version", methods=["GET"])
def version(self, request, authorization):
return self._cbor(request, self._storage_server.remote_get_version())
return self._cbor(request, self._storage_server.get_version())

View File

@ -352,8 +352,10 @@ class ShareFile(object):
return space_freed
@implementer(RIBucketWriter)
class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
class BucketWriter(object):
"""
Keep track of the process of writing to a ShareFile.
"""
def __init__(self, ss, incominghome, finalhome, max_size, lease_info, clock):
self.ss = ss
@ -373,7 +375,7 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
def allocated_size(self):
return self._max_size
def remote_write(self, offset, data):
def write(self, offset, data):
# Delay the timeout, since we received data:
self._timeout.reset(30 * 60)
start = self._clock.seconds()
@ -397,9 +399,6 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
self.ss.add_latency("write", self._clock.seconds() - start)
self.ss.count("write")
def remote_close(self):
self.close()
def close(self):
precondition(not self.closed)
self._timeout.cancel()
@ -451,13 +450,10 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
facility="tahoe.storage", level=log.UNUSUAL)
self.abort()
def remote_abort(self):
def abort(self):
log.msg("storage: aborting sharefile %s" % self.incominghome,
facility="tahoe.storage", level=log.UNUSUAL)
self.abort()
self.ss.count("abort")
def abort(self):
if self.closed:
return
@ -480,8 +476,28 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
self._timeout.cancel()
@implementer(RIBucketReader)
class BucketReader(Referenceable): # type: ignore # warner/foolscap#78
@implementer(RIBucketWriter)
class FoolscapBucketWriter(Referenceable): # type: ignore # warner/foolscap#78
"""
Foolscap-specific BucketWriter.
"""
def __init__(self, bucket_writer):
self._bucket_writer = bucket_writer
def remote_write(self, offset, data):
return self._bucket_writer.write(offset, data)
def remote_close(self):
return self._bucket_writer.close()
def remote_abort(self):
return self._bucket_writer.abort()
class BucketReader(object):
"""
Manage the process for reading from a ``ShareFile``.
"""
def __init__(self, ss, sharefname, storage_index=None, shnum=None):
self.ss = ss
@ -496,15 +512,31 @@ class BucketReader(Referenceable): # type: ignore # warner/foolscap#78
),
self.shnum)
def remote_read(self, offset, length):
def read(self, offset, length):
start = time.time()
data = self._share_file.read_share_data(offset, length)
self.ss.add_latency("read", time.time() - start)
self.ss.count("read")
return data
def advise_corrupt_share(self, reason):
return self.ss.advise_corrupt_share(b"immutable",
self.storage_index,
self.shnum,
reason)
@implementer(RIBucketReader)
class FoolscapBucketReader(Referenceable): # type: ignore # warner/foolscap#78
"""
Foolscap wrapper for ``BucketReader``
"""
def __init__(self, bucket_reader):
self._bucket_reader = bucket_reader
def remote_read(self, offset, length):
return self._bucket_reader.read(offset, length)
def remote_advise_corrupt_share(self, reason):
return self.ss.remote_advise_corrupt_share(b"immutable",
self.storage_index,
self.shnum,
reason)
return self._bucket_reader.advise_corrupt_share(reason)

View File

@ -12,7 +12,7 @@ if PY2:
# strings. Omit bytes so we don't leak future's custom bytes.
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, pow, round, super, dict, list, object, range, str, max, min # noqa: F401
else:
from typing import Dict
from typing import Dict, Tuple
import os, re
@ -32,7 +32,10 @@ from allmydata.storage.lease import LeaseInfo
from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
create_mutable_sharefile
from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
from allmydata.storage.immutable import (
ShareFile, BucketWriter, BucketReader, FoolscapBucketWriter,
FoolscapBucketReader,
)
from allmydata.storage.crawler import BucketCountingCrawler
from allmydata.storage.expirer import LeaseCheckingCrawler
@ -55,10 +58,10 @@ NUM_RE=re.compile("^[0-9]+$")
DEFAULT_RENEWAL_TIME = 31 * 24 * 60 * 60
@implementer(RIStorageServer, IStatsProducer)
class StorageServer(service.MultiService, Referenceable):
@implementer(IStatsProducer)
class StorageServer(service.MultiService):
"""
A filesystem-based implementation of ``RIStorageServer``.
Implement the business logic for the storage server.
"""
name = 'storage'
# only the tests change this to anything else
@ -125,16 +128,11 @@ class StorageServer(service.MultiService, Referenceable):
self.lease_checker.setServiceParent(self)
self._clock = clock
# Currently being-written Bucketwriters. For Foolscap, lifetime is tied
# to connection: when disconnection happens, the BucketWriters are
# removed. For HTTP, this makes no sense, so there will be
# timeout-based cleanup; see
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3807.
# Map in-progress filesystem path -> BucketWriter:
self._bucket_writers = {} # type: Dict[str,BucketWriter]
# Canaries and disconnect markers for BucketWriters created via Foolscap:
self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,(IRemoteReference, object)]
# These callables will be called with BucketWriters that closed:
self._call_on_bucket_writer_close = []
def stopService(self):
# Cancel any in-progress uploads:
@ -263,7 +261,7 @@ class StorageServer(service.MultiService, Referenceable):
space += bw.allocated_size()
return space
def remote_get_version(self):
def get_version(self):
remaining_space = self.get_available_space()
if remaining_space is None:
# We're on a platform that has no API to get disk stats.
@ -284,7 +282,7 @@ class StorageServer(service.MultiService, Referenceable):
}
return version
def _allocate_buckets(self, storage_index,
def allocate_buckets(self, storage_index,
renew_secret, cancel_secret,
sharenums, allocated_size,
owner_num=0, renew_leases=True):
@ -370,21 +368,6 @@ class StorageServer(service.MultiService, Referenceable):
self.add_latency("allocate", self._clock.seconds() - start)
return set(alreadygot), bucketwriters
def remote_allocate_buckets(self, storage_index,
renew_secret, cancel_secret,
sharenums, allocated_size,
canary, owner_num=0):
"""Foolscap-specific ``allocate_buckets()`` API."""
alreadygot, bucketwriters = self._allocate_buckets(
storage_index, renew_secret, cancel_secret, sharenums, allocated_size,
owner_num=owner_num, renew_leases=True,
)
# Abort BucketWriters if disconnection happens.
for bw in bucketwriters.values():
disconnect_marker = canary.notifyOnDisconnect(bw.disconnected)
self._bucket_writer_disconnect_markers[bw] = (canary, disconnect_marker)
return alreadygot, bucketwriters
def _iter_share_files(self, storage_index):
for shnum, filename in self._get_bucket_shares(storage_index):
with open(filename, 'rb') as f:
@ -400,8 +383,7 @@ class StorageServer(service.MultiService, Referenceable):
continue # non-sharefile
yield sf
def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
owner_num=1):
def add_lease(self, storage_index, renew_secret, cancel_secret, owner_num=1):
start = self._clock.seconds()
self.count("add-lease")
new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
@ -415,7 +397,7 @@ class StorageServer(service.MultiService, Referenceable):
self.add_latency("add-lease", self._clock.seconds() - start)
return None
def remote_renew_lease(self, storage_index, renew_secret):
def renew_lease(self, storage_index, renew_secret):
start = self._clock.seconds()
self.count("renew")
new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
@ -431,9 +413,14 @@ class StorageServer(service.MultiService, Referenceable):
if self.stats_provider:
self.stats_provider.count('storage_server.bytes_added', consumed_size)
del self._bucket_writers[bw.incominghome]
if bw in self._bucket_writer_disconnect_markers:
canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw)
canary.dontNotifyOnDisconnect(disconnect_marker)
for handler in self._call_on_bucket_writer_close:
handler(bw)
def register_bucket_writer_close_handler(self, handler):
"""
The handler will be called with any ``BucketWriter`` that closes.
"""
self._call_on_bucket_writer_close.append(handler)
def _get_bucket_shares(self, storage_index):
"""Return a list of (shnum, pathname) tuples for files that hold
@ -449,7 +436,7 @@ class StorageServer(service.MultiService, Referenceable):
# Commonly caused by there being no buckets at all.
pass
def remote_get_buckets(self, storage_index):
def get_buckets(self, storage_index):
start = self._clock.seconds()
self.count("get")
si_s = si_b2a(storage_index)
@ -641,7 +628,7 @@ class StorageServer(service.MultiService, Referenceable):
secrets,
test_and_write_vectors,
read_vector,
renew_leases,
renew_leases=True,
):
"""
Read data from shares and conditionally write some data to them.
@ -699,18 +686,6 @@ class StorageServer(service.MultiService, Referenceable):
self.add_latency("writev", self._clock.seconds() - start)
return (testv_is_good, read_data)
def remote_slot_testv_and_readv_and_writev(self, storage_index,
secrets,
test_and_write_vectors,
read_vector):
return self.slot_testv_and_readv_and_writev(
storage_index,
secrets,
test_and_write_vectors,
read_vector,
renew_leases=True,
)
def _allocate_slot_share(self, bucketdir, secrets, sharenum,
owner_num=0):
(write_enabler, renew_secret, cancel_secret) = secrets
@ -721,7 +696,7 @@ class StorageServer(service.MultiService, Referenceable):
self)
return share
def remote_slot_readv(self, storage_index, shares, readv):
def slot_readv(self, storage_index, shares, readv):
start = self._clock.seconds()
self.count("readv")
si_s = si_b2a(storage_index)
@ -763,8 +738,8 @@ class StorageServer(service.MultiService, Referenceable):
return True
return False
def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
reason):
def advise_corrupt_share(self, share_type, storage_index, shnum,
reason):
# This is a remote API, I believe, so this has to be bytes for legacy
# protocol backwards compatibility reasons.
assert isinstance(share_type, bytes)
@ -804,6 +779,90 @@ class StorageServer(service.MultiService, Referenceable):
return None
@implementer(RIStorageServer)
class FoolscapStorageServer(Referenceable): # type: ignore # warner/foolscap#78
"""
A filesystem-based implementation of ``RIStorageServer``.
For Foolscap, BucketWriter lifetime is tied to connection: when
disconnection happens, the BucketWriters are removed.
"""
name = 'storage'
def __init__(self, storage_server): # type: (StorageServer) -> None
self._server = storage_server
# Canaries and disconnect markers for BucketWriters created via Foolscap:
self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,Tuple[IRemoteReference, object]]
self._server.register_bucket_writer_close_handler(self._bucket_writer_closed)
def _bucket_writer_closed(self, bw):
if bw in self._bucket_writer_disconnect_markers:
canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw)
canary.dontNotifyOnDisconnect(disconnect_marker)
def remote_get_version(self):
return self._server.get_version()
def remote_allocate_buckets(self, storage_index,
renew_secret, cancel_secret,
sharenums, allocated_size,
canary, owner_num=0):
"""Foolscap-specific ``allocate_buckets()`` API."""
alreadygot, bucketwriters = self._server.allocate_buckets(
storage_index, renew_secret, cancel_secret, sharenums, allocated_size,
owner_num=owner_num, renew_leases=True,
)
# Abort BucketWriters if disconnection happens.
for bw in bucketwriters.values():
disconnect_marker = canary.notifyOnDisconnect(bw.disconnected)
self._bucket_writer_disconnect_markers[bw] = (canary, disconnect_marker)
# Wrap BucketWriters with Foolscap adapter:
bucketwriters = {
k: FoolscapBucketWriter(bw)
for (k, bw) in bucketwriters.items()
}
return alreadygot, bucketwriters
def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
owner_num=1):
return self._server.add_lease(storage_index, renew_secret, cancel_secret)
def remote_renew_lease(self, storage_index, renew_secret):
return self._server.renew_lease(storage_index, renew_secret)
def remote_get_buckets(self, storage_index):
return {
k: FoolscapBucketReader(bucket)
for (k, bucket) in self._server.get_buckets(storage_index).items()
}
def remote_slot_testv_and_readv_and_writev(self, storage_index,
secrets,
test_and_write_vectors,
read_vector):
return self._server.slot_testv_and_readv_and_writev(
storage_index,
secrets,
test_and_write_vectors,
read_vector,
renew_leases=True,
)
def remote_slot_readv(self, storage_index, shares, readv):
return self._server.slot_readv(storage_index, shares, readv)
def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
reason):
return self._server.advise_corrupt_share(share_type, storage_index, shnum,
reason)
CORRUPTION_REPORT_FORMAT = """\
report: Share Corruption
type: {type}

View File

@ -1,8 +1,4 @@
from .common_util import (
FakeCanary,
)
def upload_immutable(storage_server, storage_index, renew_secret, cancel_secret, shares):
"""
Synchronously upload some immutable shares to a ``StorageServer``.
@ -20,17 +16,16 @@ def upload_immutable(storage_server, storage_index, renew_secret, cancel_secret,
:return: ``None``
"""
already, writers = storage_server.remote_allocate_buckets(
already, writers = storage_server.allocate_buckets(
storage_index,
renew_secret,
cancel_secret,
shares.keys(),
len(next(iter(shares.values()))),
canary=FakeCanary(),
)
for shnum, writer in writers.items():
writer.remote_write(0, shares[shnum])
writer.remote_close()
writer.write(0, shares[shnum])
writer.close()
def upload_mutable(storage_server, storage_index, secrets, shares):
@ -57,7 +52,7 @@ def upload_mutable(storage_server, storage_index, secrets, shares):
}
read_vector = []
storage_server.remote_slot_testv_and_readv_and_writev(
storage_server.slot_testv_and_readv_and_writev(
storage_index,
secrets,
test_and_write_vectors,

View File

@ -50,7 +50,9 @@ from allmydata.util.assertutil import _assert
from allmydata import uri as tahoe_uri
from allmydata.client import _Client
from allmydata.storage.server import StorageServer, storage_index_to_dir
from allmydata.storage.server import (
StorageServer, storage_index_to_dir, FoolscapStorageServer,
)
from allmydata.util import fileutil, idlib, hashutil
from allmydata.util.hashutil import permute_server_hash
from allmydata.util.fileutil import abspath_expanduser_unicode
@ -417,7 +419,7 @@ class NoNetworkGrid(service.MultiService):
ss.setServiceParent(middleman)
serverid = ss.my_nodeid
self.servers_by_number[i] = ss
wrapper = wrap_storage_server(ss)
wrapper = wrap_storage_server(FoolscapStorageServer(ss))
self.wrappers_by_id[serverid] = wrapper
self.proxies_by_id[serverid] = NoNetworkServer(serverid, wrapper)
self.rebuild_serverlist()

View File

@ -773,13 +773,13 @@ class AddLease(GridTestMixin, unittest.TestCase):
d.addCallback(_check_cr, "mutable-normal")
really_did_break = []
# now break the server's remote_add_lease call
# now break the server's add_lease call
def _break_add_lease(ign):
def broken_add_lease(*args, **kwargs):
really_did_break.append(1)
raise KeyError("intentional failure, should be ignored")
assert self.g.servers_by_number[0].remote_add_lease
self.g.servers_by_number[0].remote_add_lease = broken_add_lease
assert self.g.servers_by_number[0].add_lease
self.g.servers_by_number[0].add_lease = broken_add_lease
d.addCallback(_break_add_lease)
# and confirm that the files still look healthy

View File

@ -601,7 +601,7 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
"enabled = true\n")
c = yield client.create_client(basedir)
ss = c.getServiceNamed("storage")
verdict = ss.remote_get_version()
verdict = ss.get_version()
self.failUnlessReallyEqual(verdict[b"application-version"],
allmydata.__full_version__.encode("ascii"))
self.failIfEqual(str(allmydata.__version__), "unknown")

View File

@ -27,7 +27,7 @@ from allmydata.util import fileutil, hashutil, pollmixin
from allmydata.storage.server import StorageServer, si_b2a
from allmydata.storage.crawler import ShareCrawler, TimeSliceExceeded
from allmydata.test.common_util import StallMixin, FakeCanary
from allmydata.test.common_util import StallMixin
class BucketEnumeratingCrawler(ShareCrawler):
cpu_slice = 500 # make sure it can complete in a single slice
@ -124,12 +124,12 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
def write(self, i, ss, serverid, tail=0):
si = self.si(i)
si = si[:-1] + bytes(bytearray((tail,)))
had,made = ss.remote_allocate_buckets(si,
self.rs(i, serverid),
self.cs(i, serverid),
set([0]), 99, FakeCanary())
made[0].remote_write(0, b"data")
made[0].remote_close()
had,made = ss.allocate_buckets(si,
self.rs(i, serverid),
self.cs(i, serverid),
set([0]), 99)
made[0].write(0, b"data")
made[0].close()
return si_b2a(si)
def test_immediate(self):

View File

@ -39,6 +39,7 @@ from allmydata.crypto import aes
from allmydata.storage.server import (
si_b2a,
StorageServer,
FoolscapStorageServer,
)
from allmydata.storage_client import StorageFarmBroker
from allmydata.immutable.layout import (
@ -427,7 +428,7 @@ class CHKCheckerAndUEBFetcherTests(SyncTestCase):
"""
storage_index = b"a" * 16
serverid = b"b" * 20
storage = StorageServer(self.mktemp(), serverid)
storage = FoolscapStorageServer(StorageServer(self.mktemp(), serverid))
rref_without_ueb = LocalWrapper(storage, fireNow)
yield write_bad_share(rref_without_ueb, storage_index)
server_without_ueb = NoNetworkServer(serverid, rref_without_ueb)
@ -451,7 +452,7 @@ class CHKCheckerAndUEBFetcherTests(SyncTestCase):
"""
storage_index = b"a" * 16
serverid = b"b" * 20
storage = StorageServer(self.mktemp(), serverid)
storage = FoolscapStorageServer(StorageServer(self.mktemp(), serverid))
rref_with_ueb = LocalWrapper(storage, fireNow)
ueb = {
"needed_shares": 2,
@ -487,7 +488,7 @@ class CHKCheckerAndUEBFetcherTests(SyncTestCase):
in [b"b", b"c"]
)
storages = list(
StorageServer(self.mktemp(), serverid)
FoolscapStorageServer(StorageServer(self.mktemp(), serverid))
for serverid
in serverids
)

View File

@ -73,7 +73,7 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin,
def _copy_share(self, share, to_server):
(sharenum, sharefile) = share
(id, ss) = to_server
shares_dir = os.path.join(ss.original.storedir, "shares")
shares_dir = os.path.join(ss.original._server.storedir, "shares")
si = uri.from_string(self.uri).get_storage_index()
si_dir = os.path.join(shares_dir, storage_index_to_dir(si))
if not os.path.exists(si_dir):
@ -82,7 +82,7 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin,
shutil.copy(sharefile, new_sharefile)
self.shares = self.find_uri_shares(self.uri)
# Make sure that the storage server has the share.
self.failUnless((sharenum, ss.original.my_nodeid, new_sharefile)
self.failUnless((sharenum, ss.original._server.my_nodeid, new_sharefile)
in self.shares)
def _corrupt_share(self, share, corruptor_func):

View File

@ -39,13 +39,18 @@ from hypothesis import given, strategies
import itertools
from allmydata import interfaces
from allmydata.util import fileutil, hashutil, base32
from allmydata.storage.server import StorageServer, DEFAULT_RENEWAL_TIME
from allmydata.storage.server import (
StorageServer, DEFAULT_RENEWAL_TIME, FoolscapStorageServer,
)
from allmydata.storage.shares import get_share_file
from allmydata.storage.mutable import MutableShareFile
from allmydata.storage.mutable_schema import (
ALL_SCHEMAS as ALL_MUTABLE_SCHEMAS,
)
from allmydata.storage.immutable import BucketWriter, BucketReader, ShareFile
from allmydata.storage.immutable import (
BucketWriter, BucketReader, ShareFile, FoolscapBucketWriter,
FoolscapBucketReader,
)
from allmydata.storage.immutable_schema import (
ALL_SCHEMAS as ALL_IMMUTABLE_SCHEMAS,
)
@ -157,25 +162,25 @@ class Bucket(unittest.TestCase):
def test_create(self):
incoming, final = self.make_workdir("test_create")
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock())
bw.remote_write(0, b"a"*25)
bw.remote_write(25, b"b"*25)
bw.remote_write(50, b"c"*25)
bw.remote_write(75, b"d"*7)
bw.remote_close()
bw.write(0, b"a"*25)
bw.write(25, b"b"*25)
bw.write(50, b"c"*25)
bw.write(75, b"d"*7)
bw.close()
def test_readwrite(self):
incoming, final = self.make_workdir("test_readwrite")
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock())
bw.remote_write(0, b"a"*25)
bw.remote_write(25, b"b"*25)
bw.remote_write(50, b"c"*7) # last block may be short
bw.remote_close()
bw.write(0, b"a"*25)
bw.write(25, b"b"*25)
bw.write(50, b"c"*7) # last block may be short
bw.close()
# now read from it
br = BucketReader(self, bw.finalhome)
self.failUnlessEqual(br.remote_read(0, 25), b"a"*25)
self.failUnlessEqual(br.remote_read(25, 25), b"b"*25)
self.failUnlessEqual(br.remote_read(50, 7), b"c"*7)
self.failUnlessEqual(br.read(0, 25), b"a"*25)
self.failUnlessEqual(br.read(25, 25), b"b"*25)
self.failUnlessEqual(br.read(50, 7), b"c"*7)
def test_write_past_size_errors(self):
"""Writing beyond the size of the bucket throws an exception."""
@ -185,7 +190,7 @@ class Bucket(unittest.TestCase):
)
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock())
with self.assertRaises(DataTooLargeError):
bw.remote_write(offset, b"a" * length)
bw.write(offset, b"a" * length)
@given(
maybe_overlapping_offset=strategies.integers(min_value=0, max_value=98),
@ -205,25 +210,25 @@ class Bucket(unittest.TestCase):
self, incoming, final, length, self.make_lease(), Clock()
)
# Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
bw.remote_write(10, expected_data[10:20])
bw.remote_write(30, expected_data[30:40])
bw.remote_write(50, expected_data[50:60])
bw.write(10, expected_data[10:20])
bw.write(30, expected_data[30:40])
bw.write(50, expected_data[50:60])
# Then, an overlapping write but with matching data:
bw.remote_write(
bw.write(
maybe_overlapping_offset,
expected_data[
maybe_overlapping_offset:maybe_overlapping_offset + maybe_overlapping_length
]
)
# Now fill in the holes:
bw.remote_write(0, expected_data[0:10])
bw.remote_write(20, expected_data[20:30])
bw.remote_write(40, expected_data[40:50])
bw.remote_write(60, expected_data[60:])
bw.remote_close()
bw.write(0, expected_data[0:10])
bw.write(20, expected_data[20:30])
bw.write(40, expected_data[40:50])
bw.write(60, expected_data[60:])
bw.close()
br = BucketReader(self, bw.finalhome)
self.assertEqual(br.remote_read(0, length), expected_data)
self.assertEqual(br.read(0, length), expected_data)
@given(
@ -243,21 +248,21 @@ class Bucket(unittest.TestCase):
self, incoming, final, length, self.make_lease(), Clock()
)
# Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
bw.remote_write(10, b"1" * 10)
bw.remote_write(30, b"1" * 10)
bw.remote_write(50, b"1" * 10)
bw.write(10, b"1" * 10)
bw.write(30, b"1" * 10)
bw.write(50, b"1" * 10)
# Then, write something that might overlap with some of them, but
# conflicts. Then fill in holes left by first three writes. Conflict is
# inevitable.
with self.assertRaises(ConflictingWriteError):
bw.remote_write(
bw.write(
maybe_overlapping_offset,
b'X' * min(maybe_overlapping_length, length - maybe_overlapping_offset),
)
bw.remote_write(0, b"1" * 10)
bw.remote_write(20, b"1" * 10)
bw.remote_write(40, b"1" * 10)
bw.remote_write(60, b"1" * 40)
bw.write(0, b"1" * 10)
bw.write(20, b"1" * 10)
bw.write(40, b"1" * 10)
bw.write(60, b"1" * 40)
def test_read_past_end_of_share_data(self):
# test vector for immutable files (hard-coded contents of an immutable share
@ -302,15 +307,15 @@ class Bucket(unittest.TestCase):
# Now read from it.
br = BucketReader(mockstorageserver, final)
self.failUnlessEqual(br.remote_read(0, len(share_data)), share_data)
self.failUnlessEqual(br.read(0, len(share_data)), share_data)
# Read past the end of share data to get the cancel secret.
read_length = len(share_data) + len(ownernumber) + len(renewsecret) + len(cancelsecret)
result_of_read = br.remote_read(0, read_length)
result_of_read = br.read(0, read_length)
self.failUnlessEqual(result_of_read, share_data)
result_of_read = br.remote_read(0, len(share_data)+1)
result_of_read = br.read(0, len(share_data)+1)
self.failUnlessEqual(result_of_read, share_data)
def _assert_timeout_only_after_30_minutes(self, clock, bw):
@ -348,7 +353,7 @@ class Bucket(unittest.TestCase):
clock.advance(29 * 60)
# .. but we receive a write! So that should delay the timeout again to
# another 30 minutes.
bw.remote_write(0, b"hello")
bw.write(0, b"hello")
self._assert_timeout_only_after_30_minutes(clock, bw)
def test_bucket_closing_cancels_timeout(self):
@ -402,7 +407,7 @@ class BucketProxy(unittest.TestCase):
fileutil.make_dirs(basedir)
fileutil.make_dirs(os.path.join(basedir, "tmp"))
bw = BucketWriter(self, incoming, final, size, self.make_lease(), Clock())
rb = RemoteBucket(bw)
rb = RemoteBucket(FoolscapBucketWriter(bw))
return bw, rb, final
def make_lease(self):
@ -474,7 +479,7 @@ class BucketProxy(unittest.TestCase):
# now read everything back
def _start_reading(res):
br = BucketReader(self, sharefname)
rb = RemoteBucket(br)
rb = RemoteBucket(FoolscapBucketReader(br))
server = NoNetworkServer(b"abc", None)
rbp = rbp_class(rb, server, storage_index=b"")
self.failUnlessIn("to peer", repr(rbp))
@ -542,20 +547,20 @@ class Server(unittest.TestCase):
def test_declares_fixed_1528(self):
ss = self.create("test_declares_fixed_1528")
ver = ss.remote_get_version()
ver = ss.get_version()
sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1']
self.failUnless(sv1.get(b'prevents-read-past-end-of-share-data'), sv1)
def test_declares_maximum_share_sizes(self):
ss = self.create("test_declares_maximum_share_sizes")
ver = ss.remote_get_version()
ver = ss.get_version()
sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1']
self.failUnlessIn(b'maximum-immutable-share-size', sv1)
self.failUnlessIn(b'maximum-mutable-share-size', sv1)
def test_declares_available_space(self):
ss = self.create("test_declares_available_space")
ver = ss.remote_get_version()
ver = ss.get_version()
sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1']
self.failUnlessIn(b'available-space', sv1)
@ -566,7 +571,9 @@ class Server(unittest.TestCase):
"""
renew_secret = hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret))
cancel_secret = hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret))
return ss._allocate_buckets(
if isinstance(ss, FoolscapStorageServer):
ss = ss._server
return ss.allocate_buckets(
storage_index,
renew_secret, cancel_secret,
sharenums, size,
@ -590,12 +597,12 @@ class Server(unittest.TestCase):
shnum, bucket = list(writers.items())[0]
# This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
bucket.remote_write(2**32, b"ab")
bucket.remote_close()
bucket.write(2**32, b"ab")
bucket.close()
readers = ss.remote_get_buckets(b"allocate")
readers = ss.get_buckets(b"allocate")
reader = readers[shnum]
self.failUnlessEqual(reader.remote_read(2**32, 2), b"ab")
self.failUnlessEqual(reader.read(2**32, 2), b"ab")
def test_dont_overfill_dirs(self):
"""
@ -606,8 +613,8 @@ class Server(unittest.TestCase):
ss = self.create("test_dont_overfill_dirs")
already, writers = self.allocate(ss, b"storageindex", [0], 10)
for i, wb in writers.items():
wb.remote_write(0, b"%10d" % i)
wb.remote_close()
wb.write(0, b"%10d" % i)
wb.close()
storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
"shares")
children_of_storedir = set(os.listdir(storedir))
@ -616,8 +623,8 @@ class Server(unittest.TestCase):
# chars the same as the first storageindex.
already, writers = self.allocate(ss, b"storageindey", [0], 10)
for i, wb in writers.items():
wb.remote_write(0, b"%10d" % i)
wb.remote_close()
wb.write(0, b"%10d" % i)
wb.close()
storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
"shares")
new_children_of_storedir = set(os.listdir(storedir))
@ -627,8 +634,8 @@ class Server(unittest.TestCase):
ss = self.create("test_remove_incoming")
already, writers = self.allocate(ss, b"vid", list(range(3)), 10)
for i,wb in writers.items():
wb.remote_write(0, b"%10d" % i)
wb.remote_close()
wb.write(0, b"%10d" % i)
wb.close()
incoming_share_dir = wb.incominghome
incoming_bucket_dir = os.path.dirname(incoming_share_dir)
incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
@ -647,32 +654,32 @@ class Server(unittest.TestCase):
# Now abort the writers.
for writer in writers.values():
writer.remote_abort()
writer.abort()
self.failUnlessEqual(ss.allocated_size(), 0)
def test_allocate(self):
ss = self.create("test_allocate")
self.failUnlessEqual(ss.remote_get_buckets(b"allocate"), {})
self.failUnlessEqual(ss.get_buckets(b"allocate"), {})
already,writers = self.allocate(ss, b"allocate", [0,1,2], 75)
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
# while the buckets are open, they should not count as readable
self.failUnlessEqual(ss.remote_get_buckets(b"allocate"), {})
self.failUnlessEqual(ss.get_buckets(b"allocate"), {})
# close the buckets
for i,wb in writers.items():
wb.remote_write(0, b"%25d" % i)
wb.remote_close()
wb.write(0, b"%25d" % i)
wb.close()
# aborting a bucket that was already closed is a no-op
wb.remote_abort()
wb.abort()
# now they should be readable
b = ss.remote_get_buckets(b"allocate")
b = ss.get_buckets(b"allocate")
self.failUnlessEqual(set(b.keys()), set([0,1,2]))
self.failUnlessEqual(b[0].remote_read(0, 25), b"%25d" % 0)
self.failUnlessEqual(b[0].read(0, 25), b"%25d" % 0)
b_str = str(b[0])
self.failUnlessIn("BucketReader", b_str)
self.failUnlessIn("mfwgy33dmf2g 0", b_str)
@ -693,15 +700,15 @@ class Server(unittest.TestCase):
# aborting the writes should remove the tempfiles
for i,wb in writers2.items():
wb.remote_abort()
wb.abort()
already2,writers2 = self.allocate(ss, b"allocate", [2,3,4,5], 75)
self.failUnlessEqual(already2, set([0,1,2]))
self.failUnlessEqual(set(writers2.keys()), set([5]))
for i,wb in writers2.items():
wb.remote_abort()
wb.abort()
for i,wb in writers.items():
wb.remote_abort()
wb.abort()
def test_allocate_without_lease_renewal(self):
"""
@ -724,8 +731,8 @@ class Server(unittest.TestCase):
ss, storage_index, [0], 1, renew_leases=False,
)
(writer,) = writers.values()
writer.remote_write(0, b"x")
writer.remote_close()
writer.write(0, b"x")
writer.close()
# It should have a lease granted at the current time.
shares = dict(ss._get_bucket_shares(storage_index))
@ -747,8 +754,8 @@ class Server(unittest.TestCase):
ss, storage_index, [1], 1, renew_leases=False,
)
(writer,) = writers.values()
writer.remote_write(0, b"x")
writer.remote_close()
writer.write(0, b"x")
writer.close()
# The first share's lease expiration time is unchanged.
shares = dict(ss._get_bucket_shares(storage_index))
@ -764,8 +771,8 @@ class Server(unittest.TestCase):
def test_bad_container_version(self):
ss = self.create("test_bad_container_version")
a,w = self.allocate(ss, b"si1", [0], 10)
w[0].remote_write(0, b"\xff"*10)
w[0].remote_close()
w[0].write(0, b"\xff"*10)
w[0].close()
fn = os.path.join(ss.sharedir, storage_index_to_dir(b"si1"), "0")
f = open(fn, "rb+")
@ -773,17 +780,17 @@ class Server(unittest.TestCase):
f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
f.close()
ss.remote_get_buckets(b"allocate")
ss.get_buckets(b"allocate")
e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
ss.remote_get_buckets, b"si1")
ss.get_buckets, b"si1")
self.assertEqual(e.filename, fn)
self.assertEqual(e.version, 0)
self.assertIn("had unexpected version 0", str(e))
def test_disconnect(self):
# simulate a disconnection
ss = self.create("test_disconnect")
ss = FoolscapStorageServer(self.create("test_disconnect"))
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
canary = FakeCanary()
@ -831,7 +838,7 @@ class Server(unittest.TestCase):
renew_secret = b"R" * 32
cancel_secret = b"C" * 32
with self.assertRaises(interfaces.NoSpace):
ss.remote_add_lease(storage_index, renew_secret, cancel_secret)
ss.add_lease(storage_index, renew_secret, cancel_secret)
def test_reserved_space_mutable_lease(self):
"""
@ -864,13 +871,13 @@ class Server(unittest.TestCase):
# in the share header. Even if we're out of disk space, on a boring
# enough filesystem we can write these.
for i in range(3):
ss.remote_add_lease(storage_index, next(renew_secrets), cancel_secret)
ss.add_lease(storage_index, next(renew_secrets), cancel_secret)
# Having used all of the space for leases in the header, we would have
# to allocate storage for the next lease. Since there is no space
# available, this must fail instead.
with self.assertRaises(interfaces.NoSpace):
ss.remote_add_lease(storage_index, next(renew_secrets), cancel_secret)
ss.add_lease(storage_index, next(renew_secrets), cancel_secret)
def test_reserved_space(self):
@ -885,7 +892,7 @@ class Server(unittest.TestCase):
}
self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
ss = self.create("test_reserved_space", reserved_space=reserved)
ss = FoolscapStorageServer(self.create("test_reserved_space", reserved_space=reserved))
# 15k available, 10k reserved, leaves 5k for shares
# a newly created and filled share incurs this much overhead, beyond
@ -906,28 +913,28 @@ class Server(unittest.TestCase):
self.failUnlessEqual(len(writers), 3)
# now the StorageServer should have 3000 bytes provisionally
# allocated, allowing only 2000 more to be claimed
self.failUnlessEqual(len(ss._bucket_writers), 3)
self.failUnlessEqual(len(ss._server._bucket_writers), 3)
# allocating 1001-byte shares only leaves room for one
canary2 = FakeCanary()
already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary2)
self.failUnlessEqual(len(writers2), 1)
self.failUnlessEqual(len(ss._bucket_writers), 4)
self.failUnlessEqual(len(ss._server._bucket_writers), 4)
# we abandon the first set, so their provisional allocation should be
# returned
canary.disconnected()
self.failUnlessEqual(len(ss._bucket_writers), 1)
self.failUnlessEqual(len(ss._server._bucket_writers), 1)
# now we have a provisional allocation of 1001 bytes
# and we close the second set, so their provisional allocation should
# become real, long-term allocation, and grows to include the
# overhead.
for bw in writers2.values():
bw.remote_write(0, b"a"*25)
bw.remote_close()
self.failUnlessEqual(len(ss._bucket_writers), 0)
bw.write(0, b"a"*25)
bw.close()
self.failUnlessEqual(len(ss._server._bucket_writers), 0)
# this also changes the amount reported as available by call_get_disk_stats
allocated = 1001 + OVERHEAD + LEASE_SIZE
@ -944,12 +951,12 @@ class Server(unittest.TestCase):
canary=canary3,
)
self.failUnlessEqual(len(writers3), 39)
self.failUnlessEqual(len(ss._bucket_writers), 39)
self.failUnlessEqual(len(ss._server._bucket_writers), 39)
canary3.disconnected()
self.failUnlessEqual(len(ss._bucket_writers), 0)
ss.disownServiceParent()
self.failUnlessEqual(len(ss._server._bucket_writers), 0)
ss._server.disownServiceParent()
del ss
def test_seek(self):
@ -978,24 +985,22 @@ class Server(unittest.TestCase):
Given a StorageServer, create a bucket with 5 shares and return renewal
and cancellation secrets.
"""
canary = FakeCanary()
sharenums = list(range(5))
size = 100
# Creating a bucket also creates a lease:
rs, cs = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
already, writers = ss.remote_allocate_buckets(storage_index, rs, cs,
sharenums, size, canary)
already, writers = ss.allocate_buckets(storage_index, rs, cs,
sharenums, size)
self.failUnlessEqual(len(already), expected_already)
self.failUnlessEqual(len(writers), expected_writers)
for wb in writers.values():
wb.remote_close()
wb.close()
return rs, cs
def test_leases(self):
ss = self.create("test_leases")
canary = FakeCanary()
sharenums = list(range(5))
size = 100
@ -1018,54 +1023,54 @@ class Server(unittest.TestCase):
# and a third lease, using add-lease
rs2a,cs2a = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
ss.remote_add_lease(b"si1", rs2a, cs2a)
ss.add_lease(b"si1", rs2a, cs2a)
(lease1, lease2, lease3) = ss.get_leases(b"si1")
self.assertTrue(lease1.is_renew_secret(rs1))
self.assertTrue(lease2.is_renew_secret(rs2))
self.assertTrue(lease3.is_renew_secret(rs2a))
# add-lease on a missing storage index is silently ignored
self.assertIsNone(ss.remote_add_lease(b"si18", b"", b""))
self.assertIsNone(ss.add_lease(b"si18", b"", b""))
# check that si0 is readable
readers = ss.remote_get_buckets(b"si0")
readers = ss.get_buckets(b"si0")
self.failUnlessEqual(len(readers), 5)
# renew the first lease. Only the proper renew_secret should work
ss.remote_renew_lease(b"si0", rs0)
self.failUnlessRaises(IndexError, ss.remote_renew_lease, b"si0", cs0)
self.failUnlessRaises(IndexError, ss.remote_renew_lease, b"si0", rs1)
ss.renew_lease(b"si0", rs0)
self.failUnlessRaises(IndexError, ss.renew_lease, b"si0", cs0)
self.failUnlessRaises(IndexError, ss.renew_lease, b"si0", rs1)
# check that si0 is still readable
readers = ss.remote_get_buckets(b"si0")
readers = ss.get_buckets(b"si0")
self.failUnlessEqual(len(readers), 5)
# There is no such method as remote_cancel_lease for now -- see
# ticket #1528.
self.failIf(hasattr(ss, 'remote_cancel_lease'), \
"ss should not have a 'remote_cancel_lease' method/attribute")
self.failIf(hasattr(FoolscapStorageServer(ss), 'remote_cancel_lease'), \
"ss should not have a 'remote_cancel_lease' method/attribute")
# test overlapping uploads
rs3,cs3 = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
rs4,cs4 = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
already,writers = ss.remote_allocate_buckets(b"si3", rs3, cs3,
sharenums, size, canary)
already,writers = ss.allocate_buckets(b"si3", rs3, cs3,
sharenums, size)
self.failUnlessEqual(len(already), 0)
self.failUnlessEqual(len(writers), 5)
already2,writers2 = ss.remote_allocate_buckets(b"si3", rs4, cs4,
sharenums, size, canary)
already2,writers2 = ss.allocate_buckets(b"si3", rs4, cs4,
sharenums, size)
self.failUnlessEqual(len(already2), 0)
self.failUnlessEqual(len(writers2), 0)
for wb in writers.values():
wb.remote_close()
wb.close()
leases = list(ss.get_leases(b"si3"))
self.failUnlessEqual(len(leases), 1)
already3,writers3 = ss.remote_allocate_buckets(b"si3", rs4, cs4,
sharenums, size, canary)
already3,writers3 = ss.allocate_buckets(b"si3", rs4, cs4,
sharenums, size)
self.failUnlessEqual(len(already3), 5)
self.failUnlessEqual(len(writers3), 0)
@ -1090,7 +1095,7 @@ class Server(unittest.TestCase):
clock.advance(123456)
# Adding a lease with matching renewal secret just renews it:
ss.remote_add_lease(b"si0", renewal_secret, cancel_secret)
ss.add_lease(b"si0", renewal_secret, cancel_secret)
[lease] = ss.get_leases(b"si0")
self.assertEqual(lease.get_expiration_time(), 123 + 123456 + DEFAULT_RENEWAL_TIME)
@ -1126,14 +1131,14 @@ class Server(unittest.TestCase):
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
for i,wb in writers.items():
wb.remote_write(0, b"%25d" % i)
wb.remote_close()
wb.write(0, b"%25d" % i)
wb.close()
# since we discard the data, the shares should be present but sparse.
# Since we write with some seeks, the data we read back will be all
# zeros.
b = ss.remote_get_buckets(b"vid")
b = ss.get_buckets(b"vid")
self.failUnlessEqual(set(b.keys()), set([0,1,2]))
self.failUnlessEqual(b[0].remote_read(0, 25), b"\x00" * 25)
self.failUnlessEqual(b[0].read(0, 25), b"\x00" * 25)
def test_reserved_space_advise_corruption(self):
"""
@ -1148,8 +1153,8 @@ class Server(unittest.TestCase):
ss.setServiceParent(self.sparent)
upload_immutable(ss, b"si0", b"r" * 32, b"c" * 32, {0: b""})
ss.remote_advise_corrupt_share(b"immutable", b"si0", 0,
b"This share smells funny.\n")
ss.advise_corrupt_share(b"immutable", b"si0", 0,
b"This share smells funny.\n")
self.assertEqual(
[],
@ -1163,8 +1168,8 @@ class Server(unittest.TestCase):
si0_s = base32.b2a(b"si0")
upload_immutable(ss, b"si0", b"r" * 32, b"c" * 32, {0: b""})
ss.remote_advise_corrupt_share(b"immutable", b"si0", 0,
b"This share smells funny.\n")
ss.advise_corrupt_share(b"immutable", b"si0", 0,
b"This share smells funny.\n")
reportdir = os.path.join(workdir, "corruption-advisories")
reports = os.listdir(reportdir)
self.failUnlessEqual(len(reports), 1)
@ -1183,12 +1188,12 @@ class Server(unittest.TestCase):
already,writers = self.allocate(ss, b"si1", [1], 75)
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([1]))
writers[1].remote_write(0, b"data")
writers[1].remote_close()
writers[1].write(0, b"data")
writers[1].close()
b = ss.remote_get_buckets(b"si1")
b = ss.get_buckets(b"si1")
self.failUnlessEqual(set(b.keys()), set([1]))
b[1].remote_advise_corrupt_share(b"This share tastes like dust.\n")
b[1].advise_corrupt_share(b"This share tastes like dust.\n")
reports = os.listdir(reportdir)
self.failUnlessEqual(len(reports), 2)
@ -1214,8 +1219,8 @@ class Server(unittest.TestCase):
upload_immutable(ss, b"si0", b"r" * 32, b"c" * 32, {0: b""})
# And try to submit a corruption advisory about a different share
ss.remote_advise_corrupt_share(b"immutable", b"si0", 1,
b"This share smells funny.\n")
ss.advise_corrupt_share(b"immutable", b"si0", 1,
b"This share smells funny.\n")
self.assertEqual(
[],
@ -1266,7 +1271,7 @@ class MutableServer(unittest.TestCase):
write_enabler = self.write_enabler(we_tag)
renew_secret = self.renew_secret(lease_tag)
cancel_secret = self.cancel_secret(lease_tag)
rstaraw = ss.remote_slot_testv_and_readv_and_writev
rstaraw = ss.slot_testv_and_readv_and_writev
testandwritev = dict( [ (shnum, ([], [], None) )
for shnum in sharenums ] )
readv = []
@ -1287,7 +1292,7 @@ class MutableServer(unittest.TestCase):
f.seek(0)
f.write(b"BAD MAGIC")
f.close()
read = ss.remote_slot_readv
read = ss.slot_readv
e = self.failUnlessRaises(UnknownMutableContainerVersionError,
read, b"si1", [0], [(0,10)])
self.assertEqual(e.filename, fn)
@ -1299,8 +1304,8 @@ class MutableServer(unittest.TestCase):
ss = self.create("test_container_size")
self.allocate(ss, b"si1", b"we1", next(self._lease_secret),
set([0,1,2]), 100)
read = ss.remote_slot_readv
rstaraw = ss.remote_slot_testv_and_readv_and_writev
read = ss.slot_readv
rstaraw = ss.slot_testv_and_readv_and_writev
secrets = ( self.write_enabler(b"we1"),
self.renew_secret(b"we1"),
self.cancel_secret(b"we1") )
@ -1380,7 +1385,7 @@ class MutableServer(unittest.TestCase):
# Also see if the server explicitly declares that it supports this
# feature.
ver = ss.remote_get_version()
ver = ss.get_version()
storage_v1_ver = ver[b"http://allmydata.org/tahoe/protocols/storage/v1"]
self.failUnless(storage_v1_ver.get(b"fills-holes-with-zero-bytes"))
@ -1398,7 +1403,7 @@ class MutableServer(unittest.TestCase):
self.allocate(ss, b"si1", b"we1", next(self._lease_secret),
set([0,1,2]), 100)
read = ss.remote_slot_readv
read = ss.slot_readv
self.failUnlessEqual(read(b"si1", [0], [(0, 10)]),
{0: [b""]})
self.failUnlessEqual(read(b"si1", [], [(0, 10)]),
@ -1411,7 +1416,7 @@ class MutableServer(unittest.TestCase):
self.renew_secret(b"we1"),
self.cancel_secret(b"we1") )
data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
write = ss.remote_slot_testv_and_readv_and_writev
write = ss.slot_testv_and_readv_and_writev
answer = write(b"si1", secrets,
{0: ([], [(0,data)], None)},
[])
@ -1421,7 +1426,7 @@ class MutableServer(unittest.TestCase):
{0: [b"00000000001111111111"]})
self.failUnlessEqual(read(b"si1", [0], [(95,10)]),
{0: [b"99999"]})
#self.failUnlessEqual(s0.remote_get_length(), 100)
#self.failUnlessEqual(s0.get_length(), 100)
bad_secrets = (b"bad write enabler", secrets[1], secrets[2])
f = self.failUnlessRaises(BadWriteEnablerError,
@ -1455,8 +1460,8 @@ class MutableServer(unittest.TestCase):
self.renew_secret(b"we1"),
self.cancel_secret(b"we1") )
data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
write = ss.remote_slot_testv_and_readv_and_writev
read = ss.remote_slot_readv
write = ss.slot_testv_and_readv_and_writev
read = ss.slot_readv
def reset():
write(b"si1", secrets,
@ -1500,8 +1505,8 @@ class MutableServer(unittest.TestCase):
self.renew_secret(b"we1"),
self.cancel_secret(b"we1") )
data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
write = ss.remote_slot_testv_and_readv_and_writev
read = ss.remote_slot_readv
write = ss.slot_testv_and_readv_and_writev
read = ss.slot_readv
data = [(b"%d" % i) * 100 for i in range(3)]
rc = write(b"si1", secrets,
{0: ([], [(0,data[0])], None),
@ -1543,8 +1548,8 @@ class MutableServer(unittest.TestCase):
self.renew_secret(b"we1-%d" % n),
self.cancel_secret(b"we1-%d" % n) )
data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
write = ss.remote_slot_testv_and_readv_and_writev
read = ss.remote_slot_readv
write = ss.slot_testv_and_readv_and_writev
read = ss.slot_readv
rc = write(b"si1", secrets(0), {0: ([], [(0,data)], None)}, [])
self.failUnlessEqual(rc, (True, {}))
@ -1560,7 +1565,7 @@ class MutableServer(unittest.TestCase):
self.failUnlessEqual(len(list(s0.get_leases())), 1)
# add-lease on a missing storage index is silently ignored
self.failUnlessEqual(ss.remote_add_lease(b"si18", b"", b""), None)
self.failUnlessEqual(ss.add_lease(b"si18", b"", b""), None)
# re-allocate the slots and use the same secrets, that should update
# the lease
@ -1568,7 +1573,7 @@ class MutableServer(unittest.TestCase):
self.failUnlessEqual(len(list(s0.get_leases())), 1)
# renew it directly
ss.remote_renew_lease(b"si1", secrets(0)[1])
ss.renew_lease(b"si1", secrets(0)[1])
self.failUnlessEqual(len(list(s0.get_leases())), 1)
# now allocate them with a bunch of different secrets, to trigger the
@ -1576,7 +1581,7 @@ class MutableServer(unittest.TestCase):
write(b"si1", secrets(1), {0: ([], [(0,data)], None)}, [])
self.failUnlessEqual(len(list(s0.get_leases())), 2)
secrets2 = secrets(2)
ss.remote_add_lease(b"si1", secrets2[1], secrets2[2])
ss.add_lease(b"si1", secrets2[1], secrets2[2])
self.failUnlessEqual(len(list(s0.get_leases())), 3)
write(b"si1", secrets(3), {0: ([], [(0,data)], None)}, [])
write(b"si1", secrets(4), {0: ([], [(0,data)], None)}, [])
@ -1594,11 +1599,11 @@ class MutableServer(unittest.TestCase):
# read back the leases, make sure they're still intact.
self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
ss.remote_renew_lease(b"si1", secrets(0)[1])
ss.remote_renew_lease(b"si1", secrets(1)[1])
ss.remote_renew_lease(b"si1", secrets(2)[1])
ss.remote_renew_lease(b"si1", secrets(3)[1])
ss.remote_renew_lease(b"si1", secrets(4)[1])
ss.renew_lease(b"si1", secrets(0)[1])
ss.renew_lease(b"si1", secrets(1)[1])
ss.renew_lease(b"si1", secrets(2)[1])
ss.renew_lease(b"si1", secrets(3)[1])
ss.renew_lease(b"si1", secrets(4)[1])
self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
# get a new copy of the leases, with the current timestamps. Reading
# data and failing to renew/cancel leases should leave the timestamps
@ -1609,7 +1614,7 @@ class MutableServer(unittest.TestCase):
# examine the exception thus raised, make sure the old nodeid is
# present, to provide for share migration
e = self.failUnlessRaises(IndexError,
ss.remote_renew_lease, b"si1",
ss.renew_lease, b"si1",
secrets(20)[1])
e_s = str(e)
self.failUnlessIn("Unable to renew non-existent lease", e_s)
@ -1644,7 +1649,7 @@ class MutableServer(unittest.TestCase):
self.renew_secret(b"we1-%d" % n),
self.cancel_secret(b"we1-%d" % n) )
data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
write = ss.remote_slot_testv_and_readv_and_writev
write = ss.slot_testv_and_readv_and_writev
write_enabler, renew_secret, cancel_secret = secrets(0)
rc = write(b"si1", (write_enabler, renew_secret, cancel_secret),
{0: ([], [(0,data)], None)}, [])
@ -1660,7 +1665,7 @@ class MutableServer(unittest.TestCase):
clock.advance(835)
# Adding a lease renews it:
ss.remote_add_lease(b"si1", renew_secret, cancel_secret)
ss.add_lease(b"si1", renew_secret, cancel_secret)
[lease] = s0.get_leases()
self.assertEqual(lease.get_expiration_time(),
235 + 835 + DEFAULT_RENEWAL_TIME)
@ -1669,8 +1674,8 @@ class MutableServer(unittest.TestCase):
ss = self.create("test_remove")
self.allocate(ss, b"si1", b"we1", next(self._lease_secret),
set([0,1,2]), 100)
readv = ss.remote_slot_readv
writev = ss.remote_slot_testv_and_readv_and_writev
readv = ss.slot_readv
writev = ss.slot_testv_and_readv_and_writev
secrets = ( self.write_enabler(b"we1"),
self.renew_secret(b"we1"),
self.cancel_secret(b"we1") )
@ -1774,7 +1779,7 @@ class MutableServer(unittest.TestCase):
# We don't even need to create any shares to exercise this
# functionality. Just go straight to sending a truncate-to-zero
# write.
testv_is_good, read_data = ss.remote_slot_testv_and_readv_and_writev(
testv_is_good, read_data = ss.slot_testv_and_readv_and_writev(
storage_index=storage_index,
secrets=secrets,
test_and_write_vectors={
@ -1792,7 +1797,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
self.sparent = LoggingServiceParent()
self._lease_secret = itertools.count()
self.ss = self.create("MDMFProxies storage test server")
self.rref = RemoteBucket(self.ss)
self.rref = RemoteBucket(FoolscapStorageServer(self.ss))
self.storage_server = _StorageServer(lambda: self.rref)
self.secrets = (self.write_enabler(b"we_secret"),
self.renew_secret(b"renew_secret"),
@ -1959,7 +1964,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
If tail_segment=True, then I will write a share that has a
smaller tail segment than other segments.
"""
write = self.ss.remote_slot_testv_and_readv_and_writev
write = self.ss.slot_testv_and_readv_and_writev
data = self.build_test_mdmf_share(tail_segment, empty)
# Finally, we write the whole thing to the storage server in one
# pass.
@ -2027,7 +2032,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
empty=False):
# Some tests need SDMF shares to verify that we can still
# read them. This method writes one, which resembles but is not
write = self.ss.remote_slot_testv_and_readv_and_writev
write = self.ss.slot_testv_and_readv_and_writev
share = self.build_test_sdmf_share(empty)
testvs = [(0, 1, b"eq", b"")]
tws = {}
@ -2359,7 +2364,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
# blocks.
mw = self._make_new_mw(b"si1", 0)
# Test writing some blocks.
read = self.ss.remote_slot_readv
read = self.ss.slot_readv
expected_private_key_offset = struct.calcsize(MDMFHEADER)
expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
PRIVATE_KEY_SIZE + \
@ -3150,7 +3155,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
d = sdmfr.finish_publishing()
def _then(ignored):
self.failUnlessEqual(self.rref.write_count, 1)
read = self.ss.remote_slot_readv
read = self.ss.slot_readv
self.failUnlessEqual(read(b"si1", [0], [(0, len(data))]),
{0: [data]})
d.addCallback(_then)
@ -3207,7 +3212,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
sdmfw.finish_publishing())
def _then_again(results):
self.failUnless(results[0])
read = self.ss.remote_slot_readv
read = self.ss.slot_readv
self.failUnlessEqual(read(b"si1", [0], [(1, 8)]),
{0: [struct.pack(">Q", 1)]})
self.failUnlessEqual(read(b"si1", [0], [(9, len(data) - 9)]),

View File

@ -74,7 +74,7 @@ class HTTPTests(TestCase):
version[b"http://allmydata.org/tahoe/protocols/storage/v1"].pop(
b"maximum-immutable-share-size"
)
expected_version = self.storage_server.remote_get_version()
expected_version = self.storage_server.get_version()
expected_version[b"http://allmydata.org/tahoe/protocols/storage/v1"].pop(
b"available-space"
)

View File

@ -53,7 +53,6 @@ from allmydata.scripts.admin import (
from allmydata.scripts.runner import (
Options,
)
from .common_util import FakeCanary
from .common_web import (
render,
@ -304,28 +303,27 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin):
mutable_si_3, rs3, cs3, we3 = make_mutable(b"\x03" * 16)
rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
sharenums = [0]
canary = FakeCanary()
# note: 'tahoe debug dump-share' will not handle this file, since the
# inner contents are not a valid CHK share
data = b"\xff" * 1000
a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1000, canary)
w[0].remote_write(0, data)
w[0].remote_close()
a,w = ss.allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1000)
w[0].write(0, data)
w[0].close()
a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1000, canary)
w[0].remote_write(0, data)
w[0].remote_close()
ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
a,w = ss.allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1000)
w[0].write(0, data)
w[0].close()
ss.add_lease(immutable_si_1, rs1a, cs1a)
writev = ss.remote_slot_testv_and_readv_and_writev
writev = ss.slot_testv_and_readv_and_writev
writev(mutable_si_2, (we2, rs2, cs2),
{0: ([], [(0,data)], len(data))}, [])
writev(mutable_si_3, (we3, rs3, cs3),
{0: ([], [(0,data)], len(data))}, [])
ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
ss.add_lease(mutable_si_3, rs3a, cs3a)
self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]