Merge pull request #1176 from tahoe-lafs/3868-istorageserver-tests-http

Start hooking up HTTP protocol as IStorageServer provider

Fixes ticket:3868
This commit is contained in:
Itamar Turner-Trauring 2022-02-04 09:19:13 -05:00 committed by GitHub
commit 665537a76f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 223 additions and 11 deletions

View File

@ -217,7 +217,7 @@ jobs:
fetch-depth: 0
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v1
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
@ -277,7 +277,7 @@ jobs:
fetch-depth: 0
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v1
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}

0
newsfragments/3868.minor Normal file
View File

View File

@ -40,7 +40,7 @@ if PY2:
from six import ensure_text
import re, time, hashlib
from os import urandom
# On Python 2 this will be the backport.
from configparser import NoSectionError
@ -75,6 +75,7 @@ from allmydata.util.observer import ObserverList
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.util.hashutil import permute_server_hash
from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict
from allmydata.storage.http_client import StorageClient, StorageClientImmutables
# who is responsible for de-duplication?
@ -1024,3 +1025,100 @@ class _StorageServer(object):
shnum,
reason,
).addErrback(log.err, "Error from remote call to advise_corrupt_share")
@attr.s
class _FakeRemoteReference(object):
"""
Emulate a Foolscap RemoteReference, calling a local object instead.
"""
local_object = attr.ib(type=object)
def callRemote(self, action, *args, **kwargs):
return getattr(self.local_object, action)(*args, **kwargs)
@attr.s
class _HTTPBucketWriter(object):
"""
Emulate a ``RIBucketWriter``.
"""
client = attr.ib(type=StorageClientImmutables)
storage_index = attr.ib(type=bytes)
share_number = attr.ib(type=int)
upload_secret = attr.ib(type=bytes)
finished = attr.ib(type=bool, default=False)
def abort(self):
pass # TODO in later ticket
@defer.inlineCallbacks
def write(self, offset, data):
result = yield self.client.write_share_chunk(
self.storage_index, self.share_number, self.upload_secret, offset, data
)
if result.finished:
self.finished = True
defer.returnValue(None)
def close(self):
# A no-op in HTTP protocol.
if not self.finished:
return defer.fail(RuntimeError("You didn't finish writing?!"))
return defer.succeed(None)
# WORK IN PROGRESS, for now it doesn't actually implement whole thing.
@implementer(IStorageServer) # type: ignore
@attr.s
class _HTTPStorageServer(object):
"""
Talk to remote storage server over HTTP.
"""
_http_client = attr.ib(type=StorageClient)
@staticmethod
def from_http_client(http_client): # type: (StorageClient) -> _HTTPStorageServer
"""
Create an ``IStorageServer`` from a HTTP ``StorageClient``.
"""
return _HTTPStorageServer(http_client=http_client)
def get_version(self):
return self._http_client.get_version()
@defer.inlineCallbacks
def allocate_buckets(
self,
storage_index,
renew_secret,
cancel_secret,
sharenums,
allocated_size,
canary,
):
upload_secret = urandom(20)
immutable_client = StorageClientImmutables(self._http_client)
result = immutable_client.create(
storage_index, sharenums, allocated_size, upload_secret, renew_secret,
cancel_secret
)
result = yield result
defer.returnValue(
(result.already_have, {
share_num: _FakeRemoteReference(_HTTPBucketWriter(
client=immutable_client,
storage_index=storage_index,
share_number=share_num,
upload_secret=upload_secret
))
for share_num in result.allocated
})
)
def get_buckets(
self,
storage_index,
):
pass

View File

@ -1,6 +1,8 @@
"""
Tests for the ``IStorageServer`` interface.
Keep in mind that ``IStorageServer`` is actually the storage _client_ interface.
Note that for performance, in the future we might want the same node to be
reused across tests, so each test should be careful to generate unique storage
indexes.
@ -17,18 +19,31 @@ if PY2:
# fmt: off
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
# fmt: on
else:
from typing import Set
from random import Random
from unittest import SkipTest
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.defer import inlineCallbacks, returnValue, succeed
from twisted.internet.task import Clock
from twisted.internet import reactor
from twisted.internet.endpoints import serverFromString
from twisted.web.server import Site
from twisted.web.client import Agent, HTTPConnectionPool
from hyperlink import DecodedURL
from treq.client import HTTPClient
from foolscap.api import Referenceable, RemoteException
from allmydata.interfaces import IStorageServer # really, IStorageClient
from .common_system import SystemTestMixin
from .common import AsyncTestCase
from .common import AsyncTestCase, SameProcessStreamEndpointAssigner
from allmydata.storage.server import StorageServer # not a IStorageServer!!
from allmydata.storage.http_server import HTTPServer
from allmydata.storage.http_client import StorageClient
from allmydata.storage_client import _HTTPStorageServer
# Use random generator with known seed, so results are reproducible if tests
# are run in the same order.
@ -998,20 +1013,30 @@ class IStorageServerMutableAPIsTestsMixin(object):
self.assertEqual(lease2.get_expiration_time() - initial_expiration_time, 167)
class _FoolscapMixin(SystemTestMixin):
"""Run tests on Foolscap version of ``IStorageServer."""
class _SharedMixin(SystemTestMixin):
"""Base class for Foolscap and HTTP mixins."""
def _get_native_server(self):
return next(iter(self.clients[0].storage_broker.get_known_servers()))
SKIP_TESTS = set() # type: Set[str]
def _get_istorage_server(self):
raise NotImplementedError("implement in subclass")
@inlineCallbacks
def setUp(self):
if self._testMethodName in self.SKIP_TESTS:
raise SkipTest(
"Test {} is still not supported".format(self._testMethodName)
)
AsyncTestCase.setUp(self)
self._port_assigner = SameProcessStreamEndpointAssigner()
self._port_assigner.setUp()
self.addCleanup(self._port_assigner.tearDown)
self.basedir = "test_istorageserver/" + self.id()
yield SystemTestMixin.setUp(self)
yield self.set_up_nodes(1)
self.storage_client = self._get_native_server().get_storage_server()
self.assertTrue(IStorageServer.providedBy(self.storage_client))
self.server = None
for s in self.clients[0].services:
if isinstance(s, StorageServer):
@ -1021,6 +1046,7 @@ class _FoolscapMixin(SystemTestMixin):
self._clock = Clock()
self._clock.advance(123456)
self.server._clock = self._clock
self.storage_client = yield self._get_istorage_server()
def fake_time(self):
"""Return the current fake, test-controlled, time."""
@ -1035,6 +1061,25 @@ class _FoolscapMixin(SystemTestMixin):
AsyncTestCase.tearDown(self)
yield SystemTestMixin.tearDown(self)
@inlineCallbacks
def disconnect(self):
"""
Disconnect and then reconnect with a new ``IStorageServer``.
"""
raise NotImplementedError("implement in subclass")
class _FoolscapMixin(_SharedMixin):
"""Run tests on Foolscap version of ``IStorageServer``."""
def _get_native_server(self):
return next(iter(self.clients[0].storage_broker.get_known_servers()))
def _get_istorage_server(self):
client = self._get_native_server().get_storage_server()
self.assertTrue(IStorageServer.providedBy(client))
return succeed(client)
@inlineCallbacks
def disconnect(self):
"""
@ -1046,18 +1091,87 @@ class _FoolscapMixin(SystemTestMixin):
assert self.storage_client is not current
class _HTTPMixin(_SharedMixin):
"""Run tests on the HTTP version of ``IStorageServer``."""
def setUp(self):
if PY2:
self.skipTest("Not going to bother supporting Python 2")
return _SharedMixin.setUp(self)
@inlineCallbacks
def _get_istorage_server(self):
swissnum = b"1234"
http_storage_server = HTTPServer(self.server, swissnum)
# Listen on randomly assigned port:
tcp_address, endpoint_string = self._port_assigner.assign(reactor)
_, host, port = tcp_address.split(":")
port = int(port)
endpoint = serverFromString(reactor, endpoint_string)
listening_port = yield endpoint.listen(Site(http_storage_server.get_resource()))
self.addCleanup(listening_port.stopListening)
# Create HTTP client with non-persistent connections, so we don't leak
# state across tests:
treq_client = HTTPClient(
Agent(reactor, HTTPConnectionPool(reactor, persistent=False))
)
returnValue(
_HTTPStorageServer.from_http_client(
StorageClient(
DecodedURL().replace(scheme="http", host=host, port=port),
swissnum,
treq=treq_client,
)
)
)
# Eventually should also:
# self.assertTrue(IStorageServer.providedBy(client))
class FoolscapSharedAPIsTests(
_FoolscapMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase
):
"""Foolscap-specific tests for shared ``IStorageServer`` APIs."""
class HTTPSharedAPIsTests(
_HTTPMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase
):
"""HTTP-specific tests for shared ``IStorageServer`` APIs."""
class FoolscapImmutableAPIsTests(
_FoolscapMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase
):
"""Foolscap-specific tests for immutable ``IStorageServer`` APIs."""
class HTTPImmutableAPIsTests(
_HTTPMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase
):
"""HTTP-specific tests for immutable ``IStorageServer`` APIs."""
# These will start passing in future PRs as HTTP protocol is implemented.
SKIP_TESTS = {
"test_abort",
"test_add_lease_renewal",
"test_add_new_lease",
"test_advise_corrupt_share",
"test_allocate_buckets_repeat",
"test_bucket_advise_corrupt_share",
"test_disconnection",
"test_get_buckets_skips_unfinished_buckets",
"test_matching_overlapping_writes",
"test_non_matching_overlapping_writes",
"test_read_bucket_at_offset",
"test_written_shares_are_readable",
"test_written_shares_are_allocated",
}
class FoolscapMutableAPIsTests(
_FoolscapMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase
):