Put run_cli back largely how it was

Also deal with StringIO better in show_output
This commit is contained in:
Jean-Paul Calderone 2020-12-06 20:37:28 -05:00
parent 613777d166
commit c12b082fa7
5 changed files with 82 additions and 62 deletions

View File

@ -98,6 +98,20 @@ def create_alias(options):
def show_output(fp, template, **kwargs): def show_output(fp, template, **kwargs):
"""
Print to just about anything.
:param fp: A file-like object to which to print. This handles the case
where ``fp`` declares a support encoding with the ``encoding``
attribute (eg sys.stdout on Python 3). It handles the case where
``fp`` declares no supported encoding via ``None`` for its
``encoding`` attribute (eg sys.stdout on Python 2 when stdout is not a
tty). It handles the case where ``fp`` declares an encoding that does
not support all of the characters in the output by forcing the
"namereplace" error handler. It handles the case where there is no
``encoding`` attribute at all (eg StringIO.StringIO) by writing
utf-8-encoded bytes.
"""
assert isinstance(template, unicode) assert isinstance(template, unicode)
# On Python 3 fp has an encoding attribute under all real usage. On # On Python 3 fp has an encoding attribute under all real usage. On
@ -105,13 +119,22 @@ def show_output(fp, template, **kwargs):
# test suite often passes StringIO which has no such attribute. Make # test suite often passes StringIO which has no such attribute. Make
# allowances for this until the test suite is fixed and Python 2 is no # allowances for this until the test suite is fixed and Python 2 is no
# more. # more.
encoding = getattr(fp, "encoding", None) or "utf-8" try:
encoding = fp.encoding or "utf-8"
except AttributeError:
has_encoding = False
encoding = "utf-8"
else:
has_encoding = True
output = template.format(**{ output = template.format(**{
k: quote_output_u(v, encoding=encoding) k: quote_output_u(v, encoding=encoding)
for (k, v) for (k, v)
in kwargs.items() in kwargs.items()
}) })
safe_output = output.encode(encoding, "namereplace").decode(encoding) safe_output = output.encode(encoding, "namereplace")
if has_encoding:
safe_output = safe_output.decode(encoding)
print(safe_output, file=fp) print(safe_output, file=fp)

View File

@ -1,6 +1,6 @@
from ...util.encodingutil import unicode_to_argv from ...util.encodingutil import unicode_to_argv
from ...scripts import runner from ...scripts import runner
from ..common_util import ReallyEqualMixin, run_cli, run_cli_ex from ..common_util import ReallyEqualMixin, run_cli, run_cli_unicode
def parse_options(basedir, command, args): def parse_options(basedir, command, args):
o = runner.Options() o = runner.Options()
@ -10,12 +10,12 @@ def parse_options(basedir, command, args):
return o return o
class CLITestMixin(ReallyEqualMixin): class CLITestMixin(ReallyEqualMixin):
def do_cli_ex(self, verb, argv, client_num=0, **kwargs): def do_cli_unicode(self, verb, argv, client_num=0, **kwargs):
# client_num is used to execute client CLI commands on a specific # client_num is used to execute client CLI commands on a specific
# client. # client.
client_dir = self.get_clientdir(i=client_num) client_dir = self.get_clientdir(i=client_num)
nodeargs = [ u"--node-directory", client_dir ] nodeargs = [ u"--node-directory", client_dir ]
return run_cli_ex(verb, argv, nodeargs=nodeargs, **kwargs) return run_cli_unicode(verb, argv, nodeargs=nodeargs, **kwargs)
def do_cli(self, verb, *args, **kwargs): def do_cli(self, verb, *args, **kwargs):

View File

@ -17,22 +17,24 @@ class ListAlias(GridTestMixin, CLITestMixin, unittest.TestCase):
self.basedir = self.mktemp() self.basedir = self.mktemp()
self.set_up_grid(oneshare=True) self.set_up_grid(oneshare=True)
rc, stdout, stderr = yield self.do_cli_ex( rc, stdout, stderr = yield self.do_cli_unicode(
u"create-alias", u"create-alias",
[alias], [alias],
encoding=encoding, encoding=encoding,
) )
self.assertIn( self.assertEqual(
b"Alias {} created".format(quote_output(alias, encoding=encoding)), b"Alias {} created\n".format(
stdout.encode(encoding), quote_output(alias, encoding=encoding),
),
stdout,
) )
self.assertEqual("", stderr) self.assertEqual("", stderr)
aliases = get_aliases(self.get_clientdir()) aliases = get_aliases(self.get_clientdir())
self.assertIn(alias, aliases) self.assertIn(alias, aliases)
self.assertTrue(aliases[alias].startswith(u"URI:DIR2:")) self.assertTrue(aliases[alias].startswith(u"URI:DIR2:"))
rc, stdout, stderr = yield self.do_cli_ex( rc, stdout, stderr = yield self.do_cli_unicode(
u"list-aliases", u"list-aliases",
[u"--json"], [u"--json"],
encoding=encoding, encoding=encoding,
@ -60,3 +62,11 @@ class ListAlias(GridTestMixin, CLITestMixin, unittest.TestCase):
def test_list_nonascii_utf_8(self): def test_list_nonascii_utf_8(self):
return self._test_list(u"tahoe\N{SNOWMAN}", encoding="utf-8") return self._test_list(u"tahoe\N{SNOWMAN}", encoding="utf-8")
def test_list_none(self):
return self._test_list(u"tahoe", encoding=None)
def test_list_nonascii_none(self):
return self._test_list(u"tahoe\N{SNOWMAN}", encoding=None)

View File

@ -6,7 +6,7 @@ from allmydata.util import fileutil
from allmydata.scripts.common import get_aliases from allmydata.scripts.common import get_aliases
from allmydata.scripts import cli, runner from allmydata.scripts import cli, runner
from ..no_network import GridTestMixin from ..no_network import GridTestMixin
from allmydata.util.encodingutil import quote_output_u, get_io_encoding from allmydata.util.encodingutil import quote_output, get_io_encoding
from .common import CLITestMixin from .common import CLITestMixin
class CreateAlias(GridTestMixin, CLITestMixin, unittest.TestCase): class CreateAlias(GridTestMixin, CLITestMixin, unittest.TestCase):
@ -171,15 +171,7 @@ class CreateAlias(GridTestMixin, CLITestMixin, unittest.TestCase):
(rc, out, err) = args (rc, out, err) = args
self.failUnlessReallyEqual(rc, 0) self.failUnlessReallyEqual(rc, 0)
self.failUnlessReallyEqual(err, "") self.failUnlessReallyEqual(err, "")
self.assertIn( self.failUnlessIn("Alias %s created" % quote_output(u"\u00E9tudes"), out)
u"Alias %s created" % (
quote_output_u(
u"\u00E9tudes",
encoding=get_io_encoding(),
),
),
out.decode(get_io_encoding()),
)
aliases = get_aliases(self.get_clientdir()) aliases = get_aliases(self.get_clientdir())
self.failUnless(aliases[u"\u00E9tudes"].startswith("URI:DIR2:")) self.failUnless(aliases[u"\u00E9tudes"].startswith("URI:DIR2:"))

View File

@ -48,55 +48,18 @@ def _getvalue(io):
return io.read() return io.read()
def run_cli(verb, *args, **kwargs): def run_cli_bytes(verb, *args, **kwargs):
"""
Run some CLI command using Python 2 stdout/stderr semantics.
"""
nodeargs = kwargs.pop("nodeargs", []) nodeargs = kwargs.pop("nodeargs", [])
stdin = kwargs.pop("stdin", None) encoding = kwargs.pop("encoding", None)
precondition( precondition(
all(isinstance(arg, bytes) for arg in [verb] + (nodeargs or []) + list(args)), all(isinstance(arg, bytes) for arg in [verb] + nodeargs + list(args)),
"arguments to run_cli must be bytes -- convert using unicode_to_argv", "arguments to run_cli must be bytes -- convert using unicode_to_argv",
verb=verb, verb=verb,
args=args, args=args,
nodeargs=nodeargs, nodeargs=nodeargs,
) )
encoding = "utf-8" argv = nodeargs + [verb] + list(args)
d = run_cli_ex( stdin = kwargs.get("stdin", "")
verb=verb.decode(encoding),
argv=list(arg.decode(encoding) for arg in args),
nodeargs=list(nodearg.decode(encoding) for nodearg in nodeargs),
stdin=stdin,
)
def maybe_encode(result):
code, stdout, stderr = result
# Make sure we produce bytes output since that's what all the code
# written to use this interface expects. If you don't like that, use
# run_cli_ex instead. We use get_io_encoding here to make sure that
# whatever was written can actually be encoded that way, otherwise it
# wouldn't really be writeable under real usage.
if isinstance(stdout, unicode):
stdout = stdout.encode(encoding)
if isinstance(stderr, unicode):
stderr = stderr.encode(encoding)
return code, stdout, stderr
d.addCallback(maybe_encode)
return d
def run_cli_ex(verb, argv, nodeargs=None, stdin=None, encoding=None):
precondition(
all(isinstance(arg, unicode) for arg in [verb] + (nodeargs or []) + argv),
"arguments to run_cli_ex must be unicode",
verb=verb,
nodeargs=nodeargs,
argv=argv,
)
if nodeargs is None:
nodeargs = []
argv = nodeargs + [verb] + list(argv)
if stdin is None:
stdin = ""
if encoding is None: if encoding is None:
# The original behavior, the Python 2 behavior, is to accept either # The original behavior, the Python 2 behavior, is to accept either
# bytes or unicode and try to automatically encode or decode as # bytes or unicode and try to automatically encode or decode as
@ -126,6 +89,38 @@ def run_cli_ex(verb, argv, nodeargs=None, stdin=None, encoding=None):
d.addCallbacks(_done, _err) d.addCallbacks(_done, _err)
return d return d
def run_cli_unicode(verb, argv, nodeargs=None, stdin=None, encoding=None):
if nodeargs is None:
nodeargs = []
precondition(
all(isinstance(arg, unicode) for arg in [verb] + nodeargs + argv),
"arguments to run_cli_unicode must be unicode",
verb=verb,
nodeargs=nodeargs,
argv=argv,
)
d = run_cli_bytes(
verb.encode("utf-8"),
nodeargs=list(arg.encode("utf-8") for arg in nodeargs),
stdin=stdin,
encoding=encoding,
*list(arg.encode("utf-8") for arg in argv)
)
def maybe_decode(result):
code, stdout, stderr = result
if isinstance(stdout, unicode):
stdout = stdout.encode("utf-8")
if isinstance(stderr, unicode):
stderr = stderr.encode("utf-8")
return code, stdout, stderr
d.addCallback(maybe_decode)
return d
run_cli = run_cli_bytes
def parse_cli(*argv): def parse_cli(*argv):
# This parses the CLI options (synchronously), and returns the Options # This parses the CLI options (synchronously), and returns the Options
# argument, or throws usage.UsageError if something went wrong. # argument, or throws usage.UsageError if something went wrong.