mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-04-08 11:24:25 +00:00
Merge pull request #965 from LeastAuthority/3581.unicode_to_argv.1
Remove unicode_to_argv, argv_to_unicode and weird unicode mangling Fixes: ticket:3581
This commit is contained in:
commit
33d566ec87
1
newsfragments/3588.incompat
Normal file
1
newsfragments/3588.incompat
Normal file
@ -0,0 +1 @@
|
||||
The Tahoe command line now always uses UTF-8 to decode its arguments, regardless of locale.
|
@ -501,7 +501,7 @@ def list_aliases(options):
|
||||
rc = tahoe_add_alias.list_aliases(options)
|
||||
return rc
|
||||
|
||||
def list(options):
|
||||
def list_(options):
|
||||
from allmydata.scripts import tahoe_ls
|
||||
rc = tahoe_ls.list(options)
|
||||
return rc
|
||||
@ -587,7 +587,7 @@ dispatch = {
|
||||
"add-alias": add_alias,
|
||||
"create-alias": create_alias,
|
||||
"list-aliases": list_aliases,
|
||||
"ls": list,
|
||||
"ls": list_,
|
||||
"get": get,
|
||||
"put": put,
|
||||
"cp": cp,
|
||||
|
@ -1,4 +1,5 @@
|
||||
from ...util.encodingutil import unicode_to_argv
|
||||
from six import ensure_str
|
||||
|
||||
from ...scripts import runner
|
||||
from ..common_util import ReallyEqualMixin, run_cli, run_cli_unicode
|
||||
|
||||
@ -45,6 +46,12 @@ class CLITestMixin(ReallyEqualMixin):
|
||||
# client_num is used to execute client CLI commands on a specific
|
||||
# client.
|
||||
client_num = kwargs.pop("client_num", 0)
|
||||
client_dir = unicode_to_argv(self.get_clientdir(i=client_num))
|
||||
# If we were really going to launch a child process then
|
||||
# `unicode_to_argv` would be the right thing to do here. However,
|
||||
# we're just going to call some Python functions directly and those
|
||||
# Python functions want native strings. So ignore the requirements
|
||||
# for passing arguments to another process and make sure this argument
|
||||
# is a native string.
|
||||
client_dir = ensure_str(self.get_clientdir(i=client_num))
|
||||
nodeargs = [ b"--node-directory", client_dir ]
|
||||
return run_cli(verb, *args, nodeargs=nodeargs, **kwargs)
|
||||
|
@ -99,22 +99,6 @@ class ListAlias(GridTestMixin, CLITestMixin, unittest.TestCase):
|
||||
)
|
||||
|
||||
|
||||
def test_list_latin_1(self):
|
||||
"""
|
||||
An alias composed of all Latin-1-encodeable code points can be created
|
||||
when the active encoding is Latin-1.
|
||||
|
||||
This is very similar to ``test_list_utf_8`` but the assumption of
|
||||
UTF-8 is nearly ubiquitous and explicitly exercising the codepaths
|
||||
with a UTF-8-incompatible encoding helps flush out unintentional UTF-8
|
||||
assumptions.
|
||||
"""
|
||||
return self._check_create_alias(
|
||||
u"taho\N{LATIN SMALL LETTER E WITH ACUTE}",
|
||||
encoding="latin-1",
|
||||
)
|
||||
|
||||
|
||||
def test_list_utf_8(self):
|
||||
"""
|
||||
An alias composed of all UTF-8-encodeable code points can be created when
|
||||
|
@ -7,7 +7,7 @@ from allmydata.scripts.common import get_aliases
|
||||
from allmydata.scripts import cli
|
||||
from ..no_network import GridTestMixin
|
||||
from ..common_util import skip_if_cannot_represent_filename
|
||||
from allmydata.util.encodingutil import get_io_encoding, unicode_to_argv
|
||||
from allmydata.util.encodingutil import get_io_encoding
|
||||
from allmydata.util.fileutil import abspath_expanduser_unicode
|
||||
from .common import CLITestMixin
|
||||
|
||||
@ -46,21 +46,21 @@ class Put(GridTestMixin, CLITestMixin, unittest.TestCase):
|
||||
self.basedir = "cli/Put/unlinked_immutable_from_file"
|
||||
self.set_up_grid(oneshare=True)
|
||||
|
||||
rel_fn = os.path.join(self.basedir, "DATAFILE")
|
||||
abs_fn = unicode_to_argv(abspath_expanduser_unicode(unicode(rel_fn)))
|
||||
rel_fn = unicode(os.path.join(self.basedir, "DATAFILE"))
|
||||
abs_fn = abspath_expanduser_unicode(rel_fn)
|
||||
# we make the file small enough to fit in a LIT file, for speed
|
||||
fileutil.write(rel_fn, "short file")
|
||||
d = self.do_cli("put", rel_fn)
|
||||
d = self.do_cli_unicode(u"put", [rel_fn])
|
||||
def _uploaded(args):
|
||||
(rc, out, err) = args
|
||||
readcap = out
|
||||
self.failUnless(readcap.startswith("URI:LIT:"), readcap)
|
||||
self.readcap = readcap
|
||||
d.addCallback(_uploaded)
|
||||
d.addCallback(lambda res: self.do_cli("put", "./" + rel_fn))
|
||||
d.addCallback(lambda res: self.do_cli_unicode(u"put", [u"./" + rel_fn]))
|
||||
d.addCallback(lambda rc_stdout_stderr:
|
||||
self.failUnlessReallyEqual(rc_stdout_stderr[1], self.readcap))
|
||||
d.addCallback(lambda res: self.do_cli("put", abs_fn))
|
||||
d.addCallback(lambda res: self.do_cli_unicode(u"put", [abs_fn]))
|
||||
d.addCallback(lambda rc_stdout_stderr:
|
||||
self.failUnlessReallyEqual(rc_stdout_stderr[1], self.readcap))
|
||||
# we just have to assume that ~ is handled properly
|
||||
|
@ -9,10 +9,15 @@ __all__ = [
|
||||
"flush_logged_errors",
|
||||
"skip",
|
||||
"skipIf",
|
||||
|
||||
# Selected based on platform and re-exported for convenience.
|
||||
"Popen",
|
||||
"PIPE",
|
||||
]
|
||||
|
||||
from past.builtins import chr as byteschr, unicode
|
||||
|
||||
import sys
|
||||
import os, random, struct
|
||||
import six
|
||||
import tempfile
|
||||
@ -101,6 +106,21 @@ from .eliotutil import (
|
||||
)
|
||||
from .common_util import ShouldFailMixin # noqa: F401
|
||||
|
||||
if sys.platform == "win32":
|
||||
# Python 2.7 doesn't have good options for launching a process with
|
||||
# non-ASCII in its command line. So use this alternative that does a
|
||||
# better job. However, only use it on Windows because it doesn't work
|
||||
# anywhere else.
|
||||
from ._win_subprocess import (
|
||||
Popen,
|
||||
)
|
||||
else:
|
||||
from subprocess import (
|
||||
Popen,
|
||||
)
|
||||
from subprocess import (
|
||||
PIPE,
|
||||
)
|
||||
|
||||
TEST_RSA_KEY_SIZE = 522
|
||||
|
||||
|
@ -70,7 +70,7 @@ if __name__ == "__main__":
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
import os, sys, locale
|
||||
import os, sys
|
||||
from unittest import skipIf
|
||||
|
||||
from twisted.trial import unittest
|
||||
@ -81,99 +81,28 @@ from allmydata.test.common_util import (
|
||||
ReallyEqualMixin, skip_if_cannot_represent_filename,
|
||||
)
|
||||
from allmydata.util import encodingutil, fileutil
|
||||
from allmydata.util.encodingutil import argv_to_unicode, unicode_to_url, \
|
||||
from allmydata.util.encodingutil import unicode_to_url, \
|
||||
unicode_to_output, quote_output, quote_path, quote_local_unicode_path, \
|
||||
quote_filepath, unicode_platform, listdir_unicode, FilenameEncodingError, \
|
||||
get_io_encoding, get_filesystem_encoding, to_bytes, from_utf8_or_none, _reload, \
|
||||
get_filesystem_encoding, to_bytes, from_utf8_or_none, _reload, \
|
||||
to_filepath, extend_filepath, unicode_from_filepath, unicode_segments_from, \
|
||||
unicode_to_argv
|
||||
from twisted.python import usage
|
||||
|
||||
|
||||
class MockStdout(object):
|
||||
pass
|
||||
|
||||
class EncodingUtilErrors(ReallyEqualMixin, unittest.TestCase):
|
||||
def test_get_io_encoding(self):
|
||||
mock_stdout = MockStdout()
|
||||
self.patch(sys, 'stdout', mock_stdout)
|
||||
|
||||
mock_stdout.encoding = 'UTF-8'
|
||||
_reload()
|
||||
self.failUnlessReallyEqual(get_io_encoding(), 'utf-8')
|
||||
|
||||
mock_stdout.encoding = 'cp65001'
|
||||
_reload()
|
||||
self.assertEqual(get_io_encoding(), 'utf-8')
|
||||
|
||||
mock_stdout.encoding = 'koi8-r'
|
||||
expected = sys.platform == "win32" and 'utf-8' or 'koi8-r'
|
||||
_reload()
|
||||
self.failUnlessReallyEqual(get_io_encoding(), expected)
|
||||
|
||||
mock_stdout.encoding = 'nonexistent_encoding'
|
||||
if sys.platform == "win32":
|
||||
_reload()
|
||||
self.failUnlessReallyEqual(get_io_encoding(), 'utf-8')
|
||||
else:
|
||||
self.failUnlessRaises(AssertionError, _reload)
|
||||
|
||||
def test_get_io_encoding_not_from_stdout(self):
|
||||
preferredencoding = 'koi8-r'
|
||||
def call_locale_getpreferredencoding():
|
||||
return preferredencoding
|
||||
self.patch(locale, 'getpreferredencoding', call_locale_getpreferredencoding)
|
||||
mock_stdout = MockStdout()
|
||||
self.patch(sys, 'stdout', mock_stdout)
|
||||
|
||||
expected = sys.platform == "win32" and 'utf-8' or 'koi8-r'
|
||||
_reload()
|
||||
self.failUnlessReallyEqual(get_io_encoding(), expected)
|
||||
|
||||
mock_stdout.encoding = None
|
||||
_reload()
|
||||
self.failUnlessReallyEqual(get_io_encoding(), expected)
|
||||
|
||||
preferredencoding = None
|
||||
_reload()
|
||||
self.assertEqual(get_io_encoding(), 'utf-8')
|
||||
|
||||
def test_argv_to_unicode(self):
|
||||
encodingutil.io_encoding = 'utf-8'
|
||||
self.failUnlessRaises(usage.UsageError,
|
||||
argv_to_unicode,
|
||||
lumiere_nfc.encode('latin1'))
|
||||
|
||||
@skipIf(PY3, "Python 2 only.")
|
||||
def test_unicode_to_output(self):
|
||||
encodingutil.io_encoding = 'koi8-r'
|
||||
self.failUnlessRaises(UnicodeEncodeError, unicode_to_output, lumiere_nfc)
|
||||
|
||||
def test_no_unicode_normalization(self):
|
||||
# Pretend to run on a Unicode platform.
|
||||
# listdir_unicode normalized to NFC in 1.7beta, but now doesn't.
|
||||
|
||||
def call_os_listdir(path):
|
||||
return [Artonwall_nfd]
|
||||
self.patch(os, 'listdir', call_os_listdir)
|
||||
self.patch(sys, 'platform', 'darwin')
|
||||
|
||||
_reload()
|
||||
self.failUnlessReallyEqual(listdir_unicode(u'/dummy'), [Artonwall_nfd])
|
||||
|
||||
|
||||
# The following tests apply only to platforms that don't store filenames as
|
||||
# Unicode entities on the filesystem.
|
||||
class EncodingUtilNonUnicodePlatform(unittest.TestCase):
|
||||
@skipIf(PY3, "Python 3 is always Unicode, regardless of OS.")
|
||||
def setUp(self):
|
||||
# Mock sys.platform because unicode_platform() uses it
|
||||
self.original_platform = sys.platform
|
||||
sys.platform = 'linux'
|
||||
# Make sure everything goes back to the way it was at the end of the
|
||||
# test.
|
||||
self.addCleanup(_reload)
|
||||
|
||||
def tearDown(self):
|
||||
sys.platform = self.original_platform
|
||||
_reload()
|
||||
# Mock sys.platform because unicode_platform() uses it. Cleanups run
|
||||
# in reverse order so we do this second so it gets undone first.
|
||||
self.patch(sys, "platform", "linux")
|
||||
|
||||
def test_listdir_unicode(self):
|
||||
# What happens if latin1-encoded filenames are encountered on an UTF-8
|
||||
@ -206,25 +135,8 @@ class EncodingUtilNonUnicodePlatform(unittest.TestCase):
|
||||
|
||||
class EncodingUtil(ReallyEqualMixin):
|
||||
def setUp(self):
|
||||
self.original_platform = sys.platform
|
||||
sys.platform = self.platform
|
||||
|
||||
def tearDown(self):
|
||||
sys.platform = self.original_platform
|
||||
_reload()
|
||||
|
||||
def test_argv_to_unicode(self):
|
||||
if 'argv' not in dir(self):
|
||||
return
|
||||
|
||||
mock_stdout = MockStdout()
|
||||
mock_stdout.encoding = self.io_encoding
|
||||
self.patch(sys, 'stdout', mock_stdout)
|
||||
|
||||
argu = lumiere_nfc
|
||||
argv = self.argv
|
||||
_reload()
|
||||
self.failUnlessReallyEqual(argv_to_unicode(argv), argu)
|
||||
self.addCleanup(_reload)
|
||||
self.patch(sys, "platform", self.platform)
|
||||
|
||||
def test_unicode_to_url(self):
|
||||
self.failUnless(unicode_to_url(lumiere_nfc), b"lumi\xc3\xa8re")
|
||||
@ -245,15 +157,19 @@ class EncodingUtil(ReallyEqualMixin):
|
||||
def test_unicode_to_output_py3(self):
|
||||
self.failUnlessReallyEqual(unicode_to_output(lumiere_nfc), lumiere_nfc)
|
||||
|
||||
@skipIf(PY3, "Python 2 only.")
|
||||
def test_unicode_to_argv_py2(self):
|
||||
"""unicode_to_argv() converts to bytes on Python 2."""
|
||||
self.assertEqual(unicode_to_argv("abc"), u"abc".encode(self.io_encoding))
|
||||
def test_unicode_to_argv(self):
|
||||
"""
|
||||
unicode_to_argv() returns its unicode argument on Windows and Python 2 and
|
||||
converts to bytes using UTF-8 elsewhere.
|
||||
"""
|
||||
result = unicode_to_argv(lumiere_nfc)
|
||||
if PY3 or self.platform == "win32":
|
||||
expected_value = lumiere_nfc
|
||||
else:
|
||||
expected_value = lumiere_nfc.encode(self.io_encoding)
|
||||
|
||||
@skipIf(PY2, "Python 3 only.")
|
||||
def test_unicode_to_argv_py3(self):
|
||||
"""unicode_to_argv() is noop on Python 3."""
|
||||
self.assertEqual(unicode_to_argv("abc"), "abc")
|
||||
self.assertIsInstance(result, type(expected_value))
|
||||
self.assertEqual(result, expected_value)
|
||||
|
||||
@skipIf(PY3, "Python 3 only.")
|
||||
def test_unicode_platform_py2(self):
|
||||
@ -463,13 +379,6 @@ class QuoteOutput(ReallyEqualMixin, unittest.TestCase):
|
||||
check(u"\n", u"\"\\x0a\"", quote_newlines=True)
|
||||
|
||||
def test_quote_output_default(self):
|
||||
self.patch(encodingutil, 'io_encoding', 'ascii')
|
||||
self.test_quote_output_ascii(None)
|
||||
|
||||
self.patch(encodingutil, 'io_encoding', 'latin1')
|
||||
self.test_quote_output_latin1(None)
|
||||
|
||||
self.patch(encodingutil, 'io_encoding', 'utf-8')
|
||||
self.test_quote_output_utf8(None)
|
||||
|
||||
|
||||
@ -581,14 +490,6 @@ class UbuntuKarmicUTF8(EncodingUtil, unittest.TestCase):
|
||||
io_encoding = 'UTF-8'
|
||||
dirlist = [b'test_file', b'\xc3\x84rtonwall.mp3', b'Blah blah.txt']
|
||||
|
||||
class UbuntuKarmicLatin1(EncodingUtil, unittest.TestCase):
|
||||
uname = 'Linux korn 2.6.31-14-generic #48-Ubuntu SMP Fri Oct 16 14:05:01 UTC 2009 x86_64'
|
||||
argv = b'lumi\xe8re'
|
||||
platform = 'linux2'
|
||||
filesystem_encoding = 'ISO-8859-1'
|
||||
io_encoding = 'ISO-8859-1'
|
||||
dirlist = [b'test_file', b'Blah blah.txt', b'\xc4rtonwall.mp3']
|
||||
|
||||
class Windows(EncodingUtil, unittest.TestCase):
|
||||
uname = 'Windows XP 5.1.2600 x86 x86 Family 15 Model 75 Step ping 2, AuthenticAMD'
|
||||
argv = b'lumi\xc3\xa8re'
|
||||
@ -605,20 +506,6 @@ class MacOSXLeopard(EncodingUtil, unittest.TestCase):
|
||||
io_encoding = 'UTF-8'
|
||||
dirlist = [u'A\u0308rtonwall.mp3', u'Blah blah.txt', u'test_file']
|
||||
|
||||
class MacOSXLeopard7bit(EncodingUtil, unittest.TestCase):
|
||||
uname = 'Darwin g5.local 9.8.0 Darwin Kernel Version 9.8.0: Wed Jul 15 16:57:01 PDT 2009; root:xnu-1228.15.4~1/RELEASE_PPC Power Macintosh powerpc'
|
||||
platform = 'darwin'
|
||||
filesystem_encoding = 'utf-8'
|
||||
io_encoding = 'US-ASCII'
|
||||
dirlist = [u'A\u0308rtonwall.mp3', u'Blah blah.txt', u'test_file']
|
||||
|
||||
class OpenBSD(EncodingUtil, unittest.TestCase):
|
||||
uname = 'OpenBSD 4.1 GENERIC#187 i386 Intel(R) Celeron(R) CPU 2.80GHz ("GenuineIntel" 686-class)'
|
||||
platform = 'openbsd4'
|
||||
filesystem_encoding = '646'
|
||||
io_encoding = '646'
|
||||
# Oops, I cannot write filenames containing non-ascii characters
|
||||
|
||||
|
||||
class TestToFromStr(ReallyEqualMixin, unittest.TestCase):
|
||||
def test_to_bytes(self):
|
||||
|
@ -6,6 +6,10 @@ from __future__ import (
|
||||
import os.path, re, sys
|
||||
from os import linesep
|
||||
|
||||
from eliot import (
|
||||
log_call,
|
||||
)
|
||||
|
||||
from twisted.trial import unittest
|
||||
|
||||
from twisted.internet import reactor
|
||||
@ -19,22 +23,25 @@ from twisted.python.runtime import (
|
||||
platform,
|
||||
)
|
||||
from allmydata.util import fileutil, pollmixin
|
||||
from allmydata.util.encodingutil import unicode_to_argv, unicode_to_output
|
||||
from allmydata.util.encodingutil import unicode_to_argv, get_filesystem_encoding
|
||||
from allmydata.test import common_util
|
||||
import allmydata
|
||||
from .common_util import parse_cli, run_cli
|
||||
from .common import (
|
||||
PIPE,
|
||||
Popen,
|
||||
)
|
||||
from .common_util import (
|
||||
parse_cli,
|
||||
run_cli,
|
||||
)
|
||||
from .cli_node_api import (
|
||||
CLINodeAPI,
|
||||
Expect,
|
||||
on_stdout,
|
||||
on_stdout_and_stderr,
|
||||
)
|
||||
from ._twisted_9607 import (
|
||||
getProcessOutputAndValue,
|
||||
)
|
||||
from ..util.eliotutil import (
|
||||
inline_callbacks,
|
||||
log_call_deferred,
|
||||
)
|
||||
|
||||
def get_root_from_file(src):
|
||||
@ -54,93 +61,92 @@ srcfile = allmydata.__file__
|
||||
rootdir = get_root_from_file(srcfile)
|
||||
|
||||
|
||||
class RunBinTahoeMixin(object):
|
||||
@log_call_deferred(action_type="run-bin-tahoe")
|
||||
def run_bintahoe(self, args, stdin=None, python_options=[], env=None):
|
||||
command = sys.executable
|
||||
argv = python_options + ["-m", "allmydata.scripts.runner"] + args
|
||||
@log_call(action_type="run-bin-tahoe")
|
||||
def run_bintahoe(extra_argv, python_options=None):
|
||||
"""
|
||||
Run the main Tahoe entrypoint in a child process with the given additional
|
||||
arguments.
|
||||
|
||||
if env is None:
|
||||
env = os.environ
|
||||
:param [unicode] extra_argv: More arguments for the child process argv.
|
||||
|
||||
d = getProcessOutputAndValue(command, argv, env, stdinBytes=stdin)
|
||||
def fix_signal(result):
|
||||
# Mirror subprocess.Popen.returncode structure
|
||||
(out, err, signal) = result
|
||||
return (out, err, -signal)
|
||||
d.addErrback(fix_signal)
|
||||
return d
|
||||
:return: A three-tuple of stdout (unicode), stderr (unicode), and the
|
||||
child process "returncode" (int).
|
||||
"""
|
||||
argv = [sys.executable.decode(get_filesystem_encoding())]
|
||||
if python_options is not None:
|
||||
argv.extend(python_options)
|
||||
argv.extend([u"-m", u"allmydata.scripts.runner"])
|
||||
argv.extend(extra_argv)
|
||||
argv = list(unicode_to_argv(arg) for arg in argv)
|
||||
p = Popen(argv, stdout=PIPE, stderr=PIPE)
|
||||
out = p.stdout.read().decode("utf-8")
|
||||
err = p.stderr.read().decode("utf-8")
|
||||
returncode = p.wait()
|
||||
return (out, err, returncode)
|
||||
|
||||
|
||||
class BinTahoe(common_util.SignalMixin, unittest.TestCase, RunBinTahoeMixin):
|
||||
class BinTahoe(common_util.SignalMixin, unittest.TestCase):
|
||||
def test_unicode_arguments_and_output(self):
|
||||
"""
|
||||
The runner script receives unmangled non-ASCII values in argv.
|
||||
"""
|
||||
tricky = u"\u2621"
|
||||
try:
|
||||
tricky_arg = unicode_to_argv(tricky, mangle=True)
|
||||
tricky_out = unicode_to_output(tricky)
|
||||
except UnicodeEncodeError:
|
||||
raise unittest.SkipTest("A non-ASCII argument/output could not be encoded on this platform.")
|
||||
out, err, returncode = run_bintahoe([tricky])
|
||||
self.assertEqual(returncode, 1)
|
||||
self.assertIn(u"Unknown command: " + tricky, out)
|
||||
|
||||
d = self.run_bintahoe([tricky_arg])
|
||||
def _cb(res):
|
||||
out, err, rc_or_sig = res
|
||||
self.failUnlessEqual(rc_or_sig, 1, str(res))
|
||||
self.failUnlessIn("Unknown command: "+tricky_out, out)
|
||||
d.addCallback(_cb)
|
||||
return d
|
||||
def test_with_python_options(self):
|
||||
"""
|
||||
Additional options for the Python interpreter don't prevent the runner
|
||||
script from receiving the arguments meant for it.
|
||||
"""
|
||||
# This seems like a redundant test for someone else's functionality
|
||||
# but on Windows we parse the whole command line string ourselves so
|
||||
# we have to have our own implementation of skipping these options.
|
||||
|
||||
def test_run_with_python_options(self):
|
||||
# -t is a harmless option that warns about tabs.
|
||||
d = self.run_bintahoe(["--version"], python_options=["-t"])
|
||||
def _cb(res):
|
||||
out, err, rc_or_sig = res
|
||||
self.assertEqual(rc_or_sig, 0, str(res))
|
||||
self.assertTrue(out.startswith(allmydata.__appname__ + '/'), str(res))
|
||||
d.addCallback(_cb)
|
||||
return d
|
||||
# -t is a harmless option that warns about tabs so we can add it
|
||||
# without impacting other behavior noticably.
|
||||
out, err, returncode = run_bintahoe([u"--version"], python_options=[u"-t"])
|
||||
self.assertEqual(returncode, 0)
|
||||
self.assertTrue(out.startswith(allmydata.__appname__ + '/'))
|
||||
|
||||
@inlineCallbacks
|
||||
def test_help_eliot_destinations(self):
|
||||
out, err, rc_or_sig = yield self.run_bintahoe(["--help-eliot-destinations"])
|
||||
self.assertIn("\tfile:<path>", out)
|
||||
self.assertEqual(rc_or_sig, 0)
|
||||
out, err, returncode = run_bintahoe([u"--help-eliot-destinations"])
|
||||
self.assertIn(u"\tfile:<path>", out)
|
||||
self.assertEqual(returncode, 0)
|
||||
|
||||
@inlineCallbacks
|
||||
def test_eliot_destination(self):
|
||||
out, err, rc_or_sig = yield self.run_bintahoe([
|
||||
out, err, returncode = run_bintahoe([
|
||||
# Proves little but maybe more than nothing.
|
||||
"--eliot-destination=file:-",
|
||||
u"--eliot-destination=file:-",
|
||||
# Throw in *some* command or the process exits with error, making
|
||||
# it difficult for us to see if the previous arg was accepted or
|
||||
# not.
|
||||
"--help",
|
||||
u"--help",
|
||||
])
|
||||
self.assertEqual(rc_or_sig, 0)
|
||||
self.assertEqual(returncode, 0)
|
||||
|
||||
@inlineCallbacks
|
||||
def test_unknown_eliot_destination(self):
|
||||
out, err, rc_or_sig = yield self.run_bintahoe([
|
||||
"--eliot-destination=invalid:more",
|
||||
out, err, returncode = run_bintahoe([
|
||||
u"--eliot-destination=invalid:more",
|
||||
])
|
||||
self.assertEqual(1, rc_or_sig)
|
||||
self.assertIn("Unknown destination description", out)
|
||||
self.assertIn("invalid:more", out)
|
||||
self.assertEqual(1, returncode)
|
||||
self.assertIn(u"Unknown destination description", out)
|
||||
self.assertIn(u"invalid:more", out)
|
||||
|
||||
@inlineCallbacks
|
||||
def test_malformed_eliot_destination(self):
|
||||
out, err, rc_or_sig = yield self.run_bintahoe([
|
||||
"--eliot-destination=invalid",
|
||||
out, err, returncode = run_bintahoe([
|
||||
u"--eliot-destination=invalid",
|
||||
])
|
||||
self.assertEqual(1, rc_or_sig)
|
||||
self.assertIn("must be formatted like", out)
|
||||
self.assertEqual(1, returncode)
|
||||
self.assertIn(u"must be formatted like", out)
|
||||
|
||||
@inlineCallbacks
|
||||
def test_escape_in_eliot_destination(self):
|
||||
out, err, rc_or_sig = yield self.run_bintahoe([
|
||||
"--eliot-destination=file:@foo",
|
||||
out, err, returncode = run_bintahoe([
|
||||
u"--eliot-destination=file:@foo",
|
||||
])
|
||||
self.assertEqual(1, rc_or_sig)
|
||||
self.assertIn("Unsupported escape character", out)
|
||||
self.assertEqual(1, returncode)
|
||||
self.assertIn(u"Unsupported escape character", out)
|
||||
|
||||
|
||||
class CreateNode(unittest.TestCase):
|
||||
@ -250,8 +256,7 @@ class CreateNode(unittest.TestCase):
|
||||
)
|
||||
|
||||
|
||||
class RunNode(common_util.SignalMixin, unittest.TestCase, pollmixin.PollMixin,
|
||||
RunBinTahoeMixin):
|
||||
class RunNode(common_util.SignalMixin, unittest.TestCase, pollmixin.PollMixin):
|
||||
"""
|
||||
exercise "tahoe run" for both introducer and client node, by spawning
|
||||
"tahoe run" as a subprocess. This doesn't get us line-level coverage, but
|
||||
@ -271,18 +276,18 @@ class RunNode(common_util.SignalMixin, unittest.TestCase, pollmixin.PollMixin,
|
||||
The introducer furl is stable across restarts.
|
||||
"""
|
||||
basedir = self.workdir("test_introducer")
|
||||
c1 = os.path.join(basedir, "c1")
|
||||
c1 = os.path.join(basedir, u"c1")
|
||||
tahoe = CLINodeAPI(reactor, FilePath(c1))
|
||||
self.addCleanup(tahoe.stop_and_wait)
|
||||
|
||||
out, err, rc_or_sig = yield self.run_bintahoe([
|
||||
"--quiet",
|
||||
"create-introducer",
|
||||
"--basedir", c1,
|
||||
"--hostname", "127.0.0.1",
|
||||
out, err, returncode = run_bintahoe([
|
||||
u"--quiet",
|
||||
u"create-introducer",
|
||||
u"--basedir", c1,
|
||||
u"--hostname", u"127.0.0.1",
|
||||
])
|
||||
|
||||
self.assertEqual(rc_or_sig, 0)
|
||||
self.assertEqual(returncode, 0)
|
||||
|
||||
# This makes sure that node.url is written, which allows us to
|
||||
# detect when the introducer restarts in _node_has_restarted below.
|
||||
@ -350,18 +355,18 @@ class RunNode(common_util.SignalMixin, unittest.TestCase, pollmixin.PollMixin,
|
||||
3) Verify that the pid file is removed after SIGTERM (on POSIX).
|
||||
"""
|
||||
basedir = self.workdir("test_client")
|
||||
c1 = os.path.join(basedir, "c1")
|
||||
c1 = os.path.join(basedir, u"c1")
|
||||
|
||||
tahoe = CLINodeAPI(reactor, FilePath(c1))
|
||||
# Set this up right now so we don't forget later.
|
||||
self.addCleanup(tahoe.cleanup)
|
||||
|
||||
out, err, rc_or_sig = yield self.run_bintahoe([
|
||||
"--quiet", "create-node", "--basedir", c1,
|
||||
"--webport", "0",
|
||||
"--hostname", "localhost",
|
||||
out, err, returncode = run_bintahoe([
|
||||
u"--quiet", u"create-node", u"--basedir", c1,
|
||||
u"--webport", u"0",
|
||||
u"--hostname", u"localhost",
|
||||
])
|
||||
self.failUnlessEqual(rc_or_sig, 0)
|
||||
self.failUnlessEqual(returncode, 0)
|
||||
|
||||
# Check that the --webport option worked.
|
||||
config = fileutil.read(tahoe.config_file.path)
|
||||
|
@ -51,6 +51,10 @@ from twisted.python.filepath import (
|
||||
FilePath,
|
||||
)
|
||||
|
||||
from ._twisted_9607 import (
|
||||
getProcessOutputAndValue,
|
||||
)
|
||||
|
||||
from .common import (
|
||||
TEST_RSA_KEY_SIZE,
|
||||
SameProcessStreamEndpointAssigner,
|
||||
@ -61,13 +65,32 @@ from .web.common import (
|
||||
)
|
||||
|
||||
# TODO: move this to common or common_util
|
||||
from allmydata.test.test_runner import RunBinTahoeMixin
|
||||
from . import common_util as testutil
|
||||
from .common_util import run_cli_unicode
|
||||
from ..scripts.common import (
|
||||
write_introducer,
|
||||
)
|
||||
|
||||
class RunBinTahoeMixin(object):
|
||||
def run_bintahoe(self, args, stdin=None, python_options=[], env=None):
|
||||
# test_runner.run_bintahoe has better unicode support but doesn't
|
||||
# support env yet and is also synchronous. If we could get rid of
|
||||
# this in favor of that, though, it would probably be an improvement.
|
||||
command = sys.executable
|
||||
argv = python_options + ["-m", "allmydata.scripts.runner"] + args
|
||||
|
||||
if env is None:
|
||||
env = os.environ
|
||||
|
||||
d = getProcessOutputAndValue(command, argv, env, stdinBytes=stdin)
|
||||
def fix_signal(result):
|
||||
# Mirror subprocess.Popen.returncode structure
|
||||
(out, err, signal) = result
|
||||
return (out, err, -signal)
|
||||
d.addErrback(fix_signal)
|
||||
return d
|
||||
|
||||
|
||||
def run_cli(*args, **kwargs):
|
||||
"""
|
||||
Run a Tahoe-LAFS CLI utility, but inline.
|
||||
|
@ -29,11 +29,6 @@ from json import (
|
||||
from textwrap import (
|
||||
dedent,
|
||||
)
|
||||
from subprocess import (
|
||||
PIPE,
|
||||
Popen,
|
||||
)
|
||||
|
||||
from twisted.python.filepath import (
|
||||
FilePath,
|
||||
)
|
||||
@ -66,6 +61,8 @@ from hypothesis.strategies import (
|
||||
)
|
||||
|
||||
from .common import (
|
||||
PIPE,
|
||||
Popen,
|
||||
SyncTestCase,
|
||||
)
|
||||
|
||||
@ -132,13 +129,6 @@ class GetArgvTests(SyncTestCase):
|
||||
``get_argv`` returns a list representing the result of tokenizing the
|
||||
"command line" argument string provided to Windows processes.
|
||||
"""
|
||||
# Python 2.7 doesn't have good options for launching a process with
|
||||
# non-ASCII in its command line. So use this alternative that does a
|
||||
# better job. Bury the import here because it only works on Windows.
|
||||
from ._win_subprocess import (
|
||||
Popen
|
||||
)
|
||||
|
||||
working_path = FilePath(self.mktemp())
|
||||
working_path.makedirs()
|
||||
save_argv_path = working_path.child("script.py")
|
||||
|
@ -18,8 +18,9 @@ if PY2:
|
||||
from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, max, min # noqa: F401
|
||||
|
||||
from past.builtins import unicode
|
||||
from six import ensure_str
|
||||
|
||||
import sys, os, re, locale
|
||||
import sys, os, re
|
||||
import unicodedata
|
||||
import warnings
|
||||
|
||||
@ -50,36 +51,25 @@ def check_encoding(encoding):
|
||||
try:
|
||||
u"test".encode(encoding)
|
||||
except (LookupError, AttributeError):
|
||||
raise AssertionError("The character encoding '%s' is not supported for conversion." % (encoding,))
|
||||
raise AssertionError(
|
||||
"The character encoding '%s' is not supported for conversion." % (encoding,),
|
||||
)
|
||||
|
||||
# On Windows we install UTF-8 stream wrappers for sys.stdout and
|
||||
# sys.stderr, and reencode the arguments as UTF-8 (see scripts/runner.py).
|
||||
#
|
||||
# On POSIX, we are moving towards a UTF-8-everything and ignore the locale.
|
||||
io_encoding = "utf-8"
|
||||
|
||||
filesystem_encoding = None
|
||||
io_encoding = None
|
||||
is_unicode_platform = False
|
||||
use_unicode_filepath = False
|
||||
|
||||
def _reload():
|
||||
global filesystem_encoding, io_encoding, is_unicode_platform, use_unicode_filepath
|
||||
global filesystem_encoding, is_unicode_platform, use_unicode_filepath
|
||||
|
||||
filesystem_encoding = canonical_encoding(sys.getfilesystemencoding())
|
||||
check_encoding(filesystem_encoding)
|
||||
|
||||
if sys.platform == 'win32':
|
||||
# On Windows we install UTF-8 stream wrappers for sys.stdout and
|
||||
# sys.stderr, and reencode the arguments as UTF-8 (see scripts/runner.py).
|
||||
io_encoding = 'utf-8'
|
||||
else:
|
||||
ioenc = None
|
||||
if hasattr(sys.stdout, 'encoding'):
|
||||
ioenc = sys.stdout.encoding
|
||||
if ioenc is None:
|
||||
try:
|
||||
ioenc = locale.getpreferredencoding()
|
||||
except Exception:
|
||||
pass # work around <http://bugs.python.org/issue1443504>
|
||||
io_encoding = canonical_encoding(ioenc)
|
||||
|
||||
check_encoding(io_encoding)
|
||||
|
||||
is_unicode_platform = PY3 or sys.platform in ["win32", "darwin"]
|
||||
|
||||
# Despite the Unicode-mode FilePath support added to Twisted in
|
||||
@ -110,6 +100,8 @@ def get_io_encoding():
|
||||
def argv_to_unicode(s):
|
||||
"""
|
||||
Decode given argv element to unicode. If this fails, raise a UsageError.
|
||||
|
||||
This is the inverse of ``unicode_to_argv``.
|
||||
"""
|
||||
if isinstance(s, unicode):
|
||||
return s
|
||||
@ -133,26 +125,22 @@ def argv_to_abspath(s, **kwargs):
|
||||
% (quote_output(s), quote_output(os.path.join('.', s))))
|
||||
return abspath_expanduser_unicode(decoded, **kwargs)
|
||||
|
||||
|
||||
def unicode_to_argv(s, mangle=False):
|
||||
"""
|
||||
Encode the given Unicode argument as a bytestring.
|
||||
If the argument is to be passed to a different process, then the 'mangle' argument
|
||||
should be true; on Windows, this uses a mangled encoding that will be reversed by
|
||||
code in runner.py.
|
||||
Make the given unicode string suitable for use in an argv list.
|
||||
|
||||
On Python 3, just return the string unchanged, since argv is unicode.
|
||||
On Python 2 on POSIX, this encodes using UTF-8. On Python 3 and on
|
||||
Windows, this returns the input unmodified.
|
||||
"""
|
||||
precondition(isinstance(s, unicode), s)
|
||||
if PY3:
|
||||
warnings.warn("This will be unnecessary once Python 2 is dropped.",
|
||||
DeprecationWarning)
|
||||
if sys.platform == "win32":
|
||||
return s
|
||||
return ensure_str(s)
|
||||
|
||||
if mangle and sys.platform == "win32":
|
||||
# This must be the same as 'mangle' in bin/tahoe-script.template.
|
||||
return bytes(re.sub(u'[^\\x20-\\x7F]', lambda m: u'\x7F%x;' % (ord(m.group(0)),), s), io_encoding)
|
||||
else:
|
||||
return s.encode(io_encoding)
|
||||
|
||||
def unicode_to_url(s):
|
||||
"""
|
||||
|
@ -13,7 +13,7 @@ from __future__ import print_function
|
||||
import sys
|
||||
assert sys.platform == "win32"
|
||||
|
||||
import codecs, re
|
||||
import codecs
|
||||
from functools import partial
|
||||
|
||||
from ctypes import WINFUNCTYPE, windll, POINTER, c_int, WinError, byref, get_last_error
|
||||
@ -174,37 +174,14 @@ def initialize():
|
||||
except Exception as e:
|
||||
_complain("exception %r while fixing up sys.stdout and sys.stderr" % (e,))
|
||||
|
||||
# This works around <http://bugs.python.org/issue2128>.
|
||||
|
||||
# Because of <http://bugs.python.org/issue8775> (and similar limitations in
|
||||
# twisted), the 'bin/tahoe' script cannot invoke us with the actual Unicode arguments.
|
||||
# Instead it "mangles" or escapes them using \x7F as an escape character, which we
|
||||
# unescape here.
|
||||
def unmangle(s):
|
||||
return re.sub(
|
||||
u'\\x7F[0-9a-fA-F]*\\;',
|
||||
# type ignored for 'unichr' (Python 2 only)
|
||||
lambda m: unichr(int(m.group(0)[1:-1], 16)), # type: ignore
|
||||
s,
|
||||
)
|
||||
|
||||
argv_unicode = get_argv()
|
||||
try:
|
||||
argv = [unmangle(argv_u).encode('utf-8') for argv_u in argv_unicode]
|
||||
except Exception as e:
|
||||
_complain("%s: could not unmangle Unicode arguments.\n%r"
|
||||
% (sys.argv[0], argv_unicode))
|
||||
raise
|
||||
argv = list(arg.encode("utf-8") for arg in get_argv())
|
||||
|
||||
# Take only the suffix with the same number of arguments as sys.argv.
|
||||
# This accounts for anything that can cause initial arguments to be stripped,
|
||||
# for example, the Python interpreter or any options passed to it, or runner
|
||||
# scripts such as 'coverage run'. It works even if there are no such arguments,
|
||||
# as in the case of a frozen executable created by bb-freeze or similar.
|
||||
|
||||
sys.argv = argv[-len(sys.argv):]
|
||||
if sys.argv[0].endswith('.pyscript'):
|
||||
sys.argv[0] = sys.argv[0][:-9]
|
||||
|
||||
|
||||
def a_console(handle):
|
||||
|
Loading…
x
Reference in New Issue
Block a user