Source code for freva_deployment.deploy

"""Module to run the freva deployment."""
from __future__ import annotations
from getpass import getuser
import os
from pathlib import Path
import random
import shlex
import string
from subprocess import run
import sys
from urllib.parse import urlparse
from tempfile import TemporaryDirectory, mkdtemp
from typing import Any

from numpy import sign
import toml
import yaml

from .utils import (
    asset_dir,
    config_dir,
    get_passwd,
    get_email_credentials,
    logger,
    upload_server_map,
    RichConsole,
)


[docs]class DeployFactory: """Apply freva deployment and its services. Parameters ---------- project_name: str The name of the project to distinguish this instance from others. steps: list[str], default: ["services", "core", "web"] The components that are going to be deployed. config_file: os.PathLike, default: None Path to any existing deployment configuration file. Examples -------- >>> from freva_deployment import DeployFactory as DF >>> deploy = DF(steps=["solr"]) >>> deploy.play(ask_pass=True) """ step_order: tuple[str, ...] = ("vault", "db", "solr", "core", "web") _steps_with_cert: tuple[str, ...] = ("db", "vault", "core", "web") def __init__( self, steps: list[str] | None = None, config_file: Path | str | None = None, ) -> None: self._config_keys: list[str] = [] self.master_pass: str = "" self.email_password: str = "" self._td: TemporaryDirectory = TemporaryDirectory(prefix="inventory") self.inventory_file: Path = Path(self._td.name) / "inventory.yaml" self.eval_conf_file: Path = Path(self._td.name) / "evaluation_system.conf" self.web_conf_file: Path = Path(self._td.name) / "freva_web.toml" self.apache_config: Path = Path(self._td.name) / "freva_web.conf" self._db_pass: str = "" self._steps = steps or ["services", "core", "web"] self._inv_tmpl = Path(config_file or config_dir / "inventory.toml") self._cfg_tmpl = self.aux_dir / "evaluation_system.conf.tmpl" self.cfg = self._read_cfg() self.project_name = self.cfg.pop("project_name", None) if not self.project_name: raise ValueError("You must set a project name") @property def public_key_file(self) -> str: """Path to the public certificate file.""" public_keyfile = self.cfg["certificates"].get("public_keyfile") chain_keyfile = self.cfg["certificates"].get("chain_keyfile") if public_keyfile: return str(Path(public_keyfile).expanduser().absolute()) if chain_keyfile: return str(Path(chain_keyfile).expanduser().absolute()) raise ValueError("You must give a valid path to a public key file.") @property def private_key_file(self) -> str: """Path to the private key file.""" keyfile = self.cfg["certificates"].get("private_keyfile") if keyfile: return str(Path(keyfile).expanduser().absolute()) raise ValueError("You must give a valid path to a private key file.") @property def chain_key_file(self) -> str: """Path to the private key file.""" keyfile = self.cfg["certificates"].get("chain_keyfile") if keyfile: return str(Path(keyfile).expanduser().absolute()) return "" def _prep_vault(self) -> None: """Prepare the vault.""" self._config_keys.append("vault") self.cfg["vault"] = self.cfg["db"].copy() if not self.master_pass: self.master_pass = get_passwd() self.cfg["vault"]["config"]["root_passwd"] = self.master_pass self.cfg["vault"]["config"]["passwd"] = self.db_pass self.cfg["vault"]["config"]["keyfile"] = self.public_key_file self.cfg["vault"]["config"]["email"] = self.cfg["web"]["config"].get( "contacts", "" ) def _prep_db(self) -> None: """prepare the mariadb service.""" self._config_keys.append("db") if not self.master_pass: self.master_pass = get_passwd() host = self.cfg["db"]["hosts"] self.cfg["db"]["config"]["root_passwd"] = self.master_pass self.cfg["db"]["config"]["passwd"] = self.db_pass self.cfg["db"]["config"]["keyfile"] = self.public_key_file for key in ("name", "user", "db"): self.cfg["db"]["config"][key] = self.cfg["db"]["config"].get(key) or "freva" db_host = self.cfg["db"]["config"].get("host", "") if not db_host: self.cfg["db"]["config"]["host"] = host self.cfg["db"]["config"].setdefault("port", "3306") self.cfg["db"]["config"]["email"] = self.cfg["web"]["config"].get( "contacts", "" ) self._prep_vault() def _prep_solr(self) -> None: """prepare the apache solr service.""" self._config_keys.append("solr") self.cfg["solr"]["config"].pop("core", None) for key, default in dict(mem="4g", port=8983).items(): self.cfg["solr"]["config"][key] = ( self.cfg["solr"]["config"].get(key) or default ) self.cfg["solr"]["config"]["email"] = self.cfg["web"]["config"].get( "contacts", "" ) def _prep_core(self) -> None: """prepare the core deployment.""" self._config_keys.append("core") self.cfg["core"]["config"]["admins"] = ( self.cfg["core"]["config"].get("admins") or getuser() ) if not self.cfg["core"]["config"]["admins"]: self.cfg["core"]["config"]["admins"] = getuser() install_dir = self.cfg["core"]["config"]["install_dir"] root_dir = self.cfg["core"]["config"].get("root_dir", "") if not root_dir: self.cfg["core"]["config"]["root_dir"] = install_dir preview_path = self.cfg["core"]["config"].get("preview_path", "") base_dir_location = self.cfg["core"]["config"].get("base_dir_location", "") scheduler_output_dir = self.cfg["core"]["config"].get( "scheduler_output_dir", "" ) scheduler_system = self.cfg["core"]["config"].get("scheduler_system", "local") if not preview_path: if base_dir_location: self.cfg["core"]["config"]["preview_path"] = str( Path(base_dir_location) / "share" / "preview" ) else: self.cfg["core"]["config"]["preview_path"] = "" if not scheduler_output_dir: scheduler_output_dir = str(Path(base_dir_location) / "share") scheduler_output_dir = Path(scheduler_output_dir) / scheduler_system self.cfg["core"]["config"]["scheduler_output_dir"] = str(scheduler_output_dir) self.cfg["core"]["config"]["keyfile"] = self.public_key_file git_exe = self.cfg["core"]["config"].get("git_path") self.cfg["core"]["config"]["git_path"] = git_exe or "git" self.cfg["core"]["config"][ "git_url" ] = "https://gitlab.dkrz.de/freva/evaluation_system.git" def _prep_web(self) -> None: """prepare the web deployment.""" self._config_keys.append("web") self._prep_core() admin = self.cfg["core"]["config"]["admins"] if not isinstance(admin, str): self.cfg["web"]["config"]["admin"] = admin[0] else: self.cfg["web"]["config"]["admin"] = admin _webserver_items = {} try: _webserver_items = { k.replace("web_", "").upper(): v for (k, v) in self.cfg["web"]["config"].items() } except KeyError as error: raise KeyError( "No web config section given, please configure the web.config" ) from error _webserver_items["ALLOWED_HOSTS"].append(self.cfg["web"]["hosts"]) _webserver_items["REDIS_HOST"] = self.cfg["web"]["hosts"] try: with Path(_webserver_items["homepage_text"]).open("r") as f_obj: _webserver_items["homepage_text"] = f_obj.read() except (FileNotFoundError, IOError, KeyError): pass server_name = self.cfg["web"]["config"].pop("server_name", []) if isinstance(server_name, str): server_name = server_name.split(",") server_name = ",".join([a for a in server_name if a.strip()]) if not server_name: server_name = self.cfg["web"]["hosts"] self.cfg["web"]["config"]["server_name"] = server_name web_host = self.cfg["web"]["hosts"] if web_host == "127.0.0.1": web_host = "localhost" self.cfg["web"]["config"]["host"] = web_host _webserver_items["CSRF_TRUSTED_ORIGINS"] = [] for url in (server_name, self.cfg["web"]["config"]["project_website"]): trusted_origin = urlparse(url) if trusted_origin.scheme: _webserver_items["CSRF_TRUSTED_ORIGINS"].append( f"https://{trusted_origin.netloc}" ) else: _webserver_items["CSRF_TRUSTED_ORIGINS"].append( f"https://{trusted_origin.path}" ) _webserver_items["FREVA_BIN"] = os.path.join( self.cfg["core"]["config"]["install_dir"], "bin" ) try: with Path(_webserver_items["ABOUT_US_TEXT"]).open() as f_obj: _webserver_items["ABOUT_US_TEXT"] = f_obj.read() except (FileNotFoundError, IOError, KeyError): pass try: _webserver_items["IMPRINT"] = _webserver_items["IMPRINT"].split(",") except AttributeError: pass with self.web_conf_file.open("w") as f_obj: toml.dump(_webserver_items, f_obj) for key in ("core", "web"): self.cfg[key]["config"]["config_toml_file"] = str(self.web_conf_file) if not self.master_pass: self.master_pass = get_passwd() email_user, self.email_password = get_email_credentials() self._prep_vault() self.cfg["vault"]["config"]["ansible_python_interpreter"] = self.cfg["db"][ "config" ].get("ansible_python_interpreter", "/usr/bin/python3") self.cfg["vault"]["config"]["email_user"] = email_user self.cfg["vault"]["config"]["email_password"] = self.email_password self.cfg["web"]["config"]["root_passwd"] = self.master_pass self.cfg["web"]["config"]["private_keyfile"] = self.private_key_file self.cfg["web"]["config"]["public_keyfile"] = self.public_key_file self.cfg["web"]["config"]["chain_keyfile"] = ( self.chain_key_file or self.public_key_file ) self.cfg["web"]["config"]["apache_config_file"] = str(self.apache_config) self._prep_apache_config() def _prep_apache_config(self): config = [] with (Path(asset_dir) / "web" / "freva_web.conf").open() as f_obj: for line in f_obj.readlines(): if not self.chain_key_file and "SSLCertificateChainFile" in line: continue config.append(line) with open(self.apache_config, "w") as f_obj: f_obj.write("".join(config)) def __enter__(self): return self def __exit__(self, *args): self._td.cleanup() def _read_cfg(self) -> dict[str, Any]: try: with self._inv_tmpl.open() as f_obj: return dict(toml.load(f_obj)) except FileNotFoundError as error: raise FileNotFoundError(f"No such file {self._inv_tmpl}") from error def _check_config(self) -> None: sections = [] for section in self.cfg.keys(): for step in self._config_keys: if section.startswith(step) and section not in sections: sections.append(section) for section in sections: for key, value in self.cfg[section]["config"].items(): if not value and not self._empty_ok and not isinstance(value, bool): raise ValueError(f"{key} in {section} is empty in {self._inv_tmpl}") @property def _empty_ok(self) -> list[str]: """Define all keys that can be empty.""" return [ "admins", "conda_exec_path", ] def _get_files_copy(self, key) -> Path | None: return dict( core=self.eval_conf_file.absolute(), web=self.eval_conf_file.absolute(), ).get(key, None) @property def db_pass(self) -> str: """Create a password for the database.""" if self._db_pass: return self._db_pass punctuations = "!@$^&*()_+-;:|,.%" num_chars, num_digits, num_punctuations = 20, 4, 4 num_chars -= num_digits + num_punctuations characters = [ "".join([random.choice(string.ascii_letters) for i in range(num_chars)]), "".join([random.choice(string.digits) for i in range(num_digits)]), "".join([random.choice(punctuations) for i in range(num_punctuations)]), ] str_characters = "".join(characters) _db_pass = "".join(random.sample(str_characters, len(str_characters))) while _db_pass.startswith("@"): # Vault treats values starting with "@" as file names. _db_pass = "".join(random.sample(str_characters, len(str_characters))) self._db_pass = _db_pass return self._db_pass @property def _needs_core(self) -> list[str]: """Define the steps that need the core config.""" return ["web", "core"] def _set_additional_config_values( self, step: str, config: dict[str, dict[str, dict[str, str | int | bool]]], ) -> None: """Set additional values to the configuration.""" if step in self._needs_core: for key in ( "root_dir", "base_dir_location", "preview_path", "scheduler_output_dir", ): value = self.cfg["core"]["config"].get(key, "") config[step]["vars"][f"core_{key}"] = value config[step]["vars"][f"{step}_hostname"] = self.cfg[step]["hosts"] config[step]["vars"][f"{step}_name"] = f"{self.project_name}-{step}" config[step]["vars"]["asset_dir"] = str(asset_dir) config[step]["vars"]["ansible_user"] = ( self.cfg[step]["config"].get("ansible_user") or getuser() ) config[step]["vars"][f"{step}_ansible_python_interpreter"] = ( self.cfg[step]["config"].get("ansible_python_interpreter") or "/usr/bin/python3" ) dump_file = self._get_files_copy(step) if dump_file: config[step]["vars"][f"{step}_dump"] = str(dump_file)
[docs] def parse_config(self) -> str: """Create config files for anisble and evaluation_system.conf.""" logger.info("Parsing configurations") self._check_config() config: dict[str, dict[str, dict[str, str | int | bool]]] = {} for step in set(self._config_keys): config[step] = {} config[step]["hosts"] = self.cfg[step]["hosts"] config[step]["vars"] = {} for key, value in self.cfg[step]["config"].items(): if key in ("root_passwd", "wipe"): new_key = key else: new_key = f"{step}_{key}" config[step]["vars"][new_key] = value config[step]["vars"]["project_name"] = self.project_name # Add additional keys self._set_additional_config_values(step, config) return yaml.dump(config)
@property def _playbook_file(self) -> Path: return Path(self._td.name) / "ansible-playbook.yml" @property def python_prefix(self) -> Path: """Get the path of the new conda evnironment.""" return Path(sys.exec_prefix) / "bin" / "python3" @property def aux_dir(self) -> Path: """Directory with auxillary files.""" return asset_dir / "config" @property def playbook_dir(self) -> Path: """The location of all playbooks.""" return asset_dir / "playbooks" @property def steps(self) -> list[str]: """Set all the deploment steps.""" steps = [] for step in self._steps: steps.append(step) if step == "db": steps.append("vault") return [s for s in self.step_order if s in steps]
[docs] def create_playbooks(self): """Create the ansible playbook form all steps.""" logger.info("Creating Ansible playbooks") playbook = [] for step in self.steps: getattr(self, f"_prep_{step}")() playbook_file = self.playbook_dir / f"{step}-server-playbook.yml" with playbook_file.open() as f_obj: playbook += yaml.safe_load(f_obj) with self._playbook_file.open("w") as f_obj: yaml.dump(playbook, f_obj)
[docs] def create_eval_config(self) -> None: """Create and dump the evaluation_systme.config.""" logger.info("Creating evaluation_system.conf") keys = ( ("core", "admins"), ("web", "project_website"), ("core", "root_dir"), ("core", "base_dir_location"), ("core", "preview_path"), ("core", "scheduler_output_dir"), ("core", "scheduler_system"), ) cfg_file = asset_dir / "config" / "evaluation_system.conf.tmpl" with cfg_file.open() as f_obj: lines = f_obj.readlines() for num, line in enumerate(lines): if line.startswith("project_name"): lines[num] = f"project_name={self.project_name}\n" for key, value in keys: if line.startswith(value): cfg = self.cfg[key]["config"].get(value, "") if cfg: lines[num] = f"{value}={cfg}\n" for step in ("solr", "db"): cfg = self.cfg[step]["config"].get("port", "") if line.startswith(f"{step}.port"): lines[num] = f"{step}.port={cfg}\n" if line.startswith(f"{step}.host"): lines[num] = f"{step}.host={self.cfg[step]['hosts']}\n" dump_file = self._get_files_copy("core") if dump_file: with dump_file.open("w") as f_obj: f_obj.write("".join(lines))
[docs] def play( self, server_map: str | None = None, ask_pass: bool = True, verbosity: int = 0, ) -> None: """Play the ansible playbook. Parameters ---------- server_map: str, default: None Hostname running the service that keeps track of the server infrastructure, if None given (default) no new deployed services are added. ask_pass: bool, default: True Instruct Ansible to ask for the ssh passord instead of using a ssh key verbosity: int, default: 0 Verbosity level, default 0 """ self.create_playbooks() inventory = self.parse_config() self.create_eval_config() with self.inventory_file.open("w") as f_obj: f_obj.write(inventory) inventory_str = inventory for passwd in (self.email_password, self.master_pass): if passwd: inventory_str = inventory_str.replace(passwd, "*" * len(passwd)) RichConsole.print(inventory, style="bold", markup=True) logger.info("Playing the playbooks with ansible") RichConsole.print( "[b]Note:[/] The [blue]BECOME[/] password refers to the " "[blue]sudo[/] password", markup=True, ) v_string = sign(verbosity) * "-" + verbosity * "v" cmd = ( f"ansible-playbook {v_string} -i {self.inventory_file} " f"{self._playbook_file} --ask-become-pass" ) if ask_pass: cmd += " --ask-pass" try: _ = run( shlex.split(cmd), env=os.environ.copy(), cwd=str(asset_dir), check=True, ) except KeyboardInterrupt: pass if server_map: self.upload_server_info(server_map, inventory)
[docs] def upload_server_info(self, server_map: str, inventory: str) -> None: """Upload information on server information to shared nextcloud.""" deployment_setup: dict[str, dict[str, str]] = {} for step, info in yaml.safe_load(inventory).items(): ansible_step = dict( python_path=info["vars"][f"{step}_ansible_python_interpreter"] ) ansible_step["hosts"] = info["hosts"] deployment_setup[step] = ansible_step if step == "web": deployment_setup["redis"] = ansible_step upload_server_map(server_map, self.project_name, deployment_setup)