Remove password-based authentication from the SFTP frontend

This commit is contained in:
Jean-Paul Calderone 2021-10-25 20:50:19 -04:00
parent 0b4e6754a3
commit 5878a64890
3 changed files with 255 additions and 125 deletions

View File

@ -47,8 +47,8 @@ servers must be configured with a way to first authenticate a user (confirm
that a prospective client has a legitimate claim to whatever authorities we
might grant a particular user), and second to decide what directory cap
should be used as the root directory for a log-in by the authenticated user.
A username and password can be used; as of Tahoe-LAFS v1.11, RSA or DSA
public key authentication is also supported.
As of Tahoe-LAFS v1.17,
RSA/DSA public key authentication is the only supported mechanism.
Tahoe-LAFS provides two mechanisms to perform this user-to-cap mapping.
The first (recommended) is a simple flat file with one account per line.
@ -59,20 +59,14 @@ Creating an Account File
To use the first form, create a file (for example ``BASEDIR/private/accounts``)
in which each non-comment/non-blank line is a space-separated line of
(USERNAME, PASSWORD, ROOTCAP), like so::
(USERNAME, KEY-TYPE, PUBLIC-KEY, ROOTCAP), like so::
% cat BASEDIR/private/accounts
# This is a password line: username password cap
alice password URI:DIR2:ioej8xmzrwilg772gzj4fhdg7a:wtiizszzz2rgmczv4wl6bqvbv33ag4kvbr6prz3u6w3geixa6m6a
bob sekrit URI:DIR2:6bdmeitystckbl9yqlw7g56f4e:serp5ioqxnh34mlbmzwvkp3odehsyrr7eytt5f64we3k9hhcrcja
# This is a public key line: username keytype pubkey cap
# (Tahoe-LAFS v1.11 or later)
carol ssh-rsa AAAA... URI:DIR2:ovjy4yhylqlfoqg2vcze36dhde:4d4f47qko2xm5g7osgo2yyidi5m4muyo2vjjy53q4vjju2u55mfa
For public key authentication, the keytype may be either "ssh-rsa" or "ssh-dsa".
To avoid ambiguity between passwords and public key types, a password cannot
start with "ssh-".
The key type may be either "ssh-rsa" or "ssh-dsa".
Now add an ``accounts.file`` directive to your ``tahoe.cfg`` file, as described in
the next sections.

View File

@ -32,65 +32,93 @@ class FTPAvatarID(object):
@implementer(checkers.ICredentialsChecker)
class AccountFileChecker(object):
credentialInterfaces = (credentials.IUsernamePassword,
credentials.IUsernameHashedPassword,
credentials.ISSHPrivateKey)
credentialInterfaces = (credentials.ISSHPrivateKey,)
def __init__(self, client, accountfile):
self.client = client
self.passwords = BytesKeyDict()
pubkeys = BytesKeyDict()
self.rootcaps = BytesKeyDict()
with open(abspath_expanduser_unicode(accountfile), "rb") as f:
for line in f:
line = line.strip()
if line.startswith(b"#") or not line:
continue
name, passwd, rest = line.split(None, 2)
if passwd.startswith(b"ssh-"):
bits = rest.split()
keystring = b" ".join([passwd] + bits[:-1])
key = keys.Key.fromString(keystring)
rootcap = bits[-1]
pubkeys[name] = [key]
else:
self.passwords[name] = passwd
rootcap = rest
self.rootcaps[name] = rootcap
path = abspath_expanduser_unicode(accountfile)
with open_account_file(path) as f:
self.rootcaps, pubkeys = load_account_file(f)
self._pubkeychecker = SSHPublicKeyChecker(InMemorySSHKeyDB(pubkeys))
def _avatarId(self, username):
return FTPAvatarID(username, self.rootcaps[username])
def _cbPasswordMatch(self, matched, username):
if matched:
return self._avatarId(username)
raise error.UnauthorizedLogin
def requestAvatarId(self, creds):
if credentials.ISSHPrivateKey.providedBy(creds):
d = defer.maybeDeferred(self._pubkeychecker.requestAvatarId, creds)
d.addCallback(self._avatarId)
return d
elif credentials.IUsernameHashedPassword.providedBy(creds):
return self._checkPassword(creds)
elif credentials.IUsernamePassword.providedBy(creds):
return self._checkPassword(creds)
else:
raise NotImplementedError()
raise NotImplementedError()
def _checkPassword(self, creds):
"""
Determine whether the password in the given credentials matches the
password in the account file.
def open_account_file(path):
"""
Open and return the accounts file at the given path.
"""
return open(path, "rt", encoding="utf-8")
Returns a Deferred that fires with the username if the password matches
or with an UnauthorizedLogin failure otherwise.
"""
try:
correct = self.passwords[creds.username]
except KeyError:
return defer.fail(error.UnauthorizedLogin())
def load_account_file(lines):
"""
Load credentials from an account file.
d = defer.maybeDeferred(creds.checkPassword, correct)
d.addCallback(self._cbPasswordMatch, creds.username)
return d
:param lines: An iterable of account lines to load.
:return: See ``create_account_maps``.
"""
return create_account_maps(
parse_accounts(
content_lines(
lines,
),
),
)
def content_lines(lines):
"""
Drop empty and commented-out lines (``#``-prefixed) from an iterator of
lines.
:param lines: An iterator of lines to process.
:return: An iterator of lines including only those from ``lines`` that
include content intended to be loaded.
"""
for line in lines:
line = line.strip()
if line and not line.startswith("#"):
yield line
def parse_accounts(lines):
"""
Parse account lines into their components (name, key, rootcap).
"""
for line in lines:
name, passwd, rest = line.split(None, 2)
if not passwd.startswith("ssh-"):
raise ValueError(
"Password-based authentication is not supported; "
"configure key-based authentication instead."
)
bits = rest.split()
keystring = " ".join([passwd] + bits[:-1])
key = keys.Key.fromString(keystring)
rootcap = bits[-1]
yield (name, key, rootcap)
def create_account_maps(accounts):
"""
Build mappings from account names to keys and rootcaps.
:param accounts: An iterator if (name, key, rootcap) tuples.
:return: A tuple of two dicts. The first maps account names to rootcaps.
The second maps account names to public keys.
"""
rootcaps = BytesKeyDict()
pubkeys = BytesKeyDict()
for (name, key, rootcap) in accounts:
name_bytes = name.encode("utf-8")
rootcaps[name_bytes] = rootcap.encode("utf-8")
pubkeys[name_bytes] = [key]
return rootcaps, pubkeys

View File

@ -8,7 +8,17 @@ from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from future.builtins import str # noqa: F401
from future.builtins import str, open # noqa: F401
from hypothesis import (
given,
)
from hypothesis.strategies import (
text,
characters,
tuples,
lists,
)
from twisted.trial import unittest
from twisted.python import filepath
@ -38,25 +48,184 @@ dBSD8940XU3YW+oeq8e+p3yQ2GinHfeJ3BYQyNQLuMAJ
-----END RSA PRIVATE KEY-----
""")
DUMMY_ACCOUNTS = u"""\
alice herpassword URI:DIR2:aaaaaaaaaaaaaaaaaaaaaaaaaa:1111111111111111111111111111111111111111111111111111
bob sekrit URI:DIR2:bbbbbbbbbbbbbbbbbbbbbbbbbb:2222222222222222222222222222222222222222222222222222
DUMMY_KEY_DSA = keys.Key.fromString("""\
-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAABsQAAAAdzc2gtZH
NzAAAAgQDKMh/ELaiP21LYRBuPbUy7dUhv/XZwV7aS1LzxSP+KaJvtDOei8X76XEAfkqX+
aGh9eup+BLkezrV6LlpO9uPzhY8ChlKpkvw5PZKv/2agSrVxZyG7yEzHNtSBQXE6qNMwIk
N/ycXLGCqyAhQSzRhLz9ETNaslRDLo7YyVWkiuAQAAABUA5nTatFKux5EqZS4EarMWFRBU
i1UAAACAFpkkK+JsPixSTPyn0DNMoGKA0Klqy8h61Ds6pws+4+aJQptUBshpwNw1ypo7MO
+goDZy3wwdWtURTPGMgesNdEfxp8L2/kqE4vpMK0myoczCqOiWMeNB/x1AStbSkBI8WmHW
2htgsC01xbaix/FrA3edK8WEyv+oIxlbV1FkrPkAAACANb0EpCc8uoR4/32rO2JLsbcLBw
H5wc2khe7AKkIa9kUknRIRvoCZUtXF5XuXXdRmnpVEm2KcsLdtZjip43asQcqgt0Kz3nuF
kAf7bI98G1waFUimcCSPsal4kCmW2HC11sg/BWOt5qczX/0/3xVxpo6juUeBq9ncnFTvPX
5fOlEAAAHoJkFqHiZBah4AAAAHc3NoLWRzcwAAAIEAyjIfxC2oj9tS2EQbj21Mu3VIb/12
cFe2ktS88Uj/imib7QznovF++lxAH5Kl/mhofXrqfgS5Hs61ei5aTvbj84WPAoZSqZL8OT
2Sr/9moEq1cWchu8hMxzbUgUFxOqjTMCJDf8nFyxgqsgIUEs0YS8/REzWrJUQy6O2MlVpI
rgEAAAAVAOZ02rRSrseRKmUuBGqzFhUQVItVAAAAgBaZJCvibD4sUkz8p9AzTKBigNCpas
vIetQ7OqcLPuPmiUKbVAbIacDcNcqaOzDvoKA2ct8MHVrVEUzxjIHrDXRH8afC9v5KhOL6
TCtJsqHMwqjoljHjQf8dQErW0pASPFph1tobYLAtNcW2osfxawN3nSvFhMr/qCMZW1dRZK
z5AAAAgDW9BKQnPLqEeP99qztiS7G3CwcB+cHNpIXuwCpCGvZFJJ0SEb6AmVLVxeV7l13U
Zp6VRJtinLC3bWY4qeN2rEHKoLdCs957hZAH+2yPfBtcGhVIpnAkj7GpeJAplthwtdbIPw
VjreanM1/9P98VcaaOo7lHgavZ3JxU7z1+XzpRAAAAFQC7360pZLbv7PFt4BPFJ8zAHxAe
QwAAAA5leGFya3VuQGJhcnlvbgECAwQ=
-----END OPENSSH PRIVATE KEY-----
""")
# dennis password URI:DIR2:aaaaaaaaaaaaaaaaaaaaaaaaaa:1111111111111111111111111111111111111111111111111111
ACCOUNTS = u"""\
# dennis {key} URI:DIR2:aaaaaaaaaaaaaaaaaaaaaaaaaa:1111111111111111111111111111111111111111111111111111
carol {key} URI:DIR2:cccccccccccccccccccccccccc:3333333333333333333333333333333333333333333333333333
""".format(key=str(DUMMY_KEY.public().toString("openssh"), "ascii")).encode("ascii")
# Python str.splitlines considers NEXT LINE, LINE SEPARATOR, and PARAGRAPH
# separator to be line separators, too. However, file.readlines() does not...
LINE_SEPARATORS = (
'\x0a', # line feed
'\x0b', # vertical tab
'\x0c', # form feed
'\x0d', # carriage return
)
class AccountFileParserTests(unittest.TestCase):
"""
Tests for ``load_account_file`` and its helper functions.
"""
@given(lists(
text(alphabet=characters(
blacklist_categories=(
# Surrogates are an encoding trick to help out UTF-16.
# They're not necessary to represent any non-surrogate code
# point in unicode. They're also not legal individually but
# only in pairs.
'Cs',
),
# Exclude all our line separators too.
blacklist_characters=("\n", "\r"),
)),
))
def test_ignore_comments(self, lines):
"""
``auth.content_lines`` filters out lines beginning with `#` and empty
lines.
"""
expected = set()
# It's not clear that real files and StringIO behave sufficiently
# similarly to use the latter instead of the former here. In
# particular, they seem to have distinct and incompatible
# line-splitting rules.
bufpath = self.mktemp()
with open(bufpath, "wt", encoding="utf-8") as buf:
for line in lines:
stripped = line.strip()
is_content = stripped and not stripped.startswith("#")
if is_content:
expected.add(stripped)
buf.write(line + "\n")
with auth.open_account_file(bufpath) as buf:
actual = set(auth.content_lines(buf))
self.assertEqual(expected, actual)
def test_parse_accounts(self):
"""
``auth.parse_accounts`` accepts an iterator of account lines and returns
an iterator of structured account data.
"""
alice_key = DUMMY_KEY.public().toString("openssh").decode("utf-8")
alice_cap = "URI:DIR2:aaaa:1111"
bob_key = DUMMY_KEY_DSA.public().toString("openssh").decode("utf-8")
bob_cap = "URI:DIR2:aaaa:2222"
self.assertEqual(
list(auth.parse_accounts([
"alice {} {}".format(alice_key, alice_cap),
"bob {} {}".format(bob_key, bob_cap),
])),
[
("alice", DUMMY_KEY.public(), alice_cap),
("bob", DUMMY_KEY_DSA.public(), bob_cap),
],
)
def test_parse_accounts_rejects_passwords(self):
"""
The iterator returned by ``auth.parse_accounts`` raises ``ValueError``
when processing reaches a line that has what looks like a password
instead of an ssh key.
"""
with self.assertRaises(ValueError):
list(auth.parse_accounts(["alice apassword URI:DIR2:aaaa:1111"]))
def test_create_account_maps(self):
"""
``auth.create_account_maps`` accepts an iterator of structured account
data and returns two mappings: one from account name to rootcap, the
other from account name to public keys.
"""
alice_cap = "URI:DIR2:aaaa:1111"
alice_key = DUMMY_KEY.public()
bob_cap = "URI:DIR2:aaaa:2222"
bob_key = DUMMY_KEY_DSA.public()
accounts = [
("alice", alice_key, alice_cap),
("bob", bob_key, bob_cap),
]
self.assertEqual(
auth.create_account_maps(accounts),
({
b"alice": alice_cap.encode("utf-8"),
b"bob": bob_cap.encode("utf-8"),
},
{
b"alice": [alice_key],
b"bob": [bob_key],
}),
)
def test_load_account_file(self):
"""
``auth.load_account_file`` accepts an iterator of serialized account lines
and returns two mappings: one from account name to rootcap, the other
from account name to public keys.
"""
alice_key = DUMMY_KEY.public().toString("openssh").decode("utf-8")
alice_cap = "URI:DIR2:aaaa:1111"
bob_key = DUMMY_KEY_DSA.public().toString("openssh").decode("utf-8")
bob_cap = "URI:DIR2:aaaa:2222"
accounts = [
"alice {} {}".format(alice_key, alice_cap),
"bob {} {}".format(bob_key, bob_cap),
"# carol {} {}".format(alice_key, alice_cap),
]
self.assertEqual(
auth.load_account_file(accounts),
({
b"alice": alice_cap.encode("utf-8"),
b"bob": bob_cap.encode("utf-8"),
},
{
b"alice": [DUMMY_KEY.public()],
b"bob": [DUMMY_KEY_DSA.public()],
}),
)
class AccountFileCheckerKeyTests(unittest.TestCase):
"""
Tests for key handling done by allmydata.frontends.auth.AccountFileChecker.
"""
def setUp(self):
self.account_file = filepath.FilePath(self.mktemp())
self.account_file.setContent(DUMMY_ACCOUNTS)
self.account_file.setContent(ACCOUNTS)
abspath = abspath_expanduser_unicode(str(self.account_file.path))
self.checker = auth.AccountFileChecker(None, abspath)
def test_unknown_user_ssh(self):
def test_unknown_user(self):
"""
AccountFileChecker.requestAvatarId returns a Deferred that fires with
UnauthorizedLogin if called with an SSHPrivateKey object with a
@ -67,67 +236,6 @@ class AccountFileCheckerKeyTests(unittest.TestCase):
avatarId = self.checker.requestAvatarId(key_credentials)
return self.assertFailure(avatarId, error.UnauthorizedLogin)
def test_unknown_user_password(self):
"""
AccountFileChecker.requestAvatarId returns a Deferred that fires with
UnauthorizedLogin if called with an SSHPrivateKey object with a
username not present in the account file.
We use a commented out user, so we're also checking that comments are
skipped.
"""
key_credentials = credentials.UsernamePassword(b"dennis", b"password")
d = self.checker.requestAvatarId(key_credentials)
return self.assertFailure(d, error.UnauthorizedLogin)
def test_password_auth_user_with_ssh_key(self):
"""
AccountFileChecker.requestAvatarId returns a Deferred that fires with
UnauthorizedLogin if called with an SSHPrivateKey object for a username
only associated with a password in the account file.
"""
key_credentials = credentials.SSHPrivateKey(
b"alice", b"md5", None, None, None)
avatarId = self.checker.requestAvatarId(key_credentials)
return self.assertFailure(avatarId, error.UnauthorizedLogin)
def test_password_auth_user_with_correct_password(self):
"""
AccountFileChecker.requestAvatarId returns a Deferred that fires with
the user if the correct password is given.
"""
key_credentials = credentials.UsernamePassword(b"alice", b"herpassword")
d = self.checker.requestAvatarId(key_credentials)
def authenticated(avatarId):
self.assertEqual(
(b"alice",
b"URI:DIR2:aaaaaaaaaaaaaaaaaaaaaaaaaa:1111111111111111111111111111111111111111111111111111"),
(avatarId.username, avatarId.rootcap))
return d
def test_password_auth_user_with_correct_hashed_password(self):
"""
AccountFileChecker.requestAvatarId returns a Deferred that fires with
the user if the correct password is given in hashed form.
"""
key_credentials = credentials.UsernameHashedPassword(b"alice", b"herpassword")
d = self.checker.requestAvatarId(key_credentials)
def authenticated(avatarId):
self.assertEqual(
(b"alice",
b"URI:DIR2:aaaaaaaaaaaaaaaaaaaaaaaaaa:1111111111111111111111111111111111111111111111111111"),
(avatarId.username, avatarId.rootcap))
return d
def test_password_auth_user_with_wrong_password(self):
"""
AccountFileChecker.requestAvatarId returns a Deferred that fires with
UnauthorizedLogin if the wrong password is given.
"""
key_credentials = credentials.UsernamePassword(b"alice", b"WRONG")
avatarId = self.checker.requestAvatarId(key_credentials)
return self.assertFailure(avatarId, error.UnauthorizedLogin)
def test_unrecognized_key(self):
"""
AccountFileChecker.requestAvatarId returns a Deferred that fires with