From 50e21a90347a8a4bcc88487245c93f0379811dde Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 1 Dec 2021 09:55:44 -0500 Subject: [PATCH 1/7] Split StorageServer into generic part and Foolscap part. --- src/allmydata/storage/server.py | 127 ++++++++++++++++++++------------ 1 file changed, 78 insertions(+), 49 deletions(-) diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index 7dc277e39..9d3ac4012 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -12,7 +12,7 @@ if PY2: # strings. Omit bytes so we don't leak future's custom bytes. from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, pow, round, super, dict, list, object, range, str, max, min # noqa: F401 else: - from typing import Dict + from typing import Dict, Tuple import os, re import six @@ -56,12 +56,11 @@ NUM_RE=re.compile("^[0-9]+$") DEFAULT_RENEWAL_TIME = 31 * 24 * 60 * 60 -@implementer(RIStorageServer, IStatsProducer) -class StorageServer(service.MultiService, Referenceable): +@implementer(IStatsProducer) +class StorageServer(service.MultiService): """ - A filesystem-based implementation of ``RIStorageServer``. + Implement the business logic for the storage server. """ - name = 'storage' LeaseCheckerClass = LeaseCheckingCrawler def __init__(self, storedir, nodeid, reserved_space=0, @@ -125,16 +124,8 @@ class StorageServer(service.MultiService, Referenceable): self.lease_checker.setServiceParent(self) self._clock = clock - # Currently being-written Bucketwriters. For Foolscap, lifetime is tied - # to connection: when disconnection happens, the BucketWriters are - # removed. For HTTP, this makes no sense, so there will be - # timeout-based cleanup; see - # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3807. - # Map in-progress filesystem path -> BucketWriter: self._bucket_writers = {} # type: Dict[str,BucketWriter] - # Canaries and disconnect markers for BucketWriters created via Foolscap: - self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,(IRemoteReference, object)] def stopService(self): # Cancel any in-progress uploads: @@ -263,7 +254,7 @@ class StorageServer(service.MultiService, Referenceable): space += bw.allocated_size() return space - def remote_get_version(self): + def get_version(self): remaining_space = self.get_available_space() if remaining_space is None: # We're on a platform that has no API to get disk stats. @@ -284,7 +275,7 @@ class StorageServer(service.MultiService, Referenceable): } return version - def _allocate_buckets(self, storage_index, + def allocate_buckets(self, storage_index, renew_secret, cancel_secret, sharenums, allocated_size, owner_num=0, renew_leases=True): @@ -371,21 +362,6 @@ class StorageServer(service.MultiService, Referenceable): self.add_latency("allocate", self._clock.seconds() - start) return alreadygot, bucketwriters - def remote_allocate_buckets(self, storage_index, - renew_secret, cancel_secret, - sharenums, allocated_size, - canary, owner_num=0): - """Foolscap-specific ``allocate_buckets()`` API.""" - alreadygot, bucketwriters = self._allocate_buckets( - storage_index, renew_secret, cancel_secret, sharenums, allocated_size, - owner_num=owner_num, renew_leases=True, - ) - # Abort BucketWriters if disconnection happens. - for bw in bucketwriters.values(): - disconnect_marker = canary.notifyOnDisconnect(bw.disconnected) - self._bucket_writer_disconnect_markers[bw] = (canary, disconnect_marker) - return alreadygot, bucketwriters - def _iter_share_files(self, storage_index): for shnum, filename in self._get_bucket_shares(storage_index): with open(filename, 'rb') as f: @@ -401,8 +377,7 @@ class StorageServer(service.MultiService, Referenceable): continue # non-sharefile yield sf - def remote_add_lease(self, storage_index, renew_secret, cancel_secret, - owner_num=1): + def add_lease(self, storage_index, renew_secret, cancel_secret, owner_num=1): start = self._clock.seconds() self.count("add-lease") new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME @@ -414,7 +389,7 @@ class StorageServer(service.MultiService, Referenceable): self.add_latency("add-lease", self._clock.seconds() - start) return None - def remote_renew_lease(self, storage_index, renew_secret): + def renew_lease(self, storage_index, renew_secret): start = self._clock.seconds() self.count("renew") new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME @@ -448,7 +423,7 @@ class StorageServer(service.MultiService, Referenceable): # Commonly caused by there being no buckets at all. pass - def remote_get_buckets(self, storage_index): + def get_buckets(self, storage_index): start = self._clock.seconds() self.count("get") si_s = si_b2a(storage_index) @@ -698,18 +673,6 @@ class StorageServer(service.MultiService, Referenceable): self.add_latency("writev", self._clock.seconds() - start) return (testv_is_good, read_data) - def remote_slot_testv_and_readv_and_writev(self, storage_index, - secrets, - test_and_write_vectors, - read_vector): - return self.slot_testv_and_readv_and_writev( - storage_index, - secrets, - test_and_write_vectors, - read_vector, - renew_leases=True, - ) - def _allocate_slot_share(self, bucketdir, secrets, sharenum, owner_num=0): (write_enabler, renew_secret, cancel_secret) = secrets @@ -720,7 +683,7 @@ class StorageServer(service.MultiService, Referenceable): self) return share - def remote_slot_readv(self, storage_index, shares, readv): + def slot_readv(self, storage_index, shares, readv): start = self._clock.seconds() self.count("readv") si_s = si_b2a(storage_index) @@ -747,8 +710,8 @@ class StorageServer(service.MultiService, Referenceable): self.add_latency("readv", self._clock.seconds() - start) return datavs - def remote_advise_corrupt_share(self, share_type, storage_index, shnum, - reason): + def advise_corrupt_share(self, share_type, storage_index, shnum, + reason): # This is a remote API, I believe, so this has to be bytes for legacy # protocol backwards compatibility reasons. assert isinstance(share_type, bytes) @@ -774,3 +737,69 @@ class StorageServer(service.MultiService, Referenceable): share_type=share_type, si=si_s, shnum=shnum, reason=reason, level=log.SCARY, umid="SGx2fA") return None + + +@implementer(RIStorageServer) +class FoolscapStorageServer(Referenceable): # type: ignore # warner/foolscap#78 + """ + A filesystem-based implementation of ``RIStorageServer``. + + For Foolscap, BucketWriter lifetime is tied to connection: when + disconnection happens, the BucketWriters are removed. + """ + name = 'storage' + + def __init__(self, storage_server): # type: (StorageServer) -> None + self._server = storage_server + + # Canaries and disconnect markers for BucketWriters created via Foolscap: + self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,Tuple[IRemoteReference, object]] + + + def remote_get_version(self): + return self.get_version() + + def remote_allocate_buckets(self, storage_index, + renew_secret, cancel_secret, + sharenums, allocated_size, + canary, owner_num=0): + """Foolscap-specific ``allocate_buckets()`` API.""" + alreadygot, bucketwriters = self._server.allocate_buckets( + storage_index, renew_secret, cancel_secret, sharenums, allocated_size, + owner_num=owner_num, renew_leases=True, + ) + # Abort BucketWriters if disconnection happens. + for bw in bucketwriters.values(): + disconnect_marker = canary.notifyOnDisconnect(bw.disconnected) + self._bucket_writer_disconnect_markers[bw] = (canary, disconnect_marker) + return alreadygot, bucketwriters + + def remote_add_lease(self, storage_index, renew_secret, cancel_secret, + owner_num=1): + return self._server.add_lease(storage_index, renew_secret, cancel_secret) + + def remote_renew_lease(self, storage_index, renew_secret): + return self._server.renew_lease(storage_index, renew_secret) + + def remote_get_buckets(self, storage_index): + return self._server.get_buckets(storage_index) + + def remote_slot_testv_and_readv_and_writev(self, storage_index, + secrets, + test_and_write_vectors, + read_vector): + return self._server.slot_testv_and_readv_and_writev( + storage_index, + secrets, + test_and_write_vectors, + read_vector, + renew_leases=True, + ) + + def remote_slot_readv(self, storage_index, shares, readv): + return self._server.slot_readv(self, storage_index, shares, readv) + + def remote_advise_corrupt_share(self, share_type, storage_index, shnum, + reason): + return self._server.advise_corrupt_share(share_type, storage_index, shnum, + reason) From 541b28f4693c900f83fc767c5e012918e98d3b9a Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 2 Dec 2021 09:36:56 -0500 Subject: [PATCH 2/7] News file. --- newsfragments/3849.minor | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 newsfragments/3849.minor diff --git a/newsfragments/3849.minor b/newsfragments/3849.minor new file mode 100644 index 000000000..e69de29bb From f7cb4d5c92e33b2b25286e4adb8c3ba4d32755bd Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 2 Dec 2021 10:02:46 -0500 Subject: [PATCH 3/7] Hook up the new FoolscapStorageServer, and fix enough bugs, such that almost all end-to-end and integration tests pass. --- src/allmydata/client.py | 4 ++-- src/allmydata/storage/immutable.py | 8 ++++---- src/allmydata/storage/server.py | 24 +++++++++++++++++++----- 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/src/allmydata/client.py b/src/allmydata/client.py index a2f88ebd6..645e157b6 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -36,7 +36,7 @@ from twisted.python.filepath import FilePath import allmydata from allmydata.crypto import rsa, ed25519 from allmydata.crypto.util import remove_prefix -from allmydata.storage.server import StorageServer +from allmydata.storage.server import StorageServer, FoolscapStorageServer from allmydata import storage_client from allmydata.immutable.upload import Uploader from allmydata.immutable.offloaded import Helper @@ -834,7 +834,7 @@ class _Client(node.Node, pollmixin.PollMixin): if anonymous_storage_enabled(self.config): furl_file = self.config.get_private_path("storage.furl").encode(get_filesystem_encoding()) - furl = self.tub.registerReference(ss, furlFile=furl_file) + furl = self.tub.registerReference(FoolscapStorageServer(ss), furlFile=furl_file) announcement["anonymous-storage-FURL"] = furl enabled_storage_servers = self._enable_storage_servers( diff --git a/src/allmydata/storage/immutable.py b/src/allmydata/storage/immutable.py index 08b83cd87..173a43e8e 100644 --- a/src/allmydata/storage/immutable.py +++ b/src/allmydata/storage/immutable.py @@ -382,7 +382,7 @@ class BucketReader(Referenceable): # type: ignore # warner/foolscap#78 return data def remote_advise_corrupt_share(self, reason): - return self.ss.remote_advise_corrupt_share(b"immutable", - self.storage_index, - self.shnum, - reason) + return self.ss.advise_corrupt_share(b"immutable", + self.storage_index, + self.shnum, + reason) diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index 9d3ac4012..5dd8cd0bc 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -127,6 +127,9 @@ class StorageServer(service.MultiService): # Map in-progress filesystem path -> BucketWriter: self._bucket_writers = {} # type: Dict[str,BucketWriter] + # These callables will be called with BucketWriters that closed: + self._call_on_bucket_writer_close = [] + def stopService(self): # Cancel any in-progress uploads: for bw in list(self._bucket_writers.values()): @@ -405,9 +408,14 @@ class StorageServer(service.MultiService): if self.stats_provider: self.stats_provider.count('storage_server.bytes_added', consumed_size) del self._bucket_writers[bw.incominghome] - if bw in self._bucket_writer_disconnect_markers: - canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw) - canary.dontNotifyOnDisconnect(disconnect_marker) + for handler in self._call_on_bucket_writer_close: + handler(bw) + + def register_bucket_writer_close_handler(self, handler): + """ + The handler will be called with any ``BucketWriter`` that closes. + """ + self._call_on_bucket_writer_close.append(handler) def _get_bucket_shares(self, storage_index): """Return a list of (shnum, pathname) tuples for files that hold @@ -755,9 +763,15 @@ class FoolscapStorageServer(Referenceable): # type: ignore # warner/foolscap#78 # Canaries and disconnect markers for BucketWriters created via Foolscap: self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,Tuple[IRemoteReference, object]] + self._server.register_bucket_writer_close_handler(self._bucket_writer_closed) + + def _bucket_writer_closed(self, bw): + if bw in self._bucket_writer_disconnect_markers: + canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw) + canary.dontNotifyOnDisconnect(disconnect_marker) def remote_get_version(self): - return self.get_version() + return self._server.get_version() def remote_allocate_buckets(self, storage_index, renew_secret, cancel_secret, @@ -797,7 +811,7 @@ class FoolscapStorageServer(Referenceable): # type: ignore # warner/foolscap#78 ) def remote_slot_readv(self, storage_index, shares, readv): - return self._server.slot_readv(self, storage_index, shares, readv) + return self._server.slot_readv(storage_index, shares, readv) def remote_advise_corrupt_share(self, share_type, storage_index, shnum, reason): From 476c41e49ec973db18b13d8f3b4e96a70e7c0933 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 2 Dec 2021 10:29:52 -0500 Subject: [PATCH 4/7] Split out Foolscap code from BucketReader/Writer. --- src/allmydata/storage/immutable.py | 60 +++++++++++++++++++++++------- src/allmydata/storage/server.py | 18 ++++++++- 2 files changed, 62 insertions(+), 16 deletions(-) diff --git a/src/allmydata/storage/immutable.py b/src/allmydata/storage/immutable.py index 173a43e8e..5fea7e2b6 100644 --- a/src/allmydata/storage/immutable.py +++ b/src/allmydata/storage/immutable.py @@ -230,8 +230,10 @@ class ShareFile(object): return space_freed -@implementer(RIBucketWriter) -class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 +class BucketWriter(object): + """ + Keep track of the process of writing to a ShareFile. + """ def __init__(self, ss, incominghome, finalhome, max_size, lease_info, clock): self.ss = ss @@ -251,7 +253,7 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 def allocated_size(self): return self._max_size - def remote_write(self, offset, data): + def write(self, offset, data): # Delay the timeout, since we received data: self._timeout.reset(30 * 60) start = self._clock.seconds() @@ -275,9 +277,6 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 self.ss.add_latency("write", self._clock.seconds() - start) self.ss.count("write") - def remote_close(self): - self.close() - def close(self): precondition(not self.closed) self._timeout.cancel() @@ -329,13 +328,10 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 facility="tahoe.storage", level=log.UNUSUAL) self.abort() - def remote_abort(self): + def abort(self): log.msg("storage: aborting sharefile %s" % self.incominghome, facility="tahoe.storage", level=log.UNUSUAL) - self.abort() self.ss.count("abort") - - def abort(self): if self.closed: return @@ -358,8 +354,28 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 self._timeout.cancel() -@implementer(RIBucketReader) -class BucketReader(Referenceable): # type: ignore # warner/foolscap#78 +@implementer(RIBucketWriter) +class FoolscapBucketWriter(Referenceable): # type: ignore # warner/foolscap#78 + """ + Foolscap-specific BucketWriter. + """ + def __init__(self, bucket_writer): + self._bucket_writer = bucket_writer + + def remote_write(self, offset, data): + return self._bucket_writer.write(offset, data) + + def remote_close(self): + return self._bucket_writer.close() + + def remote_abort(self): + return self._bucket_writer.abort() + + +class BucketReader(object): + """ + Manage the process for reading from a ``ShareFile``. + """ def __init__(self, ss, sharefname, storage_index=None, shnum=None): self.ss = ss @@ -374,15 +390,31 @@ class BucketReader(Referenceable): # type: ignore # warner/foolscap#78 ), self.shnum) - def remote_read(self, offset, length): + def read(self, offset, length): start = time.time() data = self._share_file.read_share_data(offset, length) self.ss.add_latency("read", time.time() - start) self.ss.count("read") return data - def remote_advise_corrupt_share(self, reason): + def advise_corrupt_share(self, reason): return self.ss.advise_corrupt_share(b"immutable", self.storage_index, self.shnum, reason) + + +@implementer(RIBucketReader) +class FoolscapBucketReader(Referenceable): # type: ignore # warner/foolscap#78 + """ + Foolscap wrapper for ``BucketReader`` + """ + + def __init__(self, bucket_reader): + self._bucket_reader = bucket_reader + + def remote_read(self, offset, length): + return self._bucket_reader.read(offset, length) + + def remote_advise_corrupt_share(self, reason): + return self._bucket_reader.advise_corrupt_share(reason) diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index 5dd8cd0bc..8c790b66f 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -33,7 +33,10 @@ from allmydata.storage.lease import LeaseInfo from allmydata.storage.mutable import MutableShareFile, EmptyShare, \ create_mutable_sharefile from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE -from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader +from allmydata.storage.immutable import ( + ShareFile, BucketWriter, BucketReader, FoolscapBucketWriter, + FoolscapBucketReader, +) from allmydata.storage.crawler import BucketCountingCrawler from allmydata.storage.expirer import LeaseCheckingCrawler @@ -782,10 +785,18 @@ class FoolscapStorageServer(Referenceable): # type: ignore # warner/foolscap#78 storage_index, renew_secret, cancel_secret, sharenums, allocated_size, owner_num=owner_num, renew_leases=True, ) + # Abort BucketWriters if disconnection happens. for bw in bucketwriters.values(): disconnect_marker = canary.notifyOnDisconnect(bw.disconnected) self._bucket_writer_disconnect_markers[bw] = (canary, disconnect_marker) + + # Wrap BucketWriters with Foolscap adapter: + bucketwriters = { + k: FoolscapBucketWriter(bw) + for (k, bw) in bucketwriters.items() + } + return alreadygot, bucketwriters def remote_add_lease(self, storage_index, renew_secret, cancel_secret, @@ -796,7 +807,10 @@ class FoolscapStorageServer(Referenceable): # type: ignore # warner/foolscap#78 return self._server.renew_lease(storage_index, renew_secret) def remote_get_buckets(self, storage_index): - return self._server.get_buckets(storage_index) + return { + k: FoolscapBucketReader(bucket) + for (k, bucket) in self._server.get_buckets(storage_index).items() + } def remote_slot_testv_and_readv_and_writev(self, storage_index, secrets, From 8c3d61a94e6da2e590831afc86afa85e0b0d6e36 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 2 Dec 2021 10:49:23 -0500 Subject: [PATCH 5/7] Bit more backwards compatible. --- src/allmydata/storage/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index 8c790b66f..bb116ce8e 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -626,7 +626,7 @@ class StorageServer(service.MultiService): secrets, test_and_write_vectors, read_vector, - renew_leases, + renew_leases=True, ): """ Read data from shares and conditionally write some data to them. From 439e5f2998ac178f7773190842dba0a9855da893 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 2 Dec 2021 10:49:30 -0500 Subject: [PATCH 6/7] Insofar as possible, switch to testing without the Foolscap API. --- src/allmydata/test/test_storage.py | 303 +++++++++++++------------ src/allmydata/test/test_storage_web.py | 24 +- 2 files changed, 165 insertions(+), 162 deletions(-) diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index bc87e168d..bfc1a7cd4 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -31,10 +31,15 @@ from hypothesis import given, strategies import itertools from allmydata import interfaces from allmydata.util import fileutil, hashutil, base32 -from allmydata.storage.server import StorageServer, DEFAULT_RENEWAL_TIME +from allmydata.storage.server import ( + StorageServer, DEFAULT_RENEWAL_TIME, FoolscapStorageServer, +) from allmydata.storage.shares import get_share_file from allmydata.storage.mutable import MutableShareFile -from allmydata.storage.immutable import BucketWriter, BucketReader, ShareFile +from allmydata.storage.immutable import ( + BucketWriter, BucketReader, ShareFile, FoolscapBucketWriter, + FoolscapBucketReader, +) from allmydata.storage.common import storage_index_to_dir, \ UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError, \ si_b2a, si_a2b @@ -129,25 +134,25 @@ class Bucket(unittest.TestCase): def test_create(self): incoming, final = self.make_workdir("test_create") bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock()) - bw.remote_write(0, b"a"*25) - bw.remote_write(25, b"b"*25) - bw.remote_write(50, b"c"*25) - bw.remote_write(75, b"d"*7) - bw.remote_close() + bw.write(0, b"a"*25) + bw.write(25, b"b"*25) + bw.write(50, b"c"*25) + bw.write(75, b"d"*7) + bw.close() def test_readwrite(self): incoming, final = self.make_workdir("test_readwrite") bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock()) - bw.remote_write(0, b"a"*25) - bw.remote_write(25, b"b"*25) - bw.remote_write(50, b"c"*7) # last block may be short - bw.remote_close() + bw.write(0, b"a"*25) + bw.write(25, b"b"*25) + bw.write(50, b"c"*7) # last block may be short + bw.close() # now read from it br = BucketReader(self, bw.finalhome) - self.failUnlessEqual(br.remote_read(0, 25), b"a"*25) - self.failUnlessEqual(br.remote_read(25, 25), b"b"*25) - self.failUnlessEqual(br.remote_read(50, 7), b"c"*7) + self.failUnlessEqual(br.read(0, 25), b"a"*25) + self.failUnlessEqual(br.read(25, 25), b"b"*25) + self.failUnlessEqual(br.read(50, 7), b"c"*7) def test_write_past_size_errors(self): """Writing beyond the size of the bucket throws an exception.""" @@ -157,7 +162,7 @@ class Bucket(unittest.TestCase): ) bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock()) with self.assertRaises(DataTooLargeError): - bw.remote_write(offset, b"a" * length) + bw.write(offset, b"a" * length) @given( maybe_overlapping_offset=strategies.integers(min_value=0, max_value=98), @@ -177,25 +182,25 @@ class Bucket(unittest.TestCase): self, incoming, final, length, self.make_lease(), Clock() ) # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes. - bw.remote_write(10, expected_data[10:20]) - bw.remote_write(30, expected_data[30:40]) - bw.remote_write(50, expected_data[50:60]) + bw.write(10, expected_data[10:20]) + bw.write(30, expected_data[30:40]) + bw.write(50, expected_data[50:60]) # Then, an overlapping write but with matching data: - bw.remote_write( + bw.write( maybe_overlapping_offset, expected_data[ maybe_overlapping_offset:maybe_overlapping_offset + maybe_overlapping_length ] ) # Now fill in the holes: - bw.remote_write(0, expected_data[0:10]) - bw.remote_write(20, expected_data[20:30]) - bw.remote_write(40, expected_data[40:50]) - bw.remote_write(60, expected_data[60:]) - bw.remote_close() + bw.write(0, expected_data[0:10]) + bw.write(20, expected_data[20:30]) + bw.write(40, expected_data[40:50]) + bw.write(60, expected_data[60:]) + bw.close() br = BucketReader(self, bw.finalhome) - self.assertEqual(br.remote_read(0, length), expected_data) + self.assertEqual(br.read(0, length), expected_data) @given( @@ -215,21 +220,21 @@ class Bucket(unittest.TestCase): self, incoming, final, length, self.make_lease(), Clock() ) # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes. - bw.remote_write(10, b"1" * 10) - bw.remote_write(30, b"1" * 10) - bw.remote_write(50, b"1" * 10) + bw.write(10, b"1" * 10) + bw.write(30, b"1" * 10) + bw.write(50, b"1" * 10) # Then, write something that might overlap with some of them, but # conflicts. Then fill in holes left by first three writes. Conflict is # inevitable. with self.assertRaises(ConflictingWriteError): - bw.remote_write( + bw.write( maybe_overlapping_offset, b'X' * min(maybe_overlapping_length, length - maybe_overlapping_offset), ) - bw.remote_write(0, b"1" * 10) - bw.remote_write(20, b"1" * 10) - bw.remote_write(40, b"1" * 10) - bw.remote_write(60, b"1" * 40) + bw.write(0, b"1" * 10) + bw.write(20, b"1" * 10) + bw.write(40, b"1" * 10) + bw.write(60, b"1" * 40) def test_read_past_end_of_share_data(self): # test vector for immutable files (hard-coded contents of an immutable share @@ -274,15 +279,15 @@ class Bucket(unittest.TestCase): # Now read from it. br = BucketReader(mockstorageserver, final) - self.failUnlessEqual(br.remote_read(0, len(share_data)), share_data) + self.failUnlessEqual(br.read(0, len(share_data)), share_data) # Read past the end of share data to get the cancel secret. read_length = len(share_data) + len(ownernumber) + len(renewsecret) + len(cancelsecret) - result_of_read = br.remote_read(0, read_length) + result_of_read = br.read(0, read_length) self.failUnlessEqual(result_of_read, share_data) - result_of_read = br.remote_read(0, len(share_data)+1) + result_of_read = br.read(0, len(share_data)+1) self.failUnlessEqual(result_of_read, share_data) def _assert_timeout_only_after_30_minutes(self, clock, bw): @@ -320,7 +325,7 @@ class Bucket(unittest.TestCase): clock.advance(29 * 60) # .. but we receive a write! So that should delay the timeout again to # another 30 minutes. - bw.remote_write(0, b"hello") + bw.write(0, b"hello") self._assert_timeout_only_after_30_minutes(clock, bw) def test_bucket_closing_cancels_timeout(self): @@ -374,7 +379,7 @@ class BucketProxy(unittest.TestCase): fileutil.make_dirs(basedir) fileutil.make_dirs(os.path.join(basedir, "tmp")) bw = BucketWriter(self, incoming, final, size, self.make_lease(), Clock()) - rb = RemoteBucket(bw) + rb = RemoteBucket(FoolscapBucketWriter(bw)) return bw, rb, final def make_lease(self): @@ -446,7 +451,7 @@ class BucketProxy(unittest.TestCase): # now read everything back def _start_reading(res): br = BucketReader(self, sharefname) - rb = RemoteBucket(br) + rb = RemoteBucket(FoolscapBucketReader(br)) server = NoNetworkServer(b"abc", None) rbp = rbp_class(rb, server, storage_index=b"") self.failUnlessIn("to peer", repr(rbp)) @@ -514,20 +519,20 @@ class Server(unittest.TestCase): def test_declares_fixed_1528(self): ss = self.create("test_declares_fixed_1528") - ver = ss.remote_get_version() + ver = ss.get_version() sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1'] self.failUnless(sv1.get(b'prevents-read-past-end-of-share-data'), sv1) def test_declares_maximum_share_sizes(self): ss = self.create("test_declares_maximum_share_sizes") - ver = ss.remote_get_version() + ver = ss.get_version() sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1'] self.failUnlessIn(b'maximum-immutable-share-size', sv1) self.failUnlessIn(b'maximum-mutable-share-size', sv1) def test_declares_available_space(self): ss = self.create("test_declares_available_space") - ver = ss.remote_get_version() + ver = ss.get_version() sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1'] self.failUnlessIn(b'available-space', sv1) @@ -538,7 +543,9 @@ class Server(unittest.TestCase): """ renew_secret = hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)) cancel_secret = hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)) - return ss._allocate_buckets( + if isinstance(ss, FoolscapStorageServer): + ss = ss._server + return ss.allocate_buckets( storage_index, renew_secret, cancel_secret, sharenums, size, @@ -562,12 +569,12 @@ class Server(unittest.TestCase): shnum, bucket = list(writers.items())[0] # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-( - bucket.remote_write(2**32, b"ab") - bucket.remote_close() + bucket.write(2**32, b"ab") + bucket.close() - readers = ss.remote_get_buckets(b"allocate") + readers = ss.get_buckets(b"allocate") reader = readers[shnum] - self.failUnlessEqual(reader.remote_read(2**32, 2), b"ab") + self.failUnlessEqual(reader.read(2**32, 2), b"ab") def test_dont_overfill_dirs(self): """ @@ -578,8 +585,8 @@ class Server(unittest.TestCase): ss = self.create("test_dont_overfill_dirs") already, writers = self.allocate(ss, b"storageindex", [0], 10) for i, wb in writers.items(): - wb.remote_write(0, b"%10d" % i) - wb.remote_close() + wb.write(0, b"%10d" % i) + wb.close() storedir = os.path.join(self.workdir("test_dont_overfill_dirs"), "shares") children_of_storedir = set(os.listdir(storedir)) @@ -588,8 +595,8 @@ class Server(unittest.TestCase): # chars the same as the first storageindex. already, writers = self.allocate(ss, b"storageindey", [0], 10) for i, wb in writers.items(): - wb.remote_write(0, b"%10d" % i) - wb.remote_close() + wb.write(0, b"%10d" % i) + wb.close() storedir = os.path.join(self.workdir("test_dont_overfill_dirs"), "shares") new_children_of_storedir = set(os.listdir(storedir)) @@ -599,8 +606,8 @@ class Server(unittest.TestCase): ss = self.create("test_remove_incoming") already, writers = self.allocate(ss, b"vid", list(range(3)), 10) for i,wb in writers.items(): - wb.remote_write(0, b"%10d" % i) - wb.remote_close() + wb.write(0, b"%10d" % i) + wb.close() incoming_share_dir = wb.incominghome incoming_bucket_dir = os.path.dirname(incoming_share_dir) incoming_prefix_dir = os.path.dirname(incoming_bucket_dir) @@ -619,32 +626,32 @@ class Server(unittest.TestCase): # Now abort the writers. for writer in writers.values(): - writer.remote_abort() + writer.abort() self.failUnlessEqual(ss.allocated_size(), 0) def test_allocate(self): ss = self.create("test_allocate") - self.failUnlessEqual(ss.remote_get_buckets(b"allocate"), {}) + self.failUnlessEqual(ss.get_buckets(b"allocate"), {}) already,writers = self.allocate(ss, b"allocate", [0,1,2], 75) self.failUnlessEqual(already, set()) self.failUnlessEqual(set(writers.keys()), set([0,1,2])) # while the buckets are open, they should not count as readable - self.failUnlessEqual(ss.remote_get_buckets(b"allocate"), {}) + self.failUnlessEqual(ss.get_buckets(b"allocate"), {}) # close the buckets for i,wb in writers.items(): - wb.remote_write(0, b"%25d" % i) - wb.remote_close() + wb.write(0, b"%25d" % i) + wb.close() # aborting a bucket that was already closed is a no-op - wb.remote_abort() + wb.abort() # now they should be readable - b = ss.remote_get_buckets(b"allocate") + b = ss.get_buckets(b"allocate") self.failUnlessEqual(set(b.keys()), set([0,1,2])) - self.failUnlessEqual(b[0].remote_read(0, 25), b"%25d" % 0) + self.failUnlessEqual(b[0].read(0, 25), b"%25d" % 0) b_str = str(b[0]) self.failUnlessIn("BucketReader", b_str) self.failUnlessIn("mfwgy33dmf2g 0", b_str) @@ -665,15 +672,15 @@ class Server(unittest.TestCase): # aborting the writes should remove the tempfiles for i,wb in writers2.items(): - wb.remote_abort() + wb.abort() already2,writers2 = self.allocate(ss, b"allocate", [2,3,4,5], 75) self.failUnlessEqual(already2, set([0,1,2])) self.failUnlessEqual(set(writers2.keys()), set([5])) for i,wb in writers2.items(): - wb.remote_abort() + wb.abort() for i,wb in writers.items(): - wb.remote_abort() + wb.abort() def test_allocate_without_lease_renewal(self): """ @@ -696,8 +703,8 @@ class Server(unittest.TestCase): ss, storage_index, [0], 1, renew_leases=False, ) (writer,) = writers.values() - writer.remote_write(0, b"x") - writer.remote_close() + writer.write(0, b"x") + writer.close() # It should have a lease granted at the current time. shares = dict(ss._get_bucket_shares(storage_index)) @@ -719,8 +726,8 @@ class Server(unittest.TestCase): ss, storage_index, [1], 1, renew_leases=False, ) (writer,) = writers.values() - writer.remote_write(0, b"x") - writer.remote_close() + writer.write(0, b"x") + writer.close() # The first share's lease expiration time is unchanged. shares = dict(ss._get_bucket_shares(storage_index)) @@ -736,8 +743,8 @@ class Server(unittest.TestCase): def test_bad_container_version(self): ss = self.create("test_bad_container_version") a,w = self.allocate(ss, b"si1", [0], 10) - w[0].remote_write(0, b"\xff"*10) - w[0].remote_close() + w[0].write(0, b"\xff"*10) + w[0].close() fn = os.path.join(ss.sharedir, storage_index_to_dir(b"si1"), "0") f = open(fn, "rb+") @@ -745,15 +752,15 @@ class Server(unittest.TestCase): f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1 f.close() - ss.remote_get_buckets(b"allocate") + ss.get_buckets(b"allocate") e = self.failUnlessRaises(UnknownImmutableContainerVersionError, - ss.remote_get_buckets, b"si1") + ss.get_buckets, b"si1") self.failUnlessIn(" had version 0 but we wanted 1", str(e)) def test_disconnect(self): # simulate a disconnection - ss = self.create("test_disconnect") + ss = FoolscapStorageServer(self.create("test_disconnect")) renew_secret = b"r" * 32 cancel_secret = b"c" * 32 canary = FakeCanary() @@ -789,7 +796,7 @@ class Server(unittest.TestCase): } self.patch(fileutil, 'get_disk_stats', call_get_disk_stats) - ss = self.create("test_reserved_space", reserved_space=reserved) + ss = FoolscapStorageServer(self.create("test_reserved_space", reserved_space=reserved)) # 15k available, 10k reserved, leaves 5k for shares # a newly created and filled share incurs this much overhead, beyond @@ -810,28 +817,28 @@ class Server(unittest.TestCase): self.failUnlessEqual(len(writers), 3) # now the StorageServer should have 3000 bytes provisionally # allocated, allowing only 2000 more to be claimed - self.failUnlessEqual(len(ss._bucket_writers), 3) + self.failUnlessEqual(len(ss._server._bucket_writers), 3) # allocating 1001-byte shares only leaves room for one canary2 = FakeCanary() already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary2) self.failUnlessEqual(len(writers2), 1) - self.failUnlessEqual(len(ss._bucket_writers), 4) + self.failUnlessEqual(len(ss._server._bucket_writers), 4) # we abandon the first set, so their provisional allocation should be # returned canary.disconnected() - self.failUnlessEqual(len(ss._bucket_writers), 1) + self.failUnlessEqual(len(ss._server._bucket_writers), 1) # now we have a provisional allocation of 1001 bytes # and we close the second set, so their provisional allocation should # become real, long-term allocation, and grows to include the # overhead. for bw in writers2.values(): - bw.remote_write(0, b"a"*25) - bw.remote_close() - self.failUnlessEqual(len(ss._bucket_writers), 0) + bw.write(0, b"a"*25) + bw.close() + self.failUnlessEqual(len(ss._server._bucket_writers), 0) # this also changes the amount reported as available by call_get_disk_stats allocated = 1001 + OVERHEAD + LEASE_SIZE @@ -848,12 +855,12 @@ class Server(unittest.TestCase): canary=canary3, ) self.failUnlessEqual(len(writers3), 39) - self.failUnlessEqual(len(ss._bucket_writers), 39) + self.failUnlessEqual(len(ss._server._bucket_writers), 39) canary3.disconnected() - self.failUnlessEqual(len(ss._bucket_writers), 0) - ss.disownServiceParent() + self.failUnlessEqual(len(ss._server._bucket_writers), 0) + ss._server.disownServiceParent() del ss def test_seek(self): @@ -882,24 +889,22 @@ class Server(unittest.TestCase): Given a StorageServer, create a bucket with 5 shares and return renewal and cancellation secrets. """ - canary = FakeCanary() sharenums = list(range(5)) size = 100 # Creating a bucket also creates a lease: rs, cs = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)), hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret))) - already, writers = ss.remote_allocate_buckets(storage_index, rs, cs, - sharenums, size, canary) + already, writers = ss.allocate_buckets(storage_index, rs, cs, + sharenums, size) self.failUnlessEqual(len(already), expected_already) self.failUnlessEqual(len(writers), expected_writers) for wb in writers.values(): - wb.remote_close() + wb.close() return rs, cs def test_leases(self): ss = self.create("test_leases") - canary = FakeCanary() sharenums = list(range(5)) size = 100 @@ -919,54 +924,54 @@ class Server(unittest.TestCase): # and a third lease, using add-lease rs2a,cs2a = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)), hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret))) - ss.remote_add_lease(b"si1", rs2a, cs2a) + ss.add_lease(b"si1", rs2a, cs2a) (lease1, lease2, lease3) = ss.get_leases(b"si1") self.assertTrue(lease1.is_renew_secret(rs1)) self.assertTrue(lease2.is_renew_secret(rs2)) self.assertTrue(lease3.is_renew_secret(rs2a)) # add-lease on a missing storage index is silently ignored - self.assertIsNone(ss.remote_add_lease(b"si18", b"", b"")) + self.assertIsNone(ss.add_lease(b"si18", b"", b"")) # check that si0 is readable - readers = ss.remote_get_buckets(b"si0") + readers = ss.get_buckets(b"si0") self.failUnlessEqual(len(readers), 5) # renew the first lease. Only the proper renew_secret should work - ss.remote_renew_lease(b"si0", rs0) - self.failUnlessRaises(IndexError, ss.remote_renew_lease, b"si0", cs0) - self.failUnlessRaises(IndexError, ss.remote_renew_lease, b"si0", rs1) + ss.renew_lease(b"si0", rs0) + self.failUnlessRaises(IndexError, ss.renew_lease, b"si0", cs0) + self.failUnlessRaises(IndexError, ss.renew_lease, b"si0", rs1) # check that si0 is still readable - readers = ss.remote_get_buckets(b"si0") + readers = ss.get_buckets(b"si0") self.failUnlessEqual(len(readers), 5) # There is no such method as remote_cancel_lease for now -- see # ticket #1528. - self.failIf(hasattr(ss, 'remote_cancel_lease'), \ - "ss should not have a 'remote_cancel_lease' method/attribute") + self.failIf(hasattr(FoolscapStorageServer(ss), 'remote_cancel_lease'), \ + "ss should not have a 'remote_cancel_lease' method/attribute") # test overlapping uploads rs3,cs3 = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)), hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret))) rs4,cs4 = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)), hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret))) - already,writers = ss.remote_allocate_buckets(b"si3", rs3, cs3, - sharenums, size, canary) + already,writers = ss.allocate_buckets(b"si3", rs3, cs3, + sharenums, size) self.failUnlessEqual(len(already), 0) self.failUnlessEqual(len(writers), 5) - already2,writers2 = ss.remote_allocate_buckets(b"si3", rs4, cs4, - sharenums, size, canary) + already2,writers2 = ss.allocate_buckets(b"si3", rs4, cs4, + sharenums, size) self.failUnlessEqual(len(already2), 0) self.failUnlessEqual(len(writers2), 0) for wb in writers.values(): - wb.remote_close() + wb.close() leases = list(ss.get_leases(b"si3")) self.failUnlessEqual(len(leases), 1) - already3,writers3 = ss.remote_allocate_buckets(b"si3", rs4, cs4, - sharenums, size, canary) + already3,writers3 = ss.allocate_buckets(b"si3", rs4, cs4, + sharenums, size) self.failUnlessEqual(len(already3), 5) self.failUnlessEqual(len(writers3), 0) @@ -991,7 +996,7 @@ class Server(unittest.TestCase): clock.advance(123456) # Adding a lease with matching renewal secret just renews it: - ss.remote_add_lease(b"si0", renewal_secret, cancel_secret) + ss.add_lease(b"si0", renewal_secret, cancel_secret) [lease] = ss.get_leases(b"si0") self.assertEqual(lease.get_expiration_time(), 123 + 123456 + DEFAULT_RENEWAL_TIME) @@ -1027,14 +1032,14 @@ class Server(unittest.TestCase): self.failUnlessEqual(already, set()) self.failUnlessEqual(set(writers.keys()), set([0,1,2])) for i,wb in writers.items(): - wb.remote_write(0, b"%25d" % i) - wb.remote_close() + wb.write(0, b"%25d" % i) + wb.close() # since we discard the data, the shares should be present but sparse. # Since we write with some seeks, the data we read back will be all # zeros. - b = ss.remote_get_buckets(b"vid") + b = ss.get_buckets(b"vid") self.failUnlessEqual(set(b.keys()), set([0,1,2])) - self.failUnlessEqual(b[0].remote_read(0, 25), b"\x00" * 25) + self.failUnlessEqual(b[0].read(0, 25), b"\x00" * 25) def test_advise_corruption(self): workdir = self.workdir("test_advise_corruption") @@ -1042,8 +1047,8 @@ class Server(unittest.TestCase): ss.setServiceParent(self.sparent) si0_s = base32.b2a(b"si0") - ss.remote_advise_corrupt_share(b"immutable", b"si0", 0, - b"This share smells funny.\n") + ss.advise_corrupt_share(b"immutable", b"si0", 0, + b"This share smells funny.\n") reportdir = os.path.join(workdir, "corruption-advisories") reports = os.listdir(reportdir) self.failUnlessEqual(len(reports), 1) @@ -1062,12 +1067,12 @@ class Server(unittest.TestCase): already,writers = self.allocate(ss, b"si1", [1], 75) self.failUnlessEqual(already, set()) self.failUnlessEqual(set(writers.keys()), set([1])) - writers[1].remote_write(0, b"data") - writers[1].remote_close() + writers[1].write(0, b"data") + writers[1].close() - b = ss.remote_get_buckets(b"si1") + b = ss.get_buckets(b"si1") self.failUnlessEqual(set(b.keys()), set([1])) - b[1].remote_advise_corrupt_share(b"This share tastes like dust.\n") + b[1].advise_corrupt_share(b"This share tastes like dust.\n") reports = os.listdir(reportdir) self.failUnlessEqual(len(reports), 2) @@ -1125,7 +1130,7 @@ class MutableServer(unittest.TestCase): write_enabler = self.write_enabler(we_tag) renew_secret = self.renew_secret(lease_tag) cancel_secret = self.cancel_secret(lease_tag) - rstaraw = ss.remote_slot_testv_and_readv_and_writev + rstaraw = ss.slot_testv_and_readv_and_writev testandwritev = dict( [ (shnum, ([], [], None) ) for shnum in sharenums ] ) readv = [] @@ -1146,7 +1151,7 @@ class MutableServer(unittest.TestCase): f.seek(0) f.write(b"BAD MAGIC") f.close() - read = ss.remote_slot_readv + read = ss.slot_readv e = self.failUnlessRaises(UnknownMutableContainerVersionError, read, b"si1", [0], [(0,10)]) self.failUnlessIn(" had magic ", str(e)) @@ -1156,8 +1161,8 @@ class MutableServer(unittest.TestCase): ss = self.create("test_container_size") self.allocate(ss, b"si1", b"we1", next(self._lease_secret), set([0,1,2]), 100) - read = ss.remote_slot_readv - rstaraw = ss.remote_slot_testv_and_readv_and_writev + read = ss.slot_readv + rstaraw = ss.slot_testv_and_readv_and_writev secrets = ( self.write_enabler(b"we1"), self.renew_secret(b"we1"), self.cancel_secret(b"we1") ) @@ -1237,7 +1242,7 @@ class MutableServer(unittest.TestCase): # Also see if the server explicitly declares that it supports this # feature. - ver = ss.remote_get_version() + ver = ss.get_version() storage_v1_ver = ver[b"http://allmydata.org/tahoe/protocols/storage/v1"] self.failUnless(storage_v1_ver.get(b"fills-holes-with-zero-bytes")) @@ -1255,7 +1260,7 @@ class MutableServer(unittest.TestCase): self.allocate(ss, b"si1", b"we1", next(self._lease_secret), set([0,1,2]), 100) - read = ss.remote_slot_readv + read = ss.slot_readv self.failUnlessEqual(read(b"si1", [0], [(0, 10)]), {0: [b""]}) self.failUnlessEqual(read(b"si1", [], [(0, 10)]), @@ -1268,7 +1273,7 @@ class MutableServer(unittest.TestCase): self.renew_secret(b"we1"), self.cancel_secret(b"we1") ) data = b"".join([ (b"%d" % i) * 10 for i in range(10) ]) - write = ss.remote_slot_testv_and_readv_and_writev + write = ss.slot_testv_and_readv_and_writev answer = write(b"si1", secrets, {0: ([], [(0,data)], None)}, []) @@ -1278,7 +1283,7 @@ class MutableServer(unittest.TestCase): {0: [b"00000000001111111111"]}) self.failUnlessEqual(read(b"si1", [0], [(95,10)]), {0: [b"99999"]}) - #self.failUnlessEqual(s0.remote_get_length(), 100) + #self.failUnlessEqual(s0.get_length(), 100) bad_secrets = (b"bad write enabler", secrets[1], secrets[2]) f = self.failUnlessRaises(BadWriteEnablerError, @@ -1312,8 +1317,8 @@ class MutableServer(unittest.TestCase): self.renew_secret(b"we1"), self.cancel_secret(b"we1") ) data = b"".join([ (b"%d" % i) * 10 for i in range(10) ]) - write = ss.remote_slot_testv_and_readv_and_writev - read = ss.remote_slot_readv + write = ss.slot_testv_and_readv_and_writev + read = ss.slot_readv def reset(): write(b"si1", secrets, @@ -1357,8 +1362,8 @@ class MutableServer(unittest.TestCase): self.renew_secret(b"we1"), self.cancel_secret(b"we1") ) data = b"".join([ (b"%d" % i) * 10 for i in range(10) ]) - write = ss.remote_slot_testv_and_readv_and_writev - read = ss.remote_slot_readv + write = ss.slot_testv_and_readv_and_writev + read = ss.slot_readv data = [(b"%d" % i) * 100 for i in range(3)] rc = write(b"si1", secrets, {0: ([], [(0,data[0])], None), @@ -1389,8 +1394,8 @@ class MutableServer(unittest.TestCase): self.renew_secret(b"we1-%d" % n), self.cancel_secret(b"we1-%d" % n) ) data = b"".join([ (b"%d" % i) * 10 for i in range(10) ]) - write = ss.remote_slot_testv_and_readv_and_writev - read = ss.remote_slot_readv + write = ss.slot_testv_and_readv_and_writev + read = ss.slot_readv rc = write(b"si1", secrets(0), {0: ([], [(0,data)], None)}, []) self.failUnlessEqual(rc, (True, {})) @@ -1406,7 +1411,7 @@ class MutableServer(unittest.TestCase): self.failUnlessEqual(len(list(s0.get_leases())), 1) # add-lease on a missing storage index is silently ignored - self.failUnlessEqual(ss.remote_add_lease(b"si18", b"", b""), None) + self.failUnlessEqual(ss.add_lease(b"si18", b"", b""), None) # re-allocate the slots and use the same secrets, that should update # the lease @@ -1414,7 +1419,7 @@ class MutableServer(unittest.TestCase): self.failUnlessEqual(len(list(s0.get_leases())), 1) # renew it directly - ss.remote_renew_lease(b"si1", secrets(0)[1]) + ss.renew_lease(b"si1", secrets(0)[1]) self.failUnlessEqual(len(list(s0.get_leases())), 1) # now allocate them with a bunch of different secrets, to trigger the @@ -1422,7 +1427,7 @@ class MutableServer(unittest.TestCase): write(b"si1", secrets(1), {0: ([], [(0,data)], None)}, []) self.failUnlessEqual(len(list(s0.get_leases())), 2) secrets2 = secrets(2) - ss.remote_add_lease(b"si1", secrets2[1], secrets2[2]) + ss.add_lease(b"si1", secrets2[1], secrets2[2]) self.failUnlessEqual(len(list(s0.get_leases())), 3) write(b"si1", secrets(3), {0: ([], [(0,data)], None)}, []) write(b"si1", secrets(4), {0: ([], [(0,data)], None)}, []) @@ -1440,11 +1445,11 @@ class MutableServer(unittest.TestCase): # read back the leases, make sure they're still intact. self.compare_leases_without_timestamps(all_leases, list(s0.get_leases())) - ss.remote_renew_lease(b"si1", secrets(0)[1]) - ss.remote_renew_lease(b"si1", secrets(1)[1]) - ss.remote_renew_lease(b"si1", secrets(2)[1]) - ss.remote_renew_lease(b"si1", secrets(3)[1]) - ss.remote_renew_lease(b"si1", secrets(4)[1]) + ss.renew_lease(b"si1", secrets(0)[1]) + ss.renew_lease(b"si1", secrets(1)[1]) + ss.renew_lease(b"si1", secrets(2)[1]) + ss.renew_lease(b"si1", secrets(3)[1]) + ss.renew_lease(b"si1", secrets(4)[1]) self.compare_leases_without_timestamps(all_leases, list(s0.get_leases())) # get a new copy of the leases, with the current timestamps. Reading # data and failing to renew/cancel leases should leave the timestamps @@ -1455,7 +1460,7 @@ class MutableServer(unittest.TestCase): # examine the exception thus raised, make sure the old nodeid is # present, to provide for share migration e = self.failUnlessRaises(IndexError, - ss.remote_renew_lease, b"si1", + ss.renew_lease, b"si1", secrets(20)[1]) e_s = str(e) self.failUnlessIn("Unable to renew non-existent lease", e_s) @@ -1490,7 +1495,7 @@ class MutableServer(unittest.TestCase): self.renew_secret(b"we1-%d" % n), self.cancel_secret(b"we1-%d" % n) ) data = b"".join([ (b"%d" % i) * 10 for i in range(10) ]) - write = ss.remote_slot_testv_and_readv_and_writev + write = ss.slot_testv_and_readv_and_writev write_enabler, renew_secret, cancel_secret = secrets(0) rc = write(b"si1", (write_enabler, renew_secret, cancel_secret), {0: ([], [(0,data)], None)}, []) @@ -1506,7 +1511,7 @@ class MutableServer(unittest.TestCase): clock.advance(835) # Adding a lease renews it: - ss.remote_add_lease(b"si1", renew_secret, cancel_secret) + ss.add_lease(b"si1", renew_secret, cancel_secret) [lease] = s0.get_leases() self.assertEqual(lease.get_expiration_time(), 235 + 835 + DEFAULT_RENEWAL_TIME) @@ -1515,8 +1520,8 @@ class MutableServer(unittest.TestCase): ss = self.create("test_remove") self.allocate(ss, b"si1", b"we1", next(self._lease_secret), set([0,1,2]), 100) - readv = ss.remote_slot_readv - writev = ss.remote_slot_testv_and_readv_and_writev + readv = ss.slot_readv + writev = ss.slot_testv_and_readv_and_writev secrets = ( self.write_enabler(b"we1"), self.renew_secret(b"we1"), self.cancel_secret(b"we1") ) @@ -1620,7 +1625,7 @@ class MutableServer(unittest.TestCase): # We don't even need to create any shares to exercise this # functionality. Just go straight to sending a truncate-to-zero # write. - testv_is_good, read_data = ss.remote_slot_testv_and_readv_and_writev( + testv_is_good, read_data = ss.slot_testv_and_readv_and_writev( storage_index=storage_index, secrets=secrets, test_and_write_vectors={ @@ -1638,7 +1643,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): self.sparent = LoggingServiceParent() self._lease_secret = itertools.count() self.ss = self.create("MDMFProxies storage test server") - self.rref = RemoteBucket(self.ss) + self.rref = RemoteBucket(FoolscapStorageServer(self.ss)) self.storage_server = _StorageServer(lambda: self.rref) self.secrets = (self.write_enabler(b"we_secret"), self.renew_secret(b"renew_secret"), @@ -1805,7 +1810,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): If tail_segment=True, then I will write a share that has a smaller tail segment than other segments. """ - write = self.ss.remote_slot_testv_and_readv_and_writev + write = self.ss.slot_testv_and_readv_and_writev data = self.build_test_mdmf_share(tail_segment, empty) # Finally, we write the whole thing to the storage server in one # pass. @@ -1873,7 +1878,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): empty=False): # Some tests need SDMF shares to verify that we can still # read them. This method writes one, which resembles but is not - write = self.ss.remote_slot_testv_and_readv_and_writev + write = self.ss.slot_testv_and_readv_and_writev share = self.build_test_sdmf_share(empty) testvs = [(0, 1, b"eq", b"")] tws = {} @@ -2205,7 +2210,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): # blocks. mw = self._make_new_mw(b"si1", 0) # Test writing some blocks. - read = self.ss.remote_slot_readv + read = self.ss.slot_readv expected_private_key_offset = struct.calcsize(MDMFHEADER) expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \ PRIVATE_KEY_SIZE + \ @@ -2996,7 +3001,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): d = sdmfr.finish_publishing() def _then(ignored): self.failUnlessEqual(self.rref.write_count, 1) - read = self.ss.remote_slot_readv + read = self.ss.slot_readv self.failUnlessEqual(read(b"si1", [0], [(0, len(data))]), {0: [data]}) d.addCallback(_then) @@ -3053,7 +3058,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin): sdmfw.finish_publishing()) def _then_again(results): self.failUnless(results[0]) - read = self.ss.remote_slot_readv + read = self.ss.slot_readv self.failUnlessEqual(read(b"si1", [0], [(1, 8)]), {0: [struct.pack(">Q", 1)]}) self.failUnlessEqual(read(b"si1", [0], [(9, len(data) - 9)]), diff --git a/src/allmydata/test/test_storage_web.py b/src/allmydata/test/test_storage_web.py index 38e380223..5d72fbd82 100644 --- a/src/allmydata/test/test_storage_web.py +++ b/src/allmydata/test/test_storage_web.py @@ -38,7 +38,6 @@ from allmydata.web.storage import ( StorageStatusElement, remove_prefix ) -from .common_util import FakeCanary from .common_web import ( render, @@ -289,28 +288,27 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin): mutable_si_3, rs3, cs3, we3 = make_mutable(b"\x03" * 16) rs3a, cs3a = make_extra_lease(mutable_si_3, 1) sharenums = [0] - canary = FakeCanary() # note: 'tahoe debug dump-share' will not handle this file, since the # inner contents are not a valid CHK share data = b"\xff" * 1000 - a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums, - 1000, canary) - w[0].remote_write(0, data) - w[0].remote_close() + a,w = ss.allocate_buckets(immutable_si_0, rs0, cs0, sharenums, + 1000) + w[0].write(0, data) + w[0].close() - a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums, - 1000, canary) - w[0].remote_write(0, data) - w[0].remote_close() - ss.remote_add_lease(immutable_si_1, rs1a, cs1a) + a,w = ss.allocate_buckets(immutable_si_1, rs1, cs1, sharenums, + 1000) + w[0].write(0, data) + w[0].close() + ss.add_lease(immutable_si_1, rs1a, cs1a) - writev = ss.remote_slot_testv_and_readv_and_writev + writev = ss.slot_testv_and_readv_and_writev writev(mutable_si_2, (we2, rs2, cs2), {0: ([], [(0,data)], len(data))}, []) writev(mutable_si_3, (we3, rs3, cs3), {0: ([], [(0,data)], len(data))}, []) - ss.remote_add_lease(mutable_si_3, rs3a, cs3a) + ss.add_lease(mutable_si_3, rs3a, cs3a) self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a] From 90f8480cf0c2fb97fd5e420c6e9ae029dad203b7 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Fri, 3 Dec 2021 13:09:27 -0500 Subject: [PATCH 7/7] Make more of the unittests pass again with the StorageServer factoring. --- src/allmydata/storage/http_server.py | 2 +- src/allmydata/storage/server.py | 1 + src/allmydata/test/no_network.py | 6 ++++-- src/allmydata/test/test_checker.py | 6 +++--- src/allmydata/test/test_client.py | 2 +- src/allmydata/test/test_crawler.py | 14 +++++++------- src/allmydata/test/test_helper.py | 7 ++++--- src/allmydata/test/test_hung_server.py | 4 ++-- src/allmydata/test/test_storage_http.py | 2 +- 9 files changed, 24 insertions(+), 20 deletions(-) diff --git a/src/allmydata/storage/http_server.py b/src/allmydata/storage/http_server.py index 327892ecd..6297ef484 100644 --- a/src/allmydata/storage/http_server.py +++ b/src/allmydata/storage/http_server.py @@ -91,4 +91,4 @@ class HTTPServer(object): @_authorized_route(_app, "/v1/version", methods=["GET"]) def version(self, request, authorization): - return self._cbor(request, self._storage_server.remote_get_version()) + return self._cbor(request, self._storage_server.get_version()) diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index bb116ce8e..0df9f23d8 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -64,6 +64,7 @@ class StorageServer(service.MultiService): """ Implement the business logic for the storage server. """ + name = "storage" LeaseCheckerClass = LeaseCheckingCrawler def __init__(self, storedir, nodeid, reserved_space=0, diff --git a/src/allmydata/test/no_network.py b/src/allmydata/test/no_network.py index b9fa99005..97cb371e6 100644 --- a/src/allmydata/test/no_network.py +++ b/src/allmydata/test/no_network.py @@ -50,7 +50,9 @@ from allmydata.util.assertutil import _assert from allmydata import uri as tahoe_uri from allmydata.client import _Client -from allmydata.storage.server import StorageServer, storage_index_to_dir +from allmydata.storage.server import ( + StorageServer, storage_index_to_dir, FoolscapStorageServer, +) from allmydata.util import fileutil, idlib, hashutil from allmydata.util.hashutil import permute_server_hash from allmydata.util.fileutil import abspath_expanduser_unicode @@ -417,7 +419,7 @@ class NoNetworkGrid(service.MultiService): ss.setServiceParent(middleman) serverid = ss.my_nodeid self.servers_by_number[i] = ss - wrapper = wrap_storage_server(ss) + wrapper = wrap_storage_server(FoolscapStorageServer(ss)) self.wrappers_by_id[serverid] = wrapper self.proxies_by_id[serverid] = NoNetworkServer(serverid, wrapper) self.rebuild_serverlist() diff --git a/src/allmydata/test/test_checker.py b/src/allmydata/test/test_checker.py index f56ecd089..3d64d4976 100644 --- a/src/allmydata/test/test_checker.py +++ b/src/allmydata/test/test_checker.py @@ -773,13 +773,13 @@ class AddLease(GridTestMixin, unittest.TestCase): d.addCallback(_check_cr, "mutable-normal") really_did_break = [] - # now break the server's remote_add_lease call + # now break the server's add_lease call def _break_add_lease(ign): def broken_add_lease(*args, **kwargs): really_did_break.append(1) raise KeyError("intentional failure, should be ignored") - assert self.g.servers_by_number[0].remote_add_lease - self.g.servers_by_number[0].remote_add_lease = broken_add_lease + assert self.g.servers_by_number[0].add_lease + self.g.servers_by_number[0].add_lease = broken_add_lease d.addCallback(_break_add_lease) # and confirm that the files still look healthy diff --git a/src/allmydata/test/test_client.py b/src/allmydata/test/test_client.py index a2572e735..c65a2fa2c 100644 --- a/src/allmydata/test/test_client.py +++ b/src/allmydata/test/test_client.py @@ -601,7 +601,7 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase): "enabled = true\n") c = yield client.create_client(basedir) ss = c.getServiceNamed("storage") - verdict = ss.remote_get_version() + verdict = ss.get_version() self.failUnlessReallyEqual(verdict[b"application-version"], allmydata.__full_version__.encode("ascii")) self.failIfEqual(str(allmydata.__version__), "unknown") diff --git a/src/allmydata/test/test_crawler.py b/src/allmydata/test/test_crawler.py index a9be90c43..80d732986 100644 --- a/src/allmydata/test/test_crawler.py +++ b/src/allmydata/test/test_crawler.py @@ -27,7 +27,7 @@ from allmydata.util import fileutil, hashutil, pollmixin from allmydata.storage.server import StorageServer, si_b2a from allmydata.storage.crawler import ShareCrawler, TimeSliceExceeded -from allmydata.test.common_util import StallMixin, FakeCanary +from allmydata.test.common_util import StallMixin class BucketEnumeratingCrawler(ShareCrawler): cpu_slice = 500 # make sure it can complete in a single slice @@ -124,12 +124,12 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin): def write(self, i, ss, serverid, tail=0): si = self.si(i) si = si[:-1] + bytes(bytearray((tail,))) - had,made = ss.remote_allocate_buckets(si, - self.rs(i, serverid), - self.cs(i, serverid), - set([0]), 99, FakeCanary()) - made[0].remote_write(0, b"data") - made[0].remote_close() + had,made = ss.allocate_buckets(si, + self.rs(i, serverid), + self.cs(i, serverid), + set([0]), 99) + made[0].write(0, b"data") + made[0].close() return si_b2a(si) def test_immediate(self): diff --git a/src/allmydata/test/test_helper.py b/src/allmydata/test/test_helper.py index 3faffbe0d..933a2b591 100644 --- a/src/allmydata/test/test_helper.py +++ b/src/allmydata/test/test_helper.py @@ -39,6 +39,7 @@ from allmydata.crypto import aes from allmydata.storage.server import ( si_b2a, StorageServer, + FoolscapStorageServer, ) from allmydata.storage_client import StorageFarmBroker from allmydata.immutable.layout import ( @@ -427,7 +428,7 @@ class CHKCheckerAndUEBFetcherTests(SyncTestCase): """ storage_index = b"a" * 16 serverid = b"b" * 20 - storage = StorageServer(self.mktemp(), serverid) + storage = FoolscapStorageServer(StorageServer(self.mktemp(), serverid)) rref_without_ueb = LocalWrapper(storage, fireNow) yield write_bad_share(rref_without_ueb, storage_index) server_without_ueb = NoNetworkServer(serverid, rref_without_ueb) @@ -451,7 +452,7 @@ class CHKCheckerAndUEBFetcherTests(SyncTestCase): """ storage_index = b"a" * 16 serverid = b"b" * 20 - storage = StorageServer(self.mktemp(), serverid) + storage = FoolscapStorageServer(StorageServer(self.mktemp(), serverid)) rref_with_ueb = LocalWrapper(storage, fireNow) ueb = { "needed_shares": 2, @@ -487,7 +488,7 @@ class CHKCheckerAndUEBFetcherTests(SyncTestCase): in [b"b", b"c"] ) storages = list( - StorageServer(self.mktemp(), serverid) + FoolscapStorageServer(StorageServer(self.mktemp(), serverid)) for serverid in serverids ) diff --git a/src/allmydata/test/test_hung_server.py b/src/allmydata/test/test_hung_server.py index 490315500..162b1d79c 100644 --- a/src/allmydata/test/test_hung_server.py +++ b/src/allmydata/test/test_hung_server.py @@ -73,7 +73,7 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin, def _copy_share(self, share, to_server): (sharenum, sharefile) = share (id, ss) = to_server - shares_dir = os.path.join(ss.original.storedir, "shares") + shares_dir = os.path.join(ss.original._server.storedir, "shares") si = uri.from_string(self.uri).get_storage_index() si_dir = os.path.join(shares_dir, storage_index_to_dir(si)) if not os.path.exists(si_dir): @@ -82,7 +82,7 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin, shutil.copy(sharefile, new_sharefile) self.shares = self.find_uri_shares(self.uri) # Make sure that the storage server has the share. - self.failUnless((sharenum, ss.original.my_nodeid, new_sharefile) + self.failUnless((sharenum, ss.original._server.my_nodeid, new_sharefile) in self.shares) def _corrupt_share(self, share, corruptor_func): diff --git a/src/allmydata/test/test_storage_http.py b/src/allmydata/test/test_storage_http.py index 442e154a0..6504ac97f 100644 --- a/src/allmydata/test/test_storage_http.py +++ b/src/allmydata/test/test_storage_http.py @@ -65,5 +65,5 @@ class HTTPTests(TestCase): The client can return the version. """ version = yield self.client.get_version() - expected_version = self.storage_server.remote_get_version() + expected_version = self.storage_server.get_version() self.assertEqual(version, expected_version)