tahoe-lafs/src/allmydata/test/cli/test_cli.py
meejah 50f8397c99 Merge branch 'invalid-pidfile' into delete-invalid-pidfile--lpirl
Conflicts:
	src/allmydata/scripts/startstop_node.py
	src/allmydata/test/cli/test_cli.py
2017-11-08 14:49:29 -07:00

1344 lines
61 KiB
Python

import os.path
from cStringIO import StringIO
import urllib, sys
import re
from mock import patch
from twisted.trial import unittest
from twisted.python.monkey import MonkeyPatcher
from twisted.internet import task
from twisted.python.filepath import FilePath
import allmydata
from allmydata.util import fileutil, hashutil, base32, keyutil
from allmydata.util.namespace import Namespace
from allmydata import uri
from allmydata.immutable import upload
from allmydata.dirnode import normalize
from allmydata.scripts.common_http import socket_error
import allmydata.scripts.common_http
from pycryptopp.publickey import ed25519
# Test that the scripts can be imported.
from allmydata.scripts import create_node, debug, tahoe_start, tahoe_restart, \
tahoe_add_alias, tahoe_backup, tahoe_check, tahoe_cp, tahoe_get, tahoe_ls, \
tahoe_manifest, tahoe_mkdir, tahoe_mv, tahoe_put, tahoe_unlink, tahoe_webopen, \
tahoe_stop, tahoe_daemonize, tahoe_run
_hush_pyflakes = [create_node, debug, tahoe_start, tahoe_restart, tahoe_stop,
tahoe_add_alias, tahoe_backup, tahoe_check, tahoe_cp, tahoe_get, tahoe_ls,
tahoe_manifest, tahoe_mkdir, tahoe_mv, tahoe_put, tahoe_unlink, tahoe_webopen,
tahoe_daemonize, tahoe_run]
from allmydata.scripts import common
from allmydata.scripts.common import DEFAULT_ALIAS, get_aliases, get_alias, \
DefaultAliasMarker
from allmydata.scripts import cli, debug, runner
from ..common_util import (ReallyEqualMixin, skip_if_cannot_represent_filename,
run_cli)
from ..no_network import GridTestMixin
from .common import CLITestMixin, parse_options
from twisted.python import usage
from allmydata.util.encodingutil import listdir_unicode, get_io_encoding
timeout = 480 # deep_check takes 360s on Zandr's linksys box, others take > 240s
class CLI(CLITestMixin, unittest.TestCase):
def _dump_cap(self, *args):
config = debug.DumpCapOptions()
config.stdout,config.stderr = StringIO(), StringIO()
config.parseOptions(args)
debug.dump_cap(config)
self.failIf(config.stderr.getvalue())
output = config.stdout.getvalue()
return output
def test_dump_cap_chk(self):
key = "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f"
uri_extension_hash = hashutil.uri_extension_hash("stuff")
needed_shares = 25
total_shares = 100
size = 1234
u = uri.CHKFileURI(key=key,
uri_extension_hash=uri_extension_hash,
needed_shares=needed_shares,
total_shares=total_shares,
size=size)
output = self._dump_cap(u.to_string())
self.failUnless("CHK File:" in output, output)
self.failUnless("key: aaaqeayeaudaocajbifqydiob4" in output, output)
self.failUnless("UEB hash: nf3nimquen7aeqm36ekgxomalstenpkvsdmf6fplj7swdatbv5oa" in output, output)
self.failUnless("size: 1234" in output, output)
self.failUnless("k/N: 25/100" in output, output)
self.failUnless("storage index: hdis5iaveku6lnlaiccydyid7q" in output, output)
output = self._dump_cap("--client-secret", "5s33nk3qpvnj2fw3z4mnm2y6fa",
u.to_string())
self.failUnless("client renewal secret: znxmki5zdibb5qlt46xbdvk2t55j7hibejq3i5ijyurkr6m6jkhq" in output, output)
output = self._dump_cap(u.get_verify_cap().to_string())
self.failIf("key: " in output, output)
self.failUnless("UEB hash: nf3nimquen7aeqm36ekgxomalstenpkvsdmf6fplj7swdatbv5oa" in output, output)
self.failUnless("size: 1234" in output, output)
self.failUnless("k/N: 25/100" in output, output)
self.failUnless("storage index: hdis5iaveku6lnlaiccydyid7q" in output, output)
prefixed_u = "http://127.0.0.1/uri/%s" % urllib.quote(u.to_string())
output = self._dump_cap(prefixed_u)
self.failUnless("CHK File:" in output, output)
self.failUnless("key: aaaqeayeaudaocajbifqydiob4" in output, output)
self.failUnless("UEB hash: nf3nimquen7aeqm36ekgxomalstenpkvsdmf6fplj7swdatbv5oa" in output, output)
self.failUnless("size: 1234" in output, output)
self.failUnless("k/N: 25/100" in output, output)
self.failUnless("storage index: hdis5iaveku6lnlaiccydyid7q" in output, output)
def test_dump_cap_lit(self):
u = uri.LiteralFileURI("this is some data")
output = self._dump_cap(u.to_string())
self.failUnless("Literal File URI:" in output, output)
self.failUnless("data: 'this is some data'" in output, output)
def test_dump_cap_sdmf(self):
writekey = "\x01" * 16
fingerprint = "\xfe" * 32
u = uri.WriteableSSKFileURI(writekey, fingerprint)
output = self._dump_cap(u.to_string())
self.failUnless("SDMF Writeable URI:" in output, output)
self.failUnless("writekey: aeaqcaibaeaqcaibaeaqcaibae" in output, output)
self.failUnless("readkey: nvgh5vj2ekzzkim5fgtb4gey5y" in output, output)
self.failUnless("storage index: nt4fwemuw7flestsezvo2eveke" in output, output)
self.failUnless("fingerprint: 737p57x6737p57x6737p57x6737p57x6737p57x6737p57x6737a" in output, output)
output = self._dump_cap("--client-secret", "5s33nk3qpvnj2fw3z4mnm2y6fa",
u.to_string())
self.failUnless("file renewal secret: arpszxzc2t6kb4okkg7sp765xgkni5z7caavj7lta73vmtymjlxq" in output, output)
fileutil.make_dirs("cli/test_dump_cap/private")
fileutil.write("cli/test_dump_cap/private/secret", "5s33nk3qpvnj2fw3z4mnm2y6fa\n")
output = self._dump_cap("--client-dir", "cli/test_dump_cap",
u.to_string())
self.failUnless("file renewal secret: arpszxzc2t6kb4okkg7sp765xgkni5z7caavj7lta73vmtymjlxq" in output, output)
output = self._dump_cap("--client-dir", "cli/test_dump_cap_BOGUS",
u.to_string())
self.failIf("file renewal secret:" in output, output)
output = self._dump_cap("--nodeid", "tqc35esocrvejvg4mablt6aowg6tl43j",
u.to_string())
self.failUnless("write_enabler: mgcavriox2wlb5eer26unwy5cw56elh3sjweffckkmivvsxtaknq" in output, output)
self.failIf("file renewal secret:" in output, output)
output = self._dump_cap("--nodeid", "tqc35esocrvejvg4mablt6aowg6tl43j",
"--client-secret", "5s33nk3qpvnj2fw3z4mnm2y6fa",
u.to_string())
self.failUnless("write_enabler: mgcavriox2wlb5eer26unwy5cw56elh3sjweffckkmivvsxtaknq" in output, output)
self.failUnless("file renewal secret: arpszxzc2t6kb4okkg7sp765xgkni5z7caavj7lta73vmtymjlxq" in output, output)
self.failUnless("lease renewal secret: 7pjtaumrb7znzkkbvekkmuwpqfjyfyamznfz4bwwvmh4nw33lorq" in output, output)
u = u.get_readonly()
output = self._dump_cap(u.to_string())
self.failUnless("SDMF Read-only URI:" in output, output)
self.failUnless("readkey: nvgh5vj2ekzzkim5fgtb4gey5y" in output, output)
self.failUnless("storage index: nt4fwemuw7flestsezvo2eveke" in output, output)
self.failUnless("fingerprint: 737p57x6737p57x6737p57x6737p57x6737p57x6737p57x6737a" in output, output)
u = u.get_verify_cap()
output = self._dump_cap(u.to_string())
self.failUnless("SDMF Verifier URI:" in output, output)
self.failUnless("storage index: nt4fwemuw7flestsezvo2eveke" in output, output)
self.failUnless("fingerprint: 737p57x6737p57x6737p57x6737p57x6737p57x6737p57x6737a" in output, output)
def test_dump_cap_mdmf(self):
writekey = "\x01" * 16
fingerprint = "\xfe" * 32
u = uri.WriteableMDMFFileURI(writekey, fingerprint)
output = self._dump_cap(u.to_string())
self.failUnless("MDMF Writeable URI:" in output, output)
self.failUnless("writekey: aeaqcaibaeaqcaibaeaqcaibae" in output, output)
self.failUnless("readkey: nvgh5vj2ekzzkim5fgtb4gey5y" in output, output)
self.failUnless("storage index: nt4fwemuw7flestsezvo2eveke" in output, output)
self.failUnless("fingerprint: 737p57x6737p57x6737p57x6737p57x6737p57x6737p57x6737a" in output, output)
output = self._dump_cap("--client-secret", "5s33nk3qpvnj2fw3z4mnm2y6fa",
u.to_string())
self.failUnless("file renewal secret: arpszxzc2t6kb4okkg7sp765xgkni5z7caavj7lta73vmtymjlxq" in output, output)
fileutil.make_dirs("cli/test_dump_cap/private")
fileutil.write("cli/test_dump_cap/private/secret", "5s33nk3qpvnj2fw3z4mnm2y6fa\n")
output = self._dump_cap("--client-dir", "cli/test_dump_cap",
u.to_string())
self.failUnless("file renewal secret: arpszxzc2t6kb4okkg7sp765xgkni5z7caavj7lta73vmtymjlxq" in output, output)
output = self._dump_cap("--client-dir", "cli/test_dump_cap_BOGUS",
u.to_string())
self.failIf("file renewal secret:" in output, output)
output = self._dump_cap("--nodeid", "tqc35esocrvejvg4mablt6aowg6tl43j",
u.to_string())
self.failUnless("write_enabler: mgcavriox2wlb5eer26unwy5cw56elh3sjweffckkmivvsxtaknq" in output, output)
self.failIf("file renewal secret:" in output, output)
output = self._dump_cap("--nodeid", "tqc35esocrvejvg4mablt6aowg6tl43j",
"--client-secret", "5s33nk3qpvnj2fw3z4mnm2y6fa",
u.to_string())
self.failUnless("write_enabler: mgcavriox2wlb5eer26unwy5cw56elh3sjweffckkmivvsxtaknq" in output, output)
self.failUnless("file renewal secret: arpszxzc2t6kb4okkg7sp765xgkni5z7caavj7lta73vmtymjlxq" in output, output)
self.failUnless("lease renewal secret: 7pjtaumrb7znzkkbvekkmuwpqfjyfyamznfz4bwwvmh4nw33lorq" in output, output)
u = u.get_readonly()
output = self._dump_cap(u.to_string())
self.failUnless("MDMF Read-only URI:" in output, output)
self.failUnless("readkey: nvgh5vj2ekzzkim5fgtb4gey5y" in output, output)
self.failUnless("storage index: nt4fwemuw7flestsezvo2eveke" in output, output)
self.failUnless("fingerprint: 737p57x6737p57x6737p57x6737p57x6737p57x6737p57x6737a" in output, output)
u = u.get_verify_cap()
output = self._dump_cap(u.to_string())
self.failUnless("MDMF Verifier URI:" in output, output)
self.failUnless("storage index: nt4fwemuw7flestsezvo2eveke" in output, output)
self.failUnless("fingerprint: 737p57x6737p57x6737p57x6737p57x6737p57x6737p57x6737a" in output, output)
def test_dump_cap_chk_directory(self):
key = "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f"
uri_extension_hash = hashutil.uri_extension_hash("stuff")
needed_shares = 25
total_shares = 100
size = 1234
u1 = uri.CHKFileURI(key=key,
uri_extension_hash=uri_extension_hash,
needed_shares=needed_shares,
total_shares=total_shares,
size=size)
u = uri.ImmutableDirectoryURI(u1)
output = self._dump_cap(u.to_string())
self.failUnless("CHK Directory URI:" in output, output)
self.failUnless("key: aaaqeayeaudaocajbifqydiob4" in output, output)
self.failUnless("UEB hash: nf3nimquen7aeqm36ekgxomalstenpkvsdmf6fplj7swdatbv5oa" in output, output)
self.failUnless("size: 1234" in output, output)
self.failUnless("k/N: 25/100" in output, output)
self.failUnless("storage index: hdis5iaveku6lnlaiccydyid7q" in output, output)
output = self._dump_cap("--client-secret", "5s33nk3qpvnj2fw3z4mnm2y6fa",
u.to_string())
self.failUnless("file renewal secret: csrvkjgomkyyyil5yo4yk5np37p6oa2ve2hg6xmk2dy7kaxsu6xq" in output, output)
u = u.get_verify_cap()
output = self._dump_cap(u.to_string())
self.failUnless("CHK Directory Verifier URI:" in output, output)
self.failIf("key: " in output, output)
self.failUnless("UEB hash: nf3nimquen7aeqm36ekgxomalstenpkvsdmf6fplj7swdatbv5oa" in output, output)
self.failUnless("size: 1234" in output, output)
self.failUnless("k/N: 25/100" in output, output)
self.failUnless("storage index: hdis5iaveku6lnlaiccydyid7q" in output, output)
def test_dump_cap_sdmf_directory(self):
writekey = "\x01" * 16
fingerprint = "\xfe" * 32
u1 = uri.WriteableSSKFileURI(writekey, fingerprint)
u = uri.DirectoryURI(u1)
output = self._dump_cap(u.to_string())
self.failUnless("Directory Writeable URI:" in output, output)
self.failUnless("writekey: aeaqcaibaeaqcaibaeaqcaibae" in output,
output)
self.failUnless("readkey: nvgh5vj2ekzzkim5fgtb4gey5y" in output, output)
self.failUnless("storage index: nt4fwemuw7flestsezvo2eveke" in output,
output)
self.failUnless("fingerprint: 737p57x6737p57x6737p57x6737p57x6737p57x6737p57x6737a" in output, output)
output = self._dump_cap("--client-secret", "5s33nk3qpvnj2fw3z4mnm2y6fa",
u.to_string())
self.failUnless("file renewal secret: arpszxzc2t6kb4okkg7sp765xgkni5z7caavj7lta73vmtymjlxq" in output, output)
output = self._dump_cap("--nodeid", "tqc35esocrvejvg4mablt6aowg6tl43j",
u.to_string())
self.failUnless("write_enabler: mgcavriox2wlb5eer26unwy5cw56elh3sjweffckkmivvsxtaknq" in output, output)
self.failIf("file renewal secret:" in output, output)
output = self._dump_cap("--nodeid", "tqc35esocrvejvg4mablt6aowg6tl43j",
"--client-secret", "5s33nk3qpvnj2fw3z4mnm2y6fa",
u.to_string())
self.failUnless("write_enabler: mgcavriox2wlb5eer26unwy5cw56elh3sjweffckkmivvsxtaknq" in output, output)
self.failUnless("file renewal secret: arpszxzc2t6kb4okkg7sp765xgkni5z7caavj7lta73vmtymjlxq" in output, output)
self.failUnless("lease renewal secret: 7pjtaumrb7znzkkbvekkmuwpqfjyfyamznfz4bwwvmh4nw33lorq" in output, output)
u = u.get_readonly()
output = self._dump_cap(u.to_string())
self.failUnless("Directory Read-only URI:" in output, output)
self.failUnless("readkey: nvgh5vj2ekzzkim5fgtb4gey5y" in output, output)
self.failUnless("storage index: nt4fwemuw7flestsezvo2eveke" in output, output)
self.failUnless("fingerprint: 737p57x6737p57x6737p57x6737p57x6737p57x6737p57x6737a" in output, output)
u = u.get_verify_cap()
output = self._dump_cap(u.to_string())
self.failUnless("Directory Verifier URI:" in output, output)
self.failUnless("storage index: nt4fwemuw7flestsezvo2eveke" in output, output)
self.failUnless("fingerprint: 737p57x6737p57x6737p57x6737p57x6737p57x6737p57x6737a" in output, output)
def test_dump_cap_mdmf_directory(self):
writekey = "\x01" * 16
fingerprint = "\xfe" * 32
u1 = uri.WriteableMDMFFileURI(writekey, fingerprint)
u = uri.MDMFDirectoryURI(u1)
output = self._dump_cap(u.to_string())
self.failUnless("Directory Writeable URI:" in output, output)
self.failUnless("writekey: aeaqcaibaeaqcaibaeaqcaibae" in output,
output)
self.failUnless("readkey: nvgh5vj2ekzzkim5fgtb4gey5y" in output, output)
self.failUnless("storage index: nt4fwemuw7flestsezvo2eveke" in output,
output)
self.failUnless("fingerprint: 737p57x6737p57x6737p57x6737p57x6737p57x6737p57x6737a" in output, output)
output = self._dump_cap("--client-secret", "5s33nk3qpvnj2fw3z4mnm2y6fa",
u.to_string())
self.failUnless("file renewal secret: arpszxzc2t6kb4okkg7sp765xgkni5z7caavj7lta73vmtymjlxq" in output, output)
output = self._dump_cap("--nodeid", "tqc35esocrvejvg4mablt6aowg6tl43j",
u.to_string())
self.failUnless("write_enabler: mgcavriox2wlb5eer26unwy5cw56elh3sjweffckkmivvsxtaknq" in output, output)
self.failIf("file renewal secret:" in output, output)
output = self._dump_cap("--nodeid", "tqc35esocrvejvg4mablt6aowg6tl43j",
"--client-secret", "5s33nk3qpvnj2fw3z4mnm2y6fa",
u.to_string())
self.failUnless("write_enabler: mgcavriox2wlb5eer26unwy5cw56elh3sjweffckkmivvsxtaknq" in output, output)
self.failUnless("file renewal secret: arpszxzc2t6kb4okkg7sp765xgkni5z7caavj7lta73vmtymjlxq" in output, output)
self.failUnless("lease renewal secret: 7pjtaumrb7znzkkbvekkmuwpqfjyfyamznfz4bwwvmh4nw33lorq" in output, output)
u = u.get_readonly()
output = self._dump_cap(u.to_string())
self.failUnless("Directory Read-only URI:" in output, output)
self.failUnless("readkey: nvgh5vj2ekzzkim5fgtb4gey5y" in output, output)
self.failUnless("storage index: nt4fwemuw7flestsezvo2eveke" in output, output)
self.failUnless("fingerprint: 737p57x6737p57x6737p57x6737p57x6737p57x6737p57x6737a" in output, output)
u = u.get_verify_cap()
output = self._dump_cap(u.to_string())
self.failUnless("Directory Verifier URI:" in output, output)
self.failUnless("storage index: nt4fwemuw7flestsezvo2eveke" in output, output)
self.failUnless("fingerprint: 737p57x6737p57x6737p57x6737p57x6737p57x6737p57x6737a" in output, output)
def _catalog_shares(self, *basedirs):
o = debug.CatalogSharesOptions()
o.stdout,o.stderr = StringIO(), StringIO()
args = list(basedirs)
o.parseOptions(args)
debug.catalog_shares(o)
out = o.stdout.getvalue()
err = o.stderr.getvalue()
return out, err
def test_catalog_shares_error(self):
nodedir1 = "cli/test_catalog_shares/node1"
sharedir = os.path.join(nodedir1, "storage", "shares", "mq", "mqfblse6m5a6dh45isu2cg7oji")
fileutil.make_dirs(sharedir)
fileutil.write("cli/test_catalog_shares/node1/storage/shares/mq/not-a-dir", "")
# write a bogus share that looks a little bit like CHK
fileutil.write(os.path.join(sharedir, "8"),
"\x00\x00\x00\x01" + "\xff" * 200) # this triggers an assert
nodedir2 = "cli/test_catalog_shares/node2"
fileutil.make_dirs(nodedir2)
fileutil.write("cli/test_catalog_shares/node1/storage/shares/not-a-dir", "")
# now make sure that the 'catalog-shares' commands survives the error
out, err = self._catalog_shares(nodedir1, nodedir2)
self.failUnlessReallyEqual(out, "", out)
self.failUnless("Error processing " in err,
"didn't see 'error processing' in '%s'" % err)
#self.failUnless(nodedir1 in err,
# "didn't see '%s' in '%s'" % (nodedir1, err))
# windows mangles the path, and os.path.join isn't enough to make
# up for it, so just look for individual strings
self.failUnless("node1" in err,
"didn't see 'node1' in '%s'" % err)
self.failUnless("mqfblse6m5a6dh45isu2cg7oji" in err,
"didn't see 'mqfblse6m5a6dh45isu2cg7oji' in '%s'" % err)
def test_alias(self):
def s128(c): return base32.b2a(c*(128/8))
def s256(c): return base32.b2a(c*(256/8))
TA = "URI:DIR2:%s:%s" % (s128("T"), s256("T"))
WA = "URI:DIR2:%s:%s" % (s128("W"), s256("W"))
CA = "URI:DIR2:%s:%s" % (s128("C"), s256("C"))
aliases = {"tahoe": TA,
"work": WA,
"c": CA}
def ga1(path):
return get_alias(aliases, path, u"tahoe")
uses_lettercolon = common.platform_uses_lettercolon_drivename()
self.failUnlessReallyEqual(ga1(u"bare"), (TA, "bare"))
self.failUnlessReallyEqual(ga1(u"baredir/file"), (TA, "baredir/file"))
self.failUnlessReallyEqual(ga1(u"baredir/file:7"), (TA, "baredir/file:7"))
self.failUnlessReallyEqual(ga1(u"tahoe:"), (TA, ""))
self.failUnlessReallyEqual(ga1(u"tahoe:file"), (TA, "file"))
self.failUnlessReallyEqual(ga1(u"tahoe:dir/file"), (TA, "dir/file"))
self.failUnlessReallyEqual(ga1(u"work:"), (WA, ""))
self.failUnlessReallyEqual(ga1(u"work:file"), (WA, "file"))
self.failUnlessReallyEqual(ga1(u"work:dir/file"), (WA, "dir/file"))
# default != None means we really expect a tahoe path, regardless of
# whether we're on windows or not. This is what 'tahoe get' uses.
self.failUnlessReallyEqual(ga1(u"c:"), (CA, ""))
self.failUnlessReallyEqual(ga1(u"c:file"), (CA, "file"))
self.failUnlessReallyEqual(ga1(u"c:dir/file"), (CA, "dir/file"))
self.failUnlessReallyEqual(ga1(u"URI:stuff"), ("URI:stuff", ""))
self.failUnlessReallyEqual(ga1(u"URI:stuff/file"), ("URI:stuff", "file"))
self.failUnlessReallyEqual(ga1(u"URI:stuff:./file"), ("URI:stuff", "file"))
self.failUnlessReallyEqual(ga1(u"URI:stuff/dir/file"), ("URI:stuff", "dir/file"))
self.failUnlessReallyEqual(ga1(u"URI:stuff:./dir/file"), ("URI:stuff", "dir/file"))
self.failUnlessRaises(common.UnknownAliasError, ga1, u"missing:")
self.failUnlessRaises(common.UnknownAliasError, ga1, u"missing:dir")
self.failUnlessRaises(common.UnknownAliasError, ga1, u"missing:dir/file")
def ga2(path):
return get_alias(aliases, path, None)
self.failUnlessReallyEqual(ga2(u"bare"), (DefaultAliasMarker, "bare"))
self.failUnlessReallyEqual(ga2(u"baredir/file"),
(DefaultAliasMarker, "baredir/file"))
self.failUnlessReallyEqual(ga2(u"baredir/file:7"),
(DefaultAliasMarker, "baredir/file:7"))
self.failUnlessReallyEqual(ga2(u"baredir/sub:1/file:7"),
(DefaultAliasMarker, "baredir/sub:1/file:7"))
self.failUnlessReallyEqual(ga2(u"tahoe:"), (TA, ""))
self.failUnlessReallyEqual(ga2(u"tahoe:file"), (TA, "file"))
self.failUnlessReallyEqual(ga2(u"tahoe:dir/file"), (TA, "dir/file"))
# on windows, we really want c:foo to indicate a local file.
# default==None is what 'tahoe cp' uses.
if uses_lettercolon:
self.failUnlessReallyEqual(ga2(u"c:"), (DefaultAliasMarker, "c:"))
self.failUnlessReallyEqual(ga2(u"c:file"), (DefaultAliasMarker, "c:file"))
self.failUnlessReallyEqual(ga2(u"c:dir/file"),
(DefaultAliasMarker, "c:dir/file"))
else:
self.failUnlessReallyEqual(ga2(u"c:"), (CA, ""))
self.failUnlessReallyEqual(ga2(u"c:file"), (CA, "file"))
self.failUnlessReallyEqual(ga2(u"c:dir/file"), (CA, "dir/file"))
self.failUnlessReallyEqual(ga2(u"work:"), (WA, ""))
self.failUnlessReallyEqual(ga2(u"work:file"), (WA, "file"))
self.failUnlessReallyEqual(ga2(u"work:dir/file"), (WA, "dir/file"))
self.failUnlessReallyEqual(ga2(u"URI:stuff"), ("URI:stuff", ""))
self.failUnlessReallyEqual(ga2(u"URI:stuff/file"), ("URI:stuff", "file"))
self.failUnlessReallyEqual(ga2(u"URI:stuff:./file"), ("URI:stuff", "file"))
self.failUnlessReallyEqual(ga2(u"URI:stuff/dir/file"), ("URI:stuff", "dir/file"))
self.failUnlessReallyEqual(ga2(u"URI:stuff:./dir/file"), ("URI:stuff", "dir/file"))
self.failUnlessRaises(common.UnknownAliasError, ga2, u"missing:")
self.failUnlessRaises(common.UnknownAliasError, ga2, u"missing:dir")
self.failUnlessRaises(common.UnknownAliasError, ga2, u"missing:dir/file")
def ga3(path):
old = common.pretend_platform_uses_lettercolon
try:
common.pretend_platform_uses_lettercolon = True
retval = get_alias(aliases, path, None)
finally:
common.pretend_platform_uses_lettercolon = old
return retval
self.failUnlessReallyEqual(ga3(u"bare"), (DefaultAliasMarker, "bare"))
self.failUnlessReallyEqual(ga3(u"baredir/file"),
(DefaultAliasMarker, "baredir/file"))
self.failUnlessReallyEqual(ga3(u"baredir/file:7"),
(DefaultAliasMarker, "baredir/file:7"))
self.failUnlessReallyEqual(ga3(u"baredir/sub:1/file:7"),
(DefaultAliasMarker, "baredir/sub:1/file:7"))
self.failUnlessReallyEqual(ga3(u"tahoe:"), (TA, ""))
self.failUnlessReallyEqual(ga3(u"tahoe:file"), (TA, "file"))
self.failUnlessReallyEqual(ga3(u"tahoe:dir/file"), (TA, "dir/file"))
self.failUnlessReallyEqual(ga3(u"c:"), (DefaultAliasMarker, "c:"))
self.failUnlessReallyEqual(ga3(u"c:file"), (DefaultAliasMarker, "c:file"))
self.failUnlessReallyEqual(ga3(u"c:dir/file"),
(DefaultAliasMarker, "c:dir/file"))
self.failUnlessReallyEqual(ga3(u"work:"), (WA, ""))
self.failUnlessReallyEqual(ga3(u"work:file"), (WA, "file"))
self.failUnlessReallyEqual(ga3(u"work:dir/file"), (WA, "dir/file"))
self.failUnlessReallyEqual(ga3(u"URI:stuff"), ("URI:stuff", ""))
self.failUnlessReallyEqual(ga3(u"URI:stuff:./file"), ("URI:stuff", "file"))
self.failUnlessReallyEqual(ga3(u"URI:stuff:./dir/file"), ("URI:stuff", "dir/file"))
self.failUnlessRaises(common.UnknownAliasError, ga3, u"missing:")
self.failUnlessRaises(common.UnknownAliasError, ga3, u"missing:dir")
self.failUnlessRaises(common.UnknownAliasError, ga3, u"missing:dir/file")
# calling get_alias with a path that doesn't include an alias and
# default set to something that isn't in the aliases argument should
# raise an UnknownAliasError.
def ga4(path):
return get_alias(aliases, path, u"badddefault:")
self.failUnlessRaises(common.UnknownAliasError, ga4, u"afile")
self.failUnlessRaises(common.UnknownAliasError, ga4, u"a/dir/path/")
def ga5(path):
old = common.pretend_platform_uses_lettercolon
try:
common.pretend_platform_uses_lettercolon = True
retval = get_alias(aliases, path, u"baddefault:")
finally:
common.pretend_platform_uses_lettercolon = old
return retval
self.failUnlessRaises(common.UnknownAliasError, ga5, u"C:\\Windows")
def test_alias_tolerance(self):
def s128(c): return base32.b2a(c*(128/8))
def s256(c): return base32.b2a(c*(256/8))
TA = "URI:DIR2:%s:%s" % (s128("T"), s256("T"))
aliases = {"present": TA,
"future": "URI-FROM-FUTURE:ooh:aah"}
def ga1(path):
return get_alias(aliases, path, u"tahoe")
self.failUnlessReallyEqual(ga1(u"present:file"), (TA, "file"))
# this throws, via assert IDirnodeURI.providedBy(), since get_alias()
# wants a dirnode, and the future cap gives us UnknownURI instead.
self.failUnlessRaises(AssertionError, ga1, u"future:stuff")
def test_listdir_unicode_good(self):
filenames = [u'L\u00F4zane', u'Bern', u'Gen\u00E8ve'] # must be NFC
for name in filenames:
skip_if_cannot_represent_filename(name)
basedir = "cli/common/listdir_unicode_good"
fileutil.make_dirs(basedir)
for name in filenames:
open(os.path.join(unicode(basedir), name), "wb").close()
for file in listdir_unicode(unicode(basedir)):
self.failUnlessIn(normalize(file), filenames)
def test_exception_catcher(self):
self.basedir = "cli/exception_catcher"
stderr = StringIO()
exc = Exception("canary")
ns = Namespace()
ns.parse_called = False
def call_parse_or_exit(args):
ns.parse_called = True
raise exc
ns.sys_exit_called = False
def call_sys_exit(exitcode):
ns.sys_exit_called = True
self.failUnlessEqual(exitcode, 1)
def fake_react(f):
d = f("reactor")
# normally this Deferred would be errbacked with SystemExit, but
# since we mocked out sys.exit, it will be fired with None. So
# it's safe to drop it on the floor.
del d
patcher = MonkeyPatcher((runner, 'parse_or_exit_with_explanation',
call_parse_or_exit),
(sys, 'argv', ["tahoe"]),
(sys, 'exit', call_sys_exit),
(sys, 'stderr', stderr),
(task, 'react', fake_react),
)
patcher.runWithPatches(runner.run)
self.failUnless(ns.parse_called)
self.failUnless(ns.sys_exit_called)
self.failUnlessIn(str(exc), stderr.getvalue())
class Help(unittest.TestCase):
def failUnlessInNormalized(self, x, y):
# helper function to deal with the --help output being wrapped to
# various widths, depending on the $COLUMNS environment variable
self.failUnlessIn(x.replace("\n", " "), y.replace("\n", " "))
def test_get(self):
help = str(cli.GetOptions())
self.failUnlessIn("[options] REMOTE_FILE LOCAL_FILE", help)
self.failUnlessIn("% tahoe get FOO |less", help)
def test_put(self):
help = str(cli.PutOptions())
self.failUnlessIn("[options] LOCAL_FILE REMOTE_FILE", help)
self.failUnlessIn("% cat FILE | tahoe put", help)
def test_ls(self):
help = str(cli.ListOptions())
self.failUnlessIn("[options] [PATH]", help)
def test_unlink(self):
help = str(cli.UnlinkOptions())
self.failUnlessIn("[options] REMOTE_FILE", help)
def test_rm(self):
help = str(cli.RmOptions())
self.failUnlessIn("[options] REMOTE_FILE", help)
def test_mv(self):
help = str(cli.MvOptions())
self.failUnlessIn("[options] FROM TO", help)
self.failUnlessInNormalized("Use 'tahoe mv' to move files", help)
def test_cp(self):
help = str(cli.CpOptions())
self.failUnlessIn("[options] FROM.. TO", help)
self.failUnlessInNormalized("Use 'tahoe cp' to copy files", help)
def test_ln(self):
help = str(cli.LnOptions())
self.failUnlessIn("[options] FROM_LINK TO_LINK", help)
self.failUnlessInNormalized("Use 'tahoe ln' to duplicate a link", help)
def test_mkdir(self):
help = str(cli.MakeDirectoryOptions())
self.failUnlessIn("[options] [REMOTE_DIR]", help)
self.failUnlessInNormalized("Create a new directory", help)
def test_backup(self):
help = str(cli.BackupOptions())
self.failUnlessIn("[options] FROM ALIAS:TO", help)
def test_webopen(self):
help = str(cli.WebopenOptions())
self.failUnlessIn("[options] [ALIAS:PATH]", help)
def test_manifest(self):
help = str(cli.ManifestOptions())
self.failUnlessIn("[options] [ALIAS:PATH]", help)
def test_stats(self):
help = str(cli.StatsOptions())
self.failUnlessIn("[options] [ALIAS:PATH]", help)
def test_check(self):
help = str(cli.CheckOptions())
self.failUnlessIn("[options] [ALIAS:PATH]", help)
def test_deep_check(self):
help = str(cli.DeepCheckOptions())
self.failUnlessIn("[options] [ALIAS:PATH]", help)
def test_create_alias(self):
help = str(cli.CreateAliasOptions())
self.failUnlessIn("[options] ALIAS[:]", help)
def test_add_alias(self):
help = str(cli.AddAliasOptions())
self.failUnlessIn("[options] ALIAS[:] DIRCAP", help)
def test_list_aliases(self):
help = str(cli.ListAliasesOptions())
self.failUnlessIn("[options]", help)
def test_start(self):
help = str(tahoe_start.StartOptions())
self.failUnlessIn("[options] [NODEDIR [twistd-options]]", help)
def test_stop(self):
help = str(tahoe_stop.StopOptions())
self.failUnlessIn("[options] [NODEDIR]", help)
def test_restart(self):
help = str(tahoe_restart.RestartOptions())
self.failUnlessIn("[options] [NODEDIR [twistd-options]]", help)
def test_run(self):
help = str(tahoe_run.RunOptions())
self.failUnlessIn("[options] [NODEDIR [twistd-options]]", help)
def test_create_client(self):
help = str(create_node.CreateClientOptions())
self.failUnlessIn("[options] [NODEDIR]", help)
def test_create_node(self):
help = str(create_node.CreateNodeOptions())
self.failUnlessIn("[options] [NODEDIR]", help)
def test_create_introducer(self):
help = str(create_node.CreateIntroducerOptions())
self.failUnlessIn("[options] NODEDIR", help)
def test_debug_flogtool(self):
options = debug.FlogtoolOptions()
help = str(options)
self.failUnlessIn(" [global-options] debug flogtool ", help)
self.failUnlessInNormalized("The 'tahoe debug flogtool' command uses the correct imports", help)
for (option, shortcut, oClass, desc) in options.subCommands:
subhelp = str(oClass())
self.failUnlessIn(" [global-options] debug flogtool %s " % (option,), subhelp)
class Ln(GridTestMixin, CLITestMixin, unittest.TestCase):
def _create_test_file(self):
data = "puppies" * 1000
path = os.path.join(self.basedir, "datafile")
fileutil.write(path, data)
self.datafile = path
def test_ln_without_alias(self):
# if invoked without an alias when the 'tahoe' alias doesn't
# exist, 'tahoe ln' should output a useful error message and not
# a stack trace
self.basedir = "cli/Ln/ln_without_alias"
self.set_up_grid(oneshare=True)
d = self.do_cli("ln", "from", "to")
def _check((rc, out, err)):
self.failUnlessReallyEqual(rc, 1)
self.failUnlessIn("error:", err)
self.failUnlessReallyEqual(out, "")
d.addCallback(_check)
# Make sure that validation extends to the "to" parameter
d.addCallback(lambda ign: self.do_cli("create-alias", "havasu"))
d.addCallback(lambda ign: self._create_test_file())
d.addCallback(lambda ign: self.do_cli("put", self.datafile,
"havasu:from"))
d.addCallback(lambda ign: self.do_cli("ln", "havasu:from", "to"))
d.addCallback(_check)
return d
def test_ln_with_nonexistent_alias(self):
# If invoked with aliases that don't exist, 'tahoe ln' should
# output a useful error message and not a stack trace.
self.basedir = "cli/Ln/ln_with_nonexistent_alias"
self.set_up_grid(oneshare=True)
d = self.do_cli("ln", "havasu:from", "havasu:to")
def _check((rc, out, err)):
self.failUnlessReallyEqual(rc, 1)
self.failUnlessIn("error:", err)
d.addCallback(_check)
# Make sure that validation occurs on the to parameter if the
# from parameter passes.
d.addCallback(lambda ign: self.do_cli("create-alias", "havasu"))
d.addCallback(lambda ign: self._create_test_file())
d.addCallback(lambda ign: self.do_cli("put", self.datafile,
"havasu:from"))
d.addCallback(lambda ign: self.do_cli("ln", "havasu:from", "huron:to"))
d.addCallback(_check)
return d
class Admin(unittest.TestCase):
def test_generate_keypair(self):
d = run_cli("admin", "generate-keypair")
def _done( (rc, stdout, stderr) ):
lines = [line.strip() for line in stdout.splitlines()]
privkey_bits = lines[0].split()
pubkey_bits = lines[1].split()
sk_header = "private:"
vk_header = "public:"
self.failUnlessEqual(privkey_bits[0], sk_header, lines[0])
self.failUnlessEqual(pubkey_bits[0], vk_header, lines[1])
self.failUnless(privkey_bits[1].startswith("priv-v0-"), lines[0])
self.failUnless(pubkey_bits[1].startswith("pub-v0-"), lines[1])
sk_bytes = base32.a2b(keyutil.remove_prefix(privkey_bits[1], "priv-v0-"))
sk = ed25519.SigningKey(sk_bytes)
vk_bytes = base32.a2b(keyutil.remove_prefix(pubkey_bits[1], "pub-v0-"))
self.failUnlessEqual(sk.get_verifying_key_bytes(), vk_bytes)
d.addCallback(_done)
return d
def test_derive_pubkey(self):
priv1,pub1 = keyutil.make_keypair()
d = run_cli("admin", "derive-pubkey", priv1)
def _done( (rc, stdout, stderr) ):
lines = stdout.split("\n")
privkey_line = lines[0].strip()
pubkey_line = lines[1].strip()
sk_header = "private: priv-v0-"
vk_header = "public: pub-v0-"
self.failUnless(privkey_line.startswith(sk_header), privkey_line)
self.failUnless(pubkey_line.startswith(vk_header), pubkey_line)
pub2 = pubkey_line[len(vk_header):]
self.failUnlessEqual("pub-v0-"+pub2, pub1)
d.addCallback(_done)
return d
class Errors(GridTestMixin, CLITestMixin, unittest.TestCase):
def test_get(self):
self.basedir = "cli/Errors/get"
self.set_up_grid()
c0 = self.g.clients[0]
self.fileurls = {}
DATA = "data" * 100
d = c0.upload(upload.Data(DATA, convergence=""))
def _stash_bad(ur):
self.uri_1share = ur.get_uri()
self.delete_shares_numbered(ur.get_uri(), range(1,10))
d.addCallback(_stash_bad)
# the download is abandoned as soon as it's clear that we won't get
# enough shares. The one remaining share might be in either the
# COMPLETE or the PENDING state.
in_complete_msg = "ran out of shares: complete=sh0 pending= overdue= unused= need 3"
in_pending_msg_regex = "ran out of shares: complete= pending=Share\(.+\) overdue= unused= need 3"
d.addCallback(lambda ign: self.do_cli("get", self.uri_1share))
def _check1((rc, out, err)):
self.failIfEqual(rc, 0)
self.failUnless("410 Gone" in err, err)
self.failUnlessIn("NotEnoughSharesError: ", err)
self.failUnless(in_complete_msg in err or re.search(in_pending_msg_regex, err))
d.addCallback(_check1)
targetf = os.path.join(self.basedir, "output")
d.addCallback(lambda ign: self.do_cli("get", self.uri_1share, targetf))
def _check2((rc, out, err)):
self.failIfEqual(rc, 0)
self.failUnless("410 Gone" in err, err)
self.failUnlessIn("NotEnoughSharesError: ", err)
self.failUnless(in_complete_msg in err or re.search(in_pending_msg_regex, err))
self.failIf(os.path.exists(targetf))
d.addCallback(_check2)
return d
def test_broken_socket(self):
# When the http connection breaks (such as when node.url is overwritten
# by a confused user), a user friendly error message should be printed.
self.basedir = "cli/Errors/test_broken_socket"
self.set_up_grid(oneshare=True)
# Simulate a connection error
def _socket_error(*args, **kwargs):
raise socket_error('test error')
self.patch(allmydata.scripts.common_http.httplib.HTTPConnection,
"endheaders", _socket_error)
d = self.do_cli("mkdir")
def _check_invalid((rc,stdout,stderr)):
self.failIfEqual(rc, 0)
self.failUnlessIn("Error trying to connect to http://127.0.0.1", stderr)
d.addCallback(_check_invalid)
return d
class Get(GridTestMixin, CLITestMixin, unittest.TestCase):
def test_get_without_alias(self):
# 'tahoe get' should output a useful error message when invoked
# without an explicit alias and when the default 'tahoe' alias
# hasn't been created yet.
self.basedir = "cli/Get/get_without_alias"
self.set_up_grid(oneshare=True)
d = self.do_cli('get', 'file')
def _check((rc, out, err)):
self.failUnlessReallyEqual(rc, 1)
self.failUnlessIn("error:", err)
self.failUnlessReallyEqual(out, "")
d.addCallback(_check)
return d
def test_get_with_nonexistent_alias(self):
# 'tahoe get' should output a useful error message when invoked with
# an explicit alias that doesn't exist.
self.basedir = "cli/Get/get_with_nonexistent_alias"
self.set_up_grid(oneshare=True)
d = self.do_cli("get", "nonexistent:file")
def _check((rc, out, err)):
self.failUnlessReallyEqual(rc, 1)
self.failUnlessIn("error:", err)
self.failUnlessIn("nonexistent", err)
self.failUnlessReallyEqual(out, "")
d.addCallback(_check)
return d
class Manifest(GridTestMixin, CLITestMixin, unittest.TestCase):
def test_manifest_without_alias(self):
# 'tahoe manifest' should output a useful error message when invoked
# without an explicit alias when the default 'tahoe' alias is
# missing.
self.basedir = "cli/Manifest/manifest_without_alias"
self.set_up_grid(oneshare=True)
d = self.do_cli("manifest")
def _check((rc, out, err)):
self.failUnlessReallyEqual(rc, 1)
self.failUnlessIn("error:", err)
self.failUnlessReallyEqual(out, "")
d.addCallback(_check)
return d
def test_manifest_with_nonexistent_alias(self):
# 'tahoe manifest' should output a useful error message when invoked
# with an explicit alias that doesn't exist.
self.basedir = "cli/Manifest/manifest_with_nonexistent_alias"
self.set_up_grid(oneshare=True)
d = self.do_cli("manifest", "nonexistent:")
def _check((rc, out, err)):
self.failUnlessReallyEqual(rc, 1)
self.failUnlessIn("error:", err)
self.failUnlessIn("nonexistent", err)
self.failUnlessReallyEqual(out, "")
d.addCallback(_check)
return d
class Mkdir(GridTestMixin, CLITestMixin, unittest.TestCase):
def test_mkdir(self):
self.basedir = os.path.dirname(self.mktemp())
self.set_up_grid(oneshare=True)
d = self.do_cli("create-alias", "tahoe")
d.addCallback(lambda res: self.do_cli("mkdir", "test"))
def _check((rc, out, err)):
self.failUnlessReallyEqual(rc, 0)
self.failUnlessReallyEqual(err, "")
self.failUnlessIn("URI:", out)
d.addCallback(_check)
return d
def test_mkdir_mutable_type(self):
self.basedir = os.path.dirname(self.mktemp())
self.set_up_grid(oneshare=True)
d = self.do_cli("create-alias", "tahoe")
def _check((rc, out, err), st):
self.failUnlessReallyEqual(rc, 0)
self.failUnlessReallyEqual(err, "")
self.failUnlessIn(st, out)
return out
def _mkdir(ign, mutable_type, uri_prefix, dirname):
d2 = self.do_cli("mkdir", "--format="+mutable_type, dirname)
d2.addCallback(_check, uri_prefix)
def _stash_filecap(cap):
u = uri.from_string(cap)
fn_uri = u.get_filenode_cap()
self._filecap = fn_uri.to_string()
d2.addCallback(_stash_filecap)
d2.addCallback(lambda ign: self.do_cli("ls", "--json", dirname))
d2.addCallback(_check, uri_prefix)
d2.addCallback(lambda ign: self.do_cli("ls", "--json", self._filecap))
d2.addCallback(_check, '"format": "%s"' % (mutable_type.upper(),))
return d2
d.addCallback(_mkdir, "sdmf", "URI:DIR2", "tahoe:foo")
d.addCallback(_mkdir, "SDMF", "URI:DIR2", "tahoe:foo2")
d.addCallback(_mkdir, "mdmf", "URI:DIR2-MDMF", "tahoe:bar")
d.addCallback(_mkdir, "MDMF", "URI:DIR2-MDMF", "tahoe:bar2")
return d
def test_mkdir_mutable_type_unlinked(self):
self.basedir = os.path.dirname(self.mktemp())
self.set_up_grid(oneshare=True)
d = self.do_cli("mkdir", "--format=SDMF")
def _check((rc, out, err), st):
self.failUnlessReallyEqual(rc, 0)
self.failUnlessReallyEqual(err, "")
self.failUnlessIn(st, out)
return out
d.addCallback(_check, "URI:DIR2")
def _stash_dircap(cap):
self._dircap = cap
# Now we're going to feed the cap into uri.from_string...
u = uri.from_string(cap)
# ...grab the underlying filenode uri.
fn_uri = u.get_filenode_cap()
# ...and stash that.
self._filecap = fn_uri.to_string()
d.addCallback(_stash_dircap)
d.addCallback(lambda res: self.do_cli("ls", "--json",
self._filecap))
d.addCallback(_check, '"format": "SDMF"')
d.addCallback(lambda res: self.do_cli("mkdir", "--format=MDMF"))
d.addCallback(_check, "URI:DIR2-MDMF")
d.addCallback(_stash_dircap)
d.addCallback(lambda res: self.do_cli("ls", "--json",
self._filecap))
d.addCallback(_check, '"format": "MDMF"')
return d
def test_mkdir_bad_mutable_type(self):
o = cli.MakeDirectoryOptions()
self.failUnlessRaises(usage.UsageError,
o.parseOptions,
["--format=LDMF"])
def test_mkdir_unicode(self):
self.basedir = os.path.dirname(self.mktemp())
self.set_up_grid(oneshare=True)
try:
motorhead_arg = u"tahoe:Mot\u00F6rhead".encode(get_io_encoding())
except UnicodeEncodeError:
raise unittest.SkipTest("A non-ASCII command argument could not be encoded on this platform.")
d = self.do_cli("create-alias", "tahoe")
d.addCallback(lambda res: self.do_cli("mkdir", motorhead_arg))
def _check((rc, out, err)):
self.failUnlessReallyEqual(rc, 0)
self.failUnlessReallyEqual(err, "")
self.failUnlessIn("URI:", out)
d.addCallback(_check)
return d
def test_mkdir_with_nonexistent_alias(self):
# when invoked with an alias that doesn't exist, 'tahoe mkdir' should
# output a sensible error message rather than a stack trace.
self.basedir = "cli/Mkdir/mkdir_with_nonexistent_alias"
self.set_up_grid(oneshare=True)
d = self.do_cli("mkdir", "havasu:")
def _check((rc, out, err)):
self.failUnlessReallyEqual(rc, 1)
self.failUnlessIn("error:", err)
self.failUnlessReallyEqual(out, "")
d.addCallback(_check)
return d
class Unlink(GridTestMixin, CLITestMixin, unittest.TestCase):
command = "unlink"
def _create_test_file(self):
data = "puppies" * 1000
path = os.path.join(self.basedir, "datafile")
fileutil.write(path, data)
self.datafile = path
def test_unlink_without_alias(self):
# 'tahoe unlink' should behave sensibly when invoked without an explicit
# alias before the default 'tahoe' alias has been created.
self.basedir = "cli/Unlink/%s_without_alias" % (self.command,)
self.set_up_grid(oneshare=True)
d = self.do_cli(self.command, "afile")
def _check((rc, out, err)):
self.failUnlessReallyEqual(rc, 1)
self.failUnlessIn("error:", err)
self.failUnlessReallyEqual(out, "")
d.addCallback(_check)
d.addCallback(lambda ign: self.do_cli(self.command, "afile"))
d.addCallback(_check)
return d
def test_unlink_with_nonexistent_alias(self):
# 'tahoe unlink' should behave sensibly when invoked with an explicit
# alias that doesn't exist.
self.basedir = "cli/Unlink/%s_with_nonexistent_alias" % (self.command,)
self.set_up_grid(oneshare=True)
d = self.do_cli(self.command, "nonexistent:afile")
def _check((rc, out, err)):
self.failUnlessReallyEqual(rc, 1)
self.failUnlessIn("error:", err)
self.failUnlessIn("nonexistent", err)
self.failUnlessReallyEqual(out, "")
d.addCallback(_check)
d.addCallback(lambda ign: self.do_cli(self.command, "nonexistent:afile"))
d.addCallback(_check)
return d
def test_unlink_without_path(self):
# 'tahoe unlink' should give a sensible error message when invoked without a path.
self.basedir = "cli/Unlink/%s_without_path" % (self.command,)
self.set_up_grid(oneshare=True)
self._create_test_file()
d = self.do_cli("create-alias", "tahoe")
d.addCallback(lambda ign: self.do_cli("put", self.datafile, "tahoe:test"))
def _do_unlink((rc, out, err)):
self.failUnlessReallyEqual(rc, 0)
self.failUnless(out.startswith("URI:"), out)
return self.do_cli(self.command, out.strip('\n'))
d.addCallback(_do_unlink)
def _check((rc, out, err)):
self.failUnlessReallyEqual(rc, 1)
self.failUnlessIn("'tahoe %s'" % (self.command,), err)
self.failUnlessIn("path must be given", err)
self.failUnlessReallyEqual(out, "")
d.addCallback(_check)
return d
class Rm(Unlink):
"""Test that 'tahoe rm' behaves in the same way as 'tahoe unlink'."""
command = "rm"
class Stats(GridTestMixin, CLITestMixin, unittest.TestCase):
def test_empty_directory(self):
self.basedir = "cli/Stats/empty_directory"
self.set_up_grid(oneshare=True)
c0 = self.g.clients[0]
self.fileurls = {}
d = c0.create_dirnode()
def _stash_root(n):
self.rootnode = n
self.rooturi = n.get_uri()
d.addCallback(_stash_root)
# make sure we can get stats on an empty directory too
d.addCallback(lambda ign: self.do_cli("stats", self.rooturi))
def _check_stats((rc, out, err)):
self.failUnlessReallyEqual(err, "")
self.failUnlessReallyEqual(rc, 0)
lines = out.splitlines()
self.failUnlessIn(" count-immutable-files: 0", lines)
self.failUnlessIn(" count-mutable-files: 0", lines)
self.failUnlessIn(" count-literal-files: 0", lines)
self.failUnlessIn(" count-directories: 1", lines)
self.failUnlessIn(" size-immutable-files: 0", lines)
self.failIfIn("Size Histogram:", lines)
d.addCallback(_check_stats)
return d
def test_stats_without_alias(self):
# when invoked with no explicit alias and before the default 'tahoe'
# alias is created, 'tahoe stats' should output an informative error
# message, not a stack trace.
self.basedir = "cli/Stats/stats_without_alias"
self.set_up_grid(oneshare=True)
d = self.do_cli("stats")
def _check((rc, out, err)):
self.failUnlessReallyEqual(rc, 1)
self.failUnlessIn("error:", err)
self.failUnlessReallyEqual(out, "")
d.addCallback(_check)
return d
def test_stats_with_nonexistent_alias(self):
# when invoked with an explicit alias that doesn't exist,
# 'tahoe stats' should output a useful error message.
self.basedir = "cli/Stats/stats_with_nonexistent_alias"
self.set_up_grid(oneshare=True)
d = self.do_cli("stats", "havasu:")
def _check((rc, out, err)):
self.failUnlessReallyEqual(rc, 1)
self.failUnlessIn("error:", err)
self.failUnlessReallyEqual(out, "")
d.addCallback(_check)
return d
class Webopen(GridTestMixin, CLITestMixin, unittest.TestCase):
def test_webopen_with_nonexistent_alias(self):
# when invoked with an alias that doesn't exist, 'tahoe webopen'
# should output an informative error message instead of a stack
# trace.
self.basedir = "cli/Webopen/webopen_with_nonexistent_alias"
self.set_up_grid(oneshare=True)
d = self.do_cli("webopen", "fake:")
def _check((rc, out, err)):
self.failUnlessReallyEqual(rc, 1)
self.failUnlessIn("error:", err)
self.failUnlessReallyEqual(out, "")
d.addCallback(_check)
return d
def test_webopen(self):
# TODO: replace with @patch that supports Deferreds.
import webbrowser
def call_webbrowser_open(url):
self.failUnlessIn(self.alias_uri.replace(':', '%3A'), url)
self.webbrowser_open_called = True
def _cleanup(res):
webbrowser.open = self.old_webbrowser_open
return res
self.old_webbrowser_open = webbrowser.open
try:
webbrowser.open = call_webbrowser_open
self.basedir = "cli/Webopen/webopen"
self.set_up_grid(oneshare=True)
d = self.do_cli("create-alias", "alias:")
def _check_alias((rc, out, err)):
self.failUnlessReallyEqual(rc, 0, repr((rc, out, err)))
self.failUnlessIn("Alias 'alias' created", out)
self.failUnlessReallyEqual(err, "")
self.alias_uri = get_aliases(self.get_clientdir())["alias"]
d.addCallback(_check_alias)
d.addCallback(lambda res: self.do_cli("webopen", "alias:"))
def _check_webopen((rc, out, err)):
self.failUnlessReallyEqual(rc, 0, repr((rc, out, err)))
self.failUnlessReallyEqual(out, "")
self.failUnlessReallyEqual(err, "")
self.failUnless(self.webbrowser_open_called)
d.addCallback(_check_webopen)
d.addBoth(_cleanup)
except:
_cleanup(None)
raise
return d
class Options(ReallyEqualMixin, unittest.TestCase):
# this test case only looks at argument-processing and simple stuff.
def parse(self, args, stdout=None):
o = runner.Options()
if stdout is not None:
o.stdout = stdout
o.parseOptions(args)
while hasattr(o, "subOptions"):
o = o.subOptions
return o
def test_list(self):
fileutil.rm_dir("cli/test_options")
fileutil.make_dirs("cli/test_options")
fileutil.make_dirs("cli/test_options/private")
fileutil.write("cli/test_options/node.url", "http://localhost:8080/\n")
filenode_uri = uri.WriteableSSKFileURI(writekey="\x00"*16,
fingerprint="\x00"*32)
private_uri = uri.DirectoryURI(filenode_uri).to_string()
fileutil.write("cli/test_options/private/root_dir.cap", private_uri + "\n")
def parse2(args): return parse_options("cli/test_options", "ls", args)
o = parse2([])
self.failUnlessEqual(o['node-url'], "http://localhost:8080/")
self.failUnlessEqual(o.aliases[DEFAULT_ALIAS], private_uri)
self.failUnlessEqual(o.where, u"")
o = parse2(["--node-url", "http://example.org:8111/"])
self.failUnlessEqual(o['node-url'], "http://example.org:8111/")
self.failUnlessEqual(o.aliases[DEFAULT_ALIAS], private_uri)
self.failUnlessEqual(o.where, u"")
# -u for --node-url used to clash with -u for --uri (tickets #1949 and #2137).
o = parse2(["-u", "http://example.org:8111/"])
self.failUnlessEqual(o['node-url'], "http://example.org:8111/")
self.failUnlessEqual(o.aliases[DEFAULT_ALIAS], private_uri)
self.failUnlessEqual(o.where, u"")
self.failIf(o["uri"])
o = parse2(["-u", "http://example.org:8111/", "--uri"])
self.failUnlessEqual(o['node-url'], "http://example.org:8111/")
self.failUnlessEqual(o.aliases[DEFAULT_ALIAS], private_uri)
self.failUnlessEqual(o.where, u"")
self.failUnless(o["uri"])
o = parse2(["--dir-cap", "root"])
self.failUnlessEqual(o['node-url'], "http://localhost:8080/")
self.failUnlessEqual(o.aliases[DEFAULT_ALIAS], "root")
self.failUnlessEqual(o.where, u"")
other_filenode_uri = uri.WriteableSSKFileURI(writekey="\x11"*16,
fingerprint="\x11"*32)
other_uri = uri.DirectoryURI(other_filenode_uri).to_string()
o = parse2(["--dir-cap", other_uri])
self.failUnlessEqual(o['node-url'], "http://localhost:8080/")
self.failUnlessEqual(o.aliases[DEFAULT_ALIAS], other_uri)
self.failUnlessEqual(o.where, u"")
o = parse2(["--dir-cap", other_uri, "subdir"])
self.failUnlessEqual(o['node-url'], "http://localhost:8080/")
self.failUnlessEqual(o.aliases[DEFAULT_ALIAS], other_uri)
self.failUnlessEqual(o.where, u"subdir")
self.failUnlessRaises(usage.UsageError, parse2,
["--node-url", "NOT-A-URL"])
o = parse2(["--node-url", "http://localhost:8080"])
self.failUnlessEqual(o["node-url"], "http://localhost:8080/")
o = parse2(["--node-url", "https://localhost/"])
self.failUnlessEqual(o["node-url"], "https://localhost/")
def test_version(self):
# "tahoe --version" dumps text to stdout and exits
stdout = StringIO()
self.failUnlessRaises(SystemExit, self.parse, ["--version"], stdout)
self.failUnlessIn(allmydata.__appname__ + ":", stdout.getvalue())
# but "tahoe SUBCOMMAND --version" should be rejected
self.failUnlessRaises(usage.UsageError, self.parse,
["start", "--version"])
self.failUnlessRaises(usage.UsageError, self.parse,
["start", "--version-and-path"])
def test_quiet(self):
# accepted as an overall option, but not on subcommands
o = self.parse(["--quiet", "start"])
self.failUnless(o.parent["quiet"])
self.failUnlessRaises(usage.UsageError, self.parse,
["start", "--quiet"])
def test_basedir(self):
# accept a --node-directory option before the verb, or a --basedir
# option after, or a basedir argument after, but none in the wrong
# place, and not more than one of the three.
o = self.parse(["start"])
self.failUnlessReallyEqual(o["basedir"], os.path.join(fileutil.abspath_expanduser_unicode(u"~"),
u".tahoe"))
o = self.parse(["start", "here"])
self.failUnlessReallyEqual(o["basedir"], fileutil.abspath_expanduser_unicode(u"here"))
o = self.parse(["start", "--basedir", "there"])
self.failUnlessReallyEqual(o["basedir"], fileutil.abspath_expanduser_unicode(u"there"))
o = self.parse(["--node-directory", "there", "start"])
self.failUnlessReallyEqual(o["basedir"], fileutil.abspath_expanduser_unicode(u"there"))
o = self.parse(["start", "here", "--nodaemon"])
self.failUnlessReallyEqual(o["basedir"], fileutil.abspath_expanduser_unicode(u"here"))
self.failUnlessRaises(usage.UsageError, self.parse,
["--basedir", "there", "start"])
self.failUnlessRaises(usage.UsageError, self.parse,
["start", "--node-directory", "there"])
self.failUnlessRaises(usage.UsageError, self.parse,
["--node-directory=there",
"start", "--basedir=here"])
self.failUnlessRaises(usage.UsageError, self.parse,
["start", "--basedir=here", "anywhere"])
self.failUnlessRaises(usage.UsageError, self.parse,
["--node-directory=there",
"start", "anywhere"])
self.failUnlessRaises(usage.UsageError, self.parse,
["--node-directory=there",
"start", "--basedir=here", "anywhere"])
self.failUnlessRaises(usage.UsageError, self.parse,
["--node-directory=there", "start", "--nodaemon"])
self.failUnlessRaises(usage.UsageError, self.parse,
["start", "--basedir=here", "--nodaemon"])
class Stop(unittest.TestCase):
def test_non_numeric_pid(self):
"""
If the pidfile exists but does not contain a numeric value, a complaint to
this effect is written to stderr and the non-success result is
returned.
"""
basedir = FilePath(self.mktemp().decode("ascii"))
basedir.makedirs()
basedir.child(u"twistd.pid").setContent(b"foo")
config = tahoe_stop.StopOptions()
config.stdout = StringIO()
config.stderr = StringIO()
config['basedir'] = basedir.path
result_code = tahoe_stop.stop(config)
self.assertEqual(2, result_code)
self.assertIn("invalid PID file", config.stderr.getvalue())
class Start(unittest.TestCase):
@patch('allmydata.scripts.tahoe_daemonize.os.chdir')
@patch('allmydata.scripts.tahoe_daemonize.twistd')
def test_non_numeric_pid(self, mock_twistd, chdir):
"""
If the pidfile exists but does not contain a numeric value, a complaint to
this effect is written to stderr.
"""
basedir = FilePath(self.mktemp().decode("ascii"))
basedir.makedirs()
basedir.child(u"twistd.pid").setContent(b"foo")
basedir.child(u"tahoe-client.tac").setContent(b"")
config = tahoe_daemonize.DaemonizeOptions()
config.stdout = StringIO()
config.stderr = StringIO()
config['basedir'] = basedir.path
config.twistd_args = []
result_code = tahoe_daemonize.daemonize(config)
self.assertIn("invalid PID file", config.stderr.getvalue())
self.assertTrue(len(mock_twistd.mock_calls), 1)
self.assertEqual(mock_twistd.mock_calls[0][0], 'runApp')
self.assertEqual(0, result_code)