Files
tahoe-lafs/src/allmydata/test/test_istorageserver.py
2022-02-02 12:55:41 -05:00

1162 lines
38 KiB
Python

"""
Tests for the ``IStorageServer`` interface.
Keep in mind that ``IStorageServer`` is actually the storage _client_ interface.
Note that for performance, in the future we might want the same node to be
reused across tests, so each test should be careful to generate unique storage
indexes.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2, bchr
if PY2:
# fmt: off
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
# fmt: on
else:
from typing import Set
from random import Random
from unittest import SkipTest
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.task import Clock
from twisted.internet import reactor
from twisted.web.server import Site
from twisted.web.client import HTTPConnectionPool
from hyperlink import DecodedURL
from treq.api import set_global_pool as set_treq_pool
from foolscap.api import Referenceable, RemoteException
from allmydata.interfaces import IStorageServer # really, IStorageClient
from .common_system import SystemTestMixin
from .common import AsyncTestCase
from allmydata.storage.server import StorageServer # not a IStorageServer!!
from allmydata.storage.http_server import HTTPServer
from allmydata.storage.http_client import StorageClient
from allmydata.util.iputil import allocate_tcp_port
from allmydata.storage_client import _HTTPStorageServer
# Use random generator with known seed, so results are reproducible if tests
# are run in the same order.
_RANDOM = Random(0)
def _randbytes(length):
# type: (int) -> bytes
"""Return random bytes string of given length."""
return b"".join([bchr(_RANDOM.randrange(0, 256)) for _ in range(length)])
def new_storage_index():
# type: () -> bytes
"""Return a new random storage index."""
return _randbytes(16)
def new_secret():
# type: () -> bytes
"""Return a new random secret (for lease renewal or cancellation)."""
return _randbytes(32)
class IStorageServerSharedAPIsTestsMixin(object):
"""
Tests for ``IStorageServer``'s shared APIs.
``self.storage_client`` is expected to provide ``IStorageServer``.
"""
@inlineCallbacks
def test_version(self):
"""
``IStorageServer`` returns a dictionary where the key is an expected
protocol version.
"""
result = yield self.storage_client.get_version()
self.assertIsInstance(result, dict)
self.assertIn(b"http://allmydata.org/tahoe/protocols/storage/v1", result)
class IStorageServerImmutableAPIsTestsMixin(object):
"""
Tests for ``IStorageServer``'s immutable APIs.
``self.storage_client`` is expected to provide ``IStorageServer``.
``self.disconnect()`` should disconnect and then reconnect, creating a new
``self.storage_client``. Some implementations may wish to skip tests using
this; HTTP has no notion of disconnection.
``self.server`` is expected to be the corresponding
``allmydata.storage.server.StorageServer`` instance. Time should be
instrumented, such that ``self.fake_time()`` and ``self.fake_sleep()``
return and advance the server time, respectively.
"""
@inlineCallbacks
def test_allocate_buckets_new(self):
"""
allocate_buckets() with a new storage index returns the matching
shares.
"""
(already_got, allocated) = yield self.storage_client.allocate_buckets(
new_storage_index(),
renew_secret=new_secret(),
cancel_secret=new_secret(),
sharenums=set(range(5)),
allocated_size=1024,
canary=Referenceable(),
)
self.assertEqual(already_got, set())
self.assertEqual(set(allocated.keys()), set(range(5)))
# We validate the bucket objects' interface in a later test.
@inlineCallbacks
def test_allocate_buckets_repeat(self):
"""
``IStorageServer.allocate_buckets()`` with the same storage index does not return
work-in-progress buckets, but will add any newly added buckets.
"""
storage_index, renew_secret, cancel_secret = (
new_storage_index(),
new_secret(),
new_secret(),
)
(already_got, allocated) = yield self.storage_client.allocate_buckets(
storage_index,
renew_secret,
cancel_secret,
sharenums=set(range(4)),
allocated_size=1024,
canary=Referenceable(),
)
(already_got2, allocated2) = yield self.storage_client.allocate_buckets(
storage_index,
renew_secret,
cancel_secret,
set(range(5)),
1024,
Referenceable(),
)
self.assertEqual(already_got, already_got2)
self.assertEqual(set(allocated2.keys()), {4})
@inlineCallbacks
def abort_or_disconnect_half_way(self, abort_or_disconnect):
"""
If we disconnect/abort in the middle of writing to a bucket, all data
is wiped, and it's even possible to write different data to the bucket.
(In the real world one shouldn't do that, but writing different data is
a good way to test that the original data really was wiped.)
``abort_or_disconnect`` is a callback that takes a bucket and aborts up
load, or perhaps disconnects the whole connection.
"""
storage_index, renew_secret, cancel_secret = (
new_storage_index(),
new_secret(),
new_secret(),
)
(_, allocated) = yield self.storage_client.allocate_buckets(
storage_index,
renew_secret,
cancel_secret,
sharenums={0},
allocated_size=1024,
canary=Referenceable(),
)
# Bucket 1 is fully written in one go.
yield allocated[0].callRemote("write", 0, b"1" * 1024)
# Disconnect or abort, depending on the test:
yield abort_or_disconnect(allocated[0])
# Write different data with no complaint:
(_, allocated) = yield self.storage_client.allocate_buckets(
storage_index,
renew_secret,
cancel_secret,
sharenums={0},
allocated_size=1024,
canary=Referenceable(),
)
yield allocated[0].callRemote("write", 0, b"2" * 1024)
def test_disconnection(self):
"""
If we disconnect in the middle of writing to a bucket, all data is
wiped, and it's even possible to write different data to the bucket.
(In the real world one shouldn't do that, but writing different data is
a good way to test that the original data really was wiped.)
HTTP protocol should skip this test, since disconnection is meaningless
concept; this is more about testing implicit contract the Foolscap
implementation depends on doesn't change as we refactor things.
"""
return self.abort_or_disconnect_half_way(lambda _: self.disconnect())
@inlineCallbacks
def test_written_shares_are_allocated(self):
"""
Shares that are fully written to show up as allocated in result from
``IStorageServer.allocate_buckets()``. Partially-written or empty
shares don't.
"""
storage_index, renew_secret, cancel_secret = (
new_storage_index(),
new_secret(),
new_secret(),
)
(_, allocated) = yield self.storage_client.allocate_buckets(
storage_index,
renew_secret,
cancel_secret,
sharenums=set(range(5)),
allocated_size=1024,
canary=Referenceable(),
)
# Bucket 1 is fully written in one go.
yield allocated[1].callRemote("write", 0, b"1" * 1024)
yield allocated[1].callRemote("close")
# Bucket 2 is fully written in two steps.
yield allocated[2].callRemote("write", 0, b"1" * 512)
yield allocated[2].callRemote("write", 512, b"2" * 512)
yield allocated[2].callRemote("close")
# Bucket 0 has partial write.
yield allocated[0].callRemote("write", 0, b"1" * 512)
(already_got, _) = yield self.storage_client.allocate_buckets(
storage_index,
renew_secret,
cancel_secret,
sharenums=set(range(5)),
allocated_size=1024,
canary=Referenceable(),
)
self.assertEqual(already_got, {1, 2})
@inlineCallbacks
def test_written_shares_are_readable(self):
"""
Shares that are fully written to can be read.
The result is not affected by the order in which writes
happened, only by their offsets.
"""
storage_index, renew_secret, cancel_secret = (
new_storage_index(),
new_secret(),
new_secret(),
)
(_, allocated) = yield self.storage_client.allocate_buckets(
storage_index,
renew_secret,
cancel_secret,
sharenums=set(range(5)),
allocated_size=1024,
canary=Referenceable(),
)
# Bucket 1 is fully written in order
yield allocated[1].callRemote("write", 0, b"1" * 512)
yield allocated[1].callRemote("write", 512, b"2" * 512)
yield allocated[1].callRemote("close")
# Bucket 2 is fully written in reverse.
yield allocated[2].callRemote("write", 512, b"4" * 512)
yield allocated[2].callRemote("write", 0, b"3" * 512)
yield allocated[2].callRemote("close")
buckets = yield self.storage_client.get_buckets(storage_index)
self.assertEqual(set(buckets.keys()), {1, 2})
self.assertEqual(
(yield buckets[1].callRemote("read", 0, 1024)), b"1" * 512 + b"2" * 512
)
self.assertEqual(
(yield buckets[2].callRemote("read", 0, 1024)), b"3" * 512 + b"4" * 512
)
@inlineCallbacks
def test_non_matching_overlapping_writes(self):
"""
When doing overlapping writes in immutable uploads, non-matching writes
fail.
"""
storage_index, renew_secret, cancel_secret = (
new_storage_index(),
new_secret(),
new_secret(),
)
(_, allocated) = yield self.storage_client.allocate_buckets(
storage_index,
renew_secret,
cancel_secret,
sharenums={0},
allocated_size=30,
canary=Referenceable(),
)
yield allocated[0].callRemote("write", 0, b"1" * 25)
# Overlapping write that doesn't match:
with self.assertRaises(RemoteException):
yield allocated[0].callRemote("write", 20, b"2" * 10)
@inlineCallbacks
def test_matching_overlapping_writes(self):
"""
When doing overlapping writes in immutable uploads, matching writes
succeed.
"""
storage_index, renew_secret, cancel_secret = (
new_storage_index(),
new_secret(),
new_secret(),
)
(_, allocated) = yield self.storage_client.allocate_buckets(
storage_index,
renew_secret,
cancel_secret,
sharenums={0},
allocated_size=25,
canary=Referenceable(),
)
yield allocated[0].callRemote("write", 0, b"1" * 10)
# Overlapping write that matches:
yield allocated[0].callRemote("write", 5, b"1" * 20)
yield allocated[0].callRemote("close")
buckets = yield self.storage_client.get_buckets(storage_index)
self.assertEqual(set(buckets.keys()), {0})
self.assertEqual((yield buckets[0].callRemote("read", 0, 25)), b"1" * 25)
def test_abort(self):
"""
If we call ``abort`` on the ``RIBucketWriter`` to disconnect in the
middle of writing to a bucket, all data is wiped, and it's even
possible to write different data to the bucket.
(In the real world one probably wouldn't do that, but writing different
data is a good way to test that the original data really was wiped.)
"""
return self.abort_or_disconnect_half_way(
lambda bucket: bucket.callRemote("abort")
)
@inlineCallbacks
def test_get_buckets_skips_unfinished_buckets(self):
"""
Buckets that are not fully written are not returned by
``IStorageServer.get_buckets()`` implementations.
"""
storage_index = new_storage_index()
(_, allocated) = yield self.storage_client.allocate_buckets(
storage_index,
renew_secret=new_secret(),
cancel_secret=new_secret(),
sharenums=set(range(5)),
allocated_size=10,
canary=Referenceable(),
)
# Bucket 1 is fully written
yield allocated[1].callRemote("write", 0, b"1" * 10)
yield allocated[1].callRemote("close")
# Bucket 2 is partially written
yield allocated[2].callRemote("write", 0, b"1" * 5)
buckets = yield self.storage_client.get_buckets(storage_index)
self.assertEqual(set(buckets.keys()), {1})
@inlineCallbacks
def test_read_bucket_at_offset(self):
"""
Given a read bucket returned from ``IStorageServer.get_buckets()``, it
is possible to read at different offsets and lengths, with reads past
the end resulting in empty bytes.
"""
length = 256 * 17
storage_index = new_storage_index()
(_, allocated) = yield self.storage_client.allocate_buckets(
storage_index,
renew_secret=new_secret(),
cancel_secret=new_secret(),
sharenums=set(range(1)),
allocated_size=length,
canary=Referenceable(),
)
total_data = _randbytes(256 * 17)
yield allocated[0].callRemote("write", 0, total_data)
yield allocated[0].callRemote("close")
buckets = yield self.storage_client.get_buckets(storage_index)
bucket = buckets[0]
for start, to_read in [
(0, 250), # fraction
(0, length), # whole thing
(100, 1024), # offset fraction
(length + 1, 100), # completely out of bounds
(length - 100, 200), # partially out of bounds
]:
data = yield bucket.callRemote("read", start, to_read)
self.assertEqual(
data,
total_data[start : start + to_read],
"Didn't match for start {}, length {}".format(start, to_read),
)
@inlineCallbacks
def create_share(self):
"""Create a share, return the storage index."""
storage_index = new_storage_index()
renew_secret = new_secret()
cancel_secret = new_secret()
(_, allocated) = yield self.storage_client.allocate_buckets(
storage_index,
renew_secret=renew_secret,
cancel_secret=cancel_secret,
sharenums=set(range(1)),
allocated_size=10,
canary=Referenceable(),
)
yield allocated[0].callRemote("write", 0, b"0123456789")
yield allocated[0].callRemote("close")
returnValue((storage_index, renew_secret, cancel_secret))
@inlineCallbacks
def test_bucket_advise_corrupt_share(self):
"""
Calling ``advise_corrupt_share()`` on a bucket returned by
``IStorageServer.get_buckets()`` does not result in error (other
behavior is opaque at this level of abstraction).
"""
storage_index, _, _ = yield self.create_share()
buckets = yield self.storage_client.get_buckets(storage_index)
yield buckets[0].callRemote("advise_corrupt_share", b"OH NO")
@inlineCallbacks
def test_advise_corrupt_share(self):
"""
Calling ``advise_corrupt_share()`` on an immutable share does not
result in error (other behavior is opaque at this level of
abstraction).
"""
storage_index, _, _ = yield self.create_share()
yield self.storage_client.advise_corrupt_share(
b"immutable", storage_index, 0, b"ono"
)
@inlineCallbacks
def test_allocate_buckets_creates_lease(self):
"""
When buckets are created using ``allocate_buckets()``, a lease is
created once writing is done.
"""
storage_index, _, _ = yield self.create_share()
[lease] = self.server.get_leases(storage_index)
# Lease expires in 31 days.
self.assertTrue(
lease.get_expiration_time() - self.fake_time() > (31 * 24 * 60 * 60 - 10)
)
@inlineCallbacks
def test_add_lease_renewal(self):
"""
If the lease secret is reused, ``add_lease()`` extends the existing
lease.
"""
storage_index, renew_secret, cancel_secret = yield self.create_share()
[lease] = self.server.get_leases(storage_index)
initial_expiration_time = lease.get_expiration_time()
# Time passes:
self.fake_sleep(178)
# We renew the lease:
yield self.storage_client.add_lease(storage_index, renew_secret, cancel_secret)
[lease] = self.server.get_leases(storage_index)
new_expiration_time = lease.get_expiration_time()
self.assertEqual(new_expiration_time - initial_expiration_time, 178)
@inlineCallbacks
def test_add_new_lease(self):
"""
If a new lease secret is used, ``add_lease()`` creates a new lease.
"""
storage_index, _, _ = yield self.create_share()
[lease] = self.server.get_leases(storage_index)
initial_expiration_time = lease.get_expiration_time()
# Time passes:
self.fake_sleep(167)
# We create a new lease:
renew_secret = new_secret()
cancel_secret = new_secret()
yield self.storage_client.add_lease(storage_index, renew_secret, cancel_secret)
[lease1, lease2] = self.server.get_leases(storage_index)
self.assertEqual(lease1.get_expiration_time(), initial_expiration_time)
self.assertEqual(lease2.get_expiration_time() - initial_expiration_time, 167)
class IStorageServerMutableAPIsTestsMixin(object):
"""
Tests for ``IStorageServer``'s mutable APIs.
``self.storage_client`` is expected to provide ``IStorageServer``.
``self.server`` is expected to be the corresponding
``allmydata.storage.server.StorageServer`` instance.
``STARAW`` is short for ``slot_testv_and_readv_and_writev``.
"""
def new_secrets(self):
"""Return a 3-tuple of secrets for STARAW calls."""
return (new_secret(), new_secret(), new_secret())
def staraw(self, *args, **kwargs):
"""Like ``slot_testv_and_readv_and_writev``, but less typing."""
return self.storage_client.slot_testv_and_readv_and_writev(*args, **kwargs)
@inlineCallbacks
def test_STARAW_reads_after_write(self):
"""
When data is written with
``IStorageServer.slot_testv_and_readv_and_writev``, it can then be read
by a separate call using that API.
"""
secrets = self.new_secrets()
storage_index = new_storage_index()
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([], [(0, b"abcdefg")], 7),
1: ([], [(0, b"0123"), (4, b"456")], 7),
},
r_vector=[],
)
self.assertEqual(written, True)
(_, reads) = yield self.staraw(
storage_index,
secrets,
tw_vectors={},
# Whole thing, partial, going beyond the edge, completely outside
# range:
r_vector=[(0, 7), (2, 3), (6, 8), (100, 10)],
)
self.assertEqual(
reads,
{0: [b"abcdefg", b"cde", b"g", b""], 1: [b"0123456", b"234", b"6", b""]},
)
@inlineCallbacks
def test_SATRAW_reads_happen_before_writes_in_single_query(self):
"""
If a ``IStorageServer.slot_testv_and_readv_and_writev`` command
contains both reads and writes, the read returns results that precede
the write.
"""
secrets = self.new_secrets()
storage_index = new_storage_index()
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([], [(0, b"abcdefg")], 7),
},
r_vector=[],
)
self.assertEqual(written, True)
# Read and write in same command; read happens before write:
(written, reads) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([], [(0, b"X" * 7)], 7),
},
r_vector=[(0, 7)],
)
self.assertEqual(written, True)
self.assertEqual(reads, {0: [b"abcdefg"]})
# The write is available in next read:
(_, reads) = yield self.staraw(
storage_index,
secrets,
tw_vectors={},
r_vector=[(0, 7)],
)
self.assertEqual(reads, {0: [b"X" * 7]})
@inlineCallbacks
def test_SATRAW_writes_happens_only_if_test_matches(self):
"""
If a ``IStorageServer.slot_testv_and_readv_and_writev`` includes both a
test and a write, the write succeeds if the test matches, and fails if
the test does not match.
"""
secrets = self.new_secrets()
storage_index = new_storage_index()
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([], [(0, b"1" * 7)], 7),
},
r_vector=[],
)
self.assertEqual(written, True)
# Test matches, so write happens:
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: (
[(0, 3, b"1" * 3), (3, 4, b"1" * 4)],
[(0, b"2" * 7)],
7,
),
},
r_vector=[],
)
self.assertEqual(written, True)
(_, reads) = yield self.staraw(
storage_index,
secrets,
tw_vectors={},
r_vector=[(0, 7)],
)
self.assertEqual(reads, {0: [b"2" * 7]})
# Test does not match, so write does not happen:
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([(0, 7, b"1" * 7)], [(0, b"3" * 7)], 7),
},
r_vector=[],
)
self.assertEqual(written, False)
(_, reads) = yield self.staraw(
storage_index,
secrets,
tw_vectors={},
r_vector=[(0, 7)],
)
self.assertEqual(reads, {0: [b"2" * 7]})
@inlineCallbacks
def test_SATRAW_tests_past_end_of_data(self):
"""
If a ``IStorageServer.slot_testv_and_readv_and_writev`` includes a test
vector that reads past the end of the data, the result is limited to
actual available data.
"""
secrets = self.new_secrets()
storage_index = new_storage_index()
# Since there is no data on server, the test vector will return empty
# string, which matches expected result, so write will succeed.
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([(0, 10, b"")], [(0, b"1" * 7)], 7),
},
r_vector=[],
)
self.assertEqual(written, True)
# Now the test vector is a 10-read off of a 7-byte value, but expected
# value is still 7 bytes, so the write will again succeed.
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([(0, 10, b"1" * 7)], [(0, b"2" * 7)], 7),
},
r_vector=[],
)
self.assertEqual(written, True)
@inlineCallbacks
def test_SATRAW_reads_past_end_of_data(self):
"""
If a ``IStorageServer.slot_testv_and_readv_and_writev`` reads past the
end of the data, the result is limited to actual available data.
"""
secrets = self.new_secrets()
storage_index = new_storage_index()
# Write some data
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([], [(0, b"12345")], 5),
},
r_vector=[],
)
self.assertEqual(written, True)
# Reads past end.
(_, reads) = yield self.staraw(
storage_index,
secrets,
tw_vectors={},
r_vector=[(0, 100), (2, 50)],
)
self.assertEqual(reads, {0: [b"12345", b"345"]})
@inlineCallbacks
def test_STARAW_write_enabler_must_match(self):
"""
If the write enabler secret passed to
``IStorageServer.slot_testv_and_readv_and_writev`` doesn't match
previous writes, the write fails.
"""
secrets = self.new_secrets()
storage_index = new_storage_index()
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([], [(0, b"1" * 7)], 7),
},
r_vector=[],
)
self.assertEqual(written, True)
# Write enabler secret does not match, so write does not happen:
bad_secrets = (new_secret(),) + secrets[1:]
with self.assertRaises(RemoteException):
yield self.staraw(
storage_index,
bad_secrets,
tw_vectors={
0: ([], [(0, b"2" * 7)], 7),
},
r_vector=[],
)
(_, reads) = yield self.staraw(
storage_index,
secrets,
tw_vectors={},
r_vector=[(0, 7)],
)
self.assertEqual(reads, {0: [b"1" * 7]})
@inlineCallbacks
def test_STARAW_zero_new_length_deletes(self):
"""
A zero new length passed to
``IStorageServer.slot_testv_and_readv_and_writev`` deletes the share.
"""
secrets = self.new_secrets()
storage_index = new_storage_index()
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([], [(0, b"1" * 7)], 7),
},
r_vector=[],
)
self.assertEqual(written, True)
# Write with new length of 0:
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([], [(0, b"1" * 7)], 0),
},
r_vector=[],
)
self.assertEqual(written, True)
# It's gone!
(_, reads) = yield self.staraw(
storage_index,
secrets,
tw_vectors={},
r_vector=[(0, 7)],
)
self.assertEqual(reads, {})
@inlineCallbacks
def test_slot_readv(self):
"""
Data written with ``IStorageServer.slot_testv_and_readv_and_writev()``
can be read using ``IStorageServer.slot_readv()``. Reads can't go past
the end of the data.
"""
secrets = self.new_secrets()
storage_index = new_storage_index()
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([], [(0, b"abcdefg")], 7),
1: ([], [(0, b"0123"), (4, b"456")], 7),
# This will never get read from, just here to show we only read
# from shares explicitly requested by slot_readv:
2: ([], [(0, b"XYZW")], 4),
},
r_vector=[],
)
self.assertEqual(written, True)
reads = yield self.storage_client.slot_readv(
storage_index,
shares=[0, 1],
# Whole thing, partial, going beyond the edge, completely outside
# range:
readv=[(0, 7), (2, 3), (6, 8), (100, 10)],
)
self.assertEqual(
reads,
{0: [b"abcdefg", b"cde", b"g", b""], 1: [b"0123456", b"234", b"6", b""]},
)
@inlineCallbacks
def test_slot_readv_no_shares(self):
"""
With no shares given, ``IStorageServer.slot_readv()`` reads from all shares.
"""
secrets = self.new_secrets()
storage_index = new_storage_index()
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([], [(0, b"abcdefg")], 7),
1: ([], [(0, b"0123456")], 7),
2: ([], [(0, b"9876543")], 7),
},
r_vector=[],
)
self.assertEqual(written, True)
reads = yield self.storage_client.slot_readv(
storage_index,
shares=[],
readv=[(0, 7)],
)
self.assertEqual(
reads,
{0: [b"abcdefg"], 1: [b"0123456"], 2: [b"9876543"]},
)
@inlineCallbacks
def create_slot(self):
"""Create a slot with sharenum 0."""
secrets = self.new_secrets()
storage_index = new_storage_index()
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([], [(0, b"abcdefg")], 7),
},
r_vector=[],
)
self.assertEqual(written, True)
returnValue((secrets, storage_index))
@inlineCallbacks
def test_advise_corrupt_share(self):
"""
Calling ``advise_corrupt_share()`` on a mutable share does not
result in error (other behavior is opaque at this level of
abstraction).
"""
secrets, storage_index = yield self.create_slot()
yield self.storage_client.advise_corrupt_share(
b"mutable", storage_index, 0, b"ono"
)
@inlineCallbacks
def test_STARAW_create_lease(self):
"""
When STARAW creates a new slot, it also creates a lease.
"""
_, storage_index = yield self.create_slot()
[lease] = self.server.get_slot_leases(storage_index)
# Lease expires in 31 days.
self.assertTrue(
lease.get_expiration_time() - self.fake_time() > (31 * 24 * 60 * 60 - 10)
)
@inlineCallbacks
def test_STARAW_renews_lease(self):
"""
When STARAW is run on an existing slot with same renewal secret, it
renews the lease.
"""
secrets, storage_index = yield self.create_slot()
[lease] = self.server.get_slot_leases(storage_index)
initial_expire = lease.get_expiration_time()
# Time passes...
self.fake_sleep(17)
# We do another write:
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([], [(0, b"1234567")], 7),
},
r_vector=[],
)
self.assertEqual(written, True)
# The lease has been renewed:
[lease] = self.server.get_slot_leases(storage_index)
self.assertEqual(lease.get_expiration_time() - initial_expire, 17)
@inlineCallbacks
def test_STARAW_new_lease(self):
"""
When STARAW is run with a new renewal secret on an existing slot, it
adds a new lease.
"""
secrets, storage_index = yield self.create_slot()
[lease] = self.server.get_slot_leases(storage_index)
initial_expire = lease.get_expiration_time()
# Time passes...
self.fake_sleep(19)
# We do another write:
(written, _) = yield self.staraw(
storage_index,
(secrets[0], new_secret(), new_secret()),
tw_vectors={
0: ([], [(0, b"1234567")], 7),
},
r_vector=[],
)
self.assertEqual(written, True)
# A new lease was added:
[lease1, lease2] = self.server.get_slot_leases(storage_index)
self.assertEqual(lease1.get_expiration_time(), initial_expire)
self.assertEqual(lease2.get_expiration_time() - initial_expire, 19)
@inlineCallbacks
def test_add_lease_renewal(self):
"""
If the lease secret is reused, ``add_lease()`` extends the existing
lease.
"""
secrets, storage_index = yield self.create_slot()
[lease] = self.server.get_slot_leases(storage_index)
initial_expiration_time = lease.get_expiration_time()
# Time passes:
self.fake_sleep(178)
# We renew the lease:
yield self.storage_client.add_lease(storage_index, secrets[1], secrets[2])
[lease] = self.server.get_slot_leases(storage_index)
new_expiration_time = lease.get_expiration_time()
self.assertEqual(new_expiration_time - initial_expiration_time, 178)
@inlineCallbacks
def test_add_new_lease(self):
"""
If a new lease secret is used, ``add_lease()`` creates a new lease.
"""
secrets, storage_index = yield self.create_slot()
[lease] = self.server.get_slot_leases(storage_index)
initial_expiration_time = lease.get_expiration_time()
# Time passes:
self.fake_sleep(167)
# We create a new lease:
renew_secret = new_secret()
cancel_secret = new_secret()
yield self.storage_client.add_lease(storage_index, renew_secret, cancel_secret)
[lease1, lease2] = self.server.get_slot_leases(storage_index)
self.assertEqual(lease1.get_expiration_time(), initial_expiration_time)
self.assertEqual(lease2.get_expiration_time() - initial_expiration_time, 167)
class _SharedMixin(SystemTestMixin):
"""Base class for Foolscap and HTTP mixins."""
SKIP_TESTS = set() # type: Set[str]
def _get_istorage_server(self):
raise NotImplementedError("implement in subclass")
@inlineCallbacks
def setUp(self):
if self._testMethodName in self.SKIP_TESTS:
raise SkipTest(
"Test {} is still not supported".format(self._testMethodName)
)
AsyncTestCase.setUp(self)
self.basedir = "test_istorageserver/" + self.id()
yield SystemTestMixin.setUp(self)
yield self.set_up_nodes(1)
self.server = None
for s in self.clients[0].services:
if isinstance(s, StorageServer):
self.server = s
break
assert self.server is not None, "Couldn't find StorageServer"
self._clock = Clock()
self._clock.advance(123456)
self.server._clock = self._clock
self.storage_client = self._get_istorage_server()
def fake_time(self):
"""Return the current fake, test-controlled, time."""
return self._clock.seconds()
def fake_sleep(self, seconds):
"""Advance the fake time by the given number of seconds."""
self._clock.advance(seconds)
@inlineCallbacks
def tearDown(self):
AsyncTestCase.tearDown(self)
yield SystemTestMixin.tearDown(self)
@inlineCallbacks
def disconnect(self):
"""
Disconnect and then reconnect with a new ``IStorageServer``.
"""
raise NotImplementedError("implement in subclass")
class _FoolscapMixin(_SharedMixin):
"""Run tests on Foolscap version of ``IStorageServer``."""
def _get_native_server(self):
return next(iter(self.clients[0].storage_broker.get_known_servers()))
def _get_istorage_server(self):
client = self._get_native_server().get_storage_server()
self.assertTrue(IStorageServer.providedBy(client))
return client
@inlineCallbacks
def disconnect(self):
"""
Disconnect and then reconnect with a new ``IStorageServer``.
"""
current = self.storage_client
yield self.bounce_client(0)
self.storage_client = self._get_native_server().get_storage_server()
assert self.storage_client is not current
class _HTTPMixin(_SharedMixin):
"""Run tests on the HTTP version of ``IStorageServer``."""
def setUp(self):
if PY2:
self.skipTest("Not going to bother supporting Python 2")
return _SharedMixin.setUp(self)
def _get_istorage_server(self):
set_treq_pool(HTTPConnectionPool(reactor, persistent=False))
swissnum = b"1234"
self._http_storage_server = HTTPServer(self.server, swissnum)
self._port_number = allocate_tcp_port()
self._listening_port = reactor.listenTCP(
self._port_number,
Site(self._http_storage_server.get_resource()),
interface="127.0.0.1",
)
return _HTTPStorageServer.from_http_client(
StorageClient(
DecodedURL.from_text("http://127.0.0.1:{}".format(self._port_number)),
swissnum,
)
)
# Eventually should also:
# self.assertTrue(IStorageServer.providedBy(client))
@inlineCallbacks
def tearDown(self):
yield _SharedMixin.tearDown(self)
yield self._listening_port.stopListening()
class FoolscapSharedAPIsTests(
_FoolscapMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase
):
"""Foolscap-specific tests for shared ``IStorageServer`` APIs."""
class HTTPSharedAPIsTests(
_HTTPMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase
):
"""HTTP-specific tests for shared ``IStorageServer`` APIs."""
class FoolscapImmutableAPIsTests(
_FoolscapMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase
):
"""Foolscap-specific tests for immutable ``IStorageServer`` APIs."""
class HTTPImmutableAPIsTests(
_HTTPMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase
):
"""HTTP-specific tests for immutable ``IStorageServer`` APIs."""
# These will start passing in future PRs as HTTP protocol is implemented.
SKIP_TESTS = {
"test_abort",
"test_add_lease_renewal",
"test_add_new_lease",
"test_advise_corrupt_share",
"test_bucket_advise_corrupt_share",
"test_disconnection",
"test_matching_overlapping_writes",
"test_non_matching_overlapping_writes",
}
class FoolscapMutableAPIsTests(
_FoolscapMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase
):
"""Foolscap-specific tests for immutable ``IStorageServer`` APIs."""