refactor: make sftp tests (etc) work with 'grid' refactoring

This commit is contained in:
meejah 2023-07-24 20:08:41 -06:00
parent 6c5cb02ee5
commit 45898ff8b8
8 changed files with 151 additions and 123 deletions

View File

@ -162,6 +162,10 @@ def flog_gatherer(reactor, temp_dir, flog_binary, request):
@pytest.fixture(scope='session')
@log_call(action_type=u"integration:grid", include_args=[])
def grid(reactor, request, temp_dir, flog_gatherer, port_allocator):
# XXX think: this creates an "empty" grid (introducer, no nodes);
# do we want to ensure it has some minimum storage-nodes at least?
# (that is, semantically does it make sense that 'a grid' is
# essentially empty, or not?)
g = pytest_twisted.blockon(
create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator)
)
@ -271,64 +275,17 @@ def storage_nodes(grid):
assert ok, "Storage node creation failed: {}".format(value)
return grid.storage_servers
@pytest.fixture(scope="session")
def alice_sftp_client_key_path(temp_dir):
# The client SSH key path is typically going to be somewhere else (~/.ssh,
# typically), but for convenience sake for testing we'll put it inside node.
return join(temp_dir, "alice", "private", "ssh_client_rsa_key")
@pytest.fixture(scope='session')
@log_call(action_type=u"integration:alice", include_args=[], include_result=False)
def alice(
reactor,
temp_dir,
introducer_furl,
flog_gatherer,
storage_nodes,
alice_sftp_client_key_path,
request,
):
process = pytest_twisted.blockon(
_create_node(
reactor, request, temp_dir, introducer_furl, flog_gatherer, "alice",
web_port="tcp:9980:interface=localhost",
storage=False,
)
)
pytest_twisted.blockon(await_client_ready(process))
# 1. Create a new RW directory cap:
cli(process, "create-alias", "test")
rwcap = loads(cli(process, "list-aliases", "--json"))["test"]["readwrite"]
# 2. Enable SFTP on the node:
host_ssh_key_path = join(process.node_dir, "private", "ssh_host_rsa_key")
accounts_path = join(process.node_dir, "private", "accounts")
with open(join(process.node_dir, "tahoe.cfg"), "a") as f:
f.write("""\
[sftpd]
enabled = true
port = tcp:8022:interface=127.0.0.1
host_pubkey_file = {ssh_key_path}.pub
host_privkey_file = {ssh_key_path}
accounts.file = {accounts_path}
""".format(ssh_key_path=host_ssh_key_path, accounts_path=accounts_path))
generate_ssh_key(host_ssh_key_path)
# 3. Add a SFTP access file with an SSH key for auth.
generate_ssh_key(alice_sftp_client_key_path)
# Pub key format is "ssh-rsa <thekey> <username>". We want the key.
ssh_public_key = open(alice_sftp_client_key_path + ".pub").read().strip().split()[1]
with open(accounts_path, "w") as f:
f.write("""\
alice-key ssh-rsa {ssh_public_key} {rwcap}
""".format(rwcap=rwcap, ssh_public_key=ssh_public_key))
# 4. Restart the node with new SFTP config.
pytest_twisted.blockon(process.restart_async(reactor, request))
pytest_twisted.blockon(await_client_ready(process))
print(f"Alice pid: {process.transport.pid}")
return process
def alice(reactor, request, grid, storage_nodes):
"""
:returns grid.Client: the associated instance for Alice
"""
alice = pytest_twisted.blockon(grid.add_client("alice"))
pytest_twisted.blockon(alice.add_sftp(reactor, request))
print(f"Alice pid: {alice.process.transport.pid}")
return alice
@pytest.fixture(scope='session')

View File

@ -10,6 +10,7 @@ rely on 'the' global grid as provided by fixtures like 'alice' or
from os import mkdir, listdir
from os.path import join, exists
from json import loads
from tempfile import mktemp
from time import sleep
@ -26,6 +27,7 @@ from twisted.internet.defer import (
inlineCallbacks,
returnValue,
maybeDeferred,
Deferred,
)
from twisted.internet.task import (
deferLater,
@ -54,19 +56,20 @@ from .util import (
_tahoe_runner_optional_coverage,
TahoeProcess,
await_client_ready,
generate_ssh_key,
cli,
reconfigure,
)
import attr
import pytest_twisted
# further directions:
# - "Grid" is unused, basically -- tie into the rest?
# - could make a Grid instance mandatory for create_* calls
# - could instead make create_* calls methods of Grid
# - Bring more 'util' or 'conftest' code into here
# - stop()/start()/restart() methods on StorageServer etc
# - more-complex stuff like config changes (which imply a restart too)?
# currently, we pass a "request" around a bunch but it seems to only
# be for addfinalizer() calls.
# - is "keeping" a request like that okay? What if it's a session-scoped one?
# (i.e. in Grid etc)
# - maybe limit to "a callback to hang your cleanup off of" (instead of request)?
@attr.s
@ -170,6 +173,8 @@ class StorageServer(object):
Note that self.process and self.protocol will be new instances
after this.
"""
# XXX per review comments, _can_ we make this "return a new
# instance" instead of mutating?
self.process.transport.signalProcess('TERM')
yield self.protocol.exited
self.process = yield _run_node(
@ -213,6 +218,27 @@ class Client(object):
protocol = attr.ib(
validator=attr.validators.provides(IProcessProtocol)
)
request = attr.ib() # original request, for addfinalizer()
## XXX convenience? or confusion?
# @property
# def node_dir(self):
# return self.process.node_dir
@inlineCallbacks
def reconfigure_zfec(self, reactor, request, zfec_params, convergence=None, max_segment_size=None):
"""
Reconfigure the ZFEC parameters for this node
"""
# XXX this is a stop-gap to keep tests running "as is"
# -> we should fix the tests so that they create a new client
# in the grid with the required parameters, instead of
# re-configuring Alice (or whomever)
rtn = yield Deferred.fromCoroutine(
reconfigure(reactor, self.request, self.process, zfec_params, convergence, max_segment_size)
)
return rtn
@inlineCallbacks
def restart(self, reactor, request, servers=1):
@ -226,6 +252,8 @@ class Client(object):
Note that self.process and self.protocol will be new instances
after this.
"""
# XXX similar to above, can we make this return a new instance
# instead of mutating?
self.process.transport.signalProcess('TERM')
yield self.protocol.exited
process = yield _run_node(
@ -235,8 +263,55 @@ class Client(object):
self.protocol = self.process.transport.proto
yield await_client_ready(self.process, minimum_number_of_servers=servers)
# XXX add stop / start ?
# ...maybe "reconfig" of some kind?
@inlineCallbacks
def add_sftp(self, reactor, request):
"""
"""
# if other things need to add or change configuration, further
# refactoring could be useful here (i.e. move reconfigure
# parts to their own functions)
# XXX why do we need an alias?
# 1. Create a new RW directory cap:
cli(self.process, "create-alias", "test")
rwcap = loads(cli(self.process, "list-aliases", "--json"))["test"]["readwrite"]
# 2. Enable SFTP on the node:
host_ssh_key_path = join(self.process.node_dir, "private", "ssh_host_rsa_key")
sftp_client_key_path = join(self.process.node_dir, "private", "ssh_client_rsa_key")
accounts_path = join(self.process.node_dir, "private", "accounts")
with open(join(self.process.node_dir, "tahoe.cfg"), "a") as f:
f.write(
("\n\n[sftpd]\n"
"enabled = true\n"
"port = tcp:8022:interface=127.0.0.1\n"
"host_pubkey_file = {ssh_key_path}.pub\n"
"host_privkey_file = {ssh_key_path}\n"
"accounts.file = {accounts_path}\n").format(
ssh_key_path=host_ssh_key_path,
accounts_path=accounts_path,
)
)
generate_ssh_key(host_ssh_key_path)
# 3. Add a SFTP access file with an SSH key for auth.
generate_ssh_key(sftp_client_key_path)
# Pub key format is "ssh-rsa <thekey> <username>". We want the key.
with open(sftp_client_key_path + ".pub") as pubkey_file:
ssh_public_key = pubkey_file.read().strip().split()[1]
with open(accounts_path, "w") as f:
f.write(
"alice-key ssh-rsa {ssh_public_key} {rwcap}\n".format(
rwcap=rwcap,
ssh_public_key=ssh_public_key,
)
)
# 4. Restart the node with new SFTP config.
print("restarting for SFTP")
yield self.restart(reactor, request)
print("restart done")
# XXX i think this is broken because we're "waiting for ready" during first bootstrap? or something?
@inlineCallbacks
@ -254,6 +329,7 @@ def create_client(reactor, request, temp_dir, introducer, flog_gatherer, name, w
Client(
process=node_process,
protocol=node_process.transport.proto,
request=request,
)
)
@ -370,7 +446,7 @@ class Grid(object):
Represents an entire Tahoe Grid setup
A Grid includes an Introducer, Flog Gatherer and some number of
Storage Servers.
Storage Servers. Optionally includes Clients.
"""
_reactor = attr.ib()
@ -436,7 +512,6 @@ class Grid(object):
returnValue(client)
# XXX THINK can we tie a whole *grid* to a single request? (I think
# that's all that makes sense)
@inlineCallbacks

View File

@ -8,9 +8,8 @@ from subprocess import Popen, PIPE, check_output, check_call
import pytest
from twisted.internet import reactor
from twisted.internet.threads import blockingCallFromThread
from twisted.internet.defer import Deferred
from .util import run_in_thread, cli, reconfigure
from .util import run_in_thread, cli
DATA = b"abc123 this is not utf-8 decodable \xff\x00\x33 \x11"
try:
@ -23,7 +22,7 @@ else:
@pytest.fixture(scope="session")
def get_put_alias(alice):
cli(alice, "create-alias", "getput")
cli(alice.process, "create-alias", "getput")
def read_bytes(path):
@ -39,14 +38,14 @@ def test_put_from_stdin(alice, get_put_alias, tmpdir):
"""
tempfile = str(tmpdir.join("file"))
p = Popen(
["tahoe", "--node-directory", alice.node_dir, "put", "-", "getput:fromstdin"],
["tahoe", "--node-directory", alice.process.node_dir, "put", "-", "getput:fromstdin"],
stdin=PIPE
)
p.stdin.write(DATA)
p.stdin.close()
assert p.wait() == 0
cli(alice, "get", "getput:fromstdin", tempfile)
cli(alice.process, "get", "getput:fromstdin", tempfile)
assert read_bytes(tempfile) == DATA
@ -58,10 +57,10 @@ def test_get_to_stdout(alice, get_put_alias, tmpdir):
tempfile = tmpdir.join("file")
with tempfile.open("wb") as f:
f.write(DATA)
cli(alice, "put", str(tempfile), "getput:tostdout")
cli(alice.process, "put", str(tempfile), "getput:tostdout")
p = Popen(
["tahoe", "--node-directory", alice.node_dir, "get", "getput:tostdout", "-"],
["tahoe", "--node-directory", alice.process.node_dir, "get", "getput:tostdout", "-"],
stdout=PIPE
)
assert p.stdout.read() == DATA
@ -78,11 +77,11 @@ def test_large_file(alice, get_put_alias, tmp_path):
tempfile = tmp_path / "file"
with tempfile.open("wb") as f:
f.write(DATA * 1_000_000)
cli(alice, "put", str(tempfile), "getput:largefile")
cli(alice.process, "put", str(tempfile), "getput:largefile")
outfile = tmp_path / "out"
check_call(
["tahoe", "--node-directory", alice.node_dir, "get", "getput:largefile", str(outfile)],
["tahoe", "--node-directory", alice.process.node_dir, "get", "getput:largefile", str(outfile)],
)
assert outfile.read_bytes() == tempfile.read_bytes()
@ -104,31 +103,30 @@ def test_upload_download_immutable_different_default_max_segment_size(alice, get
def set_segment_size(segment_size):
return blockingCallFromThread(
reactor,
lambda: Deferred.fromCoroutine(reconfigure(
lambda: alice.reconfigure_zfec(
reactor,
request,
alice,
(1, 1, 1),
None,
max_segment_size=segment_size
))
)
)
# 1. Upload file 1 with default segment size set to 1MB
set_segment_size(1024 * 1024)
cli(alice, "put", str(tempfile), "getput:seg1024kb")
cli(alice.process, "put", str(tempfile), "getput:seg1024kb")
# 2. Download file 1 with default segment size set to 128KB
set_segment_size(128 * 1024)
assert large_data == check_output(
["tahoe", "--node-directory", alice.node_dir, "get", "getput:seg1024kb", "-"]
["tahoe", "--node-directory", alice.process.node_dir, "get", "getput:seg1024kb", "-"]
)
# 3. Upload file 2 with default segment size set to 128KB
cli(alice, "put", str(tempfile), "getput:seg128kb")
cli(alice.process, "put", str(tempfile), "getput:seg128kb")
# 4. Download file 2 with default segment size set to 1MB
set_segment_size(1024 * 1024)
assert large_data == check_output(
["tahoe", "--node-directory", alice.node_dir, "get", "getput:seg128kb", "-"]
["tahoe", "--node-directory", alice.process.node_dir, "get", "getput:seg128kb", "-"]
)

View File

@ -173,7 +173,7 @@ def test_add_remove_client_file(reactor, request, temp_dir):
@pytest_twisted.inlineCallbacks
def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_allocator):
def _test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_allocator):
"""
A client with happines=2 fails to upload to a Grid when it is
using Grid Manager and there is only 1 storage server with a valid
@ -252,7 +252,7 @@ def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_a
@pytest_twisted.inlineCallbacks
def test_accept_storage_server(reactor, request, temp_dir, flog_gatherer, port_allocator):
def _test_accept_storage_server(reactor, request, temp_dir, flog_gatherer, port_allocator):
"""
Successfully upload to a Grid Manager enabled Grid.
"""

View File

@ -72,7 +72,7 @@ def test_bad_account_password_ssh_key(alice, tmpdir):
another_key = os.path.join(str(tmpdir), "ssh_key")
generate_ssh_key(another_key)
good_key = RSAKey(filename=os.path.join(alice.node_dir, "private", "ssh_client_rsa_key"))
good_key = RSAKey(filename=os.path.join(alice.process.node_dir, "private", "ssh_client_rsa_key"))
bad_key = RSAKey(filename=another_key)
# Wrong key:
@ -87,17 +87,16 @@ def test_bad_account_password_ssh_key(alice, tmpdir):
"username": "someoneelse", "pkey": good_key,
})
def sftp_client_key(node):
def sftp_client_key(client):
"""
:return RSAKey: the RSA client key associated with this grid.Client
"""
# XXX move to Client / grid.py?
return RSAKey(
filename=os.path.join(node.node_dir, "private", "ssh_client_rsa_key"),
filename=os.path.join(client.process.node_dir, "private", "ssh_client_rsa_key"),
)
def test_sftp_client_key_exists(alice, alice_sftp_client_key_path):
"""
Weakly validate the sftp client key fixture by asserting that *something*
exists at the supposed key path.
"""
assert os.path.exists(alice_sftp_client_key_path)
@run_in_thread
def test_ssh_key_auth(alice):

View File

@ -15,7 +15,8 @@ from pytest_twisted import ensureDeferred
from . import vectors
from .vectors import parameters
from .util import reconfigure, upload, TahoeProcess
from .util import reconfigure, upload
from .grid import Client
@mark.parametrize('convergence', parameters.CONVERGENCE_SECRETS)
def test_convergence(convergence):
@ -36,11 +37,11 @@ async def test_capability(reactor, request, alice, case, expected):
computed value.
"""
# rewrite alice's config to match params and convergence
await reconfigure(
reactor, request, alice, (1, case.params.required, case.params.total), case.convergence, case.segment_size)
await alice.reconfigure_zfec(
reactor, request, (1, case.params.required, case.params.total), case.convergence, case.segment_size)
# upload data in the correct format
actual = upload(alice, case.fmt, case.data)
actual = upload(alice.process, case.fmt, case.data)
# compare the resulting cap to the expected result
assert actual == expected
@ -82,7 +83,7 @@ async def skiptest_generate(reactor, request, alice):
async def generate(
reactor,
request,
alice: TahoeProcess,
alice: Client,
cases: Iterator[vectors.Case],
) -> AsyncGenerator[[vectors.Case, str], None]:
"""
@ -106,10 +107,9 @@ async def generate(
# reliability of this generator, be happy if we can put shares anywhere
happy = 1
for case in cases:
await reconfigure(
await alice.reconfigure_zfec(
reactor,
request,
alice,
(happy, case.params.required, case.params.total),
case.convergence,
case.segment_size
@ -117,5 +117,5 @@ async def generate(
# Give the format a chance to make an RSA key if it needs it.
case = evolve(case, fmt=case.fmt.customize())
cap = upload(alice, case.fmt, case.data)
cap = upload(alice.process, case.fmt, case.data)
yield case, cap

View File

@ -33,7 +33,7 @@ def test_index(alice):
"""
we can download the index file
"""
util.web_get(alice, u"")
util.web_get(alice.process, u"")
@run_in_thread
@ -41,7 +41,7 @@ def test_index_json(alice):
"""
we can download the index file as json
"""
data = util.web_get(alice, u"", params={u"t": u"json"})
data = util.web_get(alice.process, u"", params={u"t": u"json"})
# it should be valid json
json.loads(data)
@ -55,7 +55,7 @@ def test_upload_download(alice):
FILE_CONTENTS = u"some contents"
readcap = util.web_post(
alice, u"uri",
alice.process, u"uri",
data={
u"t": u"upload",
u"format": u"mdmf",
@ -67,7 +67,7 @@ def test_upload_download(alice):
readcap = readcap.strip()
data = util.web_get(
alice, u"uri",
alice.process, u"uri",
params={
u"uri": readcap,
u"filename": u"boom",
@ -85,11 +85,11 @@ def test_put(alice):
FILE_CONTENTS = b"added via PUT" * 20
resp = requests.put(
util.node_url(alice.node_dir, u"uri"),
util.node_url(alice.process.node_dir, u"uri"),
data=FILE_CONTENTS,
)
cap = allmydata.uri.from_string(resp.text.strip().encode('ascii'))
cfg = alice.get_config()
cfg = alice.process.get_config()
assert isinstance(cap, allmydata.uri.CHKFileURI)
assert cap.size == len(FILE_CONTENTS)
assert cap.total_shares == int(cfg.get_config("client", "shares.total"))
@ -116,7 +116,7 @@ def test_deep_stats(alice):
URIs work
"""
resp = requests.post(
util.node_url(alice.node_dir, "uri"),
util.node_url(alice.process.node_dir, "uri"),
params={
"format": "sdmf",
"t": "mkdir",
@ -130,7 +130,7 @@ def test_deep_stats(alice):
uri = url_unquote(resp.url)
assert 'URI:DIR2:' in uri
dircap = uri[uri.find("URI:DIR2:"):].rstrip('/')
dircap_uri = util.node_url(alice.node_dir, "uri/{}".format(url_quote(dircap)))
dircap_uri = util.node_url(alice.process.node_dir, "uri/{}".format(url_quote(dircap)))
# POST a file into this directory
FILE_CONTENTS = u"a file in a directory"
@ -176,7 +176,7 @@ def test_deep_stats(alice):
while tries > 0:
tries -= 1
resp = requests.get(
util.node_url(alice.node_dir, u"operations/something_random"),
util.node_url(alice.process.node_dir, u"operations/something_random"),
)
d = json.loads(resp.content)
if d['size-literal-files'] == len(FILE_CONTENTS):
@ -201,21 +201,21 @@ def test_status(alice):
FILE_CONTENTS = u"all the Important Data of alice\n" * 1200
resp = requests.put(
util.node_url(alice.node_dir, u"uri"),
util.node_url(alice.process.node_dir, u"uri"),
data=FILE_CONTENTS,
)
cap = resp.text.strip()
print("Uploaded data, cap={}".format(cap))
resp = requests.get(
util.node_url(alice.node_dir, u"uri/{}".format(url_quote(cap))),
util.node_url(alice.process.node_dir, u"uri/{}".format(url_quote(cap))),
)
print("Downloaded {} bytes of data".format(len(resp.content)))
assert str(resp.content, "ascii") == FILE_CONTENTS
resp = requests.get(
util.node_url(alice.node_dir, "status"),
util.node_url(alice.process.node_dir, "status"),
)
dom = html5lib.parse(resp.content)
@ -229,7 +229,7 @@ def test_status(alice):
for href in hrefs:
if href == u"/" or not href:
continue
resp = requests.get(util.node_url(alice.node_dir, href))
resp = requests.get(util.node_url(alice.process.node_dir, href))
if href.startswith(u"/status/up"):
assert b"File Upload Status" in resp.content
if b"Total Size: %d" % (len(FILE_CONTENTS),) in resp.content:
@ -241,7 +241,7 @@ def test_status(alice):
# download the specialized event information
resp = requests.get(
util.node_url(alice.node_dir, u"{}/event_json".format(href)),
util.node_url(alice.process.node_dir, u"{}/event_json".format(href)),
)
js = json.loads(resp.content)
# there's usually just one "read" operation, but this can handle many ..
@ -264,14 +264,14 @@ async def test_directory_deep_check(reactor, request, alice):
required = 2
total = 4
await util.reconfigure(reactor, request, alice, (happy, required, total), convergence=None)
await alice.reconfigure_zfec(reactor, request, (happy, required, total), convergence=None)
await deferToThread(_test_directory_deep_check_blocking, alice)
def _test_directory_deep_check_blocking(alice):
# create a directory
resp = requests.post(
util.node_url(alice.node_dir, u"uri"),
util.node_url(alice.process.node_dir, u"uri"),
params={
u"t": u"mkdir",
u"redirect_to_result": u"true",
@ -320,7 +320,7 @@ def _test_directory_deep_check_blocking(alice):
print("Uploaded data1, cap={}".format(cap1))
resp = requests.get(
util.node_url(alice.node_dir, u"uri/{}".format(url_quote(cap0))),
util.node_url(alice.process.node_dir, u"uri/{}".format(url_quote(cap0))),
params={u"t": u"info"},
)
@ -484,14 +484,14 @@ def test_mkdir_with_children(alice):
# create a file to put in our directory
FILE_CONTENTS = u"some file contents\n" * 500
resp = requests.put(
util.node_url(alice.node_dir, u"uri"),
util.node_url(alice.process.node_dir, u"uri"),
data=FILE_CONTENTS,
)
filecap = resp.content.strip()
# create a (sub) directory to put in our directory
resp = requests.post(
util.node_url(alice.node_dir, u"uri"),
util.node_url(alice.process.node_dir, u"uri"),
params={
u"t": u"mkdir",
}
@ -534,7 +534,7 @@ def test_mkdir_with_children(alice):
# create a new directory with one file and one sub-dir (all-at-once)
resp = util.web_post(
alice, u"uri",
alice.process, u"uri",
params={u"t": "mkdir-with-children"},
data=json.dumps(meta),
)

View File

@ -741,7 +741,6 @@ class SSK:
def load(cls, params: dict) -> SSK:
assert params.keys() == {"format", "mutable", "key"}
return cls(params["format"], params["key"].encode("ascii"))
def customize(self) -> SSK:
"""
Return an SSK with a newly generated random RSA key.
@ -780,7 +779,7 @@ def upload(alice: TahoeProcess, fmt: CHK | SSK, data: bytes) -> str:
f.write(data)
f.flush()
with fmt.to_argv() as fmt_argv:
argv = [alice, "put"] + fmt_argv + [f.name]
argv = [alice.process, "put"] + fmt_argv + [f.name]
return cli(*argv).decode("utf-8").strip()