mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-01 00:45:52 +00:00
[wip] remove mock from test_alias, along with a bunch of encoding-related changes :/
This commit is contained in:
parent
c4b58fe00b
commit
77bebb9916
@ -22,7 +22,7 @@ def print_keypair(options):
|
||||
|
||||
class DerivePubkeyOptions(BaseOptions):
|
||||
def parseArgs(self, privkey):
|
||||
self.privkey = privkey
|
||||
self.privkey = privkey.encode("ascii")
|
||||
|
||||
def getSynopsis(self):
|
||||
return "Usage: tahoe [global-options] admin derive-pubkey PRIVKEY"
|
||||
|
@ -1,4 +1,5 @@
|
||||
from __future__ import print_function
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import os.path
|
||||
import codecs
|
||||
@ -10,7 +11,7 @@ from allmydata import uri
|
||||
from allmydata.scripts.common_http import do_http, check_http_error
|
||||
from allmydata.scripts.common import get_aliases
|
||||
from allmydata.util.fileutil import move_into_place
|
||||
from allmydata.util.encodingutil import quote_output
|
||||
from allmydata.util.encodingutil import quote_output, quote_output_u
|
||||
|
||||
|
||||
def add_line_to_aliasfile(aliasfile, alias, cap):
|
||||
@ -48,14 +49,13 @@ def add_alias(options):
|
||||
|
||||
old_aliases = get_aliases(nodedir)
|
||||
if alias in old_aliases:
|
||||
print("Alias %s already exists!" % quote_output(alias), file=stderr)
|
||||
print("Alias %s already exists!" % quote_output_u(alias), file=stderr)
|
||||
return 1
|
||||
aliasfile = os.path.join(nodedir, "private", "aliases")
|
||||
cap = uri.from_string_dirnode(cap).to_string()
|
||||
|
||||
add_line_to_aliasfile(aliasfile, alias, cap)
|
||||
|
||||
print("Alias %s added" % quote_output(alias), file=stdout)
|
||||
show_output(stdout, "Alias {alias} added", alias=alias)
|
||||
return 0
|
||||
|
||||
def create_alias(options):
|
||||
@ -93,11 +93,26 @@ def create_alias(options):
|
||||
# probably check for others..
|
||||
|
||||
add_line_to_aliasfile(aliasfile, alias, new_uri)
|
||||
|
||||
print("Alias %s created" % (quote_output(alias),), file=stdout)
|
||||
show_output(stdout, "Alias {alias} created", alias=alias)
|
||||
return 0
|
||||
|
||||
|
||||
def show_output(fp, template, **kwargs):
|
||||
assert isinstance(template, unicode)
|
||||
|
||||
# On Python 2 and Python 3 fp has an encoding attribute under real usage
|
||||
# but the test suite passes StringIO in many places which has no such
|
||||
# attribute. Make allowances for this until the test suite is fixed.
|
||||
encoding = getattr(fp, "encoding", "utf-8")
|
||||
output = template.format(**{
|
||||
k: quote_output_u(v, encoding=encoding)
|
||||
for (k, v)
|
||||
in kwargs.items()
|
||||
})
|
||||
safe_output = output.encode(encoding, "namereplace").decode(encoding)
|
||||
print(safe_output, file=fp)
|
||||
|
||||
|
||||
def _get_alias_details(nodedir):
|
||||
aliases = get_aliases(nodedir)
|
||||
alias_names = sorted(aliases.keys())
|
||||
@ -115,7 +130,7 @@ def list_aliases(options):
|
||||
data = _get_alias_details(options['node-directory'])
|
||||
|
||||
if options['json']:
|
||||
output = json.dumps(data, indent=4)
|
||||
output = json.dumps(data, indent=4).decode("utf-8")
|
||||
else:
|
||||
def dircap(details):
|
||||
return (
|
||||
@ -132,6 +147,9 @@ def list_aliases(options):
|
||||
in data.items()
|
||||
))
|
||||
|
||||
print(output, file=options.stdout)
|
||||
if output:
|
||||
# Show whatever we computed. Skip this if there is no output to avoid
|
||||
# a spurious blank line.
|
||||
print(output, file=options.stdout)
|
||||
|
||||
return 0
|
||||
|
@ -1,6 +1,6 @@
|
||||
from ...util.encodingutil import unicode_to_argv
|
||||
from ...scripts import runner
|
||||
from ..common_util import ReallyEqualMixin, run_cli
|
||||
from ..common_util import ReallyEqualMixin, run_cli, run_cli_ex
|
||||
|
||||
def parse_options(basedir, command, args):
|
||||
o = runner.Options()
|
||||
@ -10,10 +10,18 @@ def parse_options(basedir, command, args):
|
||||
return o
|
||||
|
||||
class CLITestMixin(ReallyEqualMixin):
|
||||
def do_cli_ex(self, verb, argv, client_num=0, **kwargs):
|
||||
# client_num is used to execute client CLI commands on a specific
|
||||
# client.
|
||||
client_dir = self.get_clientdir(i=client_num)
|
||||
nodeargs = [ u"--node-directory", client_dir ]
|
||||
return run_cli_ex(verb, argv, nodeargs=nodeargs, **kwargs)
|
||||
|
||||
|
||||
def do_cli(self, verb, *args, **kwargs):
|
||||
# client_num is used to execute client CLI commands on a specific
|
||||
# client.
|
||||
client_num = kwargs.get("client_num", 0)
|
||||
client_num = kwargs.pop("client_num", 0)
|
||||
client_dir = unicode_to_argv(self.get_clientdir(i=client_num))
|
||||
nodeargs = [ "--node-directory", client_dir ]
|
||||
return run_cli(verb, nodeargs=nodeargs, *args, **kwargs)
|
||||
nodeargs = [ b"--node-directory", client_dir ]
|
||||
return run_cli(verb, *args, nodeargs=nodeargs, **kwargs)
|
||||
|
@ -3,47 +3,60 @@ import json
|
||||
from twisted.trial import unittest
|
||||
from twisted.internet.defer import inlineCallbacks
|
||||
|
||||
from allmydata.util.encodingutil import unicode_to_argv
|
||||
from allmydata.scripts.common import get_aliases
|
||||
from allmydata.test.no_network import GridTestMixin
|
||||
from .common import CLITestMixin
|
||||
from allmydata.util.encodingutil import quote_output
|
||||
|
||||
# see also test_create_alias
|
||||
|
||||
class ListAlias(GridTestMixin, CLITestMixin, unittest.TestCase):
|
||||
|
||||
@inlineCallbacks
|
||||
def _test_list(self, alias):
|
||||
self.basedir = "cli/ListAlias/test_list"
|
||||
def _test_list(self, alias, encoding):
|
||||
self.basedir = self.mktemp()
|
||||
self.set_up_grid(oneshare=True)
|
||||
|
||||
rc, stdout, stderr = yield self.do_cli(
|
||||
"create-alias",
|
||||
unicode_to_argv(alias),
|
||||
rc, stdout, stderr = yield self.do_cli_ex(
|
||||
u"create-alias",
|
||||
[alias],
|
||||
encoding=encoding,
|
||||
)
|
||||
|
||||
self.assertIn(
|
||||
unicode_to_argv(u"Alias '{}' created".format(alias)),
|
||||
stdout,
|
||||
b"Alias {} created".format(quote_output(alias, encoding=encoding)),
|
||||
stdout.encode(encoding),
|
||||
)
|
||||
self.assertEqual("", stderr)
|
||||
aliases = get_aliases(self.get_clientdir())
|
||||
self.assertIn(alias, aliases)
|
||||
self.assertTrue(aliases[alias].startswith("URI:DIR2:"))
|
||||
self.assertTrue(aliases[alias].startswith(u"URI:DIR2:"))
|
||||
|
||||
rc, stdout, stderr = yield self.do_cli("list-aliases", "--json")
|
||||
rc, stdout, stderr = yield self.do_cli_ex(
|
||||
u"list-aliases",
|
||||
[u"--json"],
|
||||
encoding=encoding,
|
||||
)
|
||||
|
||||
self.assertEqual(0, rc)
|
||||
data = json.loads(stdout)
|
||||
self.assertIn(alias, data)
|
||||
data = data[alias]
|
||||
self.assertIn("readwrite", data)
|
||||
self.assertIn("readonly", data)
|
||||
self.assertIn(u"readwrite", data)
|
||||
self.assertIn(u"readonly", data)
|
||||
|
||||
|
||||
def test_list(self):
|
||||
return self._test_list(u"tahoe")
|
||||
def test_list_ascii(self):
|
||||
return self._test_list(u"tahoe", encoding="ascii")
|
||||
|
||||
|
||||
def test_list_unicode(self):
|
||||
return self._test_list(u"tahoe\{SNOWMAN}")
|
||||
def test_list_nonascii_ascii(self):
|
||||
return self._test_list(u"tahoe\N{SNOWMAN}", encoding="ascii")
|
||||
|
||||
|
||||
def test_list_utf_8(self):
|
||||
return self._test_list(u"tahoe", encoding="utf-8")
|
||||
|
||||
|
||||
def test_list_nonascii_utf_8(self):
|
||||
return self._test_list(u"tahoe\N{SNOWMAN}", encoding="utf-8")
|
||||
|
@ -661,7 +661,7 @@ starting copy, 2 files, 1 directories
|
||||
# This test ensures that tahoe will copy a file from the grid to
|
||||
# a local directory without a specified file name.
|
||||
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/2027
|
||||
self.basedir = "cli/Cp/cp_verbose"
|
||||
self.basedir = "cli/Cp/ticket_2027"
|
||||
self.set_up_grid(oneshare=True)
|
||||
|
||||
# Write a test file, which we'll copy to the grid.
|
||||
|
@ -6,7 +6,7 @@ from allmydata.util import fileutil
|
||||
from allmydata.scripts.common import get_aliases
|
||||
from allmydata.scripts import cli, runner
|
||||
from ..no_network import GridTestMixin
|
||||
from allmydata.util.encodingutil import quote_output, get_io_encoding
|
||||
from allmydata.util.encodingutil import quote_output_u, get_io_encoding
|
||||
from .common import CLITestMixin
|
||||
|
||||
class CreateAlias(GridTestMixin, CLITestMixin, unittest.TestCase):
|
||||
@ -171,7 +171,15 @@ class CreateAlias(GridTestMixin, CLITestMixin, unittest.TestCase):
|
||||
(rc, out, err) = args
|
||||
self.failUnlessReallyEqual(rc, 0)
|
||||
self.failUnlessReallyEqual(err, "")
|
||||
self.failUnlessIn("Alias %s created" % quote_output(u"\u00E9tudes"), out)
|
||||
self.assertIn(
|
||||
u"Alias %s created" % (
|
||||
quote_output_u(
|
||||
u"\u00E9tudes",
|
||||
encoding=get_io_encoding(),
|
||||
),
|
||||
),
|
||||
out.decode(get_io_encoding()),
|
||||
)
|
||||
|
||||
aliases = get_aliases(self.get_clientdir())
|
||||
self.failUnless(aliases[u"\u00E9tudes"].startswith("URI:DIR2:"))
|
||||
|
@ -5,6 +5,10 @@ import time
|
||||
import signal
|
||||
from random import randrange
|
||||
from six.moves import StringIO
|
||||
from io import (
|
||||
TextIOWrapper,
|
||||
BytesIO,
|
||||
)
|
||||
|
||||
from twisted.internet import reactor, defer
|
||||
from twisted.python import failure
|
||||
@ -35,24 +39,90 @@ def skip_if_cannot_represent_argv(u):
|
||||
except UnicodeEncodeError:
|
||||
raise unittest.SkipTest("A non-ASCII argv could not be encoded on this platform.")
|
||||
|
||||
|
||||
def _getvalue(io):
|
||||
"""
|
||||
Read out the complete contents of a file-like object.
|
||||
"""
|
||||
io.seek(0)
|
||||
return io.read()
|
||||
|
||||
|
||||
def run_cli(verb, *args, **kwargs):
|
||||
precondition(not [True for arg in args if not isinstance(arg, str)],
|
||||
"arguments to do_cli must be strs -- convert using unicode_to_argv", args=args)
|
||||
nodeargs = kwargs.get("nodeargs", [])
|
||||
argv = nodeargs + [verb] + list(args)
|
||||
stdin = kwargs.get("stdin", "")
|
||||
stdout = StringIO()
|
||||
stderr = StringIO()
|
||||
"""
|
||||
Run some CLI command using Python 2 stdout/stderr semantics.
|
||||
"""
|
||||
nodeargs = kwargs.pop("nodeargs", [])
|
||||
stdin = kwargs.pop("stdin", None)
|
||||
precondition(
|
||||
all(isinstance(arg, bytes) for arg in [verb] + (nodeargs or []) + list(args)),
|
||||
"arguments to run_cli must be bytes -- convert using unicode_to_argv",
|
||||
verb=verb,
|
||||
args=args,
|
||||
nodeargs=nodeargs,
|
||||
)
|
||||
encoding = "utf-8"
|
||||
d = run_cli_ex(
|
||||
verb=verb.decode(encoding),
|
||||
argv=list(arg.decode(encoding) for arg in args),
|
||||
nodeargs=list(nodearg.decode(encoding) for nodearg in nodeargs),
|
||||
stdin=stdin,
|
||||
)
|
||||
def maybe_encode(result):
|
||||
code, stdout, stderr = result
|
||||
# Make sure we produce bytes output since that's what all the code
|
||||
# written to use this interface expects. If you don't like that, use
|
||||
# run_cli_ex instead. We use get_io_encoding here to make sure that
|
||||
# whatever was written can actually be encoded that way, otherwise it
|
||||
# wouldn't really be writeable under real usage.
|
||||
if isinstance(stdout, unicode):
|
||||
stdout = stdout.encode(encoding)
|
||||
if isinstance(stderr, unicode):
|
||||
stderr = stderr.encode(encoding)
|
||||
return code, stdout, stderr
|
||||
d.addCallback(maybe_encode)
|
||||
return d
|
||||
|
||||
|
||||
def run_cli_ex(verb, argv, nodeargs=None, stdin=None, encoding=None):
|
||||
precondition(
|
||||
all(isinstance(arg, unicode) for arg in [verb] + (nodeargs or []) + argv),
|
||||
"arguments to run_cli_ex must be unicode",
|
||||
verb=verb,
|
||||
nodeargs=nodeargs,
|
||||
argv=argv,
|
||||
)
|
||||
if nodeargs is None:
|
||||
nodeargs = []
|
||||
argv = nodeargs + [verb] + list(argv)
|
||||
if stdin is None:
|
||||
stdin = ""
|
||||
if encoding is None:
|
||||
# The original behavior, the Python 2 behavior, is to accept either
|
||||
# bytes or unicode and try to automatically encode or decode as
|
||||
# necessary. This works okay for ASCII and if LANG is set
|
||||
# appropriately. These aren't great constraints so we should move
|
||||
# away from this behavior.
|
||||
stdout = StringIO()
|
||||
stderr = StringIO()
|
||||
else:
|
||||
# The new behavior, the Python 3 behavior, is to accept unicode and
|
||||
# encode it using a specific encoding. For older versions of Python
|
||||
# 3, the encoding is determined from LANG (bad) but for newer Python
|
||||
# 3, the encoding is always utf-8 (good). Tests can pass in different
|
||||
# encodings to exercise different behaviors.
|
||||
stdout = TextIOWrapper(BytesIO(), encoding)
|
||||
stderr = TextIOWrapper(BytesIO(), encoding)
|
||||
d = defer.succeed(argv)
|
||||
d.addCallback(runner.parse_or_exit_with_explanation, stdout=stdout)
|
||||
d.addCallback(runner.dispatch,
|
||||
stdin=StringIO(stdin),
|
||||
stdout=stdout, stderr=stderr)
|
||||
def _done(rc):
|
||||
return 0, stdout.getvalue(), stderr.getvalue()
|
||||
return 0, _getvalue(stdout), _getvalue(stderr)
|
||||
def _err(f):
|
||||
f.trap(SystemExit)
|
||||
return f.value.code, stdout.getvalue(), stderr.getvalue()
|
||||
return f.value.code, _getvalue(stdout), _getvalue(stderr)
|
||||
d.addCallbacks(_done, _err)
|
||||
return d
|
||||
|
||||
|
@ -252,6 +252,13 @@ ESCAPABLE_UNICODE = re.compile(u'([\uD800-\uDBFF][\uDC00-\uDFFF])|' # valid sur
|
||||
|
||||
ESCAPABLE_8BIT = re.compile( br'[^ !#\x25-\x5B\x5D-\x5F\x61-\x7E]', re.DOTALL)
|
||||
|
||||
def quote_output_u(*args, **kwargs):
|
||||
result = quote_output(*args, **kwargs)
|
||||
if isinstance(result, unicode):
|
||||
return result
|
||||
return result.decode("utf-8")
|
||||
|
||||
|
||||
def quote_output(s, quotemarks=True, quote_newlines=None, encoding=None):
|
||||
"""
|
||||
Encode either a Unicode string or a UTF-8-encoded bytestring for representation
|
||||
|
Loading…
x
Reference in New Issue
Block a user