gns3-server/gns3server/config.py
grossmj 71725aade6 Rename ssl and auth configuration file settings.
Add enable SSL config validator.
Strict configuration file validation: any error will prevent the server to start.
Core server logic moved to a Server class.
2021-04-12 23:26:42 +09:30

305 lines
10 KiB
Python

# -*- coding: utf-8 -*-
#
# Copyright (C) 2021 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Reads the configuration file and store the settings for the server.
"""
import sys
import os
import shutil
import secrets
import configparser
from pydantic import ValidationError
from .schemas import ServerConfig
from .version import __version_info__
from .utils.file_watcher import FileWatcher
import logging
log = logging.getLogger(__name__)
class Config:
"""
Configuration file management using configparser.
:param files: Array of configuration files (optional)
:param profile: Profile settings (default use standard config file)
"""
def __init__(self, files=None, profile=None):
self._settings = None
self._files = files
self._profile = profile
if files and len(files):
directory_name = os.path.dirname(files[0])
if not directory_name or directory_name == "":
files[0] = os.path.dirname(os.path.abspath(files[0])) + os.path.sep + files[0]
self._main_config_file = files[0]
else:
self._main_config_file = None
# Monitor configuration files for changes
self._watched_files = {}
self._watch_callback = []
appname = "GNS3"
version = "{}.{}".format(__version_info__[0], __version_info__[1])
if sys.platform.startswith("win"):
# On windows, the configuration file location can be one of the following:
# 1: %APPDATA%/GNS3/gns3_server.ini
# 2: %APPDATA%/GNS3.ini
# 3: %COMMON_APPDATA%/GNS3/gns3_server.ini
# 4: %COMMON_APPDATA%/GNS3.ini
# 5: server.ini in the current working directory
appdata = os.path.expandvars("%APPDATA%")
common_appdata = os.path.expandvars("%COMMON_APPDATA%")
if self._profile:
legacy_user_dir = os.path.join(appdata, appname, "profiles", self._profile)
versioned_user_dir = os.path.join(appdata, appname, version, "profiles", self._profile)
else:
legacy_user_dir = os.path.join(appdata, appname)
versioned_user_dir = os.path.join(appdata, appname, version)
server_filename = "gns3_server.ini"
if self._files is None and not hasattr(sys, "_called_from_test"):
self._files = [os.path.join(os.getcwd(), server_filename),
os.path.join(versioned_user_dir, server_filename),
os.path.join(appdata, appname + ".ini"),
os.path.join(common_appdata, appname, server_filename),
os.path.join(common_appdata, appname + ".ini")]
else:
# On UNIX-like platforms, the configuration file location can be one of the following:
# 1: $HOME/.config/GNS3/gns3_server.conf
# 2: $HOME/.config/GNS3.conf
# 3: /etc/xdg/GNS3/gns3_server.conf
# 4: /etc/xdg/GNS3.conf
# 5: gns3_server.conf in the current working directory
home = os.path.expanduser("~")
server_filename = "gns3_server.conf"
if self._profile:
legacy_user_dir = os.path.join(home, ".config", appname, "profiles", self._profile)
versioned_user_dir = os.path.join(home, ".config", appname, version, "profiles", self._profile)
else:
legacy_user_dir = os.path.join(home, ".config", appname)
versioned_user_dir = os.path.join(home, ".config", appname, version)
if self._files is None and not hasattr(sys, "_called_from_test"):
self._files = [os.path.join(os.getcwd(), server_filename),
os.path.join(versioned_user_dir, server_filename),
os.path.join(home, ".config", appname + ".conf"),
os.path.join("/etc/gns3", server_filename),
os.path.join("/etc/xdg", appname, server_filename),
os.path.join("/etc/xdg", appname + ".conf")]
if self._files is None:
self._files = []
if self._main_config_file is None:
# TODO: migrate versioned config file from a previous version of GNS3 (for instance 2.2 -> 3.0) + support profiles
# migrate post version 2.2.0 config files if they exist
os.makedirs(versioned_user_dir, exist_ok=True)
try:
# migrate the server config file
old_server_config = os.path.join(legacy_user_dir, server_filename)
new_server_config = os.path.join(versioned_user_dir, server_filename)
if not os.path.exists(new_server_config) and os.path.exists(old_server_config):
shutil.copyfile(old_server_config, new_server_config)
except OSError as e:
log.error("Cannot migrate old config files: {}".format(e))
self._main_config_file = os.path.join(versioned_user_dir, server_filename)
for file in self._files:
if os.path.exists(file):
self._main_config_file = file
break
self.clear()
self._watch_config_file()
@property
def settings(self) -> ServerConfig:
"""
Return the settings.
"""
if self._settings is None:
return ServerConfig()
return self._settings
def listen_for_config_changes(self, callback):
"""
Call the callback when the configuration file change
"""
self._watch_callback.append(callback)
@property
def profile(self):
"""
Settings profile
"""
return self._profile
@property
def config_dir(self):
"""
Return the directory where the configuration file is located.
"""
return os.path.dirname(self._main_config_file)
@property
def server_config(self):
"""
Return the server configuration file path.
"""
if sys.platform.startswith("win"):
server_config_filename = "gns3_server.ini"
else:
server_config_filename = "gns3_server.conf"
return os.path.join(self.config_dir, server_config_filename)
def clear(self):
"""
Restart with a clean config
"""
self.read_config()
def _watch_config_file(self):
"""
Add config files to be monitored for changes.
"""
for file in self._files:
if os.path.exists(file):
self._watched_files[file] = FileWatcher(file, self._config_file_change)
def _config_file_change(self, file_path):
"""
Callback when a config file has been updated.
"""
log.info(f"'{file_path}' has been updated, reloading the config...")
self.read_config()
for callback in self._watch_callback:
callback()
def reload(self):
"""
Reload configuration
"""
self.read_config()
def get_config_files(self):
"""
Return the config files in use.
"""
return self._watched_files
def _load_jwt_secret_key(self):
"""
Load the JWT secret key.
"""
jwt_secret_key_path = os.path.join(self._settings.Server.secrets_dir, "gns3_jwt_secret_key")
if not os.path.exists(jwt_secret_key_path):
log.info(f"No JWT secret key configured, generating one in '{jwt_secret_key_path}'...")
try:
with open(jwt_secret_key_path, "w+", encoding="utf-8") as fd:
fd.write(secrets.token_hex(32))
except OSError as e:
log.error(f"Could not create JWT secret key file '{jwt_secret_key_path}': {e}")
try:
with open(jwt_secret_key_path, encoding="utf-8") as fd:
jwt_secret_key_content = fd.read()
self._settings.Controller.jwt_secret_key = jwt_secret_key_content
except OSError as e:
log.error(f"Could not read JWT secret key file '{jwt_secret_key_path}': {e}")
def _load_secret_files(self):
"""
Load the secret files.
"""
if not self._settings.Server.secrets_dir:
self._settings.Server.secrets_dir = os.path.dirname(self.server_config)
self._load_jwt_secret_key()
def read_config(self):
"""
Read the configuration files and validate the settings.
"""
config = configparser.ConfigParser(interpolation=None)
try:
parsed_files = config.read(self._files, encoding="utf-8")
except configparser.Error as e:
log.error("Can't parse configuration file: %s", str(e))
return
if not parsed_files:
log.warning("No configuration file could be found or read")
return
for file in parsed_files:
log.info(f"Configuration file '{file}' loaded")
self._watched_files[file] = os.stat(file).st_mtime
try:
self._settings = ServerConfig(**config._sections)
except ValidationError as e:
log.critical(f"Could not validate configuration file settings: {e}")
raise
self._load_secret_files()
@staticmethod
def instance(*args, **kwargs):
"""
Singleton to return only one instance of Config.
:returns: instance of Config
"""
if not hasattr(Config, "_instance") or Config._instance is None:
Config._instance = Config(*args, **kwargs)
return Config._instance
@staticmethod
def reset():
"""
Reset singleton
"""
Config._instance = None