mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-20 13:33:09 +00:00
Directly test CHKCheckerAndUEBFetcher cases
This commit is contained in:
parent
5a51d98479
commit
40e0ef0fad
@ -8,6 +8,9 @@ if PY2:
|
||||
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
|
||||
|
||||
import os
|
||||
from struct import (
|
||||
pack,
|
||||
)
|
||||
from functools import (
|
||||
partial,
|
||||
)
|
||||
@ -24,14 +27,35 @@ from eliot.twisted import (
|
||||
)
|
||||
|
||||
from allmydata.crypto import aes
|
||||
from allmydata.storage.server import si_b2a
|
||||
from allmydata.storage.server import (
|
||||
si_b2a,
|
||||
StorageServer,
|
||||
)
|
||||
from allmydata.storage_client import StorageFarmBroker
|
||||
from allmydata.immutable.layout import (
|
||||
make_write_bucket_proxy,
|
||||
)
|
||||
from allmydata.immutable import offloaded, upload
|
||||
from allmydata import uri, client
|
||||
from allmydata.util import hashutil, fileutil, mathutil, dictutil
|
||||
|
||||
from .no_network import (
|
||||
NoNetworkServer,
|
||||
LocalWrapper,
|
||||
fireNow,
|
||||
)
|
||||
from .common import (
|
||||
EMPTY_CLIENT_CONFIG,
|
||||
SyncTestCase,
|
||||
)
|
||||
|
||||
from testtools.matchers import (
|
||||
Equals,
|
||||
MatchesListwise,
|
||||
IsInstance,
|
||||
)
|
||||
from testtools.twistedsupport import (
|
||||
succeeded,
|
||||
)
|
||||
|
||||
MiB = 1024*1024
|
||||
@ -363,3 +387,203 @@ class AssistedUpload(unittest.TestCase):
|
||||
results.get_pushed_shares(),
|
||||
0,
|
||||
)
|
||||
|
||||
|
||||
class CHKCheckerAndUEBFetcherTests(SyncTestCase):
|
||||
"""
|
||||
Tests for ``CHKCheckerAndUEBFetcher``.
|
||||
"""
|
||||
def test_check_no_peers(self):
|
||||
"""
|
||||
If the supplied "peer getter" returns no peers then
|
||||
``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred`` that fires
|
||||
with ``False``.
|
||||
"""
|
||||
storage_index = b"a" * 16
|
||||
peers = {storage_index: []}
|
||||
caf = offloaded.CHKCheckerAndUEBFetcher(
|
||||
peers.get,
|
||||
storage_index,
|
||||
None,
|
||||
)
|
||||
self.assertThat(
|
||||
caf.check(),
|
||||
succeeded(Equals(False)),
|
||||
)
|
||||
|
||||
@inline_callbacks
|
||||
def test_check_ueb_unavailable(self):
|
||||
"""
|
||||
If the UEB cannot be read from any of the peers supplied by the "peer
|
||||
getter" then ``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred``
|
||||
that fires with ``False``.
|
||||
"""
|
||||
storage_index = b"a" * 16
|
||||
serverid = b"b" * 20
|
||||
storage = StorageServer(self.mktemp(), serverid)
|
||||
rref_without_ueb = LocalWrapper(storage, fireNow)
|
||||
yield write_bad_share(rref_without_ueb, storage_index)
|
||||
server_without_ueb = NoNetworkServer(serverid, rref_without_ueb)
|
||||
peers = {storage_index: [server_without_ueb]}
|
||||
caf = offloaded.CHKCheckerAndUEBFetcher(
|
||||
peers.get,
|
||||
storage_index,
|
||||
None,
|
||||
)
|
||||
self.assertThat(
|
||||
caf.check(),
|
||||
succeeded(Equals(False)),
|
||||
)
|
||||
|
||||
@inline_callbacks
|
||||
def test_not_enough_shares(self):
|
||||
"""
|
||||
If fewer shares are found than are required to reassemble the data then
|
||||
``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred`` that fires
|
||||
with ``False``.
|
||||
"""
|
||||
storage_index = b"a" * 16
|
||||
serverid = b"b" * 20
|
||||
storage = StorageServer(self.mktemp(), serverid)
|
||||
rref_with_ueb = LocalWrapper(storage, fireNow)
|
||||
ueb = {
|
||||
"needed_shares": 2,
|
||||
"total_shares": 2,
|
||||
"segment_size": 128 * 1024,
|
||||
"size": 1024,
|
||||
}
|
||||
yield write_good_share(rref_with_ueb, storage_index, ueb, [0])
|
||||
|
||||
server_with_ueb = NoNetworkServer(serverid, rref_with_ueb)
|
||||
peers = {storage_index: [server_with_ueb]}
|
||||
caf = offloaded.CHKCheckerAndUEBFetcher(
|
||||
peers.get,
|
||||
storage_index,
|
||||
None,
|
||||
)
|
||||
self.assertThat(
|
||||
caf.check(),
|
||||
succeeded(Equals(False)),
|
||||
)
|
||||
|
||||
@inline_callbacks
|
||||
def test_enough_shares(self):
|
||||
"""
|
||||
If enough shares are found to reassemble the data then
|
||||
``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred`` that fires
|
||||
with share and share placement information.
|
||||
"""
|
||||
storage_index = b"a" * 16
|
||||
serverids = list(
|
||||
ch * 20
|
||||
for ch
|
||||
in [b"b", b"c"]
|
||||
)
|
||||
storages = list(
|
||||
StorageServer(self.mktemp(), serverid)
|
||||
for serverid
|
||||
in serverids
|
||||
)
|
||||
rrefs_with_ueb = list(
|
||||
LocalWrapper(storage, fireNow)
|
||||
for storage
|
||||
in storages
|
||||
)
|
||||
ueb = {
|
||||
"needed_shares": len(serverids),
|
||||
"total_shares": len(serverids),
|
||||
"segment_size": 128 * 1024,
|
||||
"size": 1024,
|
||||
}
|
||||
for n, rref_with_ueb in enumerate(rrefs_with_ueb):
|
||||
yield write_good_share(rref_with_ueb, storage_index, ueb, [n])
|
||||
|
||||
servers_with_ueb = list(
|
||||
NoNetworkServer(serverid, rref_with_ueb)
|
||||
for (serverid, rref_with_ueb)
|
||||
in zip(serverids, rrefs_with_ueb)
|
||||
)
|
||||
peers = {storage_index: servers_with_ueb}
|
||||
caf = offloaded.CHKCheckerAndUEBFetcher(
|
||||
peers.get,
|
||||
storage_index,
|
||||
None,
|
||||
)
|
||||
self.assertThat(
|
||||
caf.check(),
|
||||
succeeded(MatchesListwise([
|
||||
Equals({
|
||||
n: {serverid}
|
||||
for (n, serverid)
|
||||
in enumerate(serverids)
|
||||
}),
|
||||
Equals(ueb),
|
||||
IsInstance(bytes),
|
||||
])),
|
||||
)
|
||||
|
||||
|
||||
def write_bad_share(storage_rref, storage_index):
|
||||
"""
|
||||
Write a share with a corrupt URI extension block.
|
||||
"""
|
||||
# Write some trash to the right bucket on this storage server. It won't
|
||||
# have a recoverable UEB block.
|
||||
return write_share(storage_rref, storage_index, [0], b"\0" * 1024)
|
||||
|
||||
|
||||
def write_good_share(storage_rref, storage_index, ueb, sharenums):
|
||||
"""
|
||||
Write a valid share with the given URI extension block.
|
||||
"""
|
||||
write_proxy = make_write_bucket_proxy(
|
||||
storage_rref,
|
||||
None,
|
||||
1024,
|
||||
ueb["segment_size"],
|
||||
1,
|
||||
1,
|
||||
ueb["size"],
|
||||
)
|
||||
# See allmydata/immutable/layout.py
|
||||
offset = write_proxy._offsets["uri_extension"]
|
||||
filler = b"\0" * (offset - len(write_proxy._offset_data))
|
||||
ueb_data = uri.pack_extension(ueb)
|
||||
data = (
|
||||
write_proxy._offset_data +
|
||||
filler +
|
||||
pack(write_proxy.fieldstruct, len(ueb_data)) +
|
||||
ueb_data
|
||||
)
|
||||
return write_share(storage_rref, storage_index, sharenums, data)
|
||||
|
||||
|
||||
@inline_callbacks
|
||||
def write_share(storage_rref, storage_index, sharenums, sharedata):
|
||||
"""
|
||||
Write the given share data to the given storage index using the given
|
||||
IStorageServer remote reference.
|
||||
|
||||
:param foolscap.ipb.IRemoteReference storage_rref: A remote reference to
|
||||
an IStorageServer.
|
||||
|
||||
:param bytes storage_index: The storage index to which to write the share
|
||||
data.
|
||||
|
||||
:param [int] sharenums: The share numbers to which to write this sharedata.
|
||||
|
||||
:param bytes sharedata: The ciphertext to write as the share.
|
||||
"""
|
||||
ignored, writers = yield storage_rref.callRemote(
|
||||
"allocate_buckets",
|
||||
storage_index,
|
||||
b"x" * 16,
|
||||
b"x" * 16,
|
||||
sharenums,
|
||||
len(sharedata),
|
||||
LocalWrapper(None),
|
||||
|
||||
)
|
||||
[writer] = writers.values()
|
||||
yield writer.callRemote("write", 0, sharedata)
|
||||
yield writer.callRemote("close")
|
||||
|
Loading…
Reference in New Issue
Block a user