mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-04-05 09:59:24 +00:00
tests for 'tahoe admin add-grid-manager-cert'
This commit is contained in:
parent
22239022c7
commit
7783f31c1b
@ -115,8 +115,6 @@ def add_grid_manager_cert(options):
|
||||
"""
|
||||
Add a new Grid Manager certificate to our config
|
||||
"""
|
||||
if options.certificate_data is None:
|
||||
return 1
|
||||
# XXX is there really not already a function for this?
|
||||
if options.parent.parent['node-directory']:
|
||||
nd = argv_to_abspath(options.parent.parent['node-directory'])
|
||||
@ -132,7 +130,7 @@ def add_grid_manager_cert(options):
|
||||
if cert_path.exists():
|
||||
msg = "Already have certificate for '{}' (at {})".format(
|
||||
options['name'],
|
||||
cert_path,
|
||||
cert_path.path,
|
||||
)
|
||||
print(msg, file=options.parent.parent.stderr)
|
||||
return 1
|
||||
|
178
src/allmydata/test/cli/test_admin.py
Normal file
178
src/allmydata/test/cli/test_admin.py
Normal file
@ -0,0 +1,178 @@
|
||||
import json
|
||||
from io import (
|
||||
BytesIO,
|
||||
StringIO,
|
||||
)
|
||||
|
||||
from twisted.python.usage import (
|
||||
UsageError,
|
||||
)
|
||||
from twisted.python.filepath import (
|
||||
FilePath,
|
||||
)
|
||||
|
||||
from allmydata.scripts.admin import (
|
||||
AdminCommand,
|
||||
AddGridManagerCertOptions,
|
||||
add_grid_manager_cert,
|
||||
)
|
||||
from allmydata.scripts.runner import (
|
||||
Options,
|
||||
)
|
||||
from ..common import (
|
||||
SyncTestCase,
|
||||
)
|
||||
|
||||
|
||||
fake_cert = {
|
||||
"certificate": "{\"expires\":1601687822,\"public_key\":\"pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga\",\"version\":1}",
|
||||
"signature": "fvjd3uvvupf2v6tnvkwjd473u3m3inyqkwiclhp7balmchkmn3px5pei3qyfjnhymq4cjcwvbpqmcwwnwswdtrfkpnlaxuih2zbdmda"
|
||||
}
|
||||
|
||||
|
||||
class AddCertificateOptions(SyncTestCase):
|
||||
"""
|
||||
Tests for 'tahoe admin add-grid-manager-cert' option validation
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
self.tahoe = Options()
|
||||
return super(AddCertificateOptions, self).setUp()
|
||||
|
||||
def test_parse_no_data(self):
|
||||
"""
|
||||
When no data is passed to stdin an error is produced
|
||||
"""
|
||||
self.tahoe.stdin = BytesIO(b"")
|
||||
self.tahoe.stderr = BytesIO() # suppress message
|
||||
|
||||
with self.assertRaises(UsageError) as ctx:
|
||||
self.tahoe.parseOptions(
|
||||
[
|
||||
"admin", "add-grid-manager-cert",
|
||||
"--name", "random-name",
|
||||
"--filename", "-",
|
||||
]
|
||||
)
|
||||
|
||||
self.assertIn(
|
||||
"Reading certificate from stdin failed",
|
||||
str(ctx.exception)
|
||||
)
|
||||
|
||||
def test_read_cert_file(self):
|
||||
"""
|
||||
A certificate can be read from a file
|
||||
"""
|
||||
tmp = self.mktemp()
|
||||
with open(tmp, "w") as f:
|
||||
json.dump(fake_cert, f)
|
||||
|
||||
# certificate should be loaded
|
||||
o = self.tahoe.parseOptions(
|
||||
[
|
||||
"admin", "add-grid-manager-cert",
|
||||
"--name", "random-name",
|
||||
"--filename", tmp,
|
||||
]
|
||||
)
|
||||
opts = self.tahoe.subOptions.subOptions
|
||||
self.assertEqual(
|
||||
fake_cert,
|
||||
opts.certificate_data
|
||||
)
|
||||
|
||||
def test_bad_certificate(self):
|
||||
"""
|
||||
Unparseable data produces an error
|
||||
"""
|
||||
self.tahoe.stdin = BytesIO(b"{}")
|
||||
self.tahoe.stderr = BytesIO() # suppress message
|
||||
|
||||
with self.assertRaises(UsageError) as ctx:
|
||||
self.tahoe.parseOptions(
|
||||
[
|
||||
"admin", "add-grid-manager-cert",
|
||||
"--name", "random-name",
|
||||
"--filename", "-",
|
||||
]
|
||||
)
|
||||
|
||||
self.assertIn(
|
||||
"Grid Manager certificate must contain",
|
||||
str(ctx.exception)
|
||||
)
|
||||
|
||||
|
||||
class AddCertificateCommand(SyncTestCase):
|
||||
"""
|
||||
Tests for 'tahoe admin add-grid-manager-cert' operation
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
self.tahoe = Options()
|
||||
self.node_path = FilePath(self.mktemp())
|
||||
self.node_path.makedirs()
|
||||
with self.node_path.child("tahoe.cfg").open("w") as f:
|
||||
f.write("# minimal test config\n")
|
||||
return super(AddCertificateCommand, self).setUp()
|
||||
|
||||
def test_add_one(self):
|
||||
"""
|
||||
Adding a certificate succeeds
|
||||
"""
|
||||
self.tahoe.stdin = BytesIO(json.dumps(fake_cert))
|
||||
self.tahoe.stderr = BytesIO()
|
||||
self.tahoe.parseOptions(
|
||||
[
|
||||
"--node-directory", self.node_path.path,
|
||||
"admin", "add-grid-manager-cert",
|
||||
"--name", "zero",
|
||||
"--filename", "-",
|
||||
]
|
||||
)
|
||||
rc = add_grid_manager_cert(self.tahoe.subOptions.subOptions)
|
||||
|
||||
self.assertEqual(rc, 0)
|
||||
self.assertEqual(
|
||||
["zero.cert", "tahoe.cfg"],
|
||||
self.node_path.listdir()
|
||||
)
|
||||
self.assertIn(
|
||||
"There are now 1 certificates",
|
||||
self.tahoe.stderr.getvalue()
|
||||
)
|
||||
|
||||
def test_add_two(self):
|
||||
"""
|
||||
An error message is produced when adding a certificate with a
|
||||
duplicate name.
|
||||
"""
|
||||
self.tahoe.stdin = BytesIO(json.dumps(fake_cert))
|
||||
self.tahoe.stderr = BytesIO()
|
||||
self.tahoe.parseOptions(
|
||||
[
|
||||
"--node-directory", self.node_path.path,
|
||||
"admin", "add-grid-manager-cert",
|
||||
"--name", "zero",
|
||||
"--filename", "-",
|
||||
]
|
||||
)
|
||||
rc = add_grid_manager_cert(self.tahoe.subOptions.subOptions)
|
||||
self.assertEqual(rc, 0)
|
||||
|
||||
self.tahoe.stdin = BytesIO(json.dumps(fake_cert))
|
||||
self.tahoe.parseOptions(
|
||||
[
|
||||
"--node-directory", self.node_path.path,
|
||||
"admin", "add-grid-manager-cert",
|
||||
"--name", "zero",
|
||||
"--filename", "-",
|
||||
]
|
||||
)
|
||||
rc = add_grid_manager_cert(self.tahoe.subOptions.subOptions)
|
||||
self.assertEqual(rc, 1)
|
||||
self.assertIn(
|
||||
"Already have certificate for 'zero'",
|
||||
self.tahoe.stderr.getvalue()
|
||||
)
|
@ -154,18 +154,20 @@ class GridManagerVerifier(SyncTestCase):
|
||||
"""
|
||||
priv, pub = ed25519.create_signing_keypair()
|
||||
self.gm.add_storage_server("test", pub)
|
||||
cert = self.gm.sign("test", timedelta(seconds=86400))
|
||||
cert0 = self.gm.sign("test", timedelta(seconds=86400))
|
||||
cert1 = self.gm.sign("test", timedelta(seconds=86400))
|
||||
self.assertNotEqual(cert0, cert1)
|
||||
|
||||
self.assertEqual(
|
||||
set(cert.keys()),
|
||||
set(cert0.keys()),
|
||||
{"certificate", "signature"},
|
||||
)
|
||||
gm_key = ed25519.verifying_key_from_string(self.gm.public_identity())
|
||||
self.assertEqual(
|
||||
ed25519.verify_signature(
|
||||
gm_key,
|
||||
base32.a2b(cert["signature"]),
|
||||
cert["certificate"],
|
||||
base32.a2b(cert0["signature"]),
|
||||
cert0["certificate"],
|
||||
),
|
||||
None
|
||||
)
|
||||
|
Loading…
x
Reference in New Issue
Block a user