From 7783f31c1b0580211f7c05f8b8a08c40c4e4c976 Mon Sep 17 00:00:00 2001 From: meejah Date: Mon, 4 Jan 2021 21:49:40 -0700 Subject: [PATCH] tests for 'tahoe admin add-grid-manager-cert' --- src/allmydata/scripts/admin.py | 4 +- src/allmydata/test/cli/test_admin.py | 178 ++++++++++++++++++++++++ src/allmydata/test/test_grid_manager.py | 10 +- 3 files changed, 185 insertions(+), 7 deletions(-) create mode 100644 src/allmydata/test/cli/test_admin.py diff --git a/src/allmydata/scripts/admin.py b/src/allmydata/scripts/admin.py index 8710641ec..a0014a24e 100644 --- a/src/allmydata/scripts/admin.py +++ b/src/allmydata/scripts/admin.py @@ -115,8 +115,6 @@ def add_grid_manager_cert(options): """ Add a new Grid Manager certificate to our config """ - if options.certificate_data is None: - return 1 # XXX is there really not already a function for this? if options.parent.parent['node-directory']: nd = argv_to_abspath(options.parent.parent['node-directory']) @@ -132,7 +130,7 @@ def add_grid_manager_cert(options): if cert_path.exists(): msg = "Already have certificate for '{}' (at {})".format( options['name'], - cert_path, + cert_path.path, ) print(msg, file=options.parent.parent.stderr) return 1 diff --git a/src/allmydata/test/cli/test_admin.py b/src/allmydata/test/cli/test_admin.py new file mode 100644 index 000000000..0d825945c --- /dev/null +++ b/src/allmydata/test/cli/test_admin.py @@ -0,0 +1,178 @@ +import json +from io import ( + BytesIO, + StringIO, +) + +from twisted.python.usage import ( + UsageError, +) +from twisted.python.filepath import ( + FilePath, +) + +from allmydata.scripts.admin import ( + AdminCommand, + AddGridManagerCertOptions, + add_grid_manager_cert, +) +from allmydata.scripts.runner import ( + Options, +) +from ..common import ( + SyncTestCase, +) + + +fake_cert = { + "certificate": "{\"expires\":1601687822,\"public_key\":\"pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga\",\"version\":1}", + "signature": "fvjd3uvvupf2v6tnvkwjd473u3m3inyqkwiclhp7balmchkmn3px5pei3qyfjnhymq4cjcwvbpqmcwwnwswdtrfkpnlaxuih2zbdmda" +} + + +class AddCertificateOptions(SyncTestCase): + """ + Tests for 'tahoe admin add-grid-manager-cert' option validation + """ + + def setUp(self): + self.tahoe = Options() + return super(AddCertificateOptions, self).setUp() + + def test_parse_no_data(self): + """ + When no data is passed to stdin an error is produced + """ + self.tahoe.stdin = BytesIO(b"") + self.tahoe.stderr = BytesIO() # suppress message + + with self.assertRaises(UsageError) as ctx: + self.tahoe.parseOptions( + [ + "admin", "add-grid-manager-cert", + "--name", "random-name", + "--filename", "-", + ] + ) + + self.assertIn( + "Reading certificate from stdin failed", + str(ctx.exception) + ) + + def test_read_cert_file(self): + """ + A certificate can be read from a file + """ + tmp = self.mktemp() + with open(tmp, "w") as f: + json.dump(fake_cert, f) + + # certificate should be loaded + o = self.tahoe.parseOptions( + [ + "admin", "add-grid-manager-cert", + "--name", "random-name", + "--filename", tmp, + ] + ) + opts = self.tahoe.subOptions.subOptions + self.assertEqual( + fake_cert, + opts.certificate_data + ) + + def test_bad_certificate(self): + """ + Unparseable data produces an error + """ + self.tahoe.stdin = BytesIO(b"{}") + self.tahoe.stderr = BytesIO() # suppress message + + with self.assertRaises(UsageError) as ctx: + self.tahoe.parseOptions( + [ + "admin", "add-grid-manager-cert", + "--name", "random-name", + "--filename", "-", + ] + ) + + self.assertIn( + "Grid Manager certificate must contain", + str(ctx.exception) + ) + + +class AddCertificateCommand(SyncTestCase): + """ + Tests for 'tahoe admin add-grid-manager-cert' operation + """ + + def setUp(self): + self.tahoe = Options() + self.node_path = FilePath(self.mktemp()) + self.node_path.makedirs() + with self.node_path.child("tahoe.cfg").open("w") as f: + f.write("# minimal test config\n") + return super(AddCertificateCommand, self).setUp() + + def test_add_one(self): + """ + Adding a certificate succeeds + """ + self.tahoe.stdin = BytesIO(json.dumps(fake_cert)) + self.tahoe.stderr = BytesIO() + self.tahoe.parseOptions( + [ + "--node-directory", self.node_path.path, + "admin", "add-grid-manager-cert", + "--name", "zero", + "--filename", "-", + ] + ) + rc = add_grid_manager_cert(self.tahoe.subOptions.subOptions) + + self.assertEqual(rc, 0) + self.assertEqual( + ["zero.cert", "tahoe.cfg"], + self.node_path.listdir() + ) + self.assertIn( + "There are now 1 certificates", + self.tahoe.stderr.getvalue() + ) + + def test_add_two(self): + """ + An error message is produced when adding a certificate with a + duplicate name. + """ + self.tahoe.stdin = BytesIO(json.dumps(fake_cert)) + self.tahoe.stderr = BytesIO() + self.tahoe.parseOptions( + [ + "--node-directory", self.node_path.path, + "admin", "add-grid-manager-cert", + "--name", "zero", + "--filename", "-", + ] + ) + rc = add_grid_manager_cert(self.tahoe.subOptions.subOptions) + self.assertEqual(rc, 0) + + self.tahoe.stdin = BytesIO(json.dumps(fake_cert)) + self.tahoe.parseOptions( + [ + "--node-directory", self.node_path.path, + "admin", "add-grid-manager-cert", + "--name", "zero", + "--filename", "-", + ] + ) + rc = add_grid_manager_cert(self.tahoe.subOptions.subOptions) + self.assertEqual(rc, 1) + self.assertIn( + "Already have certificate for 'zero'", + self.tahoe.stderr.getvalue() + ) diff --git a/src/allmydata/test/test_grid_manager.py b/src/allmydata/test/test_grid_manager.py index c28afd5fd..01cb50ccf 100644 --- a/src/allmydata/test/test_grid_manager.py +++ b/src/allmydata/test/test_grid_manager.py @@ -154,18 +154,20 @@ class GridManagerVerifier(SyncTestCase): """ priv, pub = ed25519.create_signing_keypair() self.gm.add_storage_server("test", pub) - cert = self.gm.sign("test", timedelta(seconds=86400)) + cert0 = self.gm.sign("test", timedelta(seconds=86400)) + cert1 = self.gm.sign("test", timedelta(seconds=86400)) + self.assertNotEqual(cert0, cert1) self.assertEqual( - set(cert.keys()), + set(cert0.keys()), {"certificate", "signature"}, ) gm_key = ed25519.verifying_key_from_string(self.gm.public_identity()) self.assertEqual( ed25519.verify_signature( gm_key, - base32.a2b(cert["signature"]), - cert["certificate"], + base32.a2b(cert0["signature"]), + cert0["certificate"], ), None )