Merge branch 'master' into 3551.more-immutable-python-3

This commit is contained in:
Itamar Turner-Trauring 2020-12-10 10:06:53 -05:00 committed by GitHub
commit e9b0a526bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 342 additions and 120 deletions

0
newsfragments/3522.minor Normal file
View File

View File

@ -1,4 +1,5 @@
from __future__ import print_function
from __future__ import unicode_literals
import os.path
import codecs
@ -10,7 +11,7 @@ from allmydata import uri
from allmydata.scripts.common_http import do_http, check_http_error
from allmydata.scripts.common import get_aliases
from allmydata.util.fileutil import move_into_place
from allmydata.util.encodingutil import unicode_to_output, quote_output
from allmydata.util.encodingutil import quote_output, quote_output_u
def add_line_to_aliasfile(aliasfile, alias, cap):
@ -48,14 +49,13 @@ def add_alias(options):
old_aliases = get_aliases(nodedir)
if alias in old_aliases:
print("Alias %s already exists!" % quote_output(alias), file=stderr)
show_output(stderr, "Alias {alias} already exists!", alias=alias)
return 1
aliasfile = os.path.join(nodedir, "private", "aliases")
cap = uri.from_string_dirnode(cap).to_string()
add_line_to_aliasfile(aliasfile, alias, cap)
print("Alias %s added" % quote_output(alias), file=stdout)
show_output(stdout, "Alias {alias} added", alias=alias)
return 0
def create_alias(options):
@ -75,7 +75,7 @@ def create_alias(options):
old_aliases = get_aliases(nodedir)
if alias in old_aliases:
print("Alias %s already exists!" % quote_output(alias), file=stderr)
show_output(stderr, "Alias {alias} already exists!", alias=alias)
return 1
aliasfile = os.path.join(nodedir, "private", "aliases")
@ -93,11 +93,51 @@ def create_alias(options):
# probably check for others..
add_line_to_aliasfile(aliasfile, alias, new_uri)
print("Alias %s created" % (quote_output(alias),), file=stdout)
show_output(stdout, "Alias {alias} created", alias=alias)
return 0
def show_output(fp, template, **kwargs):
"""
Print to just about anything.
:param fp: A file-like object to which to print. This handles the case
where ``fp`` declares a support encoding with the ``encoding``
attribute (eg sys.stdout on Python 3). It handles the case where
``fp`` declares no supported encoding via ``None`` for its
``encoding`` attribute (eg sys.stdout on Python 2 when stdout is not a
tty). It handles the case where ``fp`` declares an encoding that does
not support all of the characters in the output by forcing the
"namereplace" error handler. It handles the case where there is no
``encoding`` attribute at all (eg StringIO.StringIO) by writing
utf-8-encoded bytes.
"""
assert isinstance(template, unicode)
# On Python 3 fp has an encoding attribute under all real usage. On
# Python 2, the encoding attribute is None if stdio is not a tty. The
# test suite often passes StringIO which has no such attribute. Make
# allowances for this until the test suite is fixed and Python 2 is no
# more.
try:
encoding = fp.encoding or "utf-8"
except AttributeError:
has_encoding = False
encoding = "utf-8"
else:
has_encoding = True
output = template.format(**{
k: quote_output_u(v, encoding=encoding)
for (k, v)
in kwargs.items()
})
safe_output = output.encode(encoding, "namereplace")
if has_encoding:
safe_output = safe_output.decode(encoding)
print(safe_output, file=fp)
def _get_alias_details(nodedir):
aliases = get_aliases(nodedir)
alias_names = sorted(aliases.keys())
@ -111,34 +151,45 @@ def _get_alias_details(nodedir):
return data
def _escape_format(t):
"""
_escape_format(t).format() == t
:param unicode t: The text to escape.
"""
return t.replace("{", "{{").replace("}", "}}")
def list_aliases(options):
nodedir = options['node-directory']
stdout = options.stdout
stderr = options.stderr
data = _get_alias_details(nodedir)
max_width = max([len(quote_output(name)) for name in data.keys()] + [0])
fmt = "%" + str(max_width) + "s: %s"
rc = 0
"""
Show aliases that exist.
"""
data = _get_alias_details(options['node-directory'])
if options['json']:
try:
# XXX why are we presuming utf-8 output?
print(json.dumps(data, indent=4).decode('utf-8'), file=stdout)
except (UnicodeEncodeError, UnicodeDecodeError):
print(json.dumps(data, indent=4), file=stderr)
rc = 1
output = _escape_format(json.dumps(data, indent=4).decode("ascii"))
else:
for name, details in data.items():
dircap = details['readonly'] if options['readonly-uri'] else details['readwrite']
try:
print(fmt % (unicode_to_output(name), unicode_to_output(dircap.decode('utf-8'))), file=stdout)
except (UnicodeEncodeError, UnicodeDecodeError):
print(fmt % (quote_output(name), quote_output(dircap)), file=stderr)
rc = 1
def dircap(details):
return (
details['readonly']
if options['readonly-uri']
else details['readwrite']
).decode("utf-8")
if rc == 1:
print("\nThis listing included aliases or caps that could not be converted to the terminal" \
"\noutput encoding. These are shown using backslash escapes and in quotes.", file=stderr)
return rc
def format_dircap(name, details):
return fmt % (name, dircap(details))
max_width = max([len(quote_output(name)) for name in data.keys()] + [0])
fmt = "%" + str(max_width) + "s: %s"
output = "\n".join(list(
format_dircap(name, details)
for name, details
in data.items()
))
if output:
# Show whatever we computed. Skip this if there is no output to avoid
# a spurious blank line.
show_output(options.stdout, output)
return 0

View File

@ -1,6 +1,6 @@
from ...util.encodingutil import unicode_to_argv
from ...scripts import runner
from ..common_util import ReallyEqualMixin, run_cli
from ..common_util import ReallyEqualMixin, run_cli, run_cli_unicode
def parse_options(basedir, command, args):
o = runner.Options()
@ -10,10 +10,41 @@ def parse_options(basedir, command, args):
return o
class CLITestMixin(ReallyEqualMixin):
def do_cli(self, verb, *args, **kwargs):
"""
A mixin for use with ``GridTestMixin`` to execute CLI commands against
nodes created by methods of that mixin.
"""
def do_cli_unicode(self, verb, argv, client_num=0, **kwargs):
"""
Run a Tahoe-LAFS CLI command.
:param verb: See ``run_cli_unicode``.
:param argv: See ``run_cli_unicode``.
:param int client_num: The number of the ``GridTestMixin``-created
node against which to execute the command.
:param kwargs: Additional keyword arguments to pass to
``run_cli_unicode``.
"""
# client_num is used to execute client CLI commands on a specific
# client.
client_num = kwargs.get("client_num", 0)
client_dir = self.get_clientdir(i=client_num)
nodeargs = [ u"--node-directory", client_dir ]
return run_cli_unicode(verb, argv, nodeargs=nodeargs, **kwargs)
def do_cli(self, verb, *args, **kwargs):
"""
Like ``do_cli_unicode`` but work with ``bytes`` everywhere instead of
``unicode``.
Where possible, prefer ``do_cli_unicode``.
"""
# client_num is used to execute client CLI commands on a specific
# client.
client_num = kwargs.pop("client_num", 0)
client_dir = unicode_to_argv(self.get_clientdir(i=client_num))
nodeargs = [ "--node-directory", client_dir ]
return run_cli(verb, nodeargs=nodeargs, *args, **kwargs)
nodeargs = [ b"--node-directory", client_dir ]
return run_cli(verb, *args, nodeargs=nodeargs, **kwargs)

View File

@ -1,105 +1,126 @@
import json
from mock import patch
from twisted.trial import unittest
from twisted.internet.defer import inlineCallbacks
from allmydata.util.encodingutil import unicode_to_argv
from allmydata.scripts.common import get_aliases
from allmydata.test.no_network import GridTestMixin
from .common import CLITestMixin
from ..common_util import skip_if_cannot_represent_argv
from allmydata.util import encodingutil
# see also test_create_alias
class ListAlias(GridTestMixin, CLITestMixin, unittest.TestCase):
@inlineCallbacks
def test_list(self):
self.basedir = "cli/ListAlias/test_list"
def _check_create_alias(self, alias, encoding):
"""
Verify that ``tahoe create-alias`` can be used to create an alias named
``alias`` when argv is encoded using ``encoding``.
:param unicode alias: The alias to try to create.
:param NoneType|str encoding: The name of an encoding to force the
``create-alias`` implementation to use. This simulates the
effects of setting LANG and doing other locale-foolishness without
actually having to mess with this process's global locale state.
If this is ``None`` then the encoding used will be ascii but the
stdio objects given to the code under test will not declare any
encoding (this is like Python 2 when stdio is not a tty).
:return Deferred: A Deferred that fires with success if the alias can
be created and that creation is reported on stdout appropriately
encoded or with failure if something goes wrong.
"""
self.basedir = self.mktemp()
self.set_up_grid(oneshare=True)
rc, stdout, stderr = yield self.do_cli(
"create-alias",
unicode_to_argv(u"tahoe"),
# We can pass an encoding into the test utilities to invoke the code
# under test but we can't pass such a parameter directly to the code
# under test. Instead, that code looks at io_encoding. So,
# monkey-patch that value to our desired value here. This is the code
# that most directly takes the place of messing with LANG or the
# locale module.
self.patch(encodingutil, "io_encoding", encoding or "ascii")
rc, stdout, stderr = yield self.do_cli_unicode(
u"create-alias",
[alias],
encoding=encoding,
)
self.failUnless(unicode_to_argv(u"Alias 'tahoe' created") in stdout)
self.failIf(stderr)
aliases = get_aliases(self.get_clientdir())
self.failUnless(u"tahoe" in aliases)
self.failUnless(aliases[u"tahoe"].startswith("URI:DIR2:"))
# Make sure the result of the create-alias command is as we want it to
# be.
self.assertEqual(u"Alias '{}' created\n".format(alias), stdout)
self.assertEqual("", stderr)
self.assertEqual(0, rc)
rc, stdout, stderr = yield self.do_cli("list-aliases", "--json")
# Make sure it had the intended side-effect, too - an alias created in
# the node filesystem state.
aliases = get_aliases(self.get_clientdir())
self.assertIn(alias, aliases)
self.assertTrue(aliases[alias].startswith(u"URI:DIR2:"))
# And inspect the state via the user interface list-aliases command
# too.
rc, stdout, stderr = yield self.do_cli_unicode(
u"list-aliases",
[u"--json"],
encoding=encoding,
)
self.assertEqual(0, rc)
data = json.loads(stdout)
self.assertIn(u"tahoe", data)
data = data[u"tahoe"]
self.assertIn("readwrite", data)
self.assertIn("readonly", data)
self.assertIn(alias, data)
data = data[alias]
self.assertIn(u"readwrite", data)
self.assertIn(u"readonly", data)
@inlineCallbacks
def test_list_unicode_mismatch_json(self):
"""
pretty hack-y test, but we want to cover the 'except' on Unicode
errors paths and I can't come up with a nicer way to trigger
this
"""
self.basedir = "cli/ListAlias/test_list_unicode_mismatch_json"
skip_if_cannot_represent_argv(u"tahoe\u263A")
self.set_up_grid(oneshare=True)
rc, stdout, stderr = yield self.do_cli(
"create-alias",
unicode_to_argv(u"tahoe\u263A"),
def test_list_none(self):
"""
An alias composed of all ASCII-encodeable code points can be created when
stdio aren't clearly marked with an encoding.
"""
return self._check_create_alias(
u"tahoe",
encoding=None,
)
self.failUnless(unicode_to_argv(u"Alias 'tahoe\u263A' created") in stdout)
self.failIf(stderr)
booms = []
def boom(out, indent=4):
if not len(booms):
booms.append(out)
raise UnicodeEncodeError("foo", u"foo", 3, 5, "foo")
return str(out)
with patch("allmydata.scripts.tahoe_add_alias.json.dumps", boom):
aliases = get_aliases(self.get_clientdir())
self.failUnless(u"tahoe\u263A" in aliases)
self.failUnless(aliases[u"tahoe\u263A"].startswith("URI:DIR2:"))
rc, stdout, stderr = yield self.do_cli("list-aliases", "--json")
self.assertEqual(1, rc)
self.assertIn("could not be converted", stderr)
@inlineCallbacks
def test_list_unicode_mismatch(self):
self.basedir = "cli/ListAlias/test_list_unicode_mismatch"
skip_if_cannot_represent_argv(u"tahoe\u263A")
self.set_up_grid(oneshare=True)
rc, stdout, stderr = yield self.do_cli(
"create-alias",
unicode_to_argv(u"tahoe\u263A"),
def test_list_ascii(self):
"""
An alias composed of all ASCII-encodeable code points can be created when
the active encoding is ASCII.
"""
return self._check_create_alias(
u"tahoe",
encoding="ascii",
)
def boom(out):
print("boom {}".format(out))
return out
raise UnicodeEncodeError("foo", u"foo", 3, 5, "foo")
with patch("allmydata.scripts.tahoe_add_alias.unicode_to_output", boom):
self.failUnless(unicode_to_argv(u"Alias 'tahoe\u263A' created") in stdout)
self.failIf(stderr)
aliases = get_aliases(self.get_clientdir())
self.failUnless(u"tahoe\u263A" in aliases)
self.failUnless(aliases[u"tahoe\u263A"].startswith("URI:DIR2:"))
def test_list_latin_1(self):
"""
An alias composed of all Latin-1-encodeable code points can be created
when the active encoding is Latin-1.
rc, stdout, stderr = yield self.do_cli("list-aliases")
This is very similar to ``test_list_utf_8`` but the assumption of
UTF-8 is nearly ubiquitous and explicitly exercising the codepaths
with a UTF-8-incompatible encoding helps flush out unintentional UTF-8
assumptions.
"""
return self._check_create_alias(
u"taho\N{LATIN SMALL LETTER E WITH ACUTE}",
encoding="latin-1",
)
self.assertEqual(1, rc)
self.assertIn("could not be converted", stderr)
def test_list_utf_8(self):
"""
An alias composed of all UTF-8-encodeable code points can be created when
the active encoding is UTF-8.
"""
return self._check_create_alias(
u"tahoe\N{SNOWMAN}",
encoding="utf-8",
)

View File

@ -661,7 +661,7 @@ starting copy, 2 files, 1 directories
# This test ensures that tahoe will copy a file from the grid to
# a local directory without a specified file name.
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/2027
self.basedir = "cli/Cp/cp_verbose"
self.basedir = "cli/Cp/ticket_2027"
self.set_up_grid(oneshare=True)
# Write a test file, which we'll copy to the grid.

View File

@ -5,6 +5,10 @@ import time
import signal
from random import randrange
from six.moves import StringIO
from io import (
TextIOWrapper,
BytesIO,
)
from twisted.internet import reactor, defer
from twisted.python import failure
@ -35,27 +39,131 @@ def skip_if_cannot_represent_argv(u):
except UnicodeEncodeError:
raise unittest.SkipTest("A non-ASCII argv could not be encoded on this platform.")
def run_cli(verb, *args, **kwargs):
precondition(not [True for arg in args if not isinstance(arg, str)],
"arguments to do_cli must be strs -- convert using unicode_to_argv", args=args)
nodeargs = kwargs.get("nodeargs", [])
def _getvalue(io):
"""
Read out the complete contents of a file-like object.
"""
io.seek(0)
return io.read()
def run_cli_bytes(verb, *args, **kwargs):
"""
Run a Tahoe-LAFS CLI command specified as bytes.
Most code should prefer ``run_cli_unicode`` which deals with all the
necessary encoding considerations. This helper still exists so that novel
misconfigurations can be explicitly tested (for example, receiving UTF-8
bytes when the system encoding claims to be ASCII).
:param bytes verb: The command to run. For example, ``b"create-node"``.
:param [bytes] args: The arguments to pass to the command. For example,
``(b"--hostname=localhost",)``.
:param [bytes] nodeargs: Extra arguments to pass to the Tahoe executable
before ``verb``.
:param bytes stdin: Text to pass to the command via stdin.
:param NoneType|str encoding: The name of an encoding which stdout and
stderr will be configured to use. ``None`` means stdout and stderr
will accept bytes and unicode and use the default system encoding for
translating between them.
"""
nodeargs = kwargs.pop("nodeargs", [])
encoding = kwargs.pop("encoding", None)
precondition(
all(isinstance(arg, bytes) for arg in [verb] + nodeargs + list(args)),
"arguments to run_cli must be bytes -- convert using unicode_to_argv",
verb=verb,
args=args,
nodeargs=nodeargs,
)
argv = nodeargs + [verb] + list(args)
stdin = kwargs.get("stdin", "")
stdout = StringIO()
stderr = StringIO()
if encoding is None:
# The original behavior, the Python 2 behavior, is to accept either
# bytes or unicode and try to automatically encode or decode as
# necessary. This works okay for ASCII and if LANG is set
# appropriately. These aren't great constraints so we should move
# away from this behavior.
stdout = StringIO()
stderr = StringIO()
else:
# The new behavior, the Python 3 behavior, is to accept unicode and
# encode it using a specific encoding. For older versions of Python
# 3, the encoding is determined from LANG (bad) but for newer Python
# 3, the encoding is always utf-8 (good). Tests can pass in different
# encodings to exercise different behaviors.
stdout = TextIOWrapper(BytesIO(), encoding)
stderr = TextIOWrapper(BytesIO(), encoding)
d = defer.succeed(argv)
d.addCallback(runner.parse_or_exit_with_explanation, stdout=stdout)
d.addCallback(runner.dispatch,
stdin=StringIO(stdin),
stdout=stdout, stderr=stderr)
def _done(rc):
return 0, stdout.getvalue(), stderr.getvalue()
return 0, _getvalue(stdout), _getvalue(stderr)
def _err(f):
f.trap(SystemExit)
return f.value.code, stdout.getvalue(), stderr.getvalue()
return f.value.code, _getvalue(stdout), _getvalue(stderr)
d.addCallbacks(_done, _err)
return d
def run_cli_unicode(verb, argv, nodeargs=None, stdin=None, encoding=None):
"""
Run a Tahoe-LAFS CLI command.
:param unicode verb: The command to run. For example, ``u"create-node"``.
:param [unicode] argv: The arguments to pass to the command. For example,
``[u"--hostname=localhost"]``.
:param [unicode] nodeargs: Extra arguments to pass to the Tahoe executable
before ``verb``.
:param unicode stdin: Text to pass to the command via stdin.
:param NoneType|str encoding: The name of an encoding to use for all
bytes/unicode conversions necessary *and* the encoding to cause stdio
to declare with its ``encoding`` attribute. ``None`` means ASCII will
be used and no declaration will be made at all.
"""
if nodeargs is None:
nodeargs = []
precondition(
all(isinstance(arg, unicode) for arg in [verb] + nodeargs + argv),
"arguments to run_cli_unicode must be unicode",
verb=verb,
nodeargs=nodeargs,
argv=argv,
)
codec = encoding or "ascii"
encode = lambda t: None if t is None else t.encode(codec)
d = run_cli_bytes(
encode(verb),
nodeargs=list(encode(arg) for arg in nodeargs),
stdin=encode(stdin),
encoding=encoding,
*list(encode(arg) for arg in argv)
)
def maybe_decode(result):
code, stdout, stderr = result
if isinstance(stdout, bytes):
stdout = stdout.decode(codec)
if isinstance(stderr, bytes):
stderr = stderr.decode(codec)
return code, stdout, stderr
d.addCallback(maybe_decode)
return d
run_cli = run_cli_bytes
def parse_cli(*argv):
# This parses the CLI options (synchronously), and returns the Options
# argument, or throws usage.UsageError if something went wrong.

View File

@ -2618,6 +2618,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
def _run_in_subprocess(ignored, verb, *args, **kwargs):
stdin = kwargs.get("stdin")
# XXX https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3548
env = kwargs.get("env", os.environ)
# Python warnings from the child process don't matter.
env["PYTHONWARNINGS"] = "ignore"

View File

@ -252,6 +252,16 @@ ESCAPABLE_UNICODE = re.compile(u'([\uD800-\uDBFF][\uDC00-\uDFFF])|' # valid sur
ESCAPABLE_8BIT = re.compile( br'[^ !#\x25-\x5B\x5D-\x5F\x61-\x7E]', re.DOTALL)
def quote_output_u(*args, **kwargs):
"""
Like ``quote_output`` but always return ``unicode``.
"""
result = quote_output(*args, **kwargs)
if isinstance(result, unicode):
return result
return result.decode(kwargs.get("encoding", None) or io_encoding)
def quote_output(s, quotemarks=True, quote_newlines=None, encoding=None):
"""
Encode either a Unicode string or a UTF-8-encoded bytestring for representation