Get basic generation working, apparently

This commit is contained in:
Jean-Paul Calderone 2022-12-22 10:51:59 -05:00
parent 29a5f7a076
commit 49b513fefc
5 changed files with 149 additions and 55 deletions

View File

@ -0,0 +1,4 @@
? 1/1,0c0beacef8877bbf2416eb00f2b5dc96354e26dd1df5517320459b1236860f8c,2edc986847e209b4016e141a6dc8716d3207350f416969382d431539bf292e4a
: URI:CHK:mri732rh3meyh4drikau3a24ba:6tsj5wvcp6szdhmrbu5bea57wduoza64y6nd2lm7aleqpsxjm5la:1:1:1024
? 1/3,0c0beacef8877bbf2416eb00f2b5dc96354e26dd1df5517320459b1236860f8c,2edc986847e209b4016e141a6dc8716d3207350f416969382d431539bf292e4a
: URI:CHK:ycvogzi6wllnq2bkx3t6zdtwju:um42l4yen7jiwdfgirvedtty3tt3xuhjiyxzqoourvughtxjar3q:1:3:1024

View File

@ -49,7 +49,6 @@ from .util import (
await_client_ready,
TahoeProcess,
cli,
_run_node,
generate_ssh_key,
block_with_timeout,
)
@ -359,7 +358,7 @@ def alice_sftp_client_key_path(temp_dir):
# typically), but for convenience sake for testing we'll put it inside node.
return join(temp_dir, "alice", "private", "ssh_client_rsa_key")
@pytest.fixture(scope='session')
@pytest.fixture(scope='function')
@log_call(action_type=u"integration:alice", include_args=[], include_result=False)
def alice(
reactor,
@ -410,10 +409,9 @@ alice-key ssh-rsa {ssh_public_key} {rwcap}
""".format(rwcap=rwcap, ssh_public_key=ssh_public_key))
# 4. Restart the node with new SFTP config.
process.kill()
pytest_twisted.blockon(_run_node(reactor, process.node_dir, request, None))
pytest_twisted.blockon(process.restart_async(reactor, request))
await_client_ready(process)
print(f"Alice pid: {process.transport.pid}")
return process

View File

@ -2,83 +2,142 @@
Verify certain results against test vectors with well-known results.
"""
from typing import TypeVar, Iterator, Awaitable, Callable
from tempfile import NamedTemporaryFile
from hashlib import sha256
from itertools import product
from yaml import safe_dump
import vectors
from pytest import mark
from pytest_twisted import ensureDeferred
from . import vectors
from .util import cli, await_client_ready
from allmydata.client import read_config
from allmydata.util import base32
CONVERGENCE_SECRETS = [
b"aaaaaaaaaaaaaaaa",
b"bbbbbbbbbbbbbbbb",
b"abcdefghijklmnop",
b"hello world stuf",
b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f",
sha256(b"Hello world").digest()[:16],
# b"bbbbbbbbbbbbbbbb",
# b"abcdefghijklmnop",
# b"hello world stuf",
# b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f",
# sha256(b"Hello world").digest()[:16],
]
ONE_KB = sha256(b"Hello world").digest() * 32
assert length(ONE_KB) == 1024
assert len(ONE_KB) == 1024
OBJECT_DATA = [
b"a" * 1024,
b"b" * 2048,
b"c" * 4096,
(ONE_KB * 8)[:-1],
(ONE_KB * 8) + b"z",
(ONE_KB * 128)[:-1],
(ONE_KB * 128) + b"z",
# b"b" * 2048,
# b"c" * 4096,
# (ONE_KB * 8)[:-1],
# (ONE_KB * 8) + b"z",
# (ONE_KB * 128)[:-1],
# (ONE_KB * 128) + b"z",
]
ZFEC_PARAMS = [
(1, 1),
(1, 3),
(2, 3),
(3, 10),
(71, 255),
(101, 256),
# (2, 3),
# (3, 10),
# (71, 255),
# (101, 256),
]
@parametrize('convergence', CONVERGENCE_SECRETS)
@mark.parametrize('convergence', CONVERGENCE_SECRETS)
def test_convergence(convergence):
assert isinstance(convergence, bytes), "Convergence secret must be bytes"
assert len(convergence) == 16, "Convergence secret must by 16 bytes"
@parametrize('daata', OBJECT_DATA)
@mark.parametrize('data', OBJECT_DATA)
def test_data(data):
assert isinstance(data, bytes), "Object data must be bytes."
@parametrize('params', ZFEC_PARAMS)
@parametrize('convergence', CONVERGENCE_SECRETS)
@parametrize('data', OBJECT_DATA)
def test_chk_capability(alice, params, convergence, data):
@mark.parametrize('params', ZFEC_PARAMS)
@mark.parametrize('convergence', CONVERGENCE_SECRETS)
@mark.parametrize('data', OBJECT_DATA)
@ensureDeferred
async def test_chk_capability(alice, params, convergence, data):
# rewrite alice's config to match params and convergence
needed, total = params
config = read_config(alice.path, "tub.port")
config.set_config("client", "shares.happy", 1)
config.set_config("client", "shares.needed", str(needed))
config.set_config("client", "shares.happy", str(total))
# restart alice
alice.kill()
yield util._run_node(reactor, alice.path, request, None)
await reconfigure(alice, params, convergence)
# upload data as a CHK
actual = upload(alice, data)
actual = upload_immutable(alice, data)
# compare the resulting cap to the expected result
expected = vectors.immutable[params, convergence, digest(data)]
expected = vectors.chk[key(params, convergence, data)]
assert actual == expected
def test_generate(alice):
caps = {}
for params, secret, data in product(ZFEC_PARAMS, CONVERGENCE_SECRETS, OBJECT_DATA):
caps[fec, secret, sha256(data).hexdigest()] = create_immutable(params, secret, data)
print(dump(caps))
def create_immutable(alice, params, secret, data):
tempfile = str(tmpdir.join("file"))
with tempfile.open("wb") as f:
α = TypeVar("α")
β = TypeVar("β")
async def asyncfoldr(
i: Iterator[Awaitable[α]],
f: Callable[[α, β], β],
initial: β,
) -> β:
result = initial
async for a in i:
result = f(a, result)
return result
def insert(item: tuple[α, β], d: dict[α, β]) -> dict[α, β]:
d[item[0]] = item[1]
return d
@ensureDeferred
async def test_generate(reactor, request, alice):
results = await asyncfoldr(
generate(reactor, request, alice),
insert,
{},
)
with vectors.CHK_PATH.open("w") as f:
f.write(safe_dump(results))
async def reconfigure(reactor, request, alice, params, convergence):
needed, total = params
config = read_config(alice.node_dir, "tub.port")
config.set_config("client", "shares.happy", str(1))
config.set_config("client", "shares.needed", str(needed))
config.set_config("client", "shares.total", str(total))
config.write_private_config("convergence", base32.b2a(convergence))
# restart alice
print(f"Restarting {alice.node_dir} for ZFEC reconfiguration")
await alice.restart_async(reactor, request)
print("Restarted. Waiting for ready state.")
await_client_ready(alice)
print("Ready.")
async def generate(reactor, request, alice):
node_key = (None, None)
for params, secret, data in product(ZFEC_PARAMS, CONVERGENCE_SECRETS, OBJECT_DATA):
if node_key != (params, secret):
await reconfigure(reactor, request, alice, params, secret)
node_key = (params, secret)
yield key(params, secret, data), upload_immutable(alice, data)
def key(params, secret, data):
return f"{params[0]}/{params[1]},{digest(secret)},{digest(data)}"
def upload_immutable(alice, data):
with NamedTemporaryFile() as f:
f.write(data)
actual = cli(alice, "put", str(datafile))
f.flush()
return cli(alice, "put", "--format=chk", f.name).decode("utf-8").strip()
def digest(bs):
return sha256(bs).hexdigest()

View File

@ -142,7 +142,18 @@ class _MagicTextProtocol(ProcessProtocol):
sys.stdout.write(data)
def _cleanup_tahoe_process(tahoe_transport, exited):
def _cleanup_tahoe_process_async(tahoe_transport, allow_missing):
if tahoe_transport.pid is None:
if allow_missing:
print("Process already cleaned up and that's okay.")
return
else:
raise ValueError("Process is not running")
print("signaling {} with TERM".format(tahoe_transport.pid))
tahoe_transport.signalProcess('TERM')
def _cleanup_tahoe_process(tahoe_transport, exited, allow_missing=False):
"""
Terminate the given process with a kill signal (SIGKILL on POSIX,
TerminateProcess on Windows).
@ -154,13 +165,11 @@ def _cleanup_tahoe_process(tahoe_transport, exited):
"""
from twisted.internet import reactor
try:
print("signaling {} with TERM".format(tahoe_transport.pid))
tahoe_transport.signalProcess('TERM')
_cleanup_tahoe_process_async(tahoe_transport, allow_missing=allow_missing)
except ProcessExitedAlready:
print("signaled, blocking on exit")
block_with_timeout(exited, reactor)
print("exited, goodbye")
except ProcessExitedAlready:
pass
def _tahoe_runner_optional_coverage(proto, reactor, request, other_args):
@ -207,8 +216,25 @@ class TahoeProcess(object):
def kill(self):
"""Kill the process, block until it's done."""
print(f"TahoeProcess.kill({self.transport.pid} / {self.node_dir})")
_cleanup_tahoe_process(self.transport, self.transport.exited)
def kill_async(self):
"""
Kill the process, return a Deferred that fires when it's done.
"""
print(f"TahoeProcess.kill_async({self.transport.pid} / {self.node_dir})")
_cleanup_tahoe_process_async(self.transport, allow_missing=False)
return self.transport.exited
def restart_async(self, reactor, request):
d = self.kill_async()
d.addCallback(lambda ignored: _run_node(reactor, self.node_dir, request, None))
def got_new_process(proc):
self._process_transport = proc.transport
d.addCallback(got_new_process)
return d
def __str__(self):
return "<TahoeProcess in '{}'>".format(self._node_dir)
@ -238,7 +264,7 @@ def _run_node(reactor, node_dir, request, magic_text, finalize=True):
transport.exited = protocol.exited
if finalize:
request.addfinalizer(partial(_cleanup_tahoe_process, transport, protocol.exited))
request.addfinalizer(partial(_cleanup_tahoe_process, transport, protocol.exited, allow_missing=True))
# XXX abusing the Deferred; should use .when_magic_seen() pattern

7
integration/vectors.py Normal file
View File

@ -0,0 +1,7 @@
from yaml import safe_load
from pathlib import Path
CHK_PATH = Path(__file__).parent / "_vectors_chk.yaml"
with CHK_PATH.open() as f:
chk = safe_load(f)