Refactor command line arguments parsing

This commit is contained in:
grossmj 2025-01-17 07:57:42 +07:00
parent 035a104957
commit 8e96c4c2e8
No known key found for this signature in database
GPG Key ID: 1E7DD6DBB53FF3D7
3 changed files with 99 additions and 68 deletions

View File

@ -30,6 +30,7 @@ import gns3server.utils.get_resource
import os
import sys
import asyncio
import argparse
def daemonize():
@ -59,6 +60,42 @@ def daemonize():
print("Second fork failed: %d (%s)\n" % (e.errno, e.strerror), file=sys.stderr)
sys.exit(1)
def parse_arguments(argv):
"""
Parse command line arguments
:param argv: Array of command line arguments
"""
from gns3server.version import __version__
parser = argparse.ArgumentParser(description=f"GNS3 server version {__version__}")
parser.add_argument("-v", "--version", help="show the version", action="version", version=__version__)
parser.add_argument("--host", help="run on the given host/IP address")
parser.add_argument("--port", help="run on the given port", type=int)
parser.add_argument("--ssl", action="store_true", help="run in SSL mode")
parser.add_argument("--config", help="Configuration file")
parser.add_argument("--certfile", help="SSL cert file")
parser.add_argument("--certkey", help="SSL key file")
parser.add_argument("-L", "--local", action="store_true", help="local mode (allows some insecure operations)")
parser.add_argument(
"-A", "--allow", action="store_true", help="allow remote connections to local console ports"
)
parser.add_argument("-q", "--quiet", default=False, action="store_true", help="do not show logs on stdout")
parser.add_argument("-d", "--debug", default=False, action="store_true", help="show debug logs")
parser.add_argument("--logfile", "--log", help="send output to logfile instead of console")
parser.add_argument("--logmaxsize", default=10000000, help="maximum logfile size in bytes (default is 10MB)")
parser.add_argument(
"--logbackupcount", default=10, help="number of historical log files to keep (default is 10)"
)
parser.add_argument(
"--logcompression", default=False, action="store_true", help="compress inactive (historical) logs"
)
parser.add_argument("--daemon", action="store_true", help="start as a daemon")
parser.add_argument("--pid", help="store process pid")
parser.add_argument("--profile", help="Settings profile (blank will use default settings files)")
args = parser.parse_args(argv)
return parser, args
def main():
"""
@ -69,10 +106,11 @@ def main():
raise SystemExit("Windows is not a supported platform to run the GNS3 server")
if "--daemon" in sys.argv:
daemonize()
from gns3server.server import Server
try:
asyncio.run(Server().run())
parser, args = parse_arguments(sys.argv[1:])
from gns3server.server import Server
asyncio.run(Server().run(parser, args))
except KeyboardInterrupt:
pass

View File

@ -23,7 +23,6 @@ Start the program. Use main.py to load it.
import os
import datetime
import locale
import argparse
import psutil
import sys
import asyncio
@ -33,13 +32,10 @@ import uvicorn
import secrets
import string
from gns3server.controller import Controller
from gns3server.compute.port_manager import PortManager
from gns3server.logger import init_logger
from gns3server.version import __version__
from gns3server.config import Config
from gns3server.crash_report import CrashReport
from gns3server.api.server import app
from pydantic import ValidationError, SecretStr
import logging
@ -90,40 +86,13 @@ class Server:
else:
log.info(f"Current locale is {language}.{encoding}")
def _parse_arguments(self, argv):
def _setup_logging(self, args):
"""
Parse command line arguments and override local configuration
Setup logging.
:params args: Array of command line arguments
:param args: command line arguments
"""
parser = argparse.ArgumentParser(description=f"GNS3 server version {__version__}")
parser.add_argument("-v", "--version", help="show the version", action="version", version=__version__)
parser.add_argument("--host", help="run on the given host/IP address")
parser.add_argument("--port", help="run on the given port", type=int)
parser.add_argument("--ssl", action="store_true", help="run in SSL mode")
parser.add_argument("--config", help="Configuration file")
parser.add_argument("--certfile", help="SSL cert file")
parser.add_argument("--certkey", help="SSL key file")
parser.add_argument("-L", "--local", action="store_true", help="local mode (allows some insecure operations)")
parser.add_argument(
"-A", "--allow", action="store_true", help="allow remote connections to local console ports"
)
parser.add_argument("-q", "--quiet", default=False, action="store_true", help="do not show logs on stdout")
parser.add_argument("-d", "--debug", default=False, action="store_true", help="show debug logs")
parser.add_argument("--logfile", "--log", help="send output to logfile instead of console")
parser.add_argument("--logmaxsize", default=10000000, help="maximum logfile size in bytes (default is 10MB)")
parser.add_argument(
"--logbackupcount", default=10, help="number of historical log files to keep (default is 10)"
)
parser.add_argument(
"--logcompression", default=False, action="store_true", help="compress inactive (historical) logs"
)
parser.add_argument("--daemon", action="store_true", help="start as a daemon")
parser.add_argument("--pid", help="store process pid")
parser.add_argument("--profile", help="Settings profile (blank will use default settings files)")
args = parser.parse_args(argv)
level = logging.INFO
if args.debug:
level = logging.DEBUG
@ -137,6 +106,15 @@ class Server:
quiet=args.quiet,
)
@staticmethod
def _load_config_and_set_defaults(parser, args, argv=None):
"""
Parse command line arguments and override local configuration
:param parser: ArgumentParser instance
:param args: command line arguments
"""
try:
if args.config:
Config.instance(files=[args.config], profile=args.profile)
@ -157,7 +135,10 @@ class Server:
}
parser.set_defaults(**defaults)
return parser.parse_args(argv)
if argv is None:
argv = sys.argv[1:]
args = parser.parse_args(argv)
return args
@staticmethod
def _set_config_defaults_from_command_line(args):
@ -174,6 +155,8 @@ class Server:
config.Server.enable_ssl = args.ssl
def _signal_handling(self):
from gns3server.controller import Controller
def signal_handler(signame, *args):
try:
@ -239,9 +222,10 @@ class Server:
log.critical("Can't write pid file %s: %s", path, str(e))
sys.exit(1)
async def run(self):
async def run(self, parser, args):
args = self._parse_arguments(sys.argv[1:])
self._setup_logging(args)
args = self._load_config_and_set_defaults(parser, args)
if args.pid:
self._pid_lock(args.pid)
@ -256,7 +240,6 @@ class Server:
self._set_config_defaults_from_command_line(args)
config = Config.instance().settings
if not config.Server.compute_password.get_secret_value():
alphabet = string.ascii_letters + string.digits + string.punctuation
generated_password = ''.join(secrets.choice(alphabet) for _ in range(16))
@ -297,11 +280,13 @@ class Server:
host = config.Server.host
port = config.Server.port
from gns3server.compute.port_manager import PortManager
PortManager.instance().console_host = host
self._signal_handling()
try:
log.info(f"Starting server on {host}:{port}")
from gns3server.api.server import app
# only show uvicorn access logs in debug mode
access_log = False

View File

@ -21,6 +21,7 @@ import pytest
import locale
import tempfile
from gns3server.main import parse_arguments as parse
from gns3server.server import Server
from gns3server.config import Config
@ -35,12 +36,18 @@ def test_locale_check():
assert locale.getlocale() == ('fr_FR', 'UTF-8')
def parse_arguments(server, argv):
parser, args= parse(argv)
return server._load_config_and_set_defaults(parser, args, argv)
def test_parse_arguments(capsys, config, tmpdir):
server = Server()
server_config = config.settings.Server
with pytest.raises(SystemExit):
server._parse_arguments(["--fail"])
parse_arguments(server, ["--fail"])
out, err = capsys.readouterr()
assert "usage" in err
assert "fail" in err
@ -67,48 +74,49 @@ def test_parse_arguments(capsys, config, tmpdir):
# assert __version__ in out
# assert "optional arguments" in out
assert server._parse_arguments(["--host", "192.168.1.1"]).host == "192.168.1.1"
assert server._parse_arguments([]).host == "0.0.0.0"
assert parse_arguments(server, ["--host", "192.168.1.1"]).host == "192.168.1.1"
assert parse_arguments(server, []).host == "0.0.0.0"
server_config.host = "192.168.1.2"
assert server._parse_arguments(["--host", "192.168.1.1"]).host == "192.168.1.1"
assert server._parse_arguments([]).host == "192.168.1.2"
assert server._parse_arguments(["--port", "8002"]).port == 8002
assert server._parse_arguments([]).port == 3080
assert parse_arguments(server, ["--host", "192.168.1.1"]).host == "192.168.1.1"
assert parse_arguments(server, []).host == "192.168.1.2"
assert parse_arguments(server, ["--port", "8002"]).port == 8002
assert parse_arguments(server, []).port == 3080
server_config.port = 8003
assert server._parse_arguments([]).port == 8003
assert parse_arguments(server, []).port == 8003
assert server._parse_arguments(["--ssl"]).ssl
assert server._parse_arguments([]).ssl is False
assert parse_arguments(server, ["--ssl"]).ssl
assert parse_arguments(server,[]).ssl is False
with tempfile.NamedTemporaryFile(dir=str(tmpdir)) as f:
server_config.certfile = f.name
server_config.certkey = f.name
server_config.enable_ssl = True
assert server._parse_arguments([]).ssl
assert parse_arguments(server, []).ssl
assert server._parse_arguments(["--certfile", "bla"]).certfile == "bla"
assert server._parse_arguments(["--certkey", "blu"]).certkey == "blu"
assert parse_arguments(server, ["--certfile", "bla"]).certfile == "bla"
assert parse_arguments(server,["--certkey", "blu"]).certkey == "blu"
assert server._parse_arguments(["-L"]).local
assert server._parse_arguments(["--local"]).local
assert parse_arguments(server,["-L"]).local
assert parse_arguments(server,["--local"]).local
server_config.local = False
assert server._parse_arguments([]).local is False
assert parse_arguments(server,[]).local is False
server_config.local = True
assert server._parse_arguments([]).local
assert parse_arguments(server,[]).local
assert server._parse_arguments(["-A"]).allow
assert server._parse_arguments(["--allow"]).allow
assert server._parse_arguments([]).allow is False
assert parse_arguments(server,["-A"]).allow
assert parse_arguments(server,["--allow"]).allow
assert parse_arguments(server,[]).allow is False
server_config.allow_remote_console = True
assert server._parse_arguments([]).allow
assert parse_arguments(server,[]).allow
assert server._parse_arguments(["-q"]).quiet
assert server._parse_arguments(["--quiet"]).quiet
assert server._parse_arguments([]).quiet is False
assert parse_arguments(server,["-q"]).quiet
assert parse_arguments(server,["--quiet"]).quiet
assert parse_arguments(server,[]).quiet is False
assert server._parse_arguments(["-d"]).debug
assert server._parse_arguments(["--debug"]).debug
assert server._parse_arguments([]).debug is False
assert parse_arguments(server,["-d"]).debug
assert parse_arguments(server,["--debug"]).debug
assert parse_arguments(server,[]).debug is False
def test_set_config_with_args(tmpdir):
@ -118,7 +126,7 @@ def test_set_config_with_args(tmpdir):
with tempfile.NamedTemporaryFile(dir=str(tmpdir)) as f:
certfile = f.name
certkey = f.name
args = server._parse_arguments(["--host",
args = parse_arguments(server, ["--host",
"192.168.1.1",
"--local",
"--allow",