mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-26 03:28:17 +00:00
297 lines
12 KiB
Python
297 lines
12 KiB
Python
"""
|
|
Ported to Python 3.
|
|
"""
|
|
|
|
from typing import Literal
|
|
|
|
from hypothesis import (
|
|
given,
|
|
)
|
|
from hypothesis.strategies import (
|
|
text,
|
|
characters,
|
|
lists,
|
|
)
|
|
|
|
from twisted.trial import unittest
|
|
from twisted.python import filepath
|
|
from twisted.cred import error, credentials
|
|
from twisted.conch import error as conch_error
|
|
from twisted.conch.ssh import keys
|
|
|
|
from allmydata.frontends import auth
|
|
from allmydata.util.fileutil import abspath_expanduser_unicode
|
|
|
|
|
|
DUMMY_KEY = keys.Key.fromString("""\
|
|
-----BEGIN RSA PRIVATE KEY-----
|
|
MIICXQIBAAKBgQDEP3DYiukOu+NrUlBZeLL9JoHkK5nSvINYfeOQWYVW9J5NG485
|
|
pZFVUQKzvvht34Ihj4ucrrvj7vOp+FFvzxI+zHKBpDxyJwV96dvWDAZMjxTxL7iV
|
|
8HcO7hqgtQ/Xk1Kjde5lH3EOEDs3IhFHA+sox9y6i4A5NUr2AJZSHiOEVwIDAQAB
|
|
AoGASrrNwefDr7SkeS2zIx7vKa8ML1LbFIBsk7n8ee9c8yvbTAl+lLkTiqV6ne/O
|
|
sig2aYk75MI1Eirf5o2ElUsI6u36i6AeKL2u/W7tLBVijmBB8dTiWZ5gMOARWt8w
|
|
daF2An2826YdcU+iNZ7Yi0q4xtlxHQn3JcNNWxicphLvt0ECQQDtajJ/bK+Nqd9j
|
|
/WGvqYcMzkkorQq/0+MQYhcIwDlpf2Xoi45tP4HeoBubeJmU5+jXpXmdP5epWpBv
|
|
k3ZCwV7pAkEA05xBP2HTdwRFTJov5I/w7uKOrn7mj7DCvSjQFCufyPOoCJJMeBSq
|
|
tfCQlHFtwlkyNfiSbhtgZ0Pp6ovL+1RBPwJBAOlFRBKxrpgpxcXQK5BWqMwrT/S4
|
|
eWxb+6mYR3ugq4h91Zq0rJ+pG6irdhS/XV/SsZRZEXIxDoom4u3OXQ9gQikCQErM
|
|
ywuaiuNhMRXY0uEaOHJYx1LLLLjSJKQ0zwiyOvMPnfAZtsojlAxoEtNGHSQ731HQ
|
|
ogIlzzfxe7ga3mni6IUCQQCwNK9zwARovcQ8nByqotGQzohpl+1b568+iw8GXP2u
|
|
dBSD8940XU3YW+oeq8e+p3yQ2GinHfeJ3BYQyNQLuMAJ
|
|
-----END RSA PRIVATE KEY-----
|
|
""")
|
|
|
|
DUMMY_KEY_DSA = keys.Key.fromString("""\
|
|
-----BEGIN OPENSSH PRIVATE KEY-----
|
|
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAABsQAAAAdzc2gtZH
|
|
NzAAAAgQDKMh/ELaiP21LYRBuPbUy7dUhv/XZwV7aS1LzxSP+KaJvtDOei8X76XEAfkqX+
|
|
aGh9eup+BLkezrV6LlpO9uPzhY8ChlKpkvw5PZKv/2agSrVxZyG7yEzHNtSBQXE6qNMwIk
|
|
N/ycXLGCqyAhQSzRhLz9ETNaslRDLo7YyVWkiuAQAAABUA5nTatFKux5EqZS4EarMWFRBU
|
|
i1UAAACAFpkkK+JsPixSTPyn0DNMoGKA0Klqy8h61Ds6pws+4+aJQptUBshpwNw1ypo7MO
|
|
+goDZy3wwdWtURTPGMgesNdEfxp8L2/kqE4vpMK0myoczCqOiWMeNB/x1AStbSkBI8WmHW
|
|
2htgsC01xbaix/FrA3edK8WEyv+oIxlbV1FkrPkAAACANb0EpCc8uoR4/32rO2JLsbcLBw
|
|
H5wc2khe7AKkIa9kUknRIRvoCZUtXF5XuXXdRmnpVEm2KcsLdtZjip43asQcqgt0Kz3nuF
|
|
kAf7bI98G1waFUimcCSPsal4kCmW2HC11sg/BWOt5qczX/0/3xVxpo6juUeBq9ncnFTvPX
|
|
5fOlEAAAHoJkFqHiZBah4AAAAHc3NoLWRzcwAAAIEAyjIfxC2oj9tS2EQbj21Mu3VIb/12
|
|
cFe2ktS88Uj/imib7QznovF++lxAH5Kl/mhofXrqfgS5Hs61ei5aTvbj84WPAoZSqZL8OT
|
|
2Sr/9moEq1cWchu8hMxzbUgUFxOqjTMCJDf8nFyxgqsgIUEs0YS8/REzWrJUQy6O2MlVpI
|
|
rgEAAAAVAOZ02rRSrseRKmUuBGqzFhUQVItVAAAAgBaZJCvibD4sUkz8p9AzTKBigNCpas
|
|
vIetQ7OqcLPuPmiUKbVAbIacDcNcqaOzDvoKA2ct8MHVrVEUzxjIHrDXRH8afC9v5KhOL6
|
|
TCtJsqHMwqjoljHjQf8dQErW0pASPFph1tobYLAtNcW2osfxawN3nSvFhMr/qCMZW1dRZK
|
|
z5AAAAgDW9BKQnPLqEeP99qztiS7G3CwcB+cHNpIXuwCpCGvZFJJ0SEb6AmVLVxeV7l13U
|
|
Zp6VRJtinLC3bWY4qeN2rEHKoLdCs957hZAH+2yPfBtcGhVIpnAkj7GpeJAplthwtdbIPw
|
|
VjreanM1/9P98VcaaOo7lHgavZ3JxU7z1+XzpRAAAAFQC7360pZLbv7PFt4BPFJ8zAHxAe
|
|
QwAAAA5leGFya3VuQGJhcnlvbgECAwQ=
|
|
-----END OPENSSH PRIVATE KEY-----
|
|
""")
|
|
|
|
ACCOUNTS = u"""\
|
|
# dennis {key} URI:DIR2:aaaaaaaaaaaaaaaaaaaaaaaaaa:1111111111111111111111111111111111111111111111111111
|
|
carol {key} URI:DIR2:cccccccccccccccccccccccccc:3333333333333333333333333333333333333333333333333333
|
|
""".format(key=str(DUMMY_KEY.public().toString("openssh"), "ascii")).encode("ascii")
|
|
|
|
# Python str.splitlines considers NEXT LINE, LINE SEPARATOR, and PARAGRAPH
|
|
# separator to be line separators, too. However, file.readlines() does not...
|
|
LINE_SEPARATORS = (
|
|
'\x0a', # line feed
|
|
'\x0b', # vertical tab
|
|
'\x0c', # form feed
|
|
'\x0d', # carriage return
|
|
)
|
|
|
|
SURROGATES: Literal["Cs"] = "Cs"
|
|
|
|
|
|
class AccountFileParserTests(unittest.TestCase):
|
|
"""
|
|
Tests for ``load_account_file`` and its helper functions.
|
|
"""
|
|
@given(lists(
|
|
text(alphabet=characters(
|
|
blacklist_categories=(
|
|
# Surrogates are an encoding trick to help out UTF-16.
|
|
# They're not necessary to represent any non-surrogate code
|
|
# point in unicode. They're also not legal individually but
|
|
# only in pairs.
|
|
SURROGATES,
|
|
),
|
|
# Exclude all our line separators too.
|
|
blacklist_characters=("\n", "\r"),
|
|
)),
|
|
))
|
|
def test_ignore_comments(self, lines):
|
|
"""
|
|
``auth.content_lines`` filters out lines beginning with `#` and empty
|
|
lines.
|
|
"""
|
|
expected = set()
|
|
|
|
# It's not clear that real files and StringIO behave sufficiently
|
|
# similarly to use the latter instead of the former here. In
|
|
# particular, they seem to have distinct and incompatible
|
|
# line-splitting rules.
|
|
bufpath = self.mktemp()
|
|
with open(bufpath, "wt", encoding="utf-8") as buf:
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
is_content = stripped and not stripped.startswith("#")
|
|
if is_content:
|
|
expected.add(stripped)
|
|
buf.write(line + "\n")
|
|
|
|
with auth.open_account_file(bufpath) as buf:
|
|
actual = set(auth.content_lines(buf))
|
|
|
|
self.assertEqual(expected, actual)
|
|
|
|
def test_parse_accounts(self):
|
|
"""
|
|
``auth.parse_accounts`` accepts an iterator of account lines and returns
|
|
an iterator of structured account data.
|
|
"""
|
|
alice_key = DUMMY_KEY.public().toString("openssh").decode("utf-8")
|
|
alice_cap = "URI:DIR2:aaaa:1111"
|
|
|
|
bob_key = DUMMY_KEY_DSA.public().toString("openssh").decode("utf-8")
|
|
bob_cap = "URI:DIR2:aaaa:2222"
|
|
self.assertEqual(
|
|
list(auth.parse_accounts([
|
|
"alice {} {}".format(alice_key, alice_cap),
|
|
"bob {} {}".format(bob_key, bob_cap),
|
|
])),
|
|
[
|
|
("alice", DUMMY_KEY.public(), alice_cap),
|
|
("bob", DUMMY_KEY_DSA.public(), bob_cap),
|
|
],
|
|
)
|
|
|
|
def test_parse_accounts_rejects_passwords(self):
|
|
"""
|
|
The iterator returned by ``auth.parse_accounts`` raises ``ValueError``
|
|
when processing reaches a line that has what looks like a password
|
|
instead of an ssh key.
|
|
"""
|
|
with self.assertRaises(ValueError):
|
|
list(auth.parse_accounts(["alice apassword URI:DIR2:aaaa:1111"]))
|
|
|
|
def test_create_account_maps(self):
|
|
"""
|
|
``auth.create_account_maps`` accepts an iterator of structured account
|
|
data and returns two mappings: one from account name to rootcap, the
|
|
other from account name to public keys.
|
|
"""
|
|
alice_cap = "URI:DIR2:aaaa:1111"
|
|
alice_key = DUMMY_KEY.public()
|
|
bob_cap = "URI:DIR2:aaaa:2222"
|
|
bob_key = DUMMY_KEY_DSA.public()
|
|
accounts = [
|
|
("alice", alice_key, alice_cap),
|
|
("bob", bob_key, bob_cap),
|
|
]
|
|
self.assertEqual(
|
|
auth.create_account_maps(accounts),
|
|
({
|
|
b"alice": alice_cap.encode("utf-8"),
|
|
b"bob": bob_cap.encode("utf-8"),
|
|
},
|
|
{
|
|
b"alice": [alice_key],
|
|
b"bob": [bob_key],
|
|
}),
|
|
)
|
|
|
|
def test_load_account_file(self):
|
|
"""
|
|
``auth.load_account_file`` accepts an iterator of serialized account lines
|
|
and returns two mappings: one from account name to rootcap, the other
|
|
from account name to public keys.
|
|
"""
|
|
alice_key = DUMMY_KEY.public().toString("openssh").decode("utf-8")
|
|
alice_cap = "URI:DIR2:aaaa:1111"
|
|
|
|
bob_key = DUMMY_KEY_DSA.public().toString("openssh").decode("utf-8")
|
|
bob_cap = "URI:DIR2:aaaa:2222"
|
|
|
|
accounts = [
|
|
"alice {} {}".format(alice_key, alice_cap),
|
|
"bob {} {}".format(bob_key, bob_cap),
|
|
"# carol {} {}".format(alice_key, alice_cap),
|
|
]
|
|
|
|
self.assertEqual(
|
|
auth.load_account_file(accounts),
|
|
({
|
|
b"alice": alice_cap.encode("utf-8"),
|
|
b"bob": bob_cap.encode("utf-8"),
|
|
},
|
|
{
|
|
b"alice": [DUMMY_KEY.public()],
|
|
b"bob": [DUMMY_KEY_DSA.public()],
|
|
}),
|
|
)
|
|
|
|
|
|
class AccountFileCheckerKeyTests(unittest.TestCase):
|
|
"""
|
|
Tests for key handling done by allmydata.frontends.auth.AccountFileChecker.
|
|
"""
|
|
def setUp(self):
|
|
self.account_file = filepath.FilePath(self.mktemp())
|
|
self.account_file.setContent(ACCOUNTS)
|
|
abspath = abspath_expanduser_unicode(str(self.account_file.path))
|
|
self.checker = auth.AccountFileChecker(None, abspath)
|
|
|
|
def test_unknown_user(self):
|
|
"""
|
|
AccountFileChecker.requestAvatarId returns a Deferred that fires with
|
|
UnauthorizedLogin if called with an SSHPrivateKey object with a
|
|
username not present in the account file.
|
|
"""
|
|
key_credentials = credentials.SSHPrivateKey(
|
|
b"dennis", b"md5", None, None, None)
|
|
avatarId = self.checker.requestAvatarId(key_credentials)
|
|
return self.assertFailure(avatarId, error.UnauthorizedLogin)
|
|
|
|
def test_unrecognized_key(self):
|
|
"""
|
|
AccountFileChecker.requestAvatarId returns a Deferred that fires with
|
|
UnauthorizedLogin if called with an SSHPrivateKey object with a public
|
|
key other than the one indicated in the account file for the indicated
|
|
user.
|
|
"""
|
|
wrong_key_blob = b"""\
|
|
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAYQDJGMWlPXh2M3pYzTiamjcBIMqctt4VvLVW2QZgEFc86XhGjPXq5QAiRTKv9yVZJR9HW70CfBI7GHun8+v4Wb6aicWBoxgI3OB5NN+OUywdme2HSaif5yenFdQr0ME71Xs=
|
|
"""
|
|
key_credentials = credentials.SSHPrivateKey(
|
|
b"carol", b"md5", wrong_key_blob, None, None)
|
|
avatarId = self.checker.requestAvatarId(key_credentials)
|
|
return self.assertFailure(avatarId, error.UnauthorizedLogin)
|
|
|
|
def test_missing_signature(self):
|
|
"""
|
|
AccountFileChecker.requestAvatarId returns a Deferred that fires with
|
|
ValidPublicKey if called with an SSHPrivateKey object with an
|
|
authorized key for the indicated user but with no signature.
|
|
"""
|
|
right_key_blob = DUMMY_KEY.public().toString("openssh")
|
|
key_credentials = credentials.SSHPrivateKey(
|
|
b"carol", b"md5", right_key_blob, None, None)
|
|
avatarId = self.checker.requestAvatarId(key_credentials)
|
|
return self.assertFailure(avatarId, conch_error.ValidPublicKey)
|
|
|
|
def test_wrong_signature(self):
|
|
"""
|
|
AccountFileChecker.requestAvatarId returns a Deferred that fires with
|
|
UnauthorizedLogin if called with an SSHPrivateKey object with a public
|
|
key matching that on the user's line in the account file but with the
|
|
wrong signature.
|
|
"""
|
|
right_key_blob = DUMMY_KEY.public().toString("openssh")
|
|
key_credentials = credentials.SSHPrivateKey(
|
|
b"carol", b"md5", right_key_blob, b"signed data", b"wrong sig")
|
|
avatarId = self.checker.requestAvatarId(key_credentials)
|
|
return self.assertFailure(avatarId, error.UnauthorizedLogin)
|
|
|
|
def test_authenticated(self):
|
|
"""
|
|
If called with an SSHPrivateKey object with a username and public key
|
|
found in the account file and a signature that proves possession of the
|
|
corresponding private key, AccountFileChecker.requestAvatarId returns a
|
|
Deferred that fires with an FTPAvatarID giving the username and root
|
|
capability for that user.
|
|
"""
|
|
username = b"carol"
|
|
signed_data = b"signed data"
|
|
signature = DUMMY_KEY.sign(signed_data)
|
|
right_key_blob = DUMMY_KEY.public().toString("openssh")
|
|
key_credentials = credentials.SSHPrivateKey(
|
|
username, b"md5", right_key_blob, signed_data, signature)
|
|
avatarId = self.checker.requestAvatarId(key_credentials)
|
|
def authenticated(avatarId):
|
|
self.assertEqual(
|
|
(username,
|
|
b"URI:DIR2:cccccccccccccccccccccccccc:3333333333333333333333333333333333333333333333333333"),
|
|
(avatarId.username, avatarId.rootcap))
|
|
avatarId.addCallback(authenticated)
|
|
return avatarId
|