diff --git a/newsfragments/3849.minor b/newsfragments/3849.minor new file mode 100644 index 000000000..e69de29bb diff --git a/src/allmydata/client.py b/src/allmydata/client.py index a2f88ebd6..645e157b6 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -36,7 +36,7 @@ from twisted.python.filepath import FilePath import allmydata from allmydata.crypto import rsa, ed25519 from allmydata.crypto.util import remove_prefix -from allmydata.storage.server import StorageServer +from allmydata.storage.server import StorageServer, FoolscapStorageServer from allmydata import storage_client from allmydata.immutable.upload import Uploader from allmydata.immutable.offloaded import Helper @@ -834,7 +834,7 @@ class _Client(node.Node, pollmixin.PollMixin): if anonymous_storage_enabled(self.config): furl_file = self.config.get_private_path("storage.furl").encode(get_filesystem_encoding()) - furl = self.tub.registerReference(ss, furlFile=furl_file) + furl = self.tub.registerReference(FoolscapStorageServer(ss), furlFile=furl_file) announcement["anonymous-storage-FURL"] = furl enabled_storage_servers = self._enable_storage_servers( diff --git a/src/allmydata/storage/http_server.py b/src/allmydata/storage/http_server.py index 327892ecd..6297ef484 100644 --- a/src/allmydata/storage/http_server.py +++ b/src/allmydata/storage/http_server.py @@ -91,4 +91,4 @@ class HTTPServer(object): @_authorized_route(_app, "/v1/version", methods=["GET"]) def version(self, request, authorization): - return self._cbor(request, self._storage_server.remote_get_version()) + return self._cbor(request, self._storage_server.get_version()) diff --git a/src/allmydata/storage/immutable.py b/src/allmydata/storage/immutable.py index b80d4648d..da9aa473f 100644 --- a/src/allmydata/storage/immutable.py +++ b/src/allmydata/storage/immutable.py @@ -352,8 +352,10 @@ class ShareFile(object): return space_freed -@implementer(RIBucketWriter) -class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 +class BucketWriter(object): + """ + Keep track of the process of writing to a ShareFile. + """ def __init__(self, ss, incominghome, finalhome, max_size, lease_info, clock): self.ss = ss @@ -373,7 +375,7 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 def allocated_size(self): return self._max_size - def remote_write(self, offset, data): + def write(self, offset, data): # Delay the timeout, since we received data: self._timeout.reset(30 * 60) start = self._clock.seconds() @@ -397,9 +399,6 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 self.ss.add_latency("write", self._clock.seconds() - start) self.ss.count("write") - def remote_close(self): - self.close() - def close(self): precondition(not self.closed) self._timeout.cancel() @@ -451,13 +450,10 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 facility="tahoe.storage", level=log.UNUSUAL) self.abort() - def remote_abort(self): + def abort(self): log.msg("storage: aborting sharefile %s" % self.incominghome, facility="tahoe.storage", level=log.UNUSUAL) - self.abort() self.ss.count("abort") - - def abort(self): if self.closed: return @@ -480,8 +476,28 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 self._timeout.cancel() -@implementer(RIBucketReader) -class BucketReader(Referenceable): # type: ignore # warner/foolscap#78 +@implementer(RIBucketWriter) +class FoolscapBucketWriter(Referenceable): # type: ignore # warner/foolscap#78 + """ + Foolscap-specific BucketWriter. + """ + def __init__(self, bucket_writer): + self._bucket_writer = bucket_writer + + def remote_write(self, offset, data): + return self._bucket_writer.write(offset, data) + + def remote_close(self): + return self._bucket_writer.close() + + def remote_abort(self): + return self._bucket_writer.abort() + + +class BucketReader(object): + """ + Manage the process for reading from a ``ShareFile``. + """ def __init__(self, ss, sharefname, storage_index=None, shnum=None): self.ss = ss @@ -496,15 +512,31 @@ class BucketReader(Referenceable): # type: ignore # warner/foolscap#78 ), self.shnum) - def remote_read(self, offset, length): + def read(self, offset, length): start = time.time() data = self._share_file.read_share_data(offset, length) self.ss.add_latency("read", time.time() - start) self.ss.count("read") return data + def advise_corrupt_share(self, reason): + return self.ss.advise_corrupt_share(b"immutable", + self.storage_index, + self.shnum, + reason) + + +@implementer(RIBucketReader) +class FoolscapBucketReader(Referenceable): # type: ignore # warner/foolscap#78 + """ + Foolscap wrapper for ``BucketReader`` + """ + + def __init__(self, bucket_reader): + self._bucket_reader = bucket_reader + + def remote_read(self, offset, length): + return self._bucket_reader.read(offset, length) + def remote_advise_corrupt_share(self, reason): - return self.ss.remote_advise_corrupt_share(b"immutable", - self.storage_index, - self.shnum, - reason) + return self._bucket_reader.advise_corrupt_share(reason) diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index 80b337d36..2daf081e4 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -12,7 +12,7 @@ if PY2: # strings. Omit bytes so we don't leak future's custom bytes. from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, pow, round, super, dict, list, object, range, str, max, min # noqa: F401 else: - from typing import Dict + from typing import Dict, Tuple import os, re @@ -32,7 +32,10 @@ from allmydata.storage.lease import LeaseInfo from allmydata.storage.mutable import MutableShareFile, EmptyShare, \ create_mutable_sharefile from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE -from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader +from allmydata.storage.immutable import ( + ShareFile, BucketWriter, BucketReader, FoolscapBucketWriter, + FoolscapBucketReader, +) from allmydata.storage.crawler import BucketCountingCrawler from allmydata.storage.expirer import LeaseCheckingCrawler @@ -55,10 +58,10 @@ NUM_RE=re.compile("^[0-9]+$") DEFAULT_RENEWAL_TIME = 31 * 24 * 60 * 60 -@implementer(RIStorageServer, IStatsProducer) -class StorageServer(service.MultiService, Referenceable): +@implementer(IStatsProducer) +class StorageServer(service.MultiService): """ - A filesystem-based implementation of ``RIStorageServer``. + Implement the business logic for the storage server. """ name = 'storage' # only the tests change this to anything else @@ -125,16 +128,11 @@ class StorageServer(service.MultiService, Referenceable): self.lease_checker.setServiceParent(self) self._clock = clock - # Currently being-written Bucketwriters. For Foolscap, lifetime is tied - # to connection: when disconnection happens, the BucketWriters are - # removed. For HTTP, this makes no sense, so there will be - # timeout-based cleanup; see - # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3807. - # Map in-progress filesystem path -> BucketWriter: self._bucket_writers = {} # type: Dict[str,BucketWriter] - # Canaries and disconnect markers for BucketWriters created via Foolscap: - self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,(IRemoteReference, object)] + + # These callables will be called with BucketWriters that closed: + self._call_on_bucket_writer_close = [] def stopService(self): # Cancel any in-progress uploads: @@ -263,7 +261,7 @@ class StorageServer(service.MultiService, Referenceable): space += bw.allocated_size() return space - def remote_get_version(self): + def get_version(self): remaining_space = self.get_available_space() if remaining_space is None: # We're on a platform that has no API to get disk stats. @@ -284,7 +282,7 @@ class StorageServer(service.MultiService, Referenceable): } return version - def _allocate_buckets(self, storage_index, + def allocate_buckets(self, storage_index, renew_secret, cancel_secret, sharenums, allocated_size, owner_num=0, renew_leases=True): @@ -370,21 +368,6 @@ class StorageServer(service.MultiService, Referenceable): self.add_latency("allocate", self._clock.seconds() - start) return set(alreadygot), bucketwriters - def remote_allocate_buckets(self, storage_index, - renew_secret, cancel_secret, - sharenums, allocated_size, - canary, owner_num=0): - """Foolscap-specific ``allocate_buckets()`` API.""" - alreadygot, bucketwriters = self._allocate_buckets( - storage_index, renew_secret, cancel_secret, sharenums, allocated_size, - owner_num=owner_num, renew_leases=True, - ) - # Abort BucketWriters if disconnection happens. - for bw in bucketwriters.values(): - disconnect_marker = canary.notifyOnDisconnect(bw.disconnected) - self._bucket_writer_disconnect_markers[bw] = (canary, disconnect_marker) - return alreadygot, bucketwriters - def _iter_share_files(self, storage_index): for shnum, filename in self._get_bucket_shares(storage_index): with open(filename, 'rb') as f: @@ -400,8 +383,7 @@ class StorageServer(service.MultiService, Referenceable): continue # non-sharefile yield sf - def remote_add_lease(self, storage_index, renew_secret, cancel_secret, - owner_num=1): + def add_lease(self, storage_index, renew_secret, cancel_secret, owner_num=1): start = self._clock.seconds() self.count("add-lease") new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME @@ -415,7 +397,7 @@ class StorageServer(service.MultiService, Referenceable): self.add_latency("add-lease", self._clock.seconds() - start) return None - def remote_renew_lease(self, storage_index, renew_secret): + def renew_lease(self, storage_index, renew_secret): start = self._clock.seconds() self.count("renew") new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME @@ -431,9 +413,14 @@ class StorageServer(service.MultiService, Referenceable): if self.stats_provider: self.stats_provider.count('storage_server.bytes_added', consumed_size) del self._bucket_writers[bw.incominghome] - if bw in self._bucket_writer_disconnect_markers: - canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw) - canary.dontNotifyOnDisconnect(disconnect_marker) + for handler in self._call_on_bucket_writer_close: + handler(bw) + + def register_bucket_writer_close_handler(self, handler): + """ + The handler will be called with any ``BucketWriter`` that closes. + """ + self._call_on_bucket_writer_close.append(handler) def _get_bucket_shares(self, storage_index): """Return a list of (shnum, pathname) tuples for files that hold @@ -449,7 +436,7 @@ class StorageServer(service.MultiService, Referenceable): # Commonly caused by there being no buckets at all. pass - def remote_get_buckets(self, storage_index): + def get_buckets(self, storage_index): start = self._clock.seconds() self.count("get") si_s = si_b2a(storage_index) @@ -641,7 +628,7 @@ class StorageServer(service.MultiService, Referenceable): secrets, test_and_write_vectors, read_vector, - renew_leases, + renew_leases=True, ): """ Read data from shares and conditionally write some data to them. @@ -699,18 +686,6 @@ class StorageServer(service.MultiService, Referenceable): self.add_latency("writev", self._clock.seconds() - start) return (testv_is_good, read_data) - def remote_slot_testv_and_readv_and_writev(self, storage_index, - secrets, - test_and_write_vectors, - read_vector): - return self.slot_testv_and_readv_and_writev( - storage_index, - secrets, - test_and_write_vectors, - read_vector, - renew_leases=True, - ) - def _allocate_slot_share(self, bucketdir, secrets, sharenum, owner_num=0): (write_enabler, renew_secret, cancel_secret) = secrets @@ -721,7 +696,7 @@ class StorageServer(service.MultiService, Referenceable): self) return share - def remote_slot_readv(self, storage_index, shares, readv): + def slot_readv(self, storage_index, shares, readv): start = self._clock.seconds() self.count("readv") si_s = si_b2a(storage_index) @@ -763,8 +738,8 @@ class StorageServer(service.MultiService, Referenceable): return True return False - def remote_advise_corrupt_share(self, share_type, storage_index, shnum, - reason): + def advise_corrupt_share(self, share_type, storage_index, shnum, + reason): # This is a remote API, I believe, so this has to be bytes for legacy # protocol backwards compatibility reasons. assert isinstance(share_type, bytes) @@ -804,6 +779,90 @@ class StorageServer(service.MultiService, Referenceable): return None + +@implementer(RIStorageServer) +class FoolscapStorageServer(Referenceable): # type: ignore # warner/foolscap#78 + """ + A filesystem-based implementation of ``RIStorageServer``. + + For Foolscap, BucketWriter lifetime is tied to connection: when + disconnection happens, the BucketWriters are removed. + """ + name = 'storage' + + def __init__(self, storage_server): # type: (StorageServer) -> None + self._server = storage_server + + # Canaries and disconnect markers for BucketWriters created via Foolscap: + self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,Tuple[IRemoteReference, object]] + + self._server.register_bucket_writer_close_handler(self._bucket_writer_closed) + + def _bucket_writer_closed(self, bw): + if bw in self._bucket_writer_disconnect_markers: + canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw) + canary.dontNotifyOnDisconnect(disconnect_marker) + + def remote_get_version(self): + return self._server.get_version() + + def remote_allocate_buckets(self, storage_index, + renew_secret, cancel_secret, + sharenums, allocated_size, + canary, owner_num=0): + """Foolscap-specific ``allocate_buckets()`` API.""" + alreadygot, bucketwriters = self._server.allocate_buckets( + storage_index, renew_secret, cancel_secret, sharenums, allocated_size, + owner_num=owner_num, renew_leases=True, + ) + + # Abort BucketWriters if disconnection happens. + for bw in bucketwriters.values(): + disconnect_marker = canary.notifyOnDisconnect(bw.disconnected) + self._bucket_writer_disconnect_markers[bw] = (canary, disconnect_marker) + + # Wrap BucketWriters with Foolscap adapter: + bucketwriters = { + k: FoolscapBucketWriter(bw) + for (k, bw) in bucketwriters.items() + } + + return alreadygot, bucketwriters + + def remote_add_lease(self, storage_index, renew_secret, cancel_secret, + owner_num=1): + return self._server.add_lease(storage_index, renew_secret, cancel_secret) + + def remote_renew_lease(self, storage_index, renew_secret): + return self._server.renew_lease(storage_index, renew_secret) + + def remote_get_buckets(self, storage_index): + return { + k: FoolscapBucketReader(bucket) + for (k, bucket) in self._server.get_buckets(storage_index).items() + } + + def remote_slot_testv_and_readv_and_writev(self, storage_index, + secrets, + test_and_write_vectors, + read_vector): + return self._server.slot_testv_and_readv_and_writev( + storage_index, + secrets, + test_and_write_vectors, + read_vector, + renew_leases=True, + ) + + def remote_slot_readv(self, storage_index, shares, readv): + return self._server.slot_readv(storage_index, shares, readv) + + def remote_advise_corrupt_share(self, share_type, storage_index, shnum, + reason): + return self._server.advise_corrupt_share(share_type, storage_index, shnum, + reason) + + CORRUPTION_REPORT_FORMAT = """\ report: Share Corruption type: {type} diff --git a/src/allmydata/test/common_storage.py b/src/allmydata/test/common_storage.py index 529ebe586..7adcafa43 100644 --- a/src/allmydata/test/common_storage.py +++ b/src/allmydata/test/common_storage.py @@ -1,8 +1,4 @@ -from .common_util import ( - FakeCanary, -) - def upload_immutable(storage_server, storage_index, renew_secret, cancel_secret, shares): """ Synchronously upload some immutable shares to a ``StorageServer``. @@ -20,17 +16,16 @@ def upload_immutable(storage_server, storage_index, renew_secret, cancel_secret, :return: ``None`` """ - already, writers = storage_server.remote_allocate_buckets( + already, writers = storage_server.allocate_buckets( storage_index, renew_secret, cancel_secret, shares.keys(), len(next(iter(shares.values()))), - canary=FakeCanary(), ) for shnum, writer in writers.items(): - writer.remote_write(0, shares[shnum]) - writer.remote_close() + writer.write(0, shares[shnum]) + writer.close() def upload_mutable(storage_server, storage_index, secrets, shares): @@ -57,7 +52,7 @@ def upload_mutable(storage_server, storage_index, secrets, shares): } read_vector = [] - storage_server.remote_slot_testv_and_readv_and_writev( + storage_server.slot_testv_and_readv_and_writev( storage_index, secrets, test_and_write_vectors, diff --git a/src/allmydata/test/no_network.py b/src/allmydata/test/no_network.py index b9fa99005..97cb371e6 100644 --- a/src/allmydata/test/no_network.py +++ b/src/allmydata/test/no_network.py @@ -50,7 +50,9 @@ from allmydata.util.assertutil import _assert from allmydata import uri as tahoe_uri from allmydata.client import _Client -from allmydata.storage.server import StorageServer, storage_index_to_dir +from allmydata.storage.server import ( + StorageServer, storage_index_to_dir, FoolscapStorageServer, +) from allmydata.util import fileutil, idlib, hashutil from allmydata.util.hashutil import permute_server_hash from allmydata.util.fileutil import abspath_expanduser_unicode @@ -417,7 +419,7 @@ class NoNetworkGrid(service.MultiService): ss.setServiceParent(middleman) serverid = ss.my_nodeid self.servers_by_number[i] = ss - wrapper = wrap_storage_server(ss) + wrapper = wrap_storage_server(FoolscapStorageServer(ss)) self.wrappers_by_id[serverid] = wrapper self.proxies_by_id[serverid] = NoNetworkServer(serverid, wrapper) self.rebuild_serverlist() diff --git a/src/allmydata/test/test_checker.py b/src/allmydata/test/test_checker.py index f56ecd089..3d64d4976 100644 --- a/src/allmydata/test/test_checker.py +++ b/src/allmydata/test/test_checker.py @@ -773,13 +773,13 @@ class AddLease(GridTestMixin, unittest.TestCase): d.addCallback(_check_cr, "mutable-normal") really_did_break = [] - # now break the server's remote_add_lease call + # now break the server's add_lease call def _break_add_lease(ign): def broken_add_lease(*args, **kwargs): really_did_break.append(1) raise KeyError("intentional failure, should be ignored") - assert self.g.servers_by_number[0].remote_add_lease - self.g.servers_by_number[0].remote_add_lease = broken_add_lease + assert self.g.servers_by_number[0].add_lease + self.g.servers_by_number[0].add_lease = broken_add_lease d.addCallback(_break_add_lease) # and confirm that the files still look healthy diff --git a/src/allmydata/test/test_client.py b/src/allmydata/test/test_client.py index a2572e735..c65a2fa2c 100644 --- a/src/allmydata/test/test_client.py +++ b/src/allmydata/test/test_client.py @@ -601,7 +601,7 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase): "enabled = true\n") c = yield client.create_client(basedir) ss = c.getServiceNamed("storage") - verdict = ss.remote_get_version() + verdict = ss.get_version() self.failUnlessReallyEqual(verdict[b"application-version"], allmydata.__full_version__.encode("ascii")) self.failIfEqual(str(allmydata.__version__), "unknown") diff --git a/src/allmydata/test/test_crawler.py b/src/allmydata/test/test_crawler.py index a9be90c43..80d732986 100644 --- a/src/allmydata/test/test_crawler.py +++ b/src/allmydata/test/test_crawler.py @@ -27,7 +27,7 @@ from allmydata.util import fileutil, hashutil, pollmixin from allmydata.storage.server import StorageServer, si_b2a from allmydata.storage.crawler import ShareCrawler, TimeSliceExceeded -from allmydata.test.common_util import StallMixin, FakeCanary +from allmydata.test.common_util import StallMixin class BucketEnumeratingCrawler(ShareCrawler): cpu_slice = 500 # make sure it can complete in a single slice @@ -124,12 +124,12 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin): def write(self, i, ss, serverid, tail=0): si = self.si(i) si = si[:-1] + bytes(bytearray((tail,))) - had,made = ss.remote_allocate_buckets(si, - self.rs(i, serverid), - self.cs(i, serverid), - set([0]), 99, FakeCanary()) - made[0].remote_write(0, b"data") - made[0].remote_close() + had,made = ss.allocate_buckets(si, + self.rs(i, serverid), + self.cs(i, serverid), + set([0]), 99) + made[0].write(0, b"data") + made[0].close() return si_b2a(si) def test_immediate(self): diff --git a/src/allmydata/test/test_helper.py b/src/allmydata/test/test_helper.py index 3faffbe0d..933a2b591 100644 --- a/src/allmydata/test/test_helper.py +++ b/src/allmydata/test/test_helper.py @@ -39,6 +39,7 @@ from allmydata.crypto import aes from allmydata.storage.server import ( si_b2a, StorageServer, + FoolscapStorageServer, ) from allmydata.storage_client import StorageFarmBroker from allmydata.immutable.layout import ( @@ -427,7 +428,7 @@ class CHKCheckerAndUEBFetcherTests(SyncTestCase): """ storage_index = b"a" * 16 serverid = b"b" * 20 - storage = StorageServer(self.mktemp(), serverid) + storage = FoolscapStorageServer(StorageServer(self.mktemp(), serverid)) rref_without_ueb = LocalWrapper(storage, fireNow) yield write_bad_share(rref_without_ueb, storage_index) server_without_ueb = NoNetworkServer(serverid, rref_without_ueb) @@ -451,7 +452,7 @@ class CHKCheckerAndUEBFetcherTests(SyncTestCase): """ storage_index = b"a" * 16 serverid = b"b" * 20 - storage = StorageServer(self.mktemp(), serverid) + storage = FoolscapStorageServer(StorageServer(self.mktemp(), serverid)) rref_with_ueb = LocalWrapper(storage, fireNow) ueb = { "needed_shares": 2, @@ -487,7 +488,7 @@ class CHKCheckerAndUEBFetcherTests(SyncTestCase): in [b"b", b"c"] ) storages = list( - StorageServer(self.mktemp(), serverid) + FoolscapStorageServer(StorageServer(self.mktemp(), serverid)) for serverid in serverids ) diff --git a/src/allmydata/test/test_hung_server.py b/src/allmydata/test/test_hung_server.py index 490315500..162b1d79c 100644 --- a/src/allmydata/test/test_hung_server.py +++ b/src/allmydata/test/test_hung_server.py @@ -73,7 +73,7 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin, def _copy_share(self, share, to_server): (sharenum, sharefile) = share (id, ss) = to_server - shares_dir = os.path.join(ss.original.storedir, "shares") + shares_dir = os.path.join(ss.original._server.storedir, "shares") si = uri.from_string(self.uri).get_storage_index() si_dir = os.path.join(shares_dir, storage_index_to_dir(si)) if not os.path.exists(si_dir): @@ -82,7 +82,7 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin, shutil.copy(sharefile, new_sharefile) self.shares = self.find_uri_shares(self.uri) # Make sure that the storage server has the share. - self.failUnless((sharenum, ss.original.my_nodeid, new_sharefile) + self.failUnless((sharenum, ss.original._server.my_nodeid, new_sharefile) in self.shares) def _corrupt_share(self, share, corruptor_func): diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index db62ad06f..bd74a1052 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -39,13 +39,18 @@ from hypothesis import given, strategies import itertools from allmydata import interfaces from allmydata.util import fileutil, hashutil, base32 -from allmydata.storage.server import StorageServer, DEFAULT_RENEWAL_TIME +from allmydata.storage.server import ( + StorageServer, DEFAULT_RENEWAL_TIME, FoolscapStorageServer, +) from allmydata.storage.shares import get_share_file from allmydata.storage.mutable import MutableShareFile from allmydata.storage.mutable_schema import ( ALL_SCHEMAS as ALL_MUTABLE_SCHEMAS, ) -from allmydata.storage.immutable import BucketWriter, BucketReader, ShareFile +from allmydata.storage.immutable import ( + BucketWriter, BucketReader, ShareFile, FoolscapBucketWriter, + FoolscapBucketReader, +) from allmydata.storage.immutable_schema import ( ALL_SCHEMAS as ALL_IMMUTABLE_SCHEMAS, ) @@ -157,25 +162,25 @@ class Bucket(unittest.TestCase): def test_create(self): incoming, final = self.make_workdir("test_create") bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock()) - bw.remote_write(0, b"a"*25) - bw.remote_write(25, b"b"*25) - bw.remote_write(50, b"c"*25) - bw.remote_write(75, b"d"*7) - bw.remote_close() + bw.write(0, b"a"*25) + bw.write(25, b"b"*25) + bw.write(50, b"c"*25) + bw.write(75, b"d"*7) + bw.close() def test_readwrite(self): incoming, final = self.make_workdir("test_readwrite") bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock()) - bw.remote_write(0, b"a"*25) - bw.remote_write(25, b"b"*25) - bw.remote_write(50, b"c"*7) # last block may be short - bw.remote_close() + bw.write(0, b"a"*25) + bw.write(25, b"b"*25) + bw.write(50, b"c"*7) # last block may be short + bw.close() # now read from it br = BucketReader(self, bw.finalhome) - self.failUnlessEqual(br.remote_read(0, 25), b"a"*25) - self.failUnlessEqual(br.remote_read(25, 25), b"b"*25) - self.failUnlessEqual(br.remote_read(50, 7), b"c"*7) + self.failUnlessEqual(br.read(0, 25), b"a"*25) + self.failUnlessEqual(br.read(25, 25), b"b"*25) + self.failUnlessEqual(br.read(50, 7), b"c"*7) def test_write_past_size_errors(self): """Writing beyond the size of the bucket throws an exception.""" @@ -185,7 +190,7 @@ class Bucket(unittest.TestCase): ) bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock()) with self.assertRaises(DataTooLargeError): - bw.remote_write(offset, b"a" * length) + bw.write(offset, b"a" * length) @given( maybe_overlapping_offset=strategies.integers(min_value=0, max_value=98), @@ -205,25 +210,25 @@ class Bucket(unittest.TestCase): self, incoming, final, length, self.make_lease(), Clock() ) # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes. - bw.remote_write(10, expected_data[10:20]) - bw.remote_write(30, expected_data[30:40]) - bw.remote_write(50, expected_data[50:60]) + bw.write(10, expected_data[10:20]) + bw.write(30, expected_data[30:40]) + bw.write(50, expected_data[50:60]) # Then, an overlapping write but with matching data: - bw.remote_write( + bw.write( maybe_overlapping_offset, expected_data[ maybe_overlapping_offset:maybe_overlapping_offset + maybe_overlapping_length ] ) # Now fill in the holes: - bw.remote_write(0, expected_data[0:10]) - bw.remote_write(20, expected_data[20:30]) - bw.remote_write(40, expected_data[40:50]) - bw.remote_write(60, expected_data[60:]) - bw.remote_close() + bw.write(0, expected_data[0:10]) + bw.write(20, expected_data[20:30]) + bw.write(40, expected_data[40:50]) + bw.write(60, expected_data[60:]) + bw.close() br = BucketReader(self, bw.finalhome) - self.assertEqual(br.remote_read(0, length), expected_data) + self.assertEqual(br.read(0, length), expected_data) @given( @@ -243,21 +248,21 @@ class Bucket(unittest.TestCase): self, incoming, final, length, self.make_lease(), Clock() ) # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes. - bw.remote_write(10, b"1" * 10) - bw.remote_write(30, b"1" * 10) - bw.remote_write(50, b"1" * 10) + bw.write(10, b"1" * 10) + bw.write(30, b"1" * 10) + bw.write(50, b"1" * 10) # Then, write something that might overlap with some of them, but # conflicts. Then fill in holes left by first three writes. Conflict is # inevitable. with self.assertRaises(ConflictingWriteError): - bw.remote_write( + bw.write( maybe_overlapping_offset, b'X' * min(maybe_overlapping_length, length - maybe_overlapping_offset), ) - bw.remote_write(0, b"1" * 10) - bw.remote_write(20, b"1" * 10) - bw.remote_write(40, b"1" * 10) - bw.remote_write(60, b"1" * 40) + bw.write(0, b"1" * 10) + bw.write(20, b"1" * 10) + bw.write(40, b"1" * 10) + bw.write(60, b"1" * 40) def test_read_past_end_of_share_data(self): # test vector for immutable files (hard-coded contents of an immutable share @@ -302,15 +307,15 @@ class Bucket(unittest.TestCase): # Now read from it. br = BucketReader(mockstorageserver, final) - self.failUnlessEqual(br.remote_read(0, len(share_data)), share_data) + self.failUnlessEqual(br.read(0, len(share_data)), share_data) # Read past the end of share data to get the cancel secret. read_length = len(share_data) + len(ownernumber) + len(renewsecret) + len(cancelsecret) - result_of_read = br.remote_read(0, read_length) + result_of_read = br.read(0, read_length) self.failUnlessEqual(result_of_read, share_data) - result_of_read = br.remote_read(0, len(share_data)+1) + result_of_read = br.read(0, len(share_data)+1) self.failUnlessEqual(result_of_read, share_data) def _assert_timeout_only_after_30_minutes(self, clock, bw): @@ -348,7 +353,7 @@ class Bucket(unittest.TestCase): clock.advance(29 * 60) # .. but we receive a write! So that should delay the timeout again to # another 30 minutes. - bw.remote_write(0, b"hello") + bw.write(0, b"hello") self._assert_timeout_only_after_30_minutes(clock, bw) def test_bucket_closing_cancels_timeout(self): @@ -402,7 +407,7 @@ class BucketProxy(unittest.TestCase): fileutil.make_dirs(basedir) fileutil.make_dirs(os.path.join(basedir, "tmp")) bw = BucketWriter(self, incoming, final, size, self.make_lease(), Clock()) - rb = RemoteBucket(bw) + rb = RemoteBucket(FoolscapBucketWriter(bw)) return bw, rb, final def make_lease(self): @@ -474,7 +479,7 @@ class BucketProxy(unittest.TestCase): # now read everything back def _start_reading(res): br = BucketReader(self, sharefname) - rb = RemoteBucket(br) + rb = RemoteBucket(FoolscapBucketReader(br)) server = NoNetworkServer(b"abc", None) rbp = rbp_class(rb, server, storage_index=b"") self.failUnlessIn("to peer", repr(rbp)) @@ -542,20 +547,20 @@ class Server(unittest.TestCase): def test_declares_fixed_1528(self): ss = self.create("test_declares_fixed_1528") - ver = ss.remote_get_version() + ver = ss.get_version() sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1'] self.failUnless(sv1.get(b'prevents-read-past-end-of-share-data'), sv1) def test_declares_maximum_share_sizes(self): ss = self.create("test_declares_maximum_share_sizes") - ver = ss.remote_get_version() + ver = ss.get_version() sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1'] self.failUnlessIn(b'maximum-immutable-share-size', sv1) self.failUnlessIn(b'maximum-mutable-share-size', sv1) def test_declares_available_space(self): ss = self.create("test_declares_available_space") - ver = ss.remote_get_version() + ver = ss.get_version() sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1'] self.failUnlessIn(b'available-space', sv1) @@ -566,7 +571,9 @@ class Server(unittest.TestCase): """ renew_secret = hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)) cancel_secret = hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)) - return ss._allocate_buckets( + if isinstance(ss, FoolscapStorageServer): + ss = ss._server + return ss.allocate_buckets( storage_index, renew_secret, cancel_secret, sharenums, size, @@ -590,12 +597,12 @@ class Server(unittest.TestCase): shnum, bucket = list(writers.items())[0] # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-( - bucket.remote_write(2**32, b"ab") - bucket.remote_close() + bucket.write(2**32, b"ab") + bucket.close() - readers = ss.remote_get_buckets(b"allocate") + readers = ss.get_buckets(b"allocate") reader = readers[shnum] - self.failUnlessEqual(reader.remote_read(2**32, 2), b"ab") + self.failUnlessEqual(reader.read(2**32, 2), b"ab") def test_dont_overfill_dirs(self): """ @@ -606,8 +613,8 @@ class Server(unittest.TestCase): ss = self.create("test_dont_overfill_dirs") already, writers = self.allocate(ss, b"storageindex", [0], 10) for i, wb in writers.items(): - wb.remote_write(0, b"%10d" % i) - wb.remote_close() + wb.write(0, b"%10d" % i) + wb.close() storedir = os.path.join(self.workdir("test_dont_overfill_dirs"), "shares") children_of_storedir = set(os.listdir(storedir)) @@ -616,8 +623,8 @@ class Server(unittest.TestCase): # chars the same as the first storageindex. already, writers = self.allocate(ss, b"storageindey", [0], 10) for i, wb in writers.items(): - wb.remote_write(0, b"%10d" % i) - wb.remote_close() + wb.write(0, b"%10d" % i) + wb.close() storedir = os.path.join(self.workdir("test_dont_overfill_dirs"), "shares") new_children_of_storedir = set(os.listdir(storedir)) @@ -627,8 +634,8 @@ class Server(unittest.TestCase): ss = self.create("test_remove_incoming") already, writers = self.allocate(ss, b"vid", list(range(3)), 10) for i,wb in writers.items(): - wb.remote_write(0, b"%10d" % i) - wb.remote_close() + wb.write(0, b"%10d" % i) + wb.close() incoming_share_dir = wb.incominghome incoming_bucket_dir = os.path.dirname(incoming_share_dir) incoming_prefix_dir = os.path.dirname(incoming_bucket_dir) @@ -647,32 +654,32 @@ class Server(unittest.TestCase): # Now abort the writers. for writer in writers.values(): - writer.remote_abort() + writer.abort() self.failUnlessEqual(ss.allocated_size(), 0) def test_allocate(self): ss = self.create("test_allocate") - self.failUnlessEqual(ss.remote_get_buckets(b"allocate"), {}) + self.failUnlessEqual(ss.get_buckets(b"allocate"), {}) already,writers = self.allocate(ss, b"allocate", [0,1,2], 75) self.failUnlessEqual(already, set()) self.failUnlessEqual(set(writers.keys()), set([0,1,2])) # while the buckets are open, they should not count as readable - self.failUnlessEqual(ss.remote_get_buckets(b"allocate"), {}) + self.failUnlessEqual(ss.get_buckets(b"allocate"), {}) # close the buckets for i,wb in writers.items(): - wb.remote_write(0, b"%25d" % i) - wb.remote_close() + wb.write(0, b"%25d" % i) + wb.close() # aborting a bucket that was already closed is a no-op - wb.remote_abort() + wb.abort() # now they should be readable - b = ss.remote_get_buckets(b"allocate") + b = ss.get_buckets(b"allocate") self.failUnlessEqual(set(b.keys()), set([0,1,2])) - self.failUnlessEqual(b[0].remote_read(0, 25), b"%25d" % 0) + self.failUnlessEqual(b[0].read(0, 25), b"%25d" % 0) b_str = str(b[0]) self.failUnlessIn("BucketReader", b_str) self.failUnlessIn("mfwgy33dmf2g 0", b_str) @@ -693,15 +700,15 @@ class Server(unittest.TestCase): # aborting the writes should remove the tempfiles for i,wb in writers2.items(): - wb.remote_abort() + wb.abort() already2,writers2 = self.allocate(ss, b"allocate", [2,3,4,5], 75) self.failUnlessEqual(already2, set([0,1,2])) self.failUnlessEqual(set(writers2.keys()), set([5])) for i,wb in writers2.items(): - wb.remote_abort() + wb.abort() for i,wb in writers.items(): - wb.remote_abort() + wb.abort() def test_allocate_without_lease_renewal(self): """ @@ -724,8 +731,8 @@ class Server(unittest.TestCase): ss, storage_index, [0], 1, renew_leases=False, ) (writer,) = writers.values() - writer.remote_write(0, b"x") - writer.remote_close() + writer.write(0, b"x") + writer.close() # It should have a lease granted at the current time. shares = dict(ss._get_bucket_shares(storage_index)) @@ -747,8 +754,8 @@ class Server(unittest.TestCase): ss, storage_index, [1], 1, renew_leases=False, ) (writer,) = writers.values() - writer.remote_write(0, b"x") - writer.remote_close() + writer.write(0, b"x") + writer.close() # The first share's lease expiration time is unchanged. shares = dict(ss._get_bucket_shares(storage_index)) @@ -764,8 +771,8 @@ class Server(unittest.TestCase): def test_bad_container_version(self): ss = self.create("test_bad_container_version") a,w = self.allocate(ss, b"si1", [0], 10) - w[0].remote_write(0, b"\xff"*10) - w[0].remote_close() + w[0].write(0, b"\xff"*10) + w[0].close() fn = os.path.join(ss.sharedir, storage_index_to_dir(b"si1"), "0") f = open(fn, "rb+") @@ -773,17 +780,17 @@ class Server(unittest.TestCase): f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1 f.close() - ss.remote_get_buckets(b"allocate") + ss.get_buckets(b"allocate") e = self.failUnlessRaises(UnknownImmutableContainerVersionError, - ss.remote_get_buckets, b"si1") + ss.get_buckets, b"si1") self.assertEqual(e.filename, fn) self.assertEqual(e.version, 0) self.assertIn("had unexpected version 0", str(e)) def test_disconnect(self): # simulate a disconnection - ss = self.create("test_disconnect") + ss = FoolscapStorageServer(self.create("test_disconnect")) renew_secret = b"r" * 32 cancel_secret = b"c" * 32 canary = FakeCanary() @@ -831,7 +838,7 @@ class Server(unittest.TestCase): renew_secret = b"R" * 32 cancel_secret = b"C" * 32 with self.assertRaises(interfaces.NoSpace): - ss.remote_add_lease(storage_index, renew_secret, cancel_secret) + ss.add_lease(storage_index, renew_secret, cancel_secret) def test_reserved_space_mutable_lease(self): """ @@ -864,13 +871,13 @@ class Server(unittest.TestCase): # in the share header. Even if we're out of disk space, on a boring # enough filesystem we can write these. for i in range(3): - ss.remote_add_lease(storage_index, next(renew_secrets), cancel_secret) + ss.add_lease(storage_index, next(renew_secrets), cancel_secret) # Having used all of the space for leases in the header, we would have # to allocate storage for the next lease. Since there is no space # available, this must fail instead. with self.assertRaises(interfaces.NoSpace): - ss.remote_add_lease(storage_index, next(renew_secrets), cancel_secret) + ss.add_lease(storage_index, next(renew_secrets), cancel_secret) def test_reserved_space(self): @@ -885,7 +892,7 @@ class Server(unittest.TestCase): } self.patch(fileutil, 'get_disk_stats', call_get_disk_stats) - ss = self.create("test_reserved_space", reserved_space=reserved) + ss = FoolscapStorageServer(self.create("test_reserved_space", reserved_space=reserved)) # 15k available, 10k reserved, leaves 5k for shares # a newly created and filled share incurs this much overhead, beyond @@ -906,28 +913,28 @@ class Server(unittest.TestCase): self.failUnlessEqual(len(writers), 3) # now the StorageServer should have 3000 bytes provisionally # allocated, allowing only 2000 more to be claimed - self.failUnlessEqual(len(ss._bucket_writers), 3) + self.failUnlessEqual(len(ss._server._bucket_writers), 3) # allocating 1001-byte shares only leaves room for one canary2 = FakeCanary() already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary2) self.failUnlessEqual(len(writers2), 1) - self.failUnlessEqual(len(ss._bucket_writers), 4) + self.failUnlessEqual(len(ss._server._bucket_writers), 4) # we abandon the first set, so their provisional allocation should be # returned canary.disconnected() - self.failUnlessEqual(len(ss._bucket_writers), 1) + self.failUnlessEqual(len(ss._server._bucket_writers), 1) # now we have a provisional allocation of 1001 bytes # and we close the second set, so their provisional allocation should # become real, long-term allocation, and grows to include the # overhead. for bw in writers2.values(): - bw.remote_write(0, b"a"*25) - bw.remote_close() - self.failUnlessEqual(len(ss._bucket_writers), 0) + bw.write(0, b"a"*25) + bw.close() + self.failUnlessEqual(len(ss._server._bucket_writers), 0) # this also changes the amount reported as available by call_get_disk_stats allocated = 1001 + OVERHEAD + LEASE_SIZE @@ -944,12 +951,12 @@ class Server(unittest.TestCase): canary=canary3, ) self.failUnlessEqual(len(writers3), 39) - self.failUnlessEqual(len(ss._bucket_writers), 39) + self.failUnlessEqual(len(ss._server._bucket_writers), 39) canary3.disconnected() - self.failUnlessEqual(len(ss._bucket_writers), 0) - ss.disownServiceParent() + self.failUnlessEqual(len(ss._server._bucket_writers), 0) + ss._server.disownServiceParent() del ss def test_seek(self): @@ -978,24 +985,22 @@ class Server(unittest.TestCase): Given a StorageServer, create a bucket with 5 shares and return renewal and cancellation secrets. """ - canary = FakeCanary() sharenums = list(range(5)) size = 100 # Creating a bucket also creates a lease: rs, cs = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)), hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret))) - already, writers = ss.remote_allocate_buckets(storage_index, rs, cs, - sharenums, size, canary) + already, writers = ss.allocate_buckets(storage_index, rs, cs, + sharenums, size) self.failUnlessEqual(len(already), expected_already) self.failUnlessEqual(len(writers), expected_writers) for wb in writers.values(): - wb.remote_close() + wb.close() return rs, cs def test_leases(self): ss = self.create("test_leases") - canary = FakeCanary() sharenums = list(range(5)) size = 100 @@ -1018,54 +1023,54 @@ class Server(unittest.TestCase): # and a third lease, using add-lease rs2a,cs2a = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)), hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret))) - ss.remote_add_lease(b"si1", rs2a, cs2a) + ss.add_lease(b"si1", rs2a, cs2a) (lease1, lease2, lease3) = ss.get_leases(b"si1") self.assertTrue(lease1.is_renew_secret(rs1)) self.assertTrue(lease2.is_renew_secret(rs2)) self.assertTrue(lease3.is_renew_secret(rs2a)) # add-lease on a missing storage index is silently ignored - self.assertIsNone(ss.remote_add_lease(b"si18", b"", b"")) + self.assertIsNone(ss.add_lease(b"si18", b"", b"")) # check that si0 is readable - readers = ss.remote_get_buckets(b"si0") + readers = ss.get_buckets(b"si0") self.failUnlessEqual(len(readers), 5) # renew the first lease. Only the proper renew_secret should work - ss.remote_renew_lease(b"si0", rs0) - self.failUnlessRaises(IndexError, ss.remote_renew_lease, b"si0", cs0) - self.failUnlessRaises(IndexError, ss.remote_renew_lease, b"si0", rs1) + ss.renew_lease(b"si0", rs0) + self.failUnlessRaises(IndexError, ss.renew_lease, b"si0", cs0) + self.failUnlessRaises(IndexError, ss.renew_lease, b"si0", rs1) # check that si0 is still readable - readers = ss.remote_get_buckets(b"si0") + readers = ss.get_buckets(b"si0") self.failUnlessEqual(len(readers), 5) # There is no such method as remote_cancel_lease for now -- see # ticket #1528. - self.failIf(hasattr(ss, 'remote_cancel_lease'), \ - "ss should not have a 'remote_cancel_lease' method/attribute") + self.failIf(hasattr(FoolscapStorageServer(ss), 'remote_cancel_lease'), \ + "ss should not have a 'remote_cancel_lease' method/attribute") # test overlapping uploads rs3,cs3 = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)), hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret))) rs4,cs4 = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)), hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret))) - already,writers = ss.remote_allocate_buckets(b"si3", rs3, cs3, - sharenums, size, canary) + already,writers = ss.allocate_buckets(b"si3", rs3, cs3, + sharenums, size) self.failUnlessEqual(len(already), 0) self.failUnlessEqual(len(writers), 5) - already2,writers2 = ss.remote_allocate_buckets(b"si3", rs4, cs4, - sharenums, size, canary) + already2,writers2 = ss.allocate_buckets(b"si3", rs4, cs4, + sharenums, size) self.failUnlessEqual(len(already2), 0) self.failUnlessEqual(len(writers2), 0) for wb in writers.values(): - wb.remote_close() + wb.close() leases = list(ss.get_leases(b"si3")) self.failUnlessEqual(len(leases), 1) - already3,writers3 = ss.remote_allocate_buckets(b"si3", rs4, cs4, - sharenums, size, canary) + already3,writers3 = ss.allocate_buckets(b"si3", rs4, cs4, + sharenums, size) self.failUnlessEqual(len(already3), 5) self.failUnlessEqual(len(writers3), 0) @@ -1090,7 +1095,7 @@ class Server(unittest.TestCase): clock.advance(123456) # Adding a lease with matching renewal secret just renews it: - ss.remote_add_lease(b"si0", renewal_secret, cancel_secret) + ss.add_lease(b"si0", renewal_secret, cancel_secret) [lease] = ss.get_leases(b"si0") self.assertEqual(lease.get_expiration_time(), 123 + 123456 + DEFAULT_RENEWAL_TIME) @@ -1126,14 +1131,14 @@ class Server(unittest.TestCase): self.failUnlessEqual(already, set()) self.failUnlessEqual(set(writers.keys()), set([0,1,2])) for i,wb in writers.items(): - wb.remote_write(0, b"%25d" % i) - wb.remote_close() + wb.write(0, b"%25d" % i) + wb.close() # since we discard the data, the shares should be present but sparse. # Since we write with some seeks, the data we read back will be all # zeros. - b = ss.remote_get_buckets(b"vid") + b = ss.get_buckets(b"vid") self.failUnlessEqual(set(b.keys()), set([0,1,2])) - self.failUnlessEqual(b[0].remote_read(0, 25), b"\x00" * 25) + self.failUnlessEqual(b[0].read(0, 25), b"\x00" * 25) def test_reserved_space_advise_corruption(self): """ @@ -1148,8 +1153,8 @@ class Server(unittest.TestCase): ss.setServiceParent(self.sparent) upload_immutable(ss, b"si0", b"r" * 32, b"c" * 32, {0: b""}) - ss.remote_advise_corrupt_share(b"immutable", b"si0", 0, - b"This share smells funny.\n") + ss.advise_corrupt_share(b"immutable", b"si0", 0, + b"This share smells funny.\n") self.assertEqual( [], @@ -1163,8 +1168,8 @@ class Server(unittest.TestCase): si0_s = base32.b2a(b"si0") upload_immutable(ss, b"si0", b"r" * 32, b"c" * 32, {0: b""}) - ss.remote_advise_corrupt_share(b"immutable", b"si0", 0, - b"This share smells funny.\n") + ss.advise_corrupt_share(b"immutable", b"si0", 0, + b"This share smells funny.\n") reportdir = os.path.join(workdir, "corruption-advisories") reports = os.listdir(reportdir) self.failUnlessEqual(len(reports), 1) @@ -1183,12 +1188,12 @@ class Server(unittest.TestCase): already,writers = self.allocate(ss, b"si1", [1], 75) self.failUnlessEqual(already, set()) self.failUnlessEqual(set(writers.keys()), set([1])) - writers[1].remote_write(0, b"data") - writers[1].remote_close() + writers[1].write(0, b"data") + writers[1].close() - b = ss.remote_get_buckets(b"si1") + b = ss.get_buckets(b"si1") self.failUnlessEqual(set(b.keys()), set([1])) - b[1].remote_advise_corrupt_share(b"This share tastes like dust.\n") + b[1].advise_corrupt_share(b"This share tastes like dust.\n") reports = os.listdir(reportdir) self.failUnlessEqual(len(reports), 2) @@ -1214,8 +1219,8 @@ class Server(unittest.TestCase): upload_immutable(ss, b"si0", b"r" * 32, b"c" * 32, {0: b""}) # And try to submit a corruption advisory about a different share - ss.remote_advise_corrupt_share(b"immutable", b"si0", 1, - b"This share smells funny.\n") + ss.advise_corrupt_share(b"immutable", b"si0", 1, + b"This share smells funny.\n") self.assertEqual( [], @@ -1266,7 +1271,7 @@ class MutableServer(unittest.TestCase): write_enabler = self.write_enabler(we_tag) renew_secret = self.renew_secret(lease_tag) cancel_secret = self.cancel_secret(lease_tag) - rstaraw = ss.remote_slot_testv_and_readv_and_writev + rstaraw = ss.slot_testv_and_readv_and_writev testandwritev = dict( [ (shnum, ([], [], None) ) for shnum in sharenums ] ) readv = [] @@ -1287,7 +1292,7 @@ class MutableServer(unittest.TestCase): f.seek(0) f.write(b"BAD MAGIC") f.close() - read = ss.remote_slot_readv + read = ss.slot_readv e = self.failUnlessRaises(UnknownMutableContainerVersionError, read, b"si1", [0], [(0,10)]) self.assertEqual(e.filename, fn) @@ -1299,8 +1304,8 @@ class MutableServer(unittest.TestCase): ss = self.create("test_container_size") self.allocate(ss, b"si1", b"we1", next(self._lease_secret), set([0,1,2]), 100) - read = ss.remote_slot_readv - rstaraw = ss.remote_slot_testv_and_readv_and_writev + read = ss.slot_readv + rstaraw = ss.slot_testv_and_readv_and_writev secrets = ( self.write_enabler(b"we1"), self.renew_secret(b"we1"), self.cancel_secret(b"we1") ) @@ -1380,7 +1385,7 @@ class MutableServer(unittest.TestCase): # Also see if the server explicitly declares that it supports this # feature. - ver = ss.remote_get_version() + ver = ss.get_version() storage_v1_ver = ver[b"http://allmydata.org/tahoe/protocols/storage/v1"] self.failUnless(storage_v1_ver.get(b"fills-holes-with-zero-bytes")) @@ -1398,7 +1403,7 @@ class MutableServer(unittest.TestCase): self.allocate(ss, b"si1", b"we1", next(self._lease_secret), set([0,1,2]), 100) - read = ss.remote_slot_readv + read = ss.slot_readv self.failUnlessEqual(read(b"si1", [0], [(0, 10)]), {0: [b""]}) self.failUnlessEqual(read(b"si1", [], [(0, 10)]), @@ -1411,7 +1416,7 @@ class MutableServer(unittest.TestCase): self.renew_secret(b"we1"), self.cancel_secret(b"we1") ) data = b"".join([ (b"%d" % i) * 10 for i in range(10) ]) - write = ss.remote_slot_testv_and_readv_and_writev + write = ss.slot_testv_and_readv_and_writev answer = write(b"si1", secrets, {0: ([], [(0,data)], None)}, []) @@ -1421,7 +1426,7 @@ class MutableServer(unittest.TestCase): {0: [b"00000000001111111111"]}) self.failUnlessEqual(read(b"si1", [0], [(95,10)]), {0: [b"99999"]}) - #self.failUnlessEqual(s0.remote_get_length(), 100) + #self.failUnlessEqual(s0.get_length(), 100) bad_secrets = (b"bad write enabler", secrets[1], secrets[2]) f = self.failUnlessRaises(BadWriteEnablerError, @@ -1455,8 +1460,8 @@ class MutableServer(unittest.TestCase): self.renew_secret(b"we1"), self.cancel_secret(b"we1") ) data = b"".join([ (b"%d" % i) * 10 for i in range(10) ]) - write = ss.remote_slot_testv_and_readv_and_writev - read = ss.remote_slot_readv + write = ss.slot_testv_and_readv_and_writev + read = ss.slot_readv def reset(): write(b"si1", secrets, @@ -1500,8 +1505,8 @@ class MutableServer(unittest.TestCase): self.renew_secret(b"we1"), self.cancel_secret(b"we1") ) data = b"".join([ (b"%d" % i) * 10 for i in range(10) ]) - write = ss.remote_slot_testv_and_readv_and_writev - read = ss.remote_slot_readv + write = ss.slot_testv_and_readv_and_writev + read = ss.slot_readv data = [(b"%d" % i) * 100 for i in range(3)] rc = write(b"si1", secrets, {0: ([], [(0,data[0])], None), @@ -1543,8 +1548,8 @@ class MutableServer(unittest.TestCase): self.renew_secret(b"we1-%d" % n), self.cancel_secret(b"we1-%d" % n) ) data = b"".join([ (b"%d" % i) * 10 for i in range(10) ]) - write = ss.remote_slot_testv_and_readv_and_writev - read = ss.remote_slot_readv + write = ss.slot_testv_and_readv_and_writev + read = ss.slot_readv rc = write(b"si1", secrets(0), {0: ([], [(0,data)], None)}, []) self.failUnlessEqual(rc, (True, {})) @@ -1560,7 +1565,7 @@ class MutableServer(unittest.TestCase): self.failUnlessEqual(len(list(s0.get_leases())), 1) # add-lease on a missing storage index is silently ignored - self.failUnlessEqual(ss.remote_add_lease(b"si18", b"", b""), None) + self.failUnlessEqual(ss.add_lease(b"si18", b"", b""), None) # re-allocate the slots and use the same secrets, that should update # the lease @@ -1568,7 +1573,7 @@ class MutableServer(unittest.TestCase): self.failUnlessEqual(len(list(s0.get_leases())), 1) # renew it directly - ss.remote_renew_lease(b"si1", secrets(0)[1]) + ss.renew_lease(b"si1", secrets(0)[1]) self.failUnlessEqual(len(list(s0.get_leases())), 1) # now allocate them with a bunch of different secrets, to trigger the @@ -1576,7 +1581,7 @@ class MutableServer(unittest.TestCase): write(b"si1", secrets(1), {0: ([], [(0,data)], None)}, []) self.failUnlessEqual(len(list(s0.get_leases())), 2) secrets2 = secrets(2) - ss.remote_add_lease(b"si1", secrets2[1], secrets2[2]) + ss.add_lease(b"si1", secrets2[1], secrets2[2]) self.failUnlessEqual(len(list(s0.get_leases())), 3) write(b"si1", secrets(3), {0: ([], [(0,data)], None)}, []) write(b"si1", secrets(4), {0: ([], [(0,data)], None)}, []) @@ -1594,11 +1599,11 @@ class MutableServer(unittest.TestCase): # read back the leases, make sure they're still intact. self.compare_leases_without_timestamps(all_leases, list(s0.get_leases())) - ss.remote_renew_lease(b"si1", secrets(0)[1]) - ss.remote_renew_lease(b"si1", secrets(1)[1]) - ss.remote_renew_lease(b"si1", secrets(2)[1]) - ss.remote_renew_lease(b"si1", secrets(3)[1]) - ss.remote_renew_lease(b"si1", secrets(4)[1]) + ss.renew_lease(b"si1", secrets(0)[1]) + ss.renew_lease(b"si1", secrets(1)[1]) + ss.renew_lease(b"si1", secrets(2)[1]) + ss.renew_lease(b"si1", secrets(3)[1]) + ss.renew_lease(b"si1", secrets(4)[1]) self.compare_leases_without_timestamps(all_leases, list(s0.get_leases())) # get a new copy of the leases, with the current timestamps. Reading # data and failing to renew/cancel leases should leave the timestamps @@ -1609,7 +1614,7 @@ class MutableServer(unittest.TestCase): # examine the exception thus raised, make sure the old nodeid is # present, to provide for share migration e = self.failUnlessRaises(IndexError, - ss.remote_renew_lease, b"si1", + ss.renew_lease, b"si1", secrets(20)[1]) e_s = str(e) self.failUnlessIn("Unable to renew non-existent lease", e_s) @@ -1644,7 +1649,7 @@ class MutableServer(unittest.TestCase): self.renew_secret(b"we1-%d" % n), self.cancel_secret(b"we1-%d" % n) ) data = b"".join([ (b"%d" % i) * 10 for i in range(10) ]) - write = ss.remote_slot_testv_and_readv_and_writev + write = ss.slot_testv_and_readv_and_writev write_enabler, renew_secret, cancel_secret = secrets(0) rc = write(b"si1", (write_enabler, renew_secret, cancel_secret), {0: ([], [(0,data)], None)}, []) @@ -1660,7 +1665,7 @@ class MutableServer(unittest.TestCase): clock.advance(835) # Adding a lease renews it: - ss.remote_add_lease(b"si1", renew_secret, cancel_secret) + ss.add_lease(b"si1", renew_secret, cancel_secret) [lease] = s0.get_leases() self.assertEqual(lease.get_expiration_time(), 235 + 835 + DEFAULT_RENEWAL_TIME) @@ -1669,8 +1674,8 @@ class MutableServer(unittest.TestCase): ss = self.create("test_remove") self.allocate(ss, b"si1", b"we1", next(self._lease_secret), set([0,1,2]), 100) - readv = ss.remote_slot_readv - writev = ss.remote_slot_testv_and_readv_and_writev + readv = ss.slot_readv + writev = ss.slot_testv_and_readv_and_writev secrets = ( self.write_enabler(b"we1"), self.renew_secret(b"we1"), self.cancel_secret(b"we1") ) @@ -1774,7 +1779,7 @@ class MutableServer(unittest.TestCase): # We don't even need to create any shares to exercise this # functionality. Just go straight to sending a truncate-to-zero # write. - testv_is_good, read_data = ss.remote_slot_testv_and_readv_and_writev( + testv_is_good, read_data = ss.slot_testv_and_readv_and_writev( storage_index=storage_index, secrets=secrets, test_and_write_vectors={ @@ -1792,7 +1797,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): self.sparent = LoggingServiceParent() self._lease_secret = itertools.count() self.ss = self.create("MDMFProxies storage test server") - self.rref = RemoteBucket(self.ss) + self.rref = RemoteBucket(FoolscapStorageServer(self.ss)) self.storage_server = _StorageServer(lambda: self.rref) self.secrets = (self.write_enabler(b"we_secret"), self.renew_secret(b"renew_secret"), @@ -1959,7 +1964,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): If tail_segment=True, then I will write a share that has a smaller tail segment than other segments. """ - write = self.ss.remote_slot_testv_and_readv_and_writev + write = self.ss.slot_testv_and_readv_and_writev data = self.build_test_mdmf_share(tail_segment, empty) # Finally, we write the whole thing to the storage server in one # pass. @@ -2027,7 +2032,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): empty=False): # Some tests need SDMF shares to verify that we can still # read them. This method writes one, which resembles but is not - write = self.ss.remote_slot_testv_and_readv_and_writev + write = self.ss.slot_testv_and_readv_and_writev share = self.build_test_sdmf_share(empty) testvs = [(0, 1, b"eq", b"")] tws = {} @@ -2359,7 +2364,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): # blocks. mw = self._make_new_mw(b"si1", 0) # Test writing some blocks. - read = self.ss.remote_slot_readv + read = self.ss.slot_readv expected_private_key_offset = struct.calcsize(MDMFHEADER) expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \ PRIVATE_KEY_SIZE + \ @@ -3150,7 +3155,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): d = sdmfr.finish_publishing() def _then(ignored): self.failUnlessEqual(self.rref.write_count, 1) - read = self.ss.remote_slot_readv + read = self.ss.slot_readv self.failUnlessEqual(read(b"si1", [0], [(0, len(data))]), {0: [data]}) d.addCallback(_then) @@ -3207,7 +3212,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): sdmfw.finish_publishing()) def _then_again(results): self.failUnless(results[0]) - read = self.ss.remote_slot_readv + read = self.ss.slot_readv self.failUnlessEqual(read(b"si1", [0], [(1, 8)]), {0: [struct.pack(">Q", 1)]}) self.failUnlessEqual(read(b"si1", [0], [(9, len(data) - 9)]), diff --git a/src/allmydata/test/test_storage_http.py b/src/allmydata/test/test_storage_http.py index 23a3e3ea6..e413a0624 100644 --- a/src/allmydata/test/test_storage_http.py +++ b/src/allmydata/test/test_storage_http.py @@ -74,7 +74,7 @@ class HTTPTests(TestCase): version[b"http://allmydata.org/tahoe/protocols/storage/v1"].pop( b"maximum-immutable-share-size" ) - expected_version = self.storage_server.remote_get_version() + expected_version = self.storage_server.get_version() expected_version[b"http://allmydata.org/tahoe/protocols/storage/v1"].pop( b"available-space" ) diff --git a/src/allmydata/test/test_storage_web.py b/src/allmydata/test/test_storage_web.py index 18ea0220c..5984b2892 100644 --- a/src/allmydata/test/test_storage_web.py +++ b/src/allmydata/test/test_storage_web.py @@ -53,7 +53,6 @@ from allmydata.scripts.admin import ( from allmydata.scripts.runner import ( Options, ) -from .common_util import FakeCanary from .common_web import ( render, @@ -304,28 +303,27 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin): mutable_si_3, rs3, cs3, we3 = make_mutable(b"\x03" * 16) rs3a, cs3a = make_extra_lease(mutable_si_3, 1) sharenums = [0] - canary = FakeCanary() # note: 'tahoe debug dump-share' will not handle this file, since the # inner contents are not a valid CHK share data = b"\xff" * 1000 - a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums, - 1000, canary) - w[0].remote_write(0, data) - w[0].remote_close() + a,w = ss.allocate_buckets(immutable_si_0, rs0, cs0, sharenums, + 1000) + w[0].write(0, data) + w[0].close() - a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums, - 1000, canary) - w[0].remote_write(0, data) - w[0].remote_close() - ss.remote_add_lease(immutable_si_1, rs1a, cs1a) + a,w = ss.allocate_buckets(immutable_si_1, rs1, cs1, sharenums, + 1000) + w[0].write(0, data) + w[0].close() + ss.add_lease(immutable_si_1, rs1a, cs1a) - writev = ss.remote_slot_testv_and_readv_and_writev + writev = ss.slot_testv_and_readv_and_writev writev(mutable_si_2, (we2, rs2, cs2), {0: ([], [(0,data)], len(data))}, []) writev(mutable_si_3, (we3, rs3, cs3), {0: ([], [(0,data)], len(data))}, []) - ss.remote_add_lease(mutable_si_3, rs3a, cs3a) + ss.add_lease(mutable_si_3, rs3a, cs3a) self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]