A (so far failing) test for SSH public key authentication.

This commit is contained in:
Itamar Turner-Trauring 2021-01-07 13:59:57 -05:00
parent b8879916b2
commit 3764e3b6b1
3 changed files with 60 additions and 25 deletions

View File

@ -8,7 +8,6 @@ from os.path import join, exists
from tempfile import mkdtemp, mktemp
from functools import partial
from json import loads
from subprocess import check_call
from foolscap.furl import (
decode_furl,
@ -41,6 +40,7 @@ from util import (
TahoeProcess,
cli,
_run_node,
generate_ssh_key
)
@ -343,12 +343,6 @@ def storage_nodes(reactor, temp_dir, introducer, introducer_furl, flog_gatherer,
return nodes
def generate_ssh_key(path):
"""Create a new SSH private/public key pair."""
check_call(["ckeygen", "--type", "rsa", "--no-passphrase", "--bits", "512",
"--file", path])
@pytest.fixture(scope='session')
@log_call(action_type=u"integration:alice", include_args=[], include_result=False)
def alice(reactor, temp_dir, introducer_furl, flog_gatherer, storage_nodes, request):
@ -366,7 +360,7 @@ def alice(reactor, temp_dir, introducer_furl, flog_gatherer, storage_nodes, requ
rwcap = loads(cli(process, "list-aliases", "--json"))["test"]["readwrite"]
# 2. Enable SFTP on the node:
ssh_key_path = join(process.node_dir, "private", "ssh_host_rsa_key")
host_ssh_key_path = join(process.node_dir, "private", "ssh_host_rsa_key")
accounts_path = join(process.node_dir, "private", "accounts")
with open(join(process.node_dir, "tahoe.cfg"), "a") as f:
f.write("""\
@ -376,15 +370,23 @@ port = tcp:8022:interface=127.0.0.1
host_pubkey_file = {ssh_key_path}.pub
host_privkey_file = {ssh_key_path}
accounts.file = {accounts_path}
""".format(ssh_key_path=ssh_key_path, accounts_path=accounts_path))
generate_ssh_key(ssh_key_path)
""".format(ssh_key_path=host_ssh_key_path, accounts_path=accounts_path))
generate_ssh_key(host_ssh_key_path)
# 3. Add a SFTP access file with username, password, and rwcap.
# 3. Add a SFTP access file with username/password and SSH key auth.
# The client SSH key path is typically going to be somewhere else (~/.ssh,
# typically), but for convenience sake for testing we'll put it inside node.
client_ssh_key_path = join(process.node_dir, "private", "ssh_client_rsa_key")
generate_ssh_key(client_ssh_key_path)
# Pub key format is "ssh-rsa <thekey> <username>". We want the key.
ssh_public_key = open(client_ssh_key_path + ".pub").read().strip().split()[1]
with open(accounts_path, "w") as f:
f.write("""\
alice password {}
""".format(rwcap))
# TODO add sftp access with public key
alice password {rwcap}
alice2 ssh-rsa {ssh_public_key} {rwcap}
""".format(rwcap=rwcap, ssh_public_key=ssh_public_key))
# 4. Restart the node with new SFTP config.
process.kill()

View File

@ -23,14 +23,11 @@ from paramiko.ssh_exception import AuthenticationException
import pytest
def connect_sftp(alice, username="alice", password="password"):
def connect_sftp(connect_args={"username": "alice", "password": "password"}):
"""Create an SFTP client."""
client = SSHClient()
client.set_missing_host_key_policy(AutoAddPolicy)
client.connect(
"localhost", username=username, password=password, port=8022,
look_for_keys=False
)
client.connect("localhost", port=8022, look_for_keys=False, **connect_args)
sftp = SFTPClient.from_transport(client.get_transport())
def rmdir(path, delete_root=True):
@ -49,16 +46,30 @@ def connect_sftp(alice, username="alice", password="password"):
return sftp
def test_bad_account_password(alice):
"""Can't login with unknown username or wrong password."""
def test_bad_account_password_ssh_key(alice):
"""
Can't login with unknown username, wrong password, or wrong SSH pub key.
"""
for u, p in [("alice", "wrong"), ("someuser", "password")]:
with pytest.raises(AuthenticationException):
connect_sftp(alice, u, p)
connect_sftp(connect_args={
"username": u, "password": p,
})
# TODO bad pubkey
def test_ssh_key_auth(alice):
"""It's possible to login authenticating with SSH public key."""
key_filename = join(alice.node_dir, "private", "ssh_client_rsa_key")
sftp = connect_sftp(connect_args={
"username": "alice2", "key_filename": key_filename
})
assert sftp.listdir() == []
def test_read_write_files(alice):
"""It's possible to upload and download files."""
sftp = connect_sftp(alice)
sftp = connect_sftp()
f = sftp.file("myfile", "wb")
f.write(b"abc")
f.write(b"def")
@ -75,7 +86,7 @@ def test_directories(alice):
It's possible to create, list directories, and create and remove files in
them.
"""
sftp = connect_sftp(alice)
sftp = connect_sftp()
assert sftp.listdir() == []
sftp.mkdir("childdir")
@ -101,3 +112,19 @@ def test_directories(alice):
sftp.rmdir("childdir")
assert sftp.listdir() == []
def test_rename(alice):
"""Directories and files can be renamed."""
sftp = connect_sftp()
sftp.mkdir("dir")
filepath = join("dir", "file")
with sftp.file(filepath, "wb") as f:
f.write(b"abc")
sftp.rename(filepath, join("dir", "file2"))
sftp.rename("dir", "dir2")
with sftp.file(join("dir2", "file2"), "rb") as f:
assert f.read() == b"abc"

View File

@ -5,7 +5,7 @@ from os import mkdir, environ
from os.path import exists, join
from six.moves import StringIO
from functools import partial
from subprocess import check_output
from subprocess import check_output, check_call
from twisted.python.filepath import (
FilePath,
@ -506,3 +506,9 @@ def await_client_ready(tahoe, timeout=10, liveness=60*2):
tahoe,
)
)
def generate_ssh_key(path):
"""Create a new SSH private/public key pair."""
check_call(["ckeygen", "--type", "rsa", "--no-passphrase", "--bits", "512",
"--file", path])