From 49b513fefc0fafa1686645c36e8be84792fd59bf Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Thu, 22 Dec 2022 10:51:59 -0500 Subject: [PATCH] Get basic generation working, apparently --- integration/_vectors_chk.yaml | 4 + integration/conftest.py | 8 +- integration/test_vectors.py | 147 ++++++++++++++++++++++++---------- integration/util.py | 38 +++++++-- integration/vectors.py | 7 ++ 5 files changed, 149 insertions(+), 55 deletions(-) create mode 100644 integration/_vectors_chk.yaml create mode 100644 integration/vectors.py diff --git a/integration/_vectors_chk.yaml b/integration/_vectors_chk.yaml new file mode 100644 index 000000000..070d9b4be --- /dev/null +++ b/integration/_vectors_chk.yaml @@ -0,0 +1,4 @@ +? 1/1,0c0beacef8877bbf2416eb00f2b5dc96354e26dd1df5517320459b1236860f8c,2edc986847e209b4016e141a6dc8716d3207350f416969382d431539bf292e4a +: URI:CHK:mri732rh3meyh4drikau3a24ba:6tsj5wvcp6szdhmrbu5bea57wduoza64y6nd2lm7aleqpsxjm5la:1:1:1024 +? 1/3,0c0beacef8877bbf2416eb00f2b5dc96354e26dd1df5517320459b1236860f8c,2edc986847e209b4016e141a6dc8716d3207350f416969382d431539bf292e4a +: URI:CHK:ycvogzi6wllnq2bkx3t6zdtwju:um42l4yen7jiwdfgirvedtty3tt3xuhjiyxzqoourvughtxjar3q:1:3:1024 diff --git a/integration/conftest.py b/integration/conftest.py index e284b5cba..1c43aecfa 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -49,7 +49,6 @@ from .util import ( await_client_ready, TahoeProcess, cli, - _run_node, generate_ssh_key, block_with_timeout, ) @@ -359,7 +358,7 @@ def alice_sftp_client_key_path(temp_dir): # typically), but for convenience sake for testing we'll put it inside node. return join(temp_dir, "alice", "private", "ssh_client_rsa_key") -@pytest.fixture(scope='session') +@pytest.fixture(scope='function') @log_call(action_type=u"integration:alice", include_args=[], include_result=False) def alice( reactor, @@ -410,10 +409,9 @@ alice-key ssh-rsa {ssh_public_key} {rwcap} """.format(rwcap=rwcap, ssh_public_key=ssh_public_key)) # 4. Restart the node with new SFTP config. - process.kill() - pytest_twisted.blockon(_run_node(reactor, process.node_dir, request, None)) - + pytest_twisted.blockon(process.restart_async(reactor, request)) await_client_ready(process) + print(f"Alice pid: {process.transport.pid}") return process diff --git a/integration/test_vectors.py b/integration/test_vectors.py index d7fe214a1..7753ac18d 100644 --- a/integration/test_vectors.py +++ b/integration/test_vectors.py @@ -2,83 +2,142 @@ Verify certain results against test vectors with well-known results. """ +from typing import TypeVar, Iterator, Awaitable, Callable + +from tempfile import NamedTemporaryFile from hashlib import sha256 from itertools import product +from yaml import safe_dump -import vectors +from pytest import mark +from pytest_twisted import ensureDeferred + +from . import vectors +from .util import cli, await_client_ready +from allmydata.client import read_config +from allmydata.util import base32 CONVERGENCE_SECRETS = [ b"aaaaaaaaaaaaaaaa", - b"bbbbbbbbbbbbbbbb", - b"abcdefghijklmnop", - b"hello world stuf", - b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f", - sha256(b"Hello world").digest()[:16], + # b"bbbbbbbbbbbbbbbb", + # b"abcdefghijklmnop", + # b"hello world stuf", + # b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f", + # sha256(b"Hello world").digest()[:16], ] ONE_KB = sha256(b"Hello world").digest() * 32 -assert length(ONE_KB) == 1024 +assert len(ONE_KB) == 1024 OBJECT_DATA = [ b"a" * 1024, - b"b" * 2048, - b"c" * 4096, - (ONE_KB * 8)[:-1], - (ONE_KB * 8) + b"z", - (ONE_KB * 128)[:-1], - (ONE_KB * 128) + b"z", + # b"b" * 2048, + # b"c" * 4096, + # (ONE_KB * 8)[:-1], + # (ONE_KB * 8) + b"z", + # (ONE_KB * 128)[:-1], + # (ONE_KB * 128) + b"z", ] ZFEC_PARAMS = [ (1, 1), (1, 3), - (2, 3), - (3, 10), - (71, 255), - (101, 256), + # (2, 3), + # (3, 10), + # (71, 255), + # (101, 256), ] -@parametrize('convergence', CONVERGENCE_SECRETS) +@mark.parametrize('convergence', CONVERGENCE_SECRETS) def test_convergence(convergence): assert isinstance(convergence, bytes), "Convergence secret must be bytes" assert len(convergence) == 16, "Convergence secret must by 16 bytes" -@parametrize('daata', OBJECT_DATA) +@mark.parametrize('data', OBJECT_DATA) def test_data(data): assert isinstance(data, bytes), "Object data must be bytes." - -@parametrize('params', ZFEC_PARAMS) -@parametrize('convergence', CONVERGENCE_SECRETS) -@parametrize('data', OBJECT_DATA) -def test_chk_capability(alice, params, convergence, data): +@mark.parametrize('params', ZFEC_PARAMS) +@mark.parametrize('convergence', CONVERGENCE_SECRETS) +@mark.parametrize('data', OBJECT_DATA) +@ensureDeferred +async def test_chk_capability(alice, params, convergence, data): # rewrite alice's config to match params and convergence - needed, total = params - config = read_config(alice.path, "tub.port") - config.set_config("client", "shares.happy", 1) - config.set_config("client", "shares.needed", str(needed)) - config.set_config("client", "shares.happy", str(total)) - - # restart alice - alice.kill() - yield util._run_node(reactor, alice.path, request, None) + await reconfigure(alice, params, convergence) # upload data as a CHK - actual = upload(alice, data) + actual = upload_immutable(alice, data) # compare the resulting cap to the expected result - expected = vectors.immutable[params, convergence, digest(data)] + expected = vectors.chk[key(params, convergence, data)] assert actual == expected -def test_generate(alice): - caps = {} - for params, secret, data in product(ZFEC_PARAMS, CONVERGENCE_SECRETS, OBJECT_DATA): - caps[fec, secret, sha256(data).hexdigest()] = create_immutable(params, secret, data) - print(dump(caps)) -def create_immutable(alice, params, secret, data): - tempfile = str(tmpdir.join("file")) - with tempfile.open("wb") as f: +α = TypeVar("α") +β = TypeVar("β") + +async def asyncfoldr( + i: Iterator[Awaitable[α]], + f: Callable[[α, β], β], + initial: β, +) -> β: + result = initial + async for a in i: + result = f(a, result) + return result + +def insert(item: tuple[α, β], d: dict[α, β]) -> dict[α, β]: + d[item[0]] = item[1] + return d + +@ensureDeferred +async def test_generate(reactor, request, alice): + results = await asyncfoldr( + generate(reactor, request, alice), + insert, + {}, + ) + with vectors.CHK_PATH.open("w") as f: + f.write(safe_dump(results)) + + +async def reconfigure(reactor, request, alice, params, convergence): + needed, total = params + config = read_config(alice.node_dir, "tub.port") + config.set_config("client", "shares.happy", str(1)) + config.set_config("client", "shares.needed", str(needed)) + config.set_config("client", "shares.total", str(total)) + config.write_private_config("convergence", base32.b2a(convergence)) + + # restart alice + print(f"Restarting {alice.node_dir} for ZFEC reconfiguration") + await alice.restart_async(reactor, request) + print("Restarted. Waiting for ready state.") + await_client_ready(alice) + print("Ready.") + + +async def generate(reactor, request, alice): + node_key = (None, None) + for params, secret, data in product(ZFEC_PARAMS, CONVERGENCE_SECRETS, OBJECT_DATA): + if node_key != (params, secret): + await reconfigure(reactor, request, alice, params, secret) + node_key = (params, secret) + + yield key(params, secret, data), upload_immutable(alice, data) + + +def key(params, secret, data): + return f"{params[0]}/{params[1]},{digest(secret)},{digest(data)}" + + +def upload_immutable(alice, data): + with NamedTemporaryFile() as f: f.write(data) - actual = cli(alice, "put", str(datafile)) + f.flush() + return cli(alice, "put", "--format=chk", f.name).decode("utf-8").strip() + + +def digest(bs): + return sha256(bs).hexdigest() diff --git a/integration/util.py b/integration/util.py index ad9249e45..c2394375a 100644 --- a/integration/util.py +++ b/integration/util.py @@ -142,7 +142,18 @@ class _MagicTextProtocol(ProcessProtocol): sys.stdout.write(data) -def _cleanup_tahoe_process(tahoe_transport, exited): +def _cleanup_tahoe_process_async(tahoe_transport, allow_missing): + if tahoe_transport.pid is None: + if allow_missing: + print("Process already cleaned up and that's okay.") + return + else: + raise ValueError("Process is not running") + print("signaling {} with TERM".format(tahoe_transport.pid)) + tahoe_transport.signalProcess('TERM') + + +def _cleanup_tahoe_process(tahoe_transport, exited, allow_missing=False): """ Terminate the given process with a kill signal (SIGKILL on POSIX, TerminateProcess on Windows). @@ -154,13 +165,11 @@ def _cleanup_tahoe_process(tahoe_transport, exited): """ from twisted.internet import reactor try: - print("signaling {} with TERM".format(tahoe_transport.pid)) - tahoe_transport.signalProcess('TERM') + _cleanup_tahoe_process_async(tahoe_transport, allow_missing=allow_missing) + except ProcessExitedAlready: print("signaled, blocking on exit") block_with_timeout(exited, reactor) print("exited, goodbye") - except ProcessExitedAlready: - pass def _tahoe_runner_optional_coverage(proto, reactor, request, other_args): @@ -207,8 +216,25 @@ class TahoeProcess(object): def kill(self): """Kill the process, block until it's done.""" + print(f"TahoeProcess.kill({self.transport.pid} / {self.node_dir})") _cleanup_tahoe_process(self.transport, self.transport.exited) + def kill_async(self): + """ + Kill the process, return a Deferred that fires when it's done. + """ + print(f"TahoeProcess.kill_async({self.transport.pid} / {self.node_dir})") + _cleanup_tahoe_process_async(self.transport, allow_missing=False) + return self.transport.exited + + def restart_async(self, reactor, request): + d = self.kill_async() + d.addCallback(lambda ignored: _run_node(reactor, self.node_dir, request, None)) + def got_new_process(proc): + self._process_transport = proc.transport + d.addCallback(got_new_process) + return d + def __str__(self): return "".format(self._node_dir) @@ -238,7 +264,7 @@ def _run_node(reactor, node_dir, request, magic_text, finalize=True): transport.exited = protocol.exited if finalize: - request.addfinalizer(partial(_cleanup_tahoe_process, transport, protocol.exited)) + request.addfinalizer(partial(_cleanup_tahoe_process, transport, protocol.exited, allow_missing=True)) # XXX abusing the Deferred; should use .when_magic_seen() pattern diff --git a/integration/vectors.py b/integration/vectors.py new file mode 100644 index 000000000..53e581a1e --- /dev/null +++ b/integration/vectors.py @@ -0,0 +1,7 @@ +from yaml import safe_load +from pathlib import Path + +CHK_PATH = Path(__file__).parent / "_vectors_chk.yaml" + +with CHK_PATH.open() as f: + chk = safe_load(f)