From 297b713ad4a6c989dc9ebbc3dbea9969bf68e588 Mon Sep 17 00:00:00 2001 From: "G.Reijn" Date: Sat, 7 Jun 2025 07:34:18 +0200 Subject: [PATCH 1/5] Init with get method Python TSToy resource --- samples/python/resources/first/.gitignore | 166 +++++++++++++ samples/python/resources/first/README.md | 3 + samples/python/resources/first/build.ps1 | 95 +++++++ .../resources/first/src/commands/get.py | 51 ++++ .../resources/first/src/config/config.py | 233 ++++++++++++++++++ .../resources/first/src/config/manager.py | 171 +++++++++++++ .../resources/first/src/core/console.py | 12 + samples/python/resources/first/src/main.py | 15 ++ .../resources/first/src/models/models.py | 25 ++ .../python/resources/first/src/pyproject.toml | 21 ++ .../resources/first/src/resources/strings.py | 7 + .../resources/first/src/schema/schema.py | 48 ++++ .../resources/first/src/utils/logger.py | 109 ++++++++ .../first/tests/acceptance.tests.ps1 | 81 ++++++ 14 files changed, 1037 insertions(+) create mode 100644 samples/python/resources/first/.gitignore create mode 100644 samples/python/resources/first/README.md create mode 100644 samples/python/resources/first/build.ps1 create mode 100644 samples/python/resources/first/src/commands/get.py create mode 100644 samples/python/resources/first/src/config/config.py create mode 100644 samples/python/resources/first/src/config/manager.py create mode 100644 samples/python/resources/first/src/core/console.py create mode 100644 samples/python/resources/first/src/main.py create mode 100644 samples/python/resources/first/src/models/models.py create mode 100644 samples/python/resources/first/src/pyproject.toml create mode 100644 samples/python/resources/first/src/resources/strings.py create mode 100644 samples/python/resources/first/src/schema/schema.py create mode 100644 samples/python/resources/first/src/utils/logger.py create mode 100644 samples/python/resources/first/tests/acceptance.tests.ps1 diff --git a/samples/python/resources/first/.gitignore b/samples/python/resources/first/.gitignore new file mode 100644 index 0000000..863ea61 --- /dev/null +++ b/samples/python/resources/first/.gitignore @@ -0,0 +1,166 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be added to the global gitignore or merged into this project gitignore. For PyCharm +# Community Edition, use 'PyCharm CE' in the configurations. +.idea/ + +# Build output +python.zip + +# uv +.uv/ +uv.lock \ No newline at end of file diff --git a/samples/python/resources/first/README.md b/samples/python/resources/first/README.md new file mode 100644 index 0000000..3b4bcce --- /dev/null +++ b/samples/python/resources/first/README.md @@ -0,0 +1,3 @@ +# Python TSToy resource + +To be filled in diff --git a/samples/python/resources/first/build.ps1 b/samples/python/resources/first/build.ps1 new file mode 100644 index 0000000..44d662f --- /dev/null +++ b/samples/python/resources/first/build.ps1 @@ -0,0 +1,95 @@ +[CmdletBinding()] +param ( + [ValidateSet('build', 'test')] + [string]$mode = 'build', + [string]$name = 'pythontstoy' +) + +function Build-PythonProject { + [CmdletBinding()] + param ( + [Parameter()] + [string]$Name + ) + begin { + Write-Verbose -Message "Starting Python project build process" + + $sourceDir = Join-Path -Path $PSScriptRoot -ChildPath 'src' + $outputDir = Join-Path -Path $PSScriptRoot -ChildPath 'dist' + } + + process { + Install-Uv + + Push-Location -Path $sourceDir -ErrorAction Stop + + try { + # Create virtual environment + & uv venv + + # Activate it + & .\.venv\Scripts\activate.ps1 + + # Sync all the dependencies + & uv sync + + # Create executable + $pyInstallerArgs = @( + 'main.py', + '-F', + '--clean', + '--distpath', $outputDir, + '--name', $Name + ) + & pyinstaller.exe @pyInstallerArgs + } + finally { + deactivate + Pop-Location -ErrorAction Ignore + } + } + + end { + Write-Verbose -Message "Python project build process completed" + } +} + +function Install-Uv() { + begin { + Write-Verbose -Message "Installing Uv dependencies" + } + + process { + if ($IsWindows) { + if (-not (Get-Command uv -ErrorAction SilentlyContinue)) { + Write-Verbose -Message "Installing uv package manager on Windows" + Invoke-RestMethod https://astral.sh/uv/install.ps1 | Invoke-Expression + + } + $env:Path = "$env:USERPROFILE\.local\bin;$env:Path" + } + elseif ($IsLinux) { + curl -LsSf https://astral.sh/uv/install.sh | sh + } + } + + end { + Write-Verbose -Message "Uv installation process completed" + } +} + +switch ($mode) { + 'build' { + Build-PythonProject -Name $name + } + 'test' { + Build-PythonProject -Name $name + + $testContainer = New-PesterContainer -Path (Join-Path 'tests' 'acceptance.tests.ps1') -Data @{ + Name = $name + } + + Invoke-Pester -Container $testContainer -Output Detailed + } + +} \ No newline at end of file diff --git a/samples/python/resources/first/src/commands/get.py b/samples/python/resources/first/src/commands/get.py new file mode 100644 index 0000000..f66ec61 --- /dev/null +++ b/samples/python/resources/first/src/commands/get.py @@ -0,0 +1,51 @@ +import click +import json +import sys +from utils.logger import Logger +from config.config import Settings + +# Create a logger instance for this module +logger = Logger() + +@click.command('get') +@click.option('--input', 'input_json', + help='JSON input data with required scope field', + required=True, + type=str) +def get_command(input_json): + """Get command that retrieves configuration based on scope from validated JSON input""" + + try: + try: + data = json.loads(input_json) + except json.JSONDecodeError as e: + logger.error(f"Invalid JSON format - {e}", "get_command", input_data=input_json) + sys.exit(1) + + if not Settings.validate_input(data, logger): + sys.exit(1) + + settings = Settings.from_dict(data) + + if not settings.validate(): + logger.error(f"Settings validation failed for scope: {settings.scope}", "get_command", scope=settings.scope) + sys.exit(1) + + logger.info(f"Processing settings for scope: {settings.scope}", "get_command", scope=settings.scope) + + result_settings, err = settings.enforce() + if err: + logger.error(f"Failed to enforce settings for scope {settings.scope}", "get_command", + scope=settings.scope, error_details=str(err)) + sys.exit(1) + + result_settings.print_config() + logger.info(f"Settings retrieved successfully for scope: {settings.scope}", "get_command", + scope=settings.scope) + + return result_settings + + except Exception as e: + logger.critical(f"Unexpected error in get_command", "get_command", + error_type=type(e).__name__, error_message=str(e)) + sys.exit(1) \ No newline at end of file diff --git a/samples/python/resources/first/src/config/config.py b/samples/python/resources/first/src/config/config.py new file mode 100644 index 0000000..e015991 --- /dev/null +++ b/samples/python/resources/first/src/config/config.py @@ -0,0 +1,233 @@ +from dataclasses import dataclass, field +from typing import Literal, Optional, Dict, Any, Tuple, Set +from pathlib import Path +from config.manager import ConfigManager +from utils.logger import Logger +from schema.schema import validate_resource, RESOURCE_SCHEMA +import json + +@dataclass +class Settings: + scope: Literal["user", "machine"] + _exist: bool = True + updateAutomatically: Optional[bool] = None + updateFrequency: Optional[int] = None + config_path: str = field(default="", init=False, repr=False) + _provided_properties: Set[str] = field(default_factory=set, init=False, repr=False) + + def __post_init__(self): + """Initialize dependencies after dataclass creation""" + self.config_manager = ConfigManager() + self.logger = Logger() + + @classmethod + def validate_input(cls, data: Dict[str, Any], logger: Logger) -> bool: + is_valid, validation_error = validate_resource(data) + if not is_valid: + logger.error(f"Input validation failed: {validation_error}", "Settings") + return False + + allowed_properties = list(RESOURCE_SCHEMA["properties"].keys()) + logger.info(f"Valid properties per schema: {allowed_properties}", "Settings") + logger.info(f"Provided properties: {list(data.keys())}", "Settings") + + return True + + def get_config_path(self) -> Tuple[str, Optional[Exception]]: + try: + if not self.scope: + return "", ValueError("scope is required") + + if self.scope == "machine": + config_path = self.config_manager.get_machine_config_path() + elif self.scope == "user": + config_path = self.config_manager.get_user_config_path() + else: + return "", ValueError(f"invalid scope: {self.scope}") + + self.config_path = str(config_path) + return self.config_path, None + + except Exception as e: + return "", e + + def get_config_map(self) -> Tuple[Optional[Dict[str, Any]], Optional[Exception]]: + try: + config_path, err = self.get_config_path() + if err: + return None, err + + if not Path(config_path).exists(): + self.logger.info(f"Config file not found: {config_path}", "Settings") + return {}, None # Return empty map if file doesn't exist + + # Load configuration + config_data = self.config_manager.load_config_file(Path(config_path)) + if config_data is None: + self.logger.info(f"Config file loaded but empty: {config_path}", "Settings") + return {}, None + + self.logger.info(f"Config loaded successfully: {json.dumps(config_data)}", "Settings") + return config_data, None + + except Exception as e: + return None, e + + def get_config_settings(self) -> Tuple[Optional['Settings'], Optional[Exception]]: + try: + config_map, err = self.get_config_map() + if err: + return None, err + + if not config_map: + self.logger.info("No configuration found, returning with _exist=False", "Settings") + return Settings(scope=self.scope, _exist=False), None + + settings = Settings(scope=self.scope, _exist=True) + + if 'updates' in config_map: + updates = config_map['updates'] + + if 'updateAutomatically' in updates: + settings.updateAutomatically = bool(updates['updateAutomatically']) + + if 'updateFrequency' in updates: + settings.updateFrequency = int(updates['updateFrequency']) + + self.logger.info(f"Loaded settings - updateAutomatically: {settings.updateAutomatically}, " + + f"updateFrequency: {settings.updateFrequency}", "Settings") + else: + self.logger.info("No 'updates' section found in config file", "Settings") + + return settings, None + + except Exception as e: + self.logger.error(f"Error in get_config_settings: {str(e)}", "Settings") + return None, e + + def enforce(self) -> Tuple[Optional['Settings'], Optional[Exception]]: + try: + current_settings, err = self.get_config_settings() + if err: + return None, err + + if not current_settings: + self.logger.info("Failed to get settings, returning with scope only", "Settings") + return Settings(scope=self.scope, _exist=True), None + + if not Path(self.config_path).exists(): + self.logger.info("Config file doesn't exist, returning scope only with _exist=false", "Settings") + result = Settings(scope=self.scope, _exist=False) + result.updateAutomatically = None + result.updateFrequency = None + return result, None + + self.logger.info("Current settings from config file: " + + f"updateAutomatically={current_settings.updateAutomatically}, " + + f"updateFrequency={current_settings.updateFrequency}", "Settings") + + if self._has_properties_to_validate(): + validated_settings = self._validate_settings(current_settings) + return validated_settings, None + else: + self.logger.info("No properties to validate, returning all: " + + f"updateAutomatically={current_settings.updateAutomatically}, " + + f"updateFrequency={current_settings.updateFrequency}", "Settings") + + return current_settings, None + + except Exception as e: + self.logger.error(f"Error in enforce: {str(e)}", "Settings") + return None, e + + def _has_properties_to_validate(self) -> bool: + properties = set(self._provided_properties) + if 'scope' in properties: + properties.remove('scope') + if '_exist' in properties: + properties.remove('_exist') + + has_properties = len(properties) > 0 + self.logger.info(f"Properties to validate: {properties}, has_properties: {has_properties}", "Settings") + return has_properties + + def _validate_settings(self, current_settings: 'Settings') -> 'Settings': + validated_settings = Settings( + scope=self.scope, + _exist=True, # Start with True, will be set to False if validation fails + updateAutomatically=current_settings.updateAutomatically, + updateFrequency=current_settings.updateFrequency + ) + + validation_failed = False + + if 'updateAutomatically' in self._provided_properties: + if current_settings.updateAutomatically is None: + validation_failed = True + self.logger.info(f"updateAutomatically not found in config (requested: {self.updateAutomatically})", "Settings") + elif current_settings.updateAutomatically != self.updateAutomatically: + validation_failed = True + self.logger.info(f"updateAutomatically mismatch - requested: {self.updateAutomatically}, found: {current_settings.updateAutomatically}", "Settings") + else: + self.logger.info(f"updateAutomatically validation passed: {self.updateAutomatically}", "Settings") + + if 'updateFrequency' in self._provided_properties: + if current_settings.updateFrequency is None: + validation_failed = True + self.logger.info(f"updateFrequency not found in config (requested: {self.updateFrequency})", "Settings") + elif current_settings.updateFrequency != self.updateFrequency: + validation_failed = True + self.logger.info(f"updateFrequency mismatch - requested: {self.updateFrequency}, found: {current_settings.updateFrequency}", "Settings") + else: + self.logger.info(f"updateFrequency validation passed: {self.updateFrequency}", "Settings") + + if validation_failed: + validated_settings._exist = False + self.logger.info("Validation failed, setting _exist to False", "Settings") + else: + self.logger.info("All validations passed, _exist remains True", "Settings") + + return validated_settings + + def to_json(self, exclude_private: bool = False, exclude_none: bool = True) -> str: + data = {} + data["_exist"] = self._exist + + if self.scope is not None: + data["scope"] = self.scope + + if not exclude_none or self.updateAutomatically is not None: + data["updateAutomatically"] = self.updateAutomatically + + if not exclude_none or self.updateFrequency is not None: + data["updateFrequency"] = self.updateFrequency + + if exclude_none: + data = {k: v for k, v in data.items() if v is not None or k == "_exist"} + + self.logger.info(f"Serializing to JSON: {data}", "Settings") + + return json.dumps(data) + + def print_config(self) -> None: + json_output = self.to_json(exclude_private=False, exclude_none=True) + self.logger.info(f"Printing configuration: {json_output}", "Settings") + print(json_output) + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'Settings': + settings = cls( + scope=data.get('scope'), + _exist=data.get('_exist', True), # Default to True if not specified + updateAutomatically=data.get('updateAutomatically'), + updateFrequency=data.get('updateFrequency') + ) + + settings._provided_properties = set(data.keys()) + + return settings + + def validate(self) -> bool: + if not self.scope or self.scope not in ['user', 'machine']: + return False + return True \ No newline at end of file diff --git a/samples/python/resources/first/src/config/manager.py b/samples/python/resources/first/src/config/manager.py new file mode 100644 index 0000000..25f5d45 --- /dev/null +++ b/samples/python/resources/first/src/config/manager.py @@ -0,0 +1,171 @@ +import os +import json +import pathlib +from typing import Dict, Any +from resources.strings import Strings +from utils.logger import Logger + + +class ConfigSource: + """Represents a configuration source.""" + DEFAULT = "default" + MACHINE = "machine" + USER = "user" + ENV = "environment" + CLI = "cli" + +class ConfigManager: + def __init__(self): + self.default_config = { + "updates": { + "updateAutomatically": False, + "updateFrequency": 180 + } + } + self.machine_config = {} + self.user_config = {} + self.env_config = {} + self.cli_config = {} + self.config = {} + + # Track loaded sources for reporting + self.loaded_sources = [] + + self.logger = Logger() + + def get_machine_config_path(self) -> pathlib.Path: + if os.name == 'nt': # Windows + return pathlib.Path(os.environ.get('PROGRAMDATA', 'C:/ProgramData')) / 'tstoy' / 'config.json' + else: # Unix-like + return pathlib.Path('/etc/tstoy/config.json') + + def get_user_config_path(self) -> pathlib.Path: + if os.name == 'nt': # Windows + config_dir = pathlib.Path(os.environ.get('APPDATA')) + else: # Unix-like + config_dir = pathlib.Path.home() / '.config' + + return config_dir / 'tstoy' / 'config.json' + + def load_config_file(self, path: pathlib.Path) -> Dict[str, Any]: + if not path.exists(): + self.logger.info(Strings.CONFIG_NOT_FOUND.format(path)) + return {} + + try: + with open(path, 'r') as f: + config = json.load(f) + self.logger.info(Strings.CONFIG_LOADED.format(path)) + return config + except json.JSONDecodeError as e: + self.logger.error(Strings.CONFIG_INVALID.format(path, str(e))) + return {} + except IOError as e: + self.logger.error(Strings.CONFIG_INVALID.format(path, str(e))) + return {} + + def save_config_file(self, path: pathlib.Path, config: Dict[str, Any]) -> bool: + try: + # Create parent directories if they don't exist + path.parent.mkdir(parents=True, exist_ok=True) + + with open(path, 'w') as f: + json.dump(config, f, indent=2) + self.logger.info(Strings.CONFIG_UPDATED.format(path)) + return True + except Exception as e: + self.logger.error(Strings.ERROR_WRITE_CONFIG.format(path, str(e))) + return False + + def load_default_config(self): + self.config = self.default_config.copy() + self.loaded_sources.append(ConfigSource.DEFAULT) + + def load_machine_config(self): + path = self.get_machine_config_path() + self.machine_config = self.load_config_file(path) + if self.machine_config: + self._merge_config(self.machine_config) + self.loaded_sources.append(ConfigSource.MACHINE) + + def load_user_config(self): + path = self.get_user_config_path() + self.user_config = self.load_config_file(path) + if self.user_config: + self._merge_config(self.user_config) + self.loaded_sources.append(ConfigSource.USER) + + def load_environment_config(self, prefix: str): + env_config = {} + for key, value in os.environ.items(): + if key.startswith(prefix): + # Convert DSCPY_UPDATES_AUTOMATIC to updates.automatic + config_key = key[len(prefix):].lower().replace('_', '.') + + # Convert string value to appropriate type + if value.lower() in ('true', 'yes', '1'): + env_config[config_key] = True + elif value.lower() in ('false', 'no', '0'): + env_config[config_key] = False + elif value.isdigit(): + env_config[config_key] = int(value) + else: + env_config[config_key] = value + + if env_config: + self.env_config = env_config + self._merge_config(env_config) + self.loaded_sources.append(ConfigSource.ENV) + + + def _merge_config(self, source: Dict[str, Any]): + def deep_merge(target, source): + for key, value in source.items(): + if key in target and isinstance(target[key], dict) and isinstance(value, dict): + deep_merge(target[key], value) + else: + target[key] = value + + deep_merge(self.config, source) + + def get_merged_config(self) -> Dict[str, Any]: + return self.config + + def get_config_sources(self) -> list: + return self.loaded_sources + + def get_all_config_files(self) -> list: + try: + user_config_dir = self.get_user_config_path().parent + if user_config_dir.exists(): + # Return a list of all JSON files in the config directory + return list(user_config_dir.glob('*.json')) + else: + self.logger.warning(f"Config directory does not exist: {user_config_dir}") + return [] + except Exception as e: + self.logger.error(f"Error enumerating config files: {str(e)}") + return [] + + def get_config_by_name(self, name: str) -> Dict[str, Any]: + if name == 'default': + return self.default_config.copy() + + # Try to find a specific config file with this name + user_config_dir = self.get_user_config_path().parent + config_path = user_config_dir / f"{name}.json" + + if config_path.exists(): + config = self.load_config_file(config_path) + return config + else: + # If no specific file exists, return the merged config + # This behavior can be changed based on requirements + self.logger.warning(f"No configuration found for name: {name}") + return self.get_merged_config() + + def load_all_configs(self, env_prefix: str = "TSTOY_"): + self.load_default_config() + self.load_machine_config() + self.load_user_config() + self.load_environment_config(env_prefix) diff --git a/samples/python/resources/first/src/core/console.py b/samples/python/resources/first/src/core/console.py new file mode 100644 index 0000000..d5e95ac --- /dev/null +++ b/samples/python/resources/first/src/core/console.py @@ -0,0 +1,12 @@ +class Console: + @staticmethod + def info(message: str): + print(f"INFO: {message}") + + @staticmethod + def error(message: str): + print(f"ERROR: {message}") + + @staticmethod + def warning(message: str): + print(f"WARNING: {message}") diff --git a/samples/python/resources/first/src/main.py b/samples/python/resources/first/src/main.py new file mode 100644 index 0000000..28c2ce3 --- /dev/null +++ b/samples/python/resources/first/src/main.py @@ -0,0 +1,15 @@ +import click +from commands.get import get_command + + +@click.group() +def main(): + """Python TSToy CLI tool.""" + pass + +# TODO: Add more commands and move away in main.py +main.add_command(get_command) + + +if __name__ == '__main__': + main() diff --git a/samples/python/resources/first/src/models/models.py b/samples/python/resources/first/src/models/models.py new file mode 100644 index 0000000..501e57a --- /dev/null +++ b/samples/python/resources/first/src/models/models.py @@ -0,0 +1,25 @@ +from dataclasses import dataclass, asdict +from typing import Literal, Optional + +import json + +@dataclass +class TsToy: + scope: Literal["user", "machine"] + _exist: bool = True + updateAutomatically: Optional[bool] = None + updateFrequency: Optional[int] = None + + def to_json(self, include_none: bool = False) -> str: + data = asdict(self) + if not include_none: + data = {k: v for k, v in data.items() if v is not None} + return json.dumps(data) + + def to_dict(self, include_none: bool = False) -> dict: + data = asdict(self) + + if not include_none: + data = {k: v for k, v in data.items() if v is not None} + + return data \ No newline at end of file diff --git a/samples/python/resources/first/src/pyproject.toml b/samples/python/resources/first/src/pyproject.toml new file mode 100644 index 0000000..e3a0914 --- /dev/null +++ b/samples/python/resources/first/src/pyproject.toml @@ -0,0 +1,21 @@ +[build-system] +requires = ["setuptools>=45", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "tstoy" +version = "0.1.0" +description = "A command-line interface application built with Click" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "click>=8.2.1", + "jsonschema>=4.24.0", + "pyinstaller>=6.13.0", +] + +[project.scripts] +tstoy = "main:main" + +[tool.setuptools] +packages = ["commands", "core", "resources", "utils"] diff --git a/samples/python/resources/first/src/resources/strings.py b/samples/python/resources/first/src/resources/strings.py new file mode 100644 index 0000000..fe7bc6e --- /dev/null +++ b/samples/python/resources/first/src/resources/strings.py @@ -0,0 +1,7 @@ +# TODO: Convert to localization strings +class Strings: + CONFIG_NOT_FOUND = "Configuration file not found: {}" + CONFIG_LOADED = "Configuration loaded from: {}" + CONFIG_INVALID = "Invalid configuration file {}: {}" + CONFIG_UPDATED = "Configuration saved to: {}" + ERROR_WRITE_CONFIG = "Error writing configuration to {}: {}" diff --git a/samples/python/resources/first/src/schema/schema.py b/samples/python/resources/first/src/schema/schema.py new file mode 100644 index 0000000..b0208ce --- /dev/null +++ b/samples/python/resources/first/src/schema/schema.py @@ -0,0 +1,48 @@ +import jsonschema +import json + +RESOURCE_SCHEMA = { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "Python TSToy Resource", + "type": "object", + "required": ["scope"], + "additionalProperties": False, + "properties": { + "scope": { + "title": "Target configuration scope", + "description": "Defines which of TSToy's config files to manage.", + "type": "string", + "enum": ["machine", "user"], + }, + "_exist": { + "title": "Should configuration exist", + "description": "Defines whether the config file should exist.", + "type": "boolean", + "default": True, + }, + "updateAutomatically": { + "title": "Should update automatically", + "description": "Indicates whether TSToy should check for updates when it starts.", + "type": "boolean", + }, + "updateFrequency": { + "title": "Update check frequency", + "description": "Indicates how many days TSToy should wait before checking for updates.", + "type": "integer", + "minimum": 1, + "maximum": 180, + }, + } +} + +def validate_resource(instance): + """Validate resource instance against schema.""" + try: + jsonschema.validate(instance=instance, schema=RESOURCE_SCHEMA) + return True, None + except jsonschema.exceptions.ValidationError as err: + return False, f"Validation error: {err.message}" + +def get_schema(): + """Dump the schema as formatted JSON string.""" + return json.dumps(RESOURCE_SCHEMA, separators=(',', ':')) \ No newline at end of file diff --git a/samples/python/resources/first/src/utils/logger.py b/samples/python/resources/first/src/utils/logger.py new file mode 100644 index 0000000..c158c96 --- /dev/null +++ b/samples/python/resources/first/src/utils/logger.py @@ -0,0 +1,109 @@ +import json +import sys +import datetime +import inspect +from typing import Dict, Any +from enum import Enum + + +class LogLevel(Enum): + """Enumeration for log levels""" + DEBUG = "DEBUG" + INFO = "INFO" + WARNING = "WARNING" + ERROR = "ERROR" + CRITICAL = "CRITICAL" + + +class Logger: + """ + A structured JSON logger class that outputs messages to stderr. + + Features: + - JSON formatted output + - Configurable log levels + - Automatic timestamp generation + - Caller information tracking + - Customizable output stream + """ + + def __init__(self, output_stream=None, include_caller_info: bool = True): + self.output_stream = output_stream or sys.stderr + self.include_caller_info = include_caller_info + + def _get_caller_info(self) -> Dict[str, Any]: + if not self.include_caller_info: + return {} + + try: + # Get the frame of the caller (skip internal methods) + frame = inspect.currentframe() + for _ in range(3): # Skip _get_caller_info, _log, and the log level method + frame = frame.f_back + if frame is None: + break + + if frame: + return { + "file": frame.f_code.co_filename.split('\\')[-1], # Just filename + "line": frame.f_lineno, + "function": frame.f_code.co_name + } + except Exception: + pass + + return {} + + def _log(self, level: LogLevel, message: str, target: str = None, **kwargs): + log_entry = { + "timestamp": datetime.datetime.now().isoformat() + "Z", + "level": level.value, + "fields": {"message": message}, + "target": target or "unknown" + } + + # Add caller information if enabled + caller_info = self._get_caller_info() + if caller_info: + log_entry["line_number"] = caller_info.get("line", "Unknown") + log_entry["file"] = caller_info.get("file", "Unknown") + log_entry["function"] = caller_info.get("function", "Unknown") + + # Add any additional fields to the fields section + if kwargs: + log_entry["fields"].update(kwargs) + + try: + json_output = json.dumps(log_entry, separators=(",", ":")) + self.output_stream.write(json_output + '\n') + self.output_stream.flush() + except Exception as e: + # Fallback to basic error output + fallback_msg = f"[LOG ERROR] Failed to write log: {str(e)}\n" + self.output_stream.write(fallback_msg) + self.output_stream.flush() + + def debug(self, message: str, target: str = None, **kwargs): + self._log(LogLevel.DEBUG, message, target, **kwargs) + + def info(self, message: str, target: str = None, **kwargs): + self._log(LogLevel.INFO, message, target, **kwargs) + + def warning(self, message: str, target: str = None, **kwargs): + self._log(LogLevel.WARNING, message, target, **kwargs) + + def error(self, message: str, target: str = None, **kwargs): + self._log(LogLevel.ERROR, message, target, **kwargs) + + def critical(self, message: str, target: str = None, **kwargs): + self._log(LogLevel.CRITICAL, message, target, **kwargs) + + def log_config_loaded(self, config_path: str, config_type: str, **kwargs): + self.info(f"Loaded {config_type} configuration", "config_manager", + config_path=config_path, **kwargs) + + def log_config_error(self, error_msg: str, config_path: str = None, **kwargs): + self.error(f"Configuration error: {error_msg}", "config_manager", + config_path=config_path, **kwargs) + + diff --git a/samples/python/resources/first/tests/acceptance.tests.ps1 b/samples/python/resources/first/tests/acceptance.tests.ps1 new file mode 100644 index 0000000..54a468d --- /dev/null +++ b/samples/python/resources/first/tests/acceptance.tests.ps1 @@ -0,0 +1,81 @@ +param ( + [string]$Name = 'pythontstoy' +) + +BeforeAll { + $oldPath = $env:Path + $env:Path = [System.IO.Path]::PathSeparator + (Join-Path (Split-Path $PSScriptRoot -Parent) 'dist') + + if ($IsWindows) { + $script:machinePath = Join-Path $env:ProgramData 'tstoy' 'config.json' + $script:userPath = Join-Path $env:APPDATA 'tstoy' 'config.json' + } + else { + $script:machinePath = Join-Path $env:HOME '.config' 'tstoy' 'config.json' + $script:userPath = Join-Path $env:HOME '.config' 'tstoy' 'config.json' + } +} + +Describe 'TSToy acceptance tests' { + Context "Help command" { + It 'Should return help' { + $help = & $Name --help + $help | Should -Not -BeNullOrEmpty + $LASTEXITCODE | Should -Be 0 + } + } + + Context "Input validation" { + It 'Should fail with invalid input' { + $out = & $Name get --input '{}' 2>&1 + $LASTEXITCODE | Should -Be 1 + $out | Should -BeLike '*"level":"ERROR"*"message":"Input validation failed: Validation error: ''scope'' is a required property"*' + } + } + + Context "Scope validation" -ForEach @( @{ scope = 'user' }, @{ scope = 'machine' } ) { + BeforeAll { + if ($IsWindows) { + Remove-Item -Path $script:userPath -ErrorAction Ignore + Remove-Item -Path $script:machinePath -ErrorAction Ignore + } + elseif ($IsLinux) { + Remove-Item -Path $script:userPath -ErrorAction Ignore + Remove-Item -Path $script:machinePath -ErrorAction Ignore + } + } + + It "Should not exist scope: " { + $out = & $Name get --input ($_ | ConvertTo-Json -Depth 10) | ConvertFrom-Json + $LASTEXITCODE | Should -Be 0 + $out._exist | Should -BeFalse + } + + It 'Should exist when file is present' { + $config = @{ + updates = @{ + updateAutomatically = $false + updateFrequency = 180 + } + } | ConvertTo-Json -Depth 10 + + if ($_.scope -eq 'user') { + $scriptPath = $script:userPath + } + else { + $scriptPath = $script:machinePath + } + + New-Item -Path $scriptPath -ItemType File -Value $config -Force | Out-Null + + $out = & $Name get --input ($_ | ConvertTo-Json -Depth 10) | ConvertFrom-Json + $LASTEXITCODE | Should -Be 0 + $out._exist | Should -BeTrue + } + } +} + + +AfterAll { + $env:Path = $oldPath +} \ No newline at end of file From c83cd788b7be2ba549810bead97ad3c4da3d1e1d Mon Sep 17 00:00:00 2001 From: "G.Reijn" Date: Fri, 13 Jun 2025 11:12:37 +0200 Subject: [PATCH 2/5] Init setup for TuxCtl resource --- samples/python/resources/first/.gitignore | 166 --------- samples/python/resources/first/README.md | 3 - samples/python/resources/first/build.ps1 | 95 ------ .../resources/first/src/commands/get.py | 51 --- .../resources/first/src/config/config.py | 233 ------------- .../resources/first/src/config/manager.py | 171 ---------- .../resources/first/src/core/console.py | 12 - samples/python/resources/first/src/main.py | 15 - .../resources/first/src/models/models.py | 25 -- .../python/resources/first/src/pyproject.toml | 21 -- .../resources/first/src/resources/strings.py | 7 - .../resources/first/src/schema/schema.py | 48 --- .../resources/first/src/utils/logger.py | 109 ------ .../first/tests/acceptance.tests.ps1 | 81 ----- samples/python/second/.gitignore | 109 ++++++ samples/python/second/commands/common.py | 57 ++++ samples/python/second/commands/get.py | 28 ++ samples/python/second/commands/root.py | 30 ++ samples/python/second/commands/set.py | 51 +++ samples/python/second/main.py | 4 + samples/python/second/models/dsc_user.py | 159 +++++++++ samples/python/second/schema/schema.py | 53 +++ samples/python/second/utils/logger.py | 37 ++ samples/python/second/utils/utils.py | 323 ++++++++++++++++++ 24 files changed, 851 insertions(+), 1037 deletions(-) delete mode 100644 samples/python/resources/first/.gitignore delete mode 100644 samples/python/resources/first/README.md delete mode 100644 samples/python/resources/first/build.ps1 delete mode 100644 samples/python/resources/first/src/commands/get.py delete mode 100644 samples/python/resources/first/src/config/config.py delete mode 100644 samples/python/resources/first/src/config/manager.py delete mode 100644 samples/python/resources/first/src/core/console.py delete mode 100644 samples/python/resources/first/src/main.py delete mode 100644 samples/python/resources/first/src/models/models.py delete mode 100644 samples/python/resources/first/src/pyproject.toml delete mode 100644 samples/python/resources/first/src/resources/strings.py delete mode 100644 samples/python/resources/first/src/schema/schema.py delete mode 100644 samples/python/resources/first/src/utils/logger.py delete mode 100644 samples/python/resources/first/tests/acceptance.tests.ps1 create mode 100644 samples/python/second/.gitignore create mode 100644 samples/python/second/commands/common.py create mode 100644 samples/python/second/commands/get.py create mode 100644 samples/python/second/commands/root.py create mode 100644 samples/python/second/commands/set.py create mode 100644 samples/python/second/main.py create mode 100644 samples/python/second/models/dsc_user.py create mode 100644 samples/python/second/schema/schema.py create mode 100644 samples/python/second/utils/logger.py create mode 100644 samples/python/second/utils/utils.py diff --git a/samples/python/resources/first/.gitignore b/samples/python/resources/first/.gitignore deleted file mode 100644 index 863ea61..0000000 --- a/samples/python/resources/first/.gitignore +++ /dev/null @@ -1,166 +0,0 @@ -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[cod] -*$py.class - -# C extensions -*.so - -# Distribution / packaging -.Python -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -share/python-wheels/ -*.egg-info/ -.installed.cfg -*.egg -MANIFEST - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.nox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -*.py,cover -.hypothesis/ -.pytest_cache/ -cover/ - -# Translations -*.mo -*.pot - -# Django stuff: -*.log -local_settings.py -db.sqlite3 -db.sqlite3-journal - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ - -# PyBuilder -.pybuilder/ -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# IPython -profile_default/ -ipython_config.py - -# pyenv -# For a library or package, you might want to ignore these files since the code is -# intended to run in multiple environments; otherwise, check them in: -# .python-version - -# pipenv -# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. -# However, in case of collaboration, if having platform-specific dependencies or dependencies -# having no cross-platform support, pipenv may install dependencies that don't work, or not -# install all needed dependencies. -#Pipfile.lock - -# poetry -# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control -#poetry.lock - -# pdm -# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. -#pdm.lock -# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it -# in version control. -# https://pdm.fming.dev/#use-with-ide -.pdm.toml - -# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm -__pypackages__/ - -# Celery stuff -celerybeat-schedule -celerybeat.pid - -# SageMath parsed files -*.sage.py - -# Environments -.env -.venv -env/ -venv/ -ENV/ -env.bak/ -venv.bak/ - -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site - -# mypy -.mypy_cache/ -.dmypy.json -dmypy.json - -# Pyre type checker -.pyre/ - -# pytype static type analyzer -.pytype/ - -# Cython debug symbols -cython_debug/ - -# PyCharm -# JetBrains specific template is maintained in a separate JetBrains.gitignore that can -# be added to the global gitignore or merged into this project gitignore. For PyCharm -# Community Edition, use 'PyCharm CE' in the configurations. -.idea/ - -# Build output -python.zip - -# uv -.uv/ -uv.lock \ No newline at end of file diff --git a/samples/python/resources/first/README.md b/samples/python/resources/first/README.md deleted file mode 100644 index 3b4bcce..0000000 --- a/samples/python/resources/first/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# Python TSToy resource - -To be filled in diff --git a/samples/python/resources/first/build.ps1 b/samples/python/resources/first/build.ps1 deleted file mode 100644 index 44d662f..0000000 --- a/samples/python/resources/first/build.ps1 +++ /dev/null @@ -1,95 +0,0 @@ -[CmdletBinding()] -param ( - [ValidateSet('build', 'test')] - [string]$mode = 'build', - [string]$name = 'pythontstoy' -) - -function Build-PythonProject { - [CmdletBinding()] - param ( - [Parameter()] - [string]$Name - ) - begin { - Write-Verbose -Message "Starting Python project build process" - - $sourceDir = Join-Path -Path $PSScriptRoot -ChildPath 'src' - $outputDir = Join-Path -Path $PSScriptRoot -ChildPath 'dist' - } - - process { - Install-Uv - - Push-Location -Path $sourceDir -ErrorAction Stop - - try { - # Create virtual environment - & uv venv - - # Activate it - & .\.venv\Scripts\activate.ps1 - - # Sync all the dependencies - & uv sync - - # Create executable - $pyInstallerArgs = @( - 'main.py', - '-F', - '--clean', - '--distpath', $outputDir, - '--name', $Name - ) - & pyinstaller.exe @pyInstallerArgs - } - finally { - deactivate - Pop-Location -ErrorAction Ignore - } - } - - end { - Write-Verbose -Message "Python project build process completed" - } -} - -function Install-Uv() { - begin { - Write-Verbose -Message "Installing Uv dependencies" - } - - process { - if ($IsWindows) { - if (-not (Get-Command uv -ErrorAction SilentlyContinue)) { - Write-Verbose -Message "Installing uv package manager on Windows" - Invoke-RestMethod https://astral.sh/uv/install.ps1 | Invoke-Expression - - } - $env:Path = "$env:USERPROFILE\.local\bin;$env:Path" - } - elseif ($IsLinux) { - curl -LsSf https://astral.sh/uv/install.sh | sh - } - } - - end { - Write-Verbose -Message "Uv installation process completed" - } -} - -switch ($mode) { - 'build' { - Build-PythonProject -Name $name - } - 'test' { - Build-PythonProject -Name $name - - $testContainer = New-PesterContainer -Path (Join-Path 'tests' 'acceptance.tests.ps1') -Data @{ - Name = $name - } - - Invoke-Pester -Container $testContainer -Output Detailed - } - -} \ No newline at end of file diff --git a/samples/python/resources/first/src/commands/get.py b/samples/python/resources/first/src/commands/get.py deleted file mode 100644 index f66ec61..0000000 --- a/samples/python/resources/first/src/commands/get.py +++ /dev/null @@ -1,51 +0,0 @@ -import click -import json -import sys -from utils.logger import Logger -from config.config import Settings - -# Create a logger instance for this module -logger = Logger() - -@click.command('get') -@click.option('--input', 'input_json', - help='JSON input data with required scope field', - required=True, - type=str) -def get_command(input_json): - """Get command that retrieves configuration based on scope from validated JSON input""" - - try: - try: - data = json.loads(input_json) - except json.JSONDecodeError as e: - logger.error(f"Invalid JSON format - {e}", "get_command", input_data=input_json) - sys.exit(1) - - if not Settings.validate_input(data, logger): - sys.exit(1) - - settings = Settings.from_dict(data) - - if not settings.validate(): - logger.error(f"Settings validation failed for scope: {settings.scope}", "get_command", scope=settings.scope) - sys.exit(1) - - logger.info(f"Processing settings for scope: {settings.scope}", "get_command", scope=settings.scope) - - result_settings, err = settings.enforce() - if err: - logger.error(f"Failed to enforce settings for scope {settings.scope}", "get_command", - scope=settings.scope, error_details=str(err)) - sys.exit(1) - - result_settings.print_config() - logger.info(f"Settings retrieved successfully for scope: {settings.scope}", "get_command", - scope=settings.scope) - - return result_settings - - except Exception as e: - logger.critical(f"Unexpected error in get_command", "get_command", - error_type=type(e).__name__, error_message=str(e)) - sys.exit(1) \ No newline at end of file diff --git a/samples/python/resources/first/src/config/config.py b/samples/python/resources/first/src/config/config.py deleted file mode 100644 index e015991..0000000 --- a/samples/python/resources/first/src/config/config.py +++ /dev/null @@ -1,233 +0,0 @@ -from dataclasses import dataclass, field -from typing import Literal, Optional, Dict, Any, Tuple, Set -from pathlib import Path -from config.manager import ConfigManager -from utils.logger import Logger -from schema.schema import validate_resource, RESOURCE_SCHEMA -import json - -@dataclass -class Settings: - scope: Literal["user", "machine"] - _exist: bool = True - updateAutomatically: Optional[bool] = None - updateFrequency: Optional[int] = None - config_path: str = field(default="", init=False, repr=False) - _provided_properties: Set[str] = field(default_factory=set, init=False, repr=False) - - def __post_init__(self): - """Initialize dependencies after dataclass creation""" - self.config_manager = ConfigManager() - self.logger = Logger() - - @classmethod - def validate_input(cls, data: Dict[str, Any], logger: Logger) -> bool: - is_valid, validation_error = validate_resource(data) - if not is_valid: - logger.error(f"Input validation failed: {validation_error}", "Settings") - return False - - allowed_properties = list(RESOURCE_SCHEMA["properties"].keys()) - logger.info(f"Valid properties per schema: {allowed_properties}", "Settings") - logger.info(f"Provided properties: {list(data.keys())}", "Settings") - - return True - - def get_config_path(self) -> Tuple[str, Optional[Exception]]: - try: - if not self.scope: - return "", ValueError("scope is required") - - if self.scope == "machine": - config_path = self.config_manager.get_machine_config_path() - elif self.scope == "user": - config_path = self.config_manager.get_user_config_path() - else: - return "", ValueError(f"invalid scope: {self.scope}") - - self.config_path = str(config_path) - return self.config_path, None - - except Exception as e: - return "", e - - def get_config_map(self) -> Tuple[Optional[Dict[str, Any]], Optional[Exception]]: - try: - config_path, err = self.get_config_path() - if err: - return None, err - - if not Path(config_path).exists(): - self.logger.info(f"Config file not found: {config_path}", "Settings") - return {}, None # Return empty map if file doesn't exist - - # Load configuration - config_data = self.config_manager.load_config_file(Path(config_path)) - if config_data is None: - self.logger.info(f"Config file loaded but empty: {config_path}", "Settings") - return {}, None - - self.logger.info(f"Config loaded successfully: {json.dumps(config_data)}", "Settings") - return config_data, None - - except Exception as e: - return None, e - - def get_config_settings(self) -> Tuple[Optional['Settings'], Optional[Exception]]: - try: - config_map, err = self.get_config_map() - if err: - return None, err - - if not config_map: - self.logger.info("No configuration found, returning with _exist=False", "Settings") - return Settings(scope=self.scope, _exist=False), None - - settings = Settings(scope=self.scope, _exist=True) - - if 'updates' in config_map: - updates = config_map['updates'] - - if 'updateAutomatically' in updates: - settings.updateAutomatically = bool(updates['updateAutomatically']) - - if 'updateFrequency' in updates: - settings.updateFrequency = int(updates['updateFrequency']) - - self.logger.info(f"Loaded settings - updateAutomatically: {settings.updateAutomatically}, " + - f"updateFrequency: {settings.updateFrequency}", "Settings") - else: - self.logger.info("No 'updates' section found in config file", "Settings") - - return settings, None - - except Exception as e: - self.logger.error(f"Error in get_config_settings: {str(e)}", "Settings") - return None, e - - def enforce(self) -> Tuple[Optional['Settings'], Optional[Exception]]: - try: - current_settings, err = self.get_config_settings() - if err: - return None, err - - if not current_settings: - self.logger.info("Failed to get settings, returning with scope only", "Settings") - return Settings(scope=self.scope, _exist=True), None - - if not Path(self.config_path).exists(): - self.logger.info("Config file doesn't exist, returning scope only with _exist=false", "Settings") - result = Settings(scope=self.scope, _exist=False) - result.updateAutomatically = None - result.updateFrequency = None - return result, None - - self.logger.info("Current settings from config file: " + - f"updateAutomatically={current_settings.updateAutomatically}, " + - f"updateFrequency={current_settings.updateFrequency}", "Settings") - - if self._has_properties_to_validate(): - validated_settings = self._validate_settings(current_settings) - return validated_settings, None - else: - self.logger.info("No properties to validate, returning all: " + - f"updateAutomatically={current_settings.updateAutomatically}, " + - f"updateFrequency={current_settings.updateFrequency}", "Settings") - - return current_settings, None - - except Exception as e: - self.logger.error(f"Error in enforce: {str(e)}", "Settings") - return None, e - - def _has_properties_to_validate(self) -> bool: - properties = set(self._provided_properties) - if 'scope' in properties: - properties.remove('scope') - if '_exist' in properties: - properties.remove('_exist') - - has_properties = len(properties) > 0 - self.logger.info(f"Properties to validate: {properties}, has_properties: {has_properties}", "Settings") - return has_properties - - def _validate_settings(self, current_settings: 'Settings') -> 'Settings': - validated_settings = Settings( - scope=self.scope, - _exist=True, # Start with True, will be set to False if validation fails - updateAutomatically=current_settings.updateAutomatically, - updateFrequency=current_settings.updateFrequency - ) - - validation_failed = False - - if 'updateAutomatically' in self._provided_properties: - if current_settings.updateAutomatically is None: - validation_failed = True - self.logger.info(f"updateAutomatically not found in config (requested: {self.updateAutomatically})", "Settings") - elif current_settings.updateAutomatically != self.updateAutomatically: - validation_failed = True - self.logger.info(f"updateAutomatically mismatch - requested: {self.updateAutomatically}, found: {current_settings.updateAutomatically}", "Settings") - else: - self.logger.info(f"updateAutomatically validation passed: {self.updateAutomatically}", "Settings") - - if 'updateFrequency' in self._provided_properties: - if current_settings.updateFrequency is None: - validation_failed = True - self.logger.info(f"updateFrequency not found in config (requested: {self.updateFrequency})", "Settings") - elif current_settings.updateFrequency != self.updateFrequency: - validation_failed = True - self.logger.info(f"updateFrequency mismatch - requested: {self.updateFrequency}, found: {current_settings.updateFrequency}", "Settings") - else: - self.logger.info(f"updateFrequency validation passed: {self.updateFrequency}", "Settings") - - if validation_failed: - validated_settings._exist = False - self.logger.info("Validation failed, setting _exist to False", "Settings") - else: - self.logger.info("All validations passed, _exist remains True", "Settings") - - return validated_settings - - def to_json(self, exclude_private: bool = False, exclude_none: bool = True) -> str: - data = {} - data["_exist"] = self._exist - - if self.scope is not None: - data["scope"] = self.scope - - if not exclude_none or self.updateAutomatically is not None: - data["updateAutomatically"] = self.updateAutomatically - - if not exclude_none or self.updateFrequency is not None: - data["updateFrequency"] = self.updateFrequency - - if exclude_none: - data = {k: v for k, v in data.items() if v is not None or k == "_exist"} - - self.logger.info(f"Serializing to JSON: {data}", "Settings") - - return json.dumps(data) - - def print_config(self) -> None: - json_output = self.to_json(exclude_private=False, exclude_none=True) - self.logger.info(f"Printing configuration: {json_output}", "Settings") - print(json_output) - - @classmethod - def from_dict(cls, data: Dict[str, Any]) -> 'Settings': - settings = cls( - scope=data.get('scope'), - _exist=data.get('_exist', True), # Default to True if not specified - updateAutomatically=data.get('updateAutomatically'), - updateFrequency=data.get('updateFrequency') - ) - - settings._provided_properties = set(data.keys()) - - return settings - - def validate(self) -> bool: - if not self.scope or self.scope not in ['user', 'machine']: - return False - return True \ No newline at end of file diff --git a/samples/python/resources/first/src/config/manager.py b/samples/python/resources/first/src/config/manager.py deleted file mode 100644 index 25f5d45..0000000 --- a/samples/python/resources/first/src/config/manager.py +++ /dev/null @@ -1,171 +0,0 @@ -import os -import json -import pathlib -from typing import Dict, Any -from resources.strings import Strings -from utils.logger import Logger - - -class ConfigSource: - """Represents a configuration source.""" - DEFAULT = "default" - MACHINE = "machine" - USER = "user" - ENV = "environment" - CLI = "cli" - -class ConfigManager: - def __init__(self): - self.default_config = { - "updates": { - "updateAutomatically": False, - "updateFrequency": 180 - } - } - self.machine_config = {} - self.user_config = {} - self.env_config = {} - self.cli_config = {} - self.config = {} - - # Track loaded sources for reporting - self.loaded_sources = [] - - self.logger = Logger() - - def get_machine_config_path(self) -> pathlib.Path: - if os.name == 'nt': # Windows - return pathlib.Path(os.environ.get('PROGRAMDATA', 'C:/ProgramData')) / 'tstoy' / 'config.json' - else: # Unix-like - return pathlib.Path('/etc/tstoy/config.json') - - def get_user_config_path(self) -> pathlib.Path: - if os.name == 'nt': # Windows - config_dir = pathlib.Path(os.environ.get('APPDATA')) - else: # Unix-like - config_dir = pathlib.Path.home() / '.config' - - return config_dir / 'tstoy' / 'config.json' - - def load_config_file(self, path: pathlib.Path) -> Dict[str, Any]: - if not path.exists(): - self.logger.info(Strings.CONFIG_NOT_FOUND.format(path)) - return {} - - try: - with open(path, 'r') as f: - config = json.load(f) - self.logger.info(Strings.CONFIG_LOADED.format(path)) - return config - except json.JSONDecodeError as e: - self.logger.error(Strings.CONFIG_INVALID.format(path, str(e))) - return {} - except IOError as e: - self.logger.error(Strings.CONFIG_INVALID.format(path, str(e))) - return {} - - def save_config_file(self, path: pathlib.Path, config: Dict[str, Any]) -> bool: - try: - # Create parent directories if they don't exist - path.parent.mkdir(parents=True, exist_ok=True) - - with open(path, 'w') as f: - json.dump(config, f, indent=2) - self.logger.info(Strings.CONFIG_UPDATED.format(path)) - return True - except Exception as e: - self.logger.error(Strings.ERROR_WRITE_CONFIG.format(path, str(e))) - return False - - def load_default_config(self): - self.config = self.default_config.copy() - self.loaded_sources.append(ConfigSource.DEFAULT) - - def load_machine_config(self): - path = self.get_machine_config_path() - self.machine_config = self.load_config_file(path) - if self.machine_config: - self._merge_config(self.machine_config) - self.loaded_sources.append(ConfigSource.MACHINE) - - def load_user_config(self): - path = self.get_user_config_path() - self.user_config = self.load_config_file(path) - if self.user_config: - self._merge_config(self.user_config) - self.loaded_sources.append(ConfigSource.USER) - - def load_environment_config(self, prefix: str): - env_config = {} - for key, value in os.environ.items(): - if key.startswith(prefix): - # Convert DSCPY_UPDATES_AUTOMATIC to updates.automatic - config_key = key[len(prefix):].lower().replace('_', '.') - - # Convert string value to appropriate type - if value.lower() in ('true', 'yes', '1'): - env_config[config_key] = True - elif value.lower() in ('false', 'no', '0'): - env_config[config_key] = False - elif value.isdigit(): - env_config[config_key] = int(value) - else: - env_config[config_key] = value - - if env_config: - self.env_config = env_config - self._merge_config(env_config) - self.loaded_sources.append(ConfigSource.ENV) - - - def _merge_config(self, source: Dict[str, Any]): - def deep_merge(target, source): - for key, value in source.items(): - if key in target and isinstance(target[key], dict) and isinstance(value, dict): - deep_merge(target[key], value) - else: - target[key] = value - - deep_merge(self.config, source) - - def get_merged_config(self) -> Dict[str, Any]: - return self.config - - def get_config_sources(self) -> list: - return self.loaded_sources - - def get_all_config_files(self) -> list: - try: - user_config_dir = self.get_user_config_path().parent - if user_config_dir.exists(): - # Return a list of all JSON files in the config directory - return list(user_config_dir.glob('*.json')) - else: - self.logger.warning(f"Config directory does not exist: {user_config_dir}") - return [] - except Exception as e: - self.logger.error(f"Error enumerating config files: {str(e)}") - return [] - - def get_config_by_name(self, name: str) -> Dict[str, Any]: - if name == 'default': - return self.default_config.copy() - - # Try to find a specific config file with this name - user_config_dir = self.get_user_config_path().parent - config_path = user_config_dir / f"{name}.json" - - if config_path.exists(): - config = self.load_config_file(config_path) - return config - else: - # If no specific file exists, return the merged config - # This behavior can be changed based on requirements - self.logger.warning(f"No configuration found for name: {name}") - return self.get_merged_config() - - def load_all_configs(self, env_prefix: str = "TSTOY_"): - self.load_default_config() - self.load_machine_config() - self.load_user_config() - self.load_environment_config(env_prefix) diff --git a/samples/python/resources/first/src/core/console.py b/samples/python/resources/first/src/core/console.py deleted file mode 100644 index d5e95ac..0000000 --- a/samples/python/resources/first/src/core/console.py +++ /dev/null @@ -1,12 +0,0 @@ -class Console: - @staticmethod - def info(message: str): - print(f"INFO: {message}") - - @staticmethod - def error(message: str): - print(f"ERROR: {message}") - - @staticmethod - def warning(message: str): - print(f"WARNING: {message}") diff --git a/samples/python/resources/first/src/main.py b/samples/python/resources/first/src/main.py deleted file mode 100644 index 28c2ce3..0000000 --- a/samples/python/resources/first/src/main.py +++ /dev/null @@ -1,15 +0,0 @@ -import click -from commands.get import get_command - - -@click.group() -def main(): - """Python TSToy CLI tool.""" - pass - -# TODO: Add more commands and move away in main.py -main.add_command(get_command) - - -if __name__ == '__main__': - main() diff --git a/samples/python/resources/first/src/models/models.py b/samples/python/resources/first/src/models/models.py deleted file mode 100644 index 501e57a..0000000 --- a/samples/python/resources/first/src/models/models.py +++ /dev/null @@ -1,25 +0,0 @@ -from dataclasses import dataclass, asdict -from typing import Literal, Optional - -import json - -@dataclass -class TsToy: - scope: Literal["user", "machine"] - _exist: bool = True - updateAutomatically: Optional[bool] = None - updateFrequency: Optional[int] = None - - def to_json(self, include_none: bool = False) -> str: - data = asdict(self) - if not include_none: - data = {k: v for k, v in data.items() if v is not None} - return json.dumps(data) - - def to_dict(self, include_none: bool = False) -> dict: - data = asdict(self) - - if not include_none: - data = {k: v for k, v in data.items() if v is not None} - - return data \ No newline at end of file diff --git a/samples/python/resources/first/src/pyproject.toml b/samples/python/resources/first/src/pyproject.toml deleted file mode 100644 index e3a0914..0000000 --- a/samples/python/resources/first/src/pyproject.toml +++ /dev/null @@ -1,21 +0,0 @@ -[build-system] -requires = ["setuptools>=45", "wheel"] -build-backend = "setuptools.build_meta" - -[project] -name = "tstoy" -version = "0.1.0" -description = "A command-line interface application built with Click" -readme = "README.md" -requires-python = ">=3.12" -dependencies = [ - "click>=8.2.1", - "jsonschema>=4.24.0", - "pyinstaller>=6.13.0", -] - -[project.scripts] -tstoy = "main:main" - -[tool.setuptools] -packages = ["commands", "core", "resources", "utils"] diff --git a/samples/python/resources/first/src/resources/strings.py b/samples/python/resources/first/src/resources/strings.py deleted file mode 100644 index fe7bc6e..0000000 --- a/samples/python/resources/first/src/resources/strings.py +++ /dev/null @@ -1,7 +0,0 @@ -# TODO: Convert to localization strings -class Strings: - CONFIG_NOT_FOUND = "Configuration file not found: {}" - CONFIG_LOADED = "Configuration loaded from: {}" - CONFIG_INVALID = "Invalid configuration file {}: {}" - CONFIG_UPDATED = "Configuration saved to: {}" - ERROR_WRITE_CONFIG = "Error writing configuration to {}: {}" diff --git a/samples/python/resources/first/src/schema/schema.py b/samples/python/resources/first/src/schema/schema.py deleted file mode 100644 index b0208ce..0000000 --- a/samples/python/resources/first/src/schema/schema.py +++ /dev/null @@ -1,48 +0,0 @@ -import jsonschema -import json - -RESOURCE_SCHEMA = { - "$schema": "https://json-schema.org/draft/2020-12/schema", - "title": "Python TSToy Resource", - "type": "object", - "required": ["scope"], - "additionalProperties": False, - "properties": { - "scope": { - "title": "Target configuration scope", - "description": "Defines which of TSToy's config files to manage.", - "type": "string", - "enum": ["machine", "user"], - }, - "_exist": { - "title": "Should configuration exist", - "description": "Defines whether the config file should exist.", - "type": "boolean", - "default": True, - }, - "updateAutomatically": { - "title": "Should update automatically", - "description": "Indicates whether TSToy should check for updates when it starts.", - "type": "boolean", - }, - "updateFrequency": { - "title": "Update check frequency", - "description": "Indicates how many days TSToy should wait before checking for updates.", - "type": "integer", - "minimum": 1, - "maximum": 180, - }, - } -} - -def validate_resource(instance): - """Validate resource instance against schema.""" - try: - jsonschema.validate(instance=instance, schema=RESOURCE_SCHEMA) - return True, None - except jsonschema.exceptions.ValidationError as err: - return False, f"Validation error: {err.message}" - -def get_schema(): - """Dump the schema as formatted JSON string.""" - return json.dumps(RESOURCE_SCHEMA, separators=(',', ':')) \ No newline at end of file diff --git a/samples/python/resources/first/src/utils/logger.py b/samples/python/resources/first/src/utils/logger.py deleted file mode 100644 index c158c96..0000000 --- a/samples/python/resources/first/src/utils/logger.py +++ /dev/null @@ -1,109 +0,0 @@ -import json -import sys -import datetime -import inspect -from typing import Dict, Any -from enum import Enum - - -class LogLevel(Enum): - """Enumeration for log levels""" - DEBUG = "DEBUG" - INFO = "INFO" - WARNING = "WARNING" - ERROR = "ERROR" - CRITICAL = "CRITICAL" - - -class Logger: - """ - A structured JSON logger class that outputs messages to stderr. - - Features: - - JSON formatted output - - Configurable log levels - - Automatic timestamp generation - - Caller information tracking - - Customizable output stream - """ - - def __init__(self, output_stream=None, include_caller_info: bool = True): - self.output_stream = output_stream or sys.stderr - self.include_caller_info = include_caller_info - - def _get_caller_info(self) -> Dict[str, Any]: - if not self.include_caller_info: - return {} - - try: - # Get the frame of the caller (skip internal methods) - frame = inspect.currentframe() - for _ in range(3): # Skip _get_caller_info, _log, and the log level method - frame = frame.f_back - if frame is None: - break - - if frame: - return { - "file": frame.f_code.co_filename.split('\\')[-1], # Just filename - "line": frame.f_lineno, - "function": frame.f_code.co_name - } - except Exception: - pass - - return {} - - def _log(self, level: LogLevel, message: str, target: str = None, **kwargs): - log_entry = { - "timestamp": datetime.datetime.now().isoformat() + "Z", - "level": level.value, - "fields": {"message": message}, - "target": target or "unknown" - } - - # Add caller information if enabled - caller_info = self._get_caller_info() - if caller_info: - log_entry["line_number"] = caller_info.get("line", "Unknown") - log_entry["file"] = caller_info.get("file", "Unknown") - log_entry["function"] = caller_info.get("function", "Unknown") - - # Add any additional fields to the fields section - if kwargs: - log_entry["fields"].update(kwargs) - - try: - json_output = json.dumps(log_entry, separators=(",", ":")) - self.output_stream.write(json_output + '\n') - self.output_stream.flush() - except Exception as e: - # Fallback to basic error output - fallback_msg = f"[LOG ERROR] Failed to write log: {str(e)}\n" - self.output_stream.write(fallback_msg) - self.output_stream.flush() - - def debug(self, message: str, target: str = None, **kwargs): - self._log(LogLevel.DEBUG, message, target, **kwargs) - - def info(self, message: str, target: str = None, **kwargs): - self._log(LogLevel.INFO, message, target, **kwargs) - - def warning(self, message: str, target: str = None, **kwargs): - self._log(LogLevel.WARNING, message, target, **kwargs) - - def error(self, message: str, target: str = None, **kwargs): - self._log(LogLevel.ERROR, message, target, **kwargs) - - def critical(self, message: str, target: str = None, **kwargs): - self._log(LogLevel.CRITICAL, message, target, **kwargs) - - def log_config_loaded(self, config_path: str, config_type: str, **kwargs): - self.info(f"Loaded {config_type} configuration", "config_manager", - config_path=config_path, **kwargs) - - def log_config_error(self, error_msg: str, config_path: str = None, **kwargs): - self.error(f"Configuration error: {error_msg}", "config_manager", - config_path=config_path, **kwargs) - - diff --git a/samples/python/resources/first/tests/acceptance.tests.ps1 b/samples/python/resources/first/tests/acceptance.tests.ps1 deleted file mode 100644 index 54a468d..0000000 --- a/samples/python/resources/first/tests/acceptance.tests.ps1 +++ /dev/null @@ -1,81 +0,0 @@ -param ( - [string]$Name = 'pythontstoy' -) - -BeforeAll { - $oldPath = $env:Path - $env:Path = [System.IO.Path]::PathSeparator + (Join-Path (Split-Path $PSScriptRoot -Parent) 'dist') - - if ($IsWindows) { - $script:machinePath = Join-Path $env:ProgramData 'tstoy' 'config.json' - $script:userPath = Join-Path $env:APPDATA 'tstoy' 'config.json' - } - else { - $script:machinePath = Join-Path $env:HOME '.config' 'tstoy' 'config.json' - $script:userPath = Join-Path $env:HOME '.config' 'tstoy' 'config.json' - } -} - -Describe 'TSToy acceptance tests' { - Context "Help command" { - It 'Should return help' { - $help = & $Name --help - $help | Should -Not -BeNullOrEmpty - $LASTEXITCODE | Should -Be 0 - } - } - - Context "Input validation" { - It 'Should fail with invalid input' { - $out = & $Name get --input '{}' 2>&1 - $LASTEXITCODE | Should -Be 1 - $out | Should -BeLike '*"level":"ERROR"*"message":"Input validation failed: Validation error: ''scope'' is a required property"*' - } - } - - Context "Scope validation" -ForEach @( @{ scope = 'user' }, @{ scope = 'machine' } ) { - BeforeAll { - if ($IsWindows) { - Remove-Item -Path $script:userPath -ErrorAction Ignore - Remove-Item -Path $script:machinePath -ErrorAction Ignore - } - elseif ($IsLinux) { - Remove-Item -Path $script:userPath -ErrorAction Ignore - Remove-Item -Path $script:machinePath -ErrorAction Ignore - } - } - - It "Should not exist scope: " { - $out = & $Name get --input ($_ | ConvertTo-Json -Depth 10) | ConvertFrom-Json - $LASTEXITCODE | Should -Be 0 - $out._exist | Should -BeFalse - } - - It 'Should exist when file is present' { - $config = @{ - updates = @{ - updateAutomatically = $false - updateFrequency = 180 - } - } | ConvertTo-Json -Depth 10 - - if ($_.scope -eq 'user') { - $scriptPath = $script:userPath - } - else { - $scriptPath = $script:machinePath - } - - New-Item -Path $scriptPath -ItemType File -Value $config -Force | Out-Null - - $out = & $Name get --input ($_ | ConvertTo-Json -Depth 10) | ConvertFrom-Json - $LASTEXITCODE | Should -Be 0 - $out._exist | Should -BeTrue - } - } -} - - -AfterAll { - $env:Path = $oldPath -} \ No newline at end of file diff --git a/samples/python/second/.gitignore b/samples/python/second/.gitignore new file mode 100644 index 0000000..63547ab --- /dev/null +++ b/samples/python/second/.gitignore @@ -0,0 +1,109 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json \ No newline at end of file diff --git a/samples/python/second/commands/common.py b/samples/python/second/commands/common.py new file mode 100644 index 0000000..37911e8 --- /dev/null +++ b/samples/python/second/commands/common.py @@ -0,0 +1,57 @@ +import click + +def common(function): + """Add common options to click commands""" + function = click.option( + "--input", + "input_json", + help="JSON input data with settings", + required=False, + type=str, + )(function) + + function = click.option( + "--username", + help="Username for authentication", + type=str, + required=False, + )(function) + + function = click.option( + "--password", + help="Password for authentication", + type=str, + required=False, + hide_input=True, + )(function) + + # Add additional user properties + function = click.option( + "--uid", + help="User ID number", + type=int, + required=False, + )(function) + + function = click.option( + "--gid", + help="Primary group ID", + type=int, + required=False, + )(function) + + function = click.option( + "--home", + help="Home directory path", + type=str, + required=False, + )(function) + + function = click.option( + "--shell", + help="Login shell path", + type=str, + required=False, + )(function) + + return function \ No newline at end of file diff --git a/samples/python/second/commands/get.py b/samples/python/second/commands/get.py new file mode 100644 index 0000000..3596fb1 --- /dev/null +++ b/samples/python/second/commands/get.py @@ -0,0 +1,28 @@ +import click +from utils.utils import get_input, get_requested_properties +from commands.common import common + + + +@click.command() +@common +def get(username, password, input_json, uid, gid, home, shell): + """ + Get Linux user information. + + This command takes either command line parameters or a JSON input to retrieve + information about a Linux user. + """ + user = get_input( + username=username, + password=password, + input_json=input_json, + uid=uid, + gid=gid, + home=home, + shell=shell + ) + + requested_properties = get_requested_properties(user) + + user.get_current_state(requested_properties) \ No newline at end of file diff --git a/samples/python/second/commands/root.py b/samples/python/second/commands/root.py new file mode 100644 index 0000000..8d4d49f --- /dev/null +++ b/samples/python/second/commands/root.py @@ -0,0 +1,30 @@ +import click +import sys +import os +from utils.logger import setup_logger +from commands.get import get +from commands.set import set + +# Setup logger for root commands +logger = setup_logger('commands.root') + +@click.group(invoke_without_command=True) +@click.pass_context +def root_command(ctx): + """Linux User Management CLI - Root Command.""" + + ctx.ensure_object(dict) + ctx.obj['logger'] = logger + + if os.name != 'nt' and hasattr(os, 'geteuid') and os.geteuid() != 0: + click.echo("This program requires root privileges to manage users. Please run with sudo.") + sys.exit(1) + + ctx.obj['logger'].info("Starting Linux User Management CLI") + + if ctx.invoked_subcommand is None: + click.echo(ctx.get_help()) + +# Register all available commands +root_command.add_command(get, name="get") +root_command.add_command(set, name="set") \ No newline at end of file diff --git a/samples/python/second/commands/set.py b/samples/python/second/commands/set.py new file mode 100644 index 0000000..a8a3086 --- /dev/null +++ b/samples/python/second/commands/set.py @@ -0,0 +1,51 @@ +import click +import sys +from typing import Optional +from commands.common import common +from utils.utils import get_input + +@click.command() +@common +@click.option('--what-if', is_flag=True, help='Show what would happen without making changes') +def set( + username: Optional[str], + password: Optional[str], + input_json: Optional[str], + uid: Optional[int], + gid: Optional[int], + home: Optional[str], + shell: Optional[str], + what_if: bool +): + """ + Create or update a Linux user. + + This command takes either command line parameters or a JSON input to create or + modify a Linux user. If the user already exists, it will be updated. + + Note: Group membership is read-only and cannot be modified with this command. + """ + try: + # Use get_input to create a user object from inputs + user = get_input( + username=username, + password=password, + input_json=input_json, + uid=uid, + gid=gid, + home=home, + shell=shell + ) + + # Call the modify method to create or update the user + user.modify(what_if=what_if) + + if not what_if: + click.echo(f"User '{user.username}' has been created or updated successfully.") + + except ValueError as e: + click.echo(f"Error: {str(e)}", err=True) + sys.exit(1) + except Exception as e: + click.echo(f"Error: {str(e)}", err=True) + sys.exit(1) \ No newline at end of file diff --git a/samples/python/second/main.py b/samples/python/second/main.py new file mode 100644 index 0000000..8313ecb --- /dev/null +++ b/samples/python/second/main.py @@ -0,0 +1,4 @@ +from commands.root import root_command + +if __name__ == "__main__": + root_command() \ No newline at end of file diff --git a/samples/python/second/models/dsc_user.py b/samples/python/second/models/dsc_user.py new file mode 100644 index 0000000..3b14639 --- /dev/null +++ b/samples/python/second/models/dsc_user.py @@ -0,0 +1,159 @@ +import json +from typing import Dict, Any +from utils.utils import check_user_exist, create_user, update_user, get_passwd_entry, get_user_groups +import sys + +def dsc_resource(cls): + """ + Decorator that marks a class as a DSC (Desired State Configuration) resource. + This allows the class to be used in configuration management. + """ + cls._is_dsc_resource = True + + # Add methods needed for DSC if they don't exist + if not hasattr(cls, 'test'): + cls.test = lambda self: self._exist + + if not hasattr(cls, 'get'): + cls.get = lambda self: self.__dict__ + + return cls + +@dsc_resource +class User: + """Linux User resource that can be managed via DSC.""" + def __init__(self): + """Initialize User with default values.""" + self.username = "" + self.password = "" + self.uid = None + self.gid = None + self.home = "" + self.shell = "" + self.groups = None + self._exist = False + + + def get_current_state(self, requested_properties=None): + """ + Check if the user exists and return only the requested properties. + + Args: + requested_properties: List of property names that were explicitly requested + If None, returns all available properties + + Returns: + Dict containing the requested user properties + """ + result = {} + + try: + user = check_user_exist(self.username) + + if user is not None: + self._exist = True + result["exist"] = True + + if user: + passwd_entry = get_passwd_entry(self.username) + user_groups = get_user_groups(self.username) + + self.uid = user.get('uid', 0) + self.gid = user.get('gid', 0) + self.groups = user_groups + self.home = passwd_entry.get('home', '') if passwd_entry else '' + self.shell = passwd_entry.get('shell', '/bin/bash') if passwd_entry else '/bin/bash' + + if requested_properties: + for prop_name, expected_value in requested_properties: + if prop_name in ['username', '_exist']: + continue + current_value = getattr(self, prop_name) + if expected_value != current_value: + self._exist = False + break + + if requested_properties: + for prop_name, _ in requested_properties: + if hasattr(self, prop_name): + result[prop_name] = getattr(self, prop_name) + else: + result.update({ + "uid": self.uid, + "gid": self.gid, + "groups": self.groups, + "home": self.home, + "shell": self.shell + }) + + if not self._exist: + print(json.dumps({ + "username": self.username, + "exist": False, + })) + else: + print(self.to_json(self)) + + except Exception as e: + print(f"Error occurred while getting current state for user '{self.username}': {e}") + sys.exit(1) + + def modify(self, what_if: bool = False) -> None: + exists = check_user_exist(self.username) + if what_if: + if exists: + print(f"Would update user '{self.username}'") + else: + print(f"Would create user '{self.username}'") + return + + if exists: + update_user( + username=self.username, + uid=self.uid, + gid=self.gid, + home=self.home, + shell=self.shell + ) + else: + create_user( + username=self.username, + uid=self.uid, + gid=self.gid, + home=self.home, + shell=self.shell + ) + + @classmethod + def to_dict(cls, item: 'User') -> Dict[str, Any]: + """Convert User to dictionary for serialization.""" + result = { + "username": item.username, + "uid": item.uid, + "gid": item.gid, + "home": item.home, + "shell": item.shell, + "groups": item.groups, + "exist": item._exist + } + + return result + + @classmethod + def to_json(cls, user: 'User') -> str: + return json.dumps(cls.to_dict(user), separators=(',', ':')) + + @classmethod + def from_json(cls, json_str: str) -> 'User': + """Create a User object from JSON string.""" + data = json.loads(json_str) + user = cls() + user.username = data.get('username', '') + user.password = data.get('password', '') + user.uid = data.get('uid') + user.gid = data.get('gid') + user.home = data.get('home') + user.shell = data.get('shell', '/bin/bash') + user.groups = data.get('groups', []) + user._exist = data.get('exists', False) + return user \ No newline at end of file diff --git a/samples/python/second/schema/schema.py b/samples/python/second/schema/schema.py new file mode 100644 index 0000000..c846be8 --- /dev/null +++ b/samples/python/second/schema/schema.py @@ -0,0 +1,53 @@ +import os +import json +import jsonschema +from typing import Dict, Any +from utils.logger import setup_logger + +logger = setup_logger('utils.schema_validator') + +# Default schema with username as required field +DEFAULT_USER_SCHEMA = { + "type": "object", + "properties": { + "username": {"type": "string"}, + "password": {"type": ["string", "null"]}, + "uid": {"type": ["integer", "null"]}, + "gid": {"type": ["integer", "null"]}, + "home": {"type": ["string", "null"]}, + "shell": {"type": "string"}, + "groups": { + "type": "array", + "items": {"type": "string"}, + "readOnly": True + } + }, + "required": ["username"], + "additionalProperties": False +} + +# TODO: Add proper error message handling and create dsc.resource.json file +def get_schema() -> Dict[str, Any]: + schema_path = os.path.join(os.path.dirname(__file__), '..', '.dsc.resource.json') + + if os.path.exists(schema_path): + try: + logger.info(f"Loading schema from {schema_path}") + with open(schema_path, 'r') as f: + return json.load(f) + except (json.JSONDecodeError, IOError) as e: + logger.warning(f"Failed to load schema from file: {e}, falling back to default") + else: + logger.info("Schema file not found, using default schema") + + # Fall back to embedded schema + return DEFAULT_USER_SCHEMA + +def validate_user_data(data: Dict[str, Any]) -> None: + schema = get_schema() + + try: + jsonschema.validate(instance=data, schema=schema) + except jsonschema.exceptions.ValidationError as e: + logger.error(f"Schema validation failed: {e}") + raise \ No newline at end of file diff --git a/samples/python/second/utils/logger.py b/samples/python/second/utils/logger.py new file mode 100644 index 0000000..a442947 --- /dev/null +++ b/samples/python/second/utils/logger.py @@ -0,0 +1,37 @@ +import logging +import sys + +# TODO: Fix up logger for DSC v3 engine +def setup_logger(name, level=logging.INFO): + logger = logging.getLogger(name) + logger.setLevel(level) + + # Don't add handlers if they already exist + if not logger.handlers: + # Create console handler with a higher log level + handler = logging.StreamHandler(sys.stdout) + handler.setLevel(level) + + # Create formatter and add it to the handler + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + + # Add the handler to the logger + logger.addHandler(handler) + + return logger + +# Create a default logger for general use +app_logger = setup_logger('linux_user_mgmt') + +def log_info(message): + app_logger.info(message) + +def log_error(message): + app_logger.error(message) + +def log_debug(message): + app_logger.debug(message) + +def log_warning(message): + app_logger.warning(message) \ No newline at end of file diff --git a/samples/python/second/utils/utils.py b/samples/python/second/utils/utils.py new file mode 100644 index 0000000..a87aea1 --- /dev/null +++ b/samples/python/second/utils/utils.py @@ -0,0 +1,323 @@ +import sys +import json +import subprocess +import grp +import pwd +import os +from schema.schema import validate_user_data +from typing import Dict, Any, Optional + + +def read_stdin() -> str: + return sys.stdin.read().strip() + + +def parse_json_input(data: str) -> Dict[str, Any]: + try: + return json.loads(data) + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON input: {str(e)}") + + +def get_input( + username: Optional[str] = None, + password: Optional[str] = None, + input_json: Optional[str] = None, + uid: Optional[int] = None, + gid: Optional[int] = None, + home: Optional[str] = None, + shell: Optional[str] = None, +): + + from models.dsc_user import User + + combined_data = collect_input_data( + username=username, + password=password, + input_json=input_json, + uid=uid, + gid=gid, + home=home, + shell=shell, + ) + + validate_user_data(combined_data) + + user = User() + + for key, value in combined_data.items(): + if hasattr(user, key): + setattr(user, key, value) + + return user + + +def collect_input_data( + username: Optional[str] = None, + password: Optional[str] = None, + input_json: Optional[str] = None, + uid: Optional[int] = None, + gid: Optional[int] = None, + home: Optional[str] = None, + shell: Optional[str] = None, +) -> Dict[str, Any]: + + combined_data = {} + + # Process JSON input if provided + if input_json: + try: + if input_json.startswith("{"): + json_data = parse_json_input(input_json) + else: + with open(input_json, "r") as f: + json_data = json.load(f) + + combined_data.update(json_data) + + except (ValueError, IOError, json.JSONDecodeError) as e: + raise ValueError(f"Failed to parse JSON input: {str(e)}") + else: + # Try to read from stdin if no JSON input was explicitly provided + try: + if not sys.stdin.isatty(): + stdin_data = read_stdin() + if stdin_data: + json_data = parse_json_input(stdin_data) + combined_data.update(json_data) + except (ValueError, EOFError) as e: + pass + + cli_data = {} + + if username is not None: + cli_data["username"] = username + + if password is not None: + cli_data["password"] = password + + if uid is not None: + cli_data["uid"] = uid + + if gid is not None: + cli_data["gid"] = gid + + if home is not None: + cli_data["home"] = home + + if shell is not None: + cli_data["shell"] = shell + + if cli_data: + combined_data.update(cli_data) + + return combined_data + + +def check_user_exist(user_id: int) -> Optional[Dict[str, Any]]: + """ + Check if a user exists in the system and return user information. + + Args: + user_id: The user ID to check + + Returns: + Dictionary with username, uid, and gid if user exists, None otherwise + """ + try: + result = subprocess.run( + ["id", str(user_id)], + check=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + if result.returncode == 0: + output = result.stdout.strip() + + uid_part = output.split()[0] # uid=1000(username) + username = uid_part.split("(")[1].rstrip(")") + uid = int(uid_part.split("=")[1].split("(")[0]) + + gid_part = output.split()[1] # gid=1000(groupname) + gid = int(gid_part.split("=")[1].split("(")[0]) + + return {"username": username, "uid": uid, "gid": gid} + except (subprocess.SubprocessError, ValueError, IndexError): + pass + return None + + +def create_user( + username: str, + password: Optional[str] = None, + uid: Optional[int] = None, + gid: Optional[int] = None, + home: Optional[str] = None, + shell: Optional[str] = None, +) -> None: + + cmd = ["adduser", "--quiet"] + + # Add optional parameters if provided + if home: + cmd.extend(["--home", home]) + + if shell: + cmd.extend(["--shell", shell]) + if uid is not None: + cmd.extend(["--uid", str(uid)]) + + if gid is not None: + try: + group_name = grp.getgrgid(gid).gr_name + cmd.extend(["--gid", group_name]) + except KeyError: + raise ValueError(f"Group ID {gid} does not exist") + + if password: + cmd.extend(["--password", password]) + + cmd.append(username) + + try: + with open(os.devnull, "w") as DEVNULL: + process = subprocess.run( + cmd, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=DEVNULL, # Redirect stdin from /dev/null to prevent hangs + universal_newlines=True, + ) + + if process.returncode != 0: + raise RuntimeError(f"Failed to create user: {process.stderr.strip()}") + except subprocess.CalledProcessError as e: + raise RuntimeError(f"Failed to create user: {e.stderr}") + + +def update_user( + username: str, + password: Optional[str] = None, + uid: Optional[int] = None, + gid: Optional[int] = None, + home: Optional[str] = None, + shell: Optional[str] = None, +) -> None: + # TODO: Fix + cmd = ["usermod"] + + if uid is not None: + cmd.extend(["-u", str(uid)]) + + if gid is not None: + cmd.extend(["-g", str(gid)]) + + if home: + cmd.extend(["-d", home, "-m"]) + + if shell: + cmd.extend(["-s", shell]) + + if password: + cmd.extend(["-p", password]) + + cmd.append(username) + + try: + print(f"Updating user with command: {' '.join(cmd)}") + subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + except subprocess.CalledProcessError as e: + raise RuntimeError(f"Failed to update user: {e.stderr.decode('utf-8')}") + + +def get_passwd_entry(username: str) -> Dict[str, Any]: + """ + Get user info from /etc/passwd. + + Args: + username: The username to look up + + Returns: + Dictionary with user information from passwd file + """ + try: + result = subprocess.run( + ["grep", f"^{username}:", "/etc/passwd"], + check=False, + stdout=subprocess.PIPE, + text=True, + ) + if result.returncode == 0: + parts = result.stdout.strip().split(":") + return { + "username": parts[0], + "uid": int(parts[2]), + "gid": int(parts[3]), + "gecos": parts[4], + "home": parts[5], + "shell": parts[6], + } + except Exception: + pass + return {} + + +def get_user_groups(username: str) -> list: + """ + Get all groups the user belongs to. + + Args: + username: The username to check group membership for + + Returns: + List of group names + """ + try: + result = subprocess.run( + ["groups", username], check=False, stdout=subprocess.PIPE, text=True + ) + if result.returncode == 0: + # Output format: "username : group1 group2 group3" + return result.stdout.split(":", 1)[1].strip().split() + + # Alternative method using Python's grp module + groups = [] + for group in grp.getgrall(): + if username in group.gr_mem: + groups.append(group.gr_name) + + try: + pw_entry = pwd.getpwnam(username) + primary_group = grp.getgrgid(pw_entry.pw_gid).gr_name + if primary_group not in groups: + groups.append(primary_group) + except KeyError: + pass + + return groups + except Exception: + pass + return [] + + +def get_requested_properties(user) -> list: + """ + Get a list of properties that have non-None and non-empty values from a user object. + + Args: + user: User object to inspect + + Returns: + List of property names that have values + """ + requested_properties = [] + + for attr in dir(user): + if not attr.startswith("_") and not callable(getattr(user, attr)): + user_value = getattr(user, attr) + if user_value is not None and user_value != "": + requested_properties.append((attr, user_value)) + return requested_properties From 0cf32d496fe227a359e101916521738f7a5248cd Mon Sep 17 00:00:00 2001 From: "G.Reijn" Date: Thu, 19 Jun 2025 14:36:06 +0200 Subject: [PATCH 3/5] Add new commands --- samples/python/second/{ => src}/.gitignore | 4 +- .../second/{ => src}/commands/common.py | 0 samples/python/second/src/commands/delete.py | 40 ++++ samples/python/second/src/commands/export.py | 13 + .../python/second/{ => src}/commands/get.py | 0 .../python/second/{ => src}/commands/root.py | 15 +- .../python/second/{ => src}/commands/set.py | 20 +- samples/python/second/{ => src}/main.py | 1 + .../second/{ => src}/models/dsc_user.py | 97 +++++--- samples/python/second/src/requirements.txt | 3 + .../python/second/{ => src}/schema/schema.py | 17 +- .../second/src/tuxctl.dsc.resource.json | 60 +++++ samples/python/second/src/utils/logger.py | 109 +++++++++ .../python/second/{ => src}/utils/utils.py | 226 ++++++++++++++++-- .../python/second/tests/acceptance.tests.ps1 | 85 +++++++ samples/python/second/utils/logger.py | 37 --- 16 files changed, 589 insertions(+), 138 deletions(-) rename samples/python/second/{ => src}/.gitignore (97%) rename samples/python/second/{ => src}/commands/common.py (100%) create mode 100644 samples/python/second/src/commands/delete.py create mode 100644 samples/python/second/src/commands/export.py rename samples/python/second/{ => src}/commands/get.py (100%) rename samples/python/second/{ => src}/commands/root.py (64%) rename samples/python/second/{ => src}/commands/set.py (57%) rename samples/python/second/{ => src}/main.py (78%) rename samples/python/second/{ => src}/models/dsc_user.py (65%) create mode 100644 samples/python/second/src/requirements.txt rename samples/python/second/{ => src}/schema/schema.py (70%) create mode 100644 samples/python/second/src/tuxctl.dsc.resource.json create mode 100644 samples/python/second/src/utils/logger.py rename samples/python/second/{ => src}/utils/utils.py (55%) create mode 100644 samples/python/second/tests/acceptance.tests.ps1 delete mode 100644 samples/python/second/utils/logger.py diff --git a/samples/python/second/.gitignore b/samples/python/second/src/.gitignore similarity index 97% rename from samples/python/second/.gitignore rename to samples/python/second/src/.gitignore index 63547ab..9a66ff3 100644 --- a/samples/python/second/.gitignore +++ b/samples/python/second/src/.gitignore @@ -106,4 +106,6 @@ venv.bak/ # mypy .mypy_cache/ .dmypy.json -dmypy.json \ No newline at end of file +dmypy.json + +*.py text eol=lf \ No newline at end of file diff --git a/samples/python/second/commands/common.py b/samples/python/second/src/commands/common.py similarity index 100% rename from samples/python/second/commands/common.py rename to samples/python/second/src/commands/common.py diff --git a/samples/python/second/src/commands/delete.py b/samples/python/second/src/commands/delete.py new file mode 100644 index 0000000..1a20b4c --- /dev/null +++ b/samples/python/second/src/commands/delete.py @@ -0,0 +1,40 @@ +import click +import sys +from typing import Optional +from utils.utils import delete_user, collect_input_data +from utils.logger import dfl_logger as Logger + + +@click.command() +@click.option("--username", "-u", help="Username of the user to delete", type=str) +@click.option("--input", "input_json", help="JSON input with username", type=str) +@click.option( + "-w", + "--what-if", + is_flag=True, + help="Show what would happen without making changes", +) +def delete(username: Optional[str], input_json: Optional[str], what_if: bool): + """ + Delete a Linux user. + + This command deletes a specified Linux user from the system. + """ + try: + data = collect_input_data(username=username, input_json=input_json) + + username_to_delete = data.get("username") + + if not username_to_delete: + Logger.error("Username is required to delete a user.", target="delete") + sys.exit(1) + + Logger.info( + f"Processing delete request for user: {username_to_delete}", target="delete" + ) + + delete_user(username=username_to_delete, what_if=what_if) + + except Exception as e: + Logger.error(f"Failed to process delete command: {str(e)}", target="delete") + sys.exit(1) diff --git a/samples/python/second/src/commands/export.py b/samples/python/second/src/commands/export.py new file mode 100644 index 0000000..0e66e4d --- /dev/null +++ b/samples/python/second/src/commands/export.py @@ -0,0 +1,13 @@ +import click +from models.dsc_user import User + +@click.command() +def export(): + """ + Export Linux user information. + + This command exports information about Linux users in JSON format. + """ + + user = User() + user.export() \ No newline at end of file diff --git a/samples/python/second/commands/get.py b/samples/python/second/src/commands/get.py similarity index 100% rename from samples/python/second/commands/get.py rename to samples/python/second/src/commands/get.py diff --git a/samples/python/second/commands/root.py b/samples/python/second/src/commands/root.py similarity index 64% rename from samples/python/second/commands/root.py rename to samples/python/second/src/commands/root.py index 8d4d49f..9f7f8ca 100644 --- a/samples/python/second/commands/root.py +++ b/samples/python/second/src/commands/root.py @@ -1,30 +1,27 @@ import click import sys import os -from utils.logger import setup_logger from commands.get import get from commands.set import set - -# Setup logger for root commands -logger = setup_logger('commands.root') +from commands.delete import delete +from commands.export import export @click.group(invoke_without_command=True) @click.pass_context def root_command(ctx): - """Linux User Management CLI - Root Command.""" + """Linux User Management CLI.""" ctx.ensure_object(dict) - ctx.obj['logger'] = logger if os.name != 'nt' and hasattr(os, 'geteuid') and os.geteuid() != 0: click.echo("This program requires root privileges to manage users. Please run with sudo.") sys.exit(1) - ctx.obj['logger'].info("Starting Linux User Management CLI") - if ctx.invoked_subcommand is None: click.echo(ctx.get_help()) # Register all available commands root_command.add_command(get, name="get") -root_command.add_command(set, name="set") \ No newline at end of file +root_command.add_command(set, name="set") +root_command.add_command(delete, name="delete") +root_command.add_command(export, name="export") \ No newline at end of file diff --git a/samples/python/second/commands/set.py b/samples/python/second/src/commands/set.py similarity index 57% rename from samples/python/second/commands/set.py rename to samples/python/second/src/commands/set.py index a8a3086..3754e9b 100644 --- a/samples/python/second/commands/set.py +++ b/samples/python/second/src/commands/set.py @@ -1,12 +1,11 @@ import click -import sys from typing import Optional from commands.common import common from utils.utils import get_input @click.command() @common -@click.option('--what-if', is_flag=True, help='Show what would happen without making changes') +@click.option('-w', '--what-if', is_flag=True, help='Show what would happen without making changes') def set( username: Optional[str], password: Optional[str], @@ -25,9 +24,7 @@ def set( Note: Group membership is read-only and cannot be modified with this command. """ - try: - # Use get_input to create a user object from inputs - user = get_input( + user = get_input( username=username, password=password, input_json=input_json, @@ -37,15 +34,4 @@ def set( shell=shell ) - # Call the modify method to create or update the user - user.modify(what_if=what_if) - - if not what_if: - click.echo(f"User '{user.username}' has been created or updated successfully.") - - except ValueError as e: - click.echo(f"Error: {str(e)}", err=True) - sys.exit(1) - except Exception as e: - click.echo(f"Error: {str(e)}", err=True) - sys.exit(1) \ No newline at end of file + user.modify(what_if=what_if) \ No newline at end of file diff --git a/samples/python/second/main.py b/samples/python/second/src/main.py similarity index 78% rename from samples/python/second/main.py rename to samples/python/second/src/main.py index 8313ecb..ae05d7b 100644 --- a/samples/python/second/main.py +++ b/samples/python/second/src/main.py @@ -1,3 +1,4 @@ +#!/usr/bin/env python3 from commands.root import root_command if __name__ == "__main__": diff --git a/samples/python/second/models/dsc_user.py b/samples/python/second/src/models/dsc_user.py similarity index 65% rename from samples/python/second/models/dsc_user.py rename to samples/python/second/src/models/dsc_user.py index 3b14639..02645c4 100644 --- a/samples/python/second/models/dsc_user.py +++ b/samples/python/second/src/models/dsc_user.py @@ -2,28 +2,12 @@ from typing import Dict, Any from utils.utils import check_user_exist, create_user, update_user, get_passwd_entry, get_user_groups import sys +import pwd +from utils.logger import dfl_logger as Logger -def dsc_resource(cls): - """ - Decorator that marks a class as a DSC (Desired State Configuration) resource. - This allows the class to be used in configuration management. - """ - cls._is_dsc_resource = True - - # Add methods needed for DSC if they don't exist - if not hasattr(cls, 'test'): - cls.test = lambda self: self._exist - - if not hasattr(cls, 'get'): - cls.get = lambda self: self.__dict__ - - return cls - -@dsc_resource class User: """Linux User resource that can be managed via DSC.""" def __init__(self): - """Initialize User with default values.""" self.username = "" self.password = "" self.uid = None @@ -33,18 +17,38 @@ def __init__(self): self.groups = None self._exist = False + def export(self): + """Export all users on the system.""" + result = [] + + try: + for passwd_entry in pwd.getpwall(): + username = passwd_entry.pw_name + user = User() + user.username = username + user.uid = passwd_entry.pw_uid + user.gid = passwd_entry.pw_gid + user.home = passwd_entry.pw_dir + user.shell = passwd_entry.pw_shell + user._exist = True + + # Get groups for this user + try: + user_groups = get_user_groups(username) + user.groups = user_groups + except Exception: + user.groups = [] + + result.append(self.to_dict(user)) + + print(json.dumps(result, separators=(',', ':'))) + + except Exception as e: + Logger.error(f"Error occurred while exporting users: {e}", target="Export") + sys.exit(1) + def get_current_state(self, requested_properties=None): - """ - Check if the user exists and return only the requested properties. - - Args: - requested_properties: List of property names that were explicitly requested - If None, returns all available properties - - Returns: - Dict containing the requested user properties - """ result = {} try: @@ -52,7 +56,7 @@ def get_current_state(self, requested_properties=None): if user is not None: self._exist = True - result["exist"] = True + result["_exist"] = True if user: passwd_entry = get_passwd_entry(self.username) @@ -89,22 +93,37 @@ def get_current_state(self, requested_properties=None): if not self._exist: print(json.dumps({ "username": self.username, - "exist": False, + "_exist": False, })) else: print(self.to_json(self)) except Exception as e: - print(f"Error occurred while getting current state for user '{self.username}': {e}") + Logger.error(f"Error occurred while getting current state for user '{self.username}': {e}", target="GetCurrentState") sys.exit(1) def modify(self, what_if: bool = False) -> None: exists = check_user_exist(self.username) + if what_if: if exists: - print(f"Would update user '{self.username}'") + print(json.dumps({ + "username": self.username, + "_metadata": { + "whatIf": [ + f"User '{self.username}' exists and will be updated." + ] + } + })) else: - print(f"Would create user '{self.username}'") + print(json.dumps({ + "username": self.username, + "_metadata": { + "whatIf": [ + f"User '{self.username}' does not exist and will be created." + ] + } + })) return if exists: @@ -113,7 +132,8 @@ def modify(self, what_if: bool = False) -> None: uid=self.uid, gid=self.gid, home=self.home, - shell=self.shell + shell=self.shell, + password=self.password ) else: create_user( @@ -121,9 +141,10 @@ def modify(self, what_if: bool = False) -> None: uid=self.uid, gid=self.gid, home=self.home, - shell=self.shell + shell=self.shell, + password=self.password ) - + @classmethod def to_dict(cls, item: 'User') -> Dict[str, Any]: """Convert User to dictionary for serialization.""" @@ -134,7 +155,7 @@ def to_dict(cls, item: 'User') -> Dict[str, Any]: "home": item.home, "shell": item.shell, "groups": item.groups, - "exist": item._exist + "_exist": item._exist } return result @@ -155,5 +176,5 @@ def from_json(cls, json_str: str) -> 'User': user.home = data.get('home') user.shell = data.get('shell', '/bin/bash') user.groups = data.get('groups', []) - user._exist = data.get('exists', False) + user._exist = data.get('_exist', False) return user \ No newline at end of file diff --git a/samples/python/second/src/requirements.txt b/samples/python/second/src/requirements.txt new file mode 100644 index 0000000..ff94205 --- /dev/null +++ b/samples/python/second/src/requirements.txt @@ -0,0 +1,3 @@ +# Core dependencies +click>=8.0.0 +jsonschema>=4.0.0 \ No newline at end of file diff --git a/samples/python/second/schema/schema.py b/samples/python/second/src/schema/schema.py similarity index 70% rename from samples/python/second/schema/schema.py rename to samples/python/second/src/schema/schema.py index c846be8..81269a5 100644 --- a/samples/python/second/schema/schema.py +++ b/samples/python/second/src/schema/schema.py @@ -1,10 +1,9 @@ import os import json +import sys import jsonschema from typing import Dict, Any -from utils.logger import setup_logger - -logger = setup_logger('utils.schema_validator') +from utils.logger import dfl_logger as logger # Default schema with username as required field DEFAULT_USER_SCHEMA = { @@ -26,21 +25,17 @@ "additionalProperties": False } -# TODO: Add proper error message handling and create dsc.resource.json file def get_schema() -> Dict[str, Any]: schema_path = os.path.join(os.path.dirname(__file__), '..', '.dsc.resource.json') if os.path.exists(schema_path): try: - logger.info(f"Loading schema from {schema_path}") with open(schema_path, 'r') as f: return json.load(f) except (json.JSONDecodeError, IOError) as e: - logger.warning(f"Failed to load schema from file: {e}, falling back to default") - else: - logger.info("Schema file not found, using default schema") + logger.error(f"Failed to load schema from {schema_path}: {e}") + sys.exit(1) - # Fall back to embedded schema return DEFAULT_USER_SCHEMA def validate_user_data(data: Dict[str, Any]) -> None: @@ -49,5 +44,5 @@ def validate_user_data(data: Dict[str, Any]) -> None: try: jsonschema.validate(instance=data, schema=schema) except jsonschema.exceptions.ValidationError as e: - logger.error(f"Schema validation failed: {e}") - raise \ No newline at end of file + logger.error(f"Schema validation failed: {e.message}") + sys.exit(1) \ No newline at end of file diff --git a/samples/python/second/src/tuxctl.dsc.resource.json b/samples/python/second/src/tuxctl.dsc.resource.json new file mode 100644 index 0000000..a44569d --- /dev/null +++ b/samples/python/second/src/tuxctl.dsc.resource.json @@ -0,0 +1,60 @@ +{ + "$schema": "https://aka.ms/dsc/schemas/v3/bundled/resource/manifest.json", + "type": "DSC.UserManagement/TuxCtl", + "description": "DSC resource to manage users on Linux", + "tags": [ + "Linux", + "UserManagement" + ], + "version": "0.1.0", + "schema": { + "embedded": { + "$schema": "https://json-schema.org/draft/2020-12/schema#", + "$id": "https://raw.githubusercontent.com/PowerShell/DSC/main/schemas/resources/DSC/UserManagement/TuxCtl/v0.1.0/schema.json", + "title": "Linux User Management", + "description": "Manages Linux users", + "type": "object", + "required": ["username"], + "additionalProperties": false, + "properties": { + "username": { + "type": "string", + "title": "Username", + "description": "Defines the name of the Linux user" + }, + "password": { + "type": ["string", "null"], + "title": "Password", + "description": "The user's password" + }, + "uid": { + "type": ["integer", "null"], + "title": "User ID", + "description": "The user's numeric ID" + }, + "gid": { + "type": ["integer", "null"], + "title": "Group ID", + "description": "The user's primary group ID" + }, + "home": { + "type": ["string", "null"], + "title": "Home Directory", + "description": "The user's home directory path" + }, + "shell": { + "type": "string", + "title": "Shell", + "description": "The user's login shell" + }, + "groups": { + "type": "array", + "items": {"type": "string"}, + "readOnly": true, + "title": "Groups", + "description": "List of groups the user belongs to" + } + } + } + } +} \ No newline at end of file diff --git a/samples/python/second/src/utils/logger.py b/samples/python/second/src/utils/logger.py new file mode 100644 index 0000000..d07edf3 --- /dev/null +++ b/samples/python/second/src/utils/logger.py @@ -0,0 +1,109 @@ +import json +import sys +import datetime +import inspect +from typing import Dict, Any +from enum import Enum + + +class LogLevel(Enum): + """Enumeration for log levels""" + TRACE = "TRACE" + DEBUG = "DEBUG" + INFO = "INFO" + WARNING = "WARNING" + ERROR = "ERROR" + + +class Logger: + """ + A structured JSON logger class that outputs messages to stderr. + + Features: + - JSON formatted output + - Configurable log levels + - Automatic timestamp generation + - Caller information tracking + - Customizable output stream + """ + + def __init__(self, output_stream=None, include_caller_info: bool = True): + self.output_stream = output_stream or sys.stderr + self.include_caller_info = include_caller_info + + def _get_caller_info(self) -> Dict[str, Any]: + if not self.include_caller_info: + return {} + + try: + # Get the frame of the caller (skip internal methods) + frame = inspect.currentframe() + for _ in range(3): # Skip _get_caller_info, _log, and the log level method + frame = frame.f_back + if frame is None: + break + + if frame: + return { + "file": frame.f_code.co_filename.split('\\')[-1], # Just filename + "line": frame.f_lineno, + "function": frame.f_code.co_name + } + except Exception: + pass + + return {} + + def _log(self, level: LogLevel, message: str, target: str = None, **kwargs): + log_entry = { + "timestamp": datetime.datetime.now().isoformat() + "Z", + "level": level.value, + "fields": {"message": message}, + "target": target or "unknown" + } + + # Add caller information if enabled + caller_info = self._get_caller_info() + if caller_info: + log_entry["line_number"] = caller_info.get("line", "Unknown") + log_entry["file"] = caller_info.get("file", "Unknown") + log_entry["function"] = caller_info.get("function", "Unknown") + + # Add any additional fields to the fields section + if kwargs: + log_entry["fields"].update(kwargs) + + try: + json_output = json.dumps(log_entry, separators=(",", ":")) + self.output_stream.write(json_output + '\n') + self.output_stream.flush() + except Exception as e: + # Fallback to basic error output + fallback_msg = f"[LOG ERROR] Failed to write log: {str(e)}\n" + self.output_stream.write(fallback_msg) + self.output_stream.flush() + + def trace(self, message: str, target: str = None, **kwargs): + self._log(LogLevel.TRACE, message, target, **kwargs) + + def debug(self, message: str, target: str = None, **kwargs): + self._log(LogLevel.DEBUG, message, target, **kwargs) + + def info(self, message: str, target: str = None, **kwargs): + self._log(LogLevel.INFO, message, target, **kwargs) + + def warning(self, message: str, target: str = None, **kwargs): + self._log(LogLevel.WARNING, message, target, **kwargs) + + def error(self, message: str, target: str = None, **kwargs): + self._log(LogLevel.ERROR, message, target, **kwargs) + + def log_config_loaded(self, config_path: str, config_type: str, **kwargs): + self.info(f"Loaded {config_type} configuration", "config_manager", + config_path=config_path, **kwargs) + + def log_config_error(self, error_msg: str, config_path: str = None, **kwargs): + self.error(f"Configuration error: {error_msg}", "config_manager", + config_path=config_path, **kwargs) + +dfl_logger = Logger() \ No newline at end of file diff --git a/samples/python/second/utils/utils.py b/samples/python/second/src/utils/utils.py similarity index 55% rename from samples/python/second/utils/utils.py rename to samples/python/second/src/utils/utils.py index a87aea1..95f663b 100644 --- a/samples/python/second/utils/utils.py +++ b/samples/python/second/src/utils/utils.py @@ -6,6 +6,7 @@ import os from schema.schema import validate_user_data from typing import Dict, Any, Optional +from utils.logger import dfl_logger as Logger def read_stdin() -> str: @@ -18,7 +19,6 @@ def parse_json_input(data: str) -> Dict[str, Any]: except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON input: {str(e)}") - def get_input( username: Optional[str] = None, password: Optional[str] = None, @@ -28,7 +28,6 @@ def get_input( home: Optional[str] = None, shell: Optional[str] = None, ): - from models.dsc_user import User combined_data = collect_input_data( @@ -61,7 +60,6 @@ def collect_input_data( home: Optional[str] = None, shell: Optional[str] = None, ) -> Dict[str, Any]: - combined_data = {} # Process JSON input if provided @@ -157,7 +155,6 @@ def create_user( home: Optional[str] = None, shell: Optional[str] = None, ) -> None: - cmd = ["adduser", "--quiet"] # Add optional parameters if provided @@ -176,26 +173,70 @@ def create_user( except KeyError: raise ValueError(f"Group ID {gid} does not exist") - if password: - cmd.extend(["--password", password]) - cmd.append(username) + Logger.info( + "Creating user", "user_management", command="adduser", username=username + ) + + result = run_command( + cmd, prevent_input=True, command_name="Create user", username=username + ) + + if result and result.returncode == 0 and password: + set_password(username, password) + + +def set_password(username: str, password: str) -> None: + """ + Set or update a user's password. + + Args: + username: The username to set password for + password: The new password + + Raises: + RuntimeError: If setting the password fails + """ + if not username or not password: + Logger.error( + "Username and password are required for setting password", + "user_management", + username=username, + ) + sys.exit(4) + + Logger.info("Setting password for user", "user_management", username=username) + + input_str = f"{username}:{password}\n" try: - with open(os.devnull, "w") as DEVNULL: - process = subprocess.run( - cmd, - check=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - stdin=DEVNULL, # Redirect stdin from /dev/null to prevent hangs - universal_newlines=True, - ) + run_command( + ["chpasswd"], + input_str=input_str, + command_name="Set password", + username=username, + ) + except RuntimeError as e: + error_msg = str(e) + sanitized_error = error_msg.replace(password, "********") + raise RuntimeError(f"Failed to set password: {sanitized_error}") - if process.returncode != 0: - raise RuntimeError(f"Failed to create user: {process.stderr.strip()}") except subprocess.CalledProcessError as e: - raise RuntimeError(f"Failed to create user: {e.stderr}") + error_msg = e.stderr.decode("utf-8") if hasattr(e, "stderr") else str(e) + # Don't include the password in error messages + sanitized_error = error_msg.replace(password, "********") + Logger.error( + "Failed to set password", + "user_management", + username=username, + error=sanitized_error, + ) + raise RuntimeError(f"Failed to set password: {sanitized_error}") + except Exception as e: + Logger.error( + "Error setting password", "user_management", username=username, error=str(e) + ) + raise RuntimeError(f"Error setting password: {str(e)}") def update_user( @@ -206,7 +247,6 @@ def update_user( home: Optional[str] = None, shell: Optional[str] = None, ) -> None: - # TODO: Fix cmd = ["usermod"] if uid is not None: @@ -222,15 +262,76 @@ def update_user( cmd.extend(["-s", shell]) if password: - cmd.extend(["-p", password]) + print("Setting password for user", username) + set_password(username, password) + + if len(cmd) == 1: + Logger.debug( + "No changes specified for user", "user_management", username=username + ) + return + + cmd.append(username) + + Logger.info( + "Updating user", "user_management", command="usermod", username=username + ) + + run_command(cmd, command_name="Update user", username=username) + + +def delete_user(username: str, what_if: bool = False) -> None: + user = check_user_exist(username) + if not user: + return + cmd = ["userdel"] cmd.append(username) + if what_if: + print( + json.dumps( + { + "username": username, + "_metadata": { + "whatIf": [f"User '{username}' exists and will be deleted."] + }, + } + ) + ) + return + + Logger.info( + "Deleting user", "user_management", command="delete_user", username=username + ) + + run_command(cmd, command_name="Delete user", username=username) + + +def export_user() -> str: + """ + Export all users in JSON format. + + Returns: + JSON string containing all user information + """ + users = [] + try: - print(f"Updating user with command: {' '.join(cmd)}") - subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - except subprocess.CalledProcessError as e: - raise RuntimeError(f"Failed to update user: {e.stderr.decode('utf-8')}") + for user_entry in pwd.getpwall(): + user_info = { + "username": user_entry.pw_name, + "uid": user_entry.pw_uid, + "gid": user_entry.pw_gid, + "home": user_entry.pw_dir, + "shell": user_entry.pw_shell, + "groups": get_user_groups(user_entry.pw_name), + } + users.append(user_info) + + return json.dumps(users, indent=2) + except Exception as e: + raise RuntimeError(f"Failed to export users: {str(e)}") def get_passwd_entry(username: str) -> Dict[str, Any]: @@ -321,3 +422,78 @@ def get_requested_properties(user) -> list: if user_value is not None and user_value != "": requested_properties.append((attr, user_value)) return requested_properties + + +def run_command( + cmd: list, + input_str: bytes = None, + check: bool = True, + prevent_input: bool = False, + command_name: str = None, + username: str = None, + universal_newlines: bool = True, +) -> subprocess.CompletedProcess: + """ + Run a system command with consistent error handling and logging. + + Args: + cmd: Command and arguments as a list + input_str: Optional input to send to process stdin + check: Whether to check return code and raise exception + prevent_input: Whether to redirect stdin from /dev/null + command_name: Name of command for logging + username: Username for logging + universal_newlines: Convert output to strings + + Returns: + CompletedProcess instance with return code, stdout, stderr + + Raises: + RuntimeError: If command fails and check is True + """ + + try: + kwargs = { + "check": check, + "stdout": subprocess.PIPE, + "stderr": subprocess.PIPE, + "universal_newlines": universal_newlines, + } + + # If input provided, pass it to process + if input_str: + kwargs["input"] = input_str + + # If preventing input, redirect stdin from /dev/null + if prevent_input: + with open(os.devnull, "r") as DEVNULL: + kwargs["stdin"] = DEVNULL + return subprocess.run(cmd, **kwargs) + else: + return subprocess.run(cmd, **kwargs) + + except subprocess.CalledProcessError as e: + error_msg = e.stderr if universal_newlines else e.stderr.decode("utf-8") + if command_name: + Logger.error( + f"{command_name} failed", + "user_management", + command=cmd[0], + error=error_msg, + username=username, + ) + if check: + raise RuntimeError(f"Command failed: {error_msg}") + return e + except Exception as e: + if command_name: + Logger.error( + f"{command_name} failed unexpectedly", + "user_management", + command=cmd[0], + error=str(e), + username=username, + ) + if check: + raise RuntimeError(f"Command execution error: {str(e)}") + return None diff --git a/samples/python/second/tests/acceptance.tests.ps1 b/samples/python/second/tests/acceptance.tests.ps1 new file mode 100644 index 0000000..f0a9797 --- /dev/null +++ b/samples/python/second/tests/acceptance.tests.ps1 @@ -0,0 +1,85 @@ +$global:executable = Join-Path (Split-Path -Parent $PSScriptRoot) 'src' 'main.py' + +Write-Verbose -Message "Executable path: $global:executable" -Verbose + +Describe "TuxCtl acceptance tests - Help command" -Skip:(!$IsLinux) { + It "Should display help information when --help is passed" { + $result = & $global:executable --help + $LASTEXITCODE | Should -Be 0 + $result | Should -Not -BeNullOrEmpty + } +} + +Describe "TuxCtl acceptance tests - Get command" -Skip:(!$IsLinux) { + It "Should get a username using the --username option" { + $result = & sudo $global:executable get --username root | ConvertFrom-Json + $LASTEXITCODE | Should -Be 0 + $result | Should -Not -BeNullOrEmpty + $result.username | Should -Be "root" + $result.uid | Should -Be 0 + $result.gid | Should -Be 0 + $result.home | Should -Be "/root" + $result.shell | Should -Be "/bin/bash" + $result.groups | Should -Contain "root" + $result._exist | Should -Be $true + } + + It "Should work with all options" -Skip:(!($IsLinux)) { + $result = & sudo $global:executable get --username root --uid 0 --gid 0 --home /root --shell /bin/bash | ConvertFrom-Json + $LASTEXITCODE | Should -Be 0 + $result | Should -Not -BeNullOrEmpty + $result.username | Should -Be "root" + $result.uid | Should -Be 0 + $result.gid | Should -Be 0 + $result.home | Should -Be "/root" + $result.shell | Should -Be "/bin/bash" + $result.groups | Should -Contain "root" + $result._exist | Should -Be $true + } + + It "Should work with JSON input" { + $in = @{username = "root"; uid = 0; gid = 0; shell = "/bin/bash"; home = "/root"} | ConvertTo-Json + $result = & sudo $global:executable get --input $in | ConvertFrom-Json + $LASTEXITCODE | Should -Be 0 + $result | Should -Not -BeNullOrEmpty + $result.username | Should -Be "root" + $result.uid | Should -Be 0 + $result.gid | Should -Be 0 + $result.home | Should -Be "/root" + $result.shell | Should -Be "/bin/bash" + $result.groups | Should -Contain "root" + $result._exist | Should -Be $true + } +} + +Describe "TuxCtl acceptance tests - Set command" -Skip:(!$IsLinux) { + It "Should set a username using the --username option" { + & sudo $global:executable set --username testuser --password randompassword + $LASTEXITCODE | Should -Be 0 + + # Check if the user was created + $result = & sudo $global:executable get --username testuser | ConvertFrom-Json + $LASTEXITCODE | Should -Be 0 + $result.username | Should -Be "testuser" + } +} + +Describe "TuxCtl acceptance tests - Delete command" -Skip:(!$IsLinux) { + It "Should delete a user using the --username option" { + & sudo $global:executable delete --username testuser + $LASTEXITCODE | Should -Be 0 + + # Check if the user was deleted + $result = & sudo $global:executable get --username testuser | ConvertFrom-Json + $LASTEXITCODE | Should -Be 0 + $result._exist | Should -Be $false + } +} + +Describe "TuxCtl acceptance tests - Export command" -Skip:(!$IsLinux) { + It "Should list all users" { + $result = & sudo $global:executable export | ConvertFrom-Json + $LASTEXITCODE | Should -Be 0 + $result.Count | Should -BeGreaterThan 1 + } +} \ No newline at end of file diff --git a/samples/python/second/utils/logger.py b/samples/python/second/utils/logger.py deleted file mode 100644 index a442947..0000000 --- a/samples/python/second/utils/logger.py +++ /dev/null @@ -1,37 +0,0 @@ -import logging -import sys - -# TODO: Fix up logger for DSC v3 engine -def setup_logger(name, level=logging.INFO): - logger = logging.getLogger(name) - logger.setLevel(level) - - # Don't add handlers if they already exist - if not logger.handlers: - # Create console handler with a higher log level - handler = logging.StreamHandler(sys.stdout) - handler.setLevel(level) - - # Create formatter and add it to the handler - formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') - handler.setFormatter(formatter) - - # Add the handler to the logger - logger.addHandler(handler) - - return logger - -# Create a default logger for general use -app_logger = setup_logger('linux_user_mgmt') - -def log_info(message): - app_logger.info(message) - -def log_error(message): - app_logger.error(message) - -def log_debug(message): - app_logger.debug(message) - -def log_warning(message): - app_logger.warning(message) \ No newline at end of file From 9bf8a0aea285dcb8ec2615b1933007b6382d95fb Mon Sep 17 00:00:00 2001 From: "G.Reijn" Date: Thu, 19 Jun 2025 14:54:07 +0200 Subject: [PATCH 4/5] Remove unused methods --- samples/python/second/src/utils/logger.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/samples/python/second/src/utils/logger.py b/samples/python/second/src/utils/logger.py index d07edf3..0029ca8 100644 --- a/samples/python/second/src/utils/logger.py +++ b/samples/python/second/src/utils/logger.py @@ -98,12 +98,5 @@ def warning(self, message: str, target: str = None, **kwargs): def error(self, message: str, target: str = None, **kwargs): self._log(LogLevel.ERROR, message, target, **kwargs) - def log_config_loaded(self, config_path: str, config_type: str, **kwargs): - self.info(f"Loaded {config_type} configuration", "config_manager", - config_path=config_path, **kwargs) - - def log_config_error(self, error_msg: str, config_path: str = None, **kwargs): - self.error(f"Configuration error: {error_msg}", "config_manager", - config_path=config_path, **kwargs) dfl_logger = Logger() \ No newline at end of file From a4a1d228187c609f746131ff4394d264ed68829d Mon Sep 17 00:00:00 2001 From: "G.Reijn" Date: Thu, 19 Jun 2025 15:46:12 +0200 Subject: [PATCH 5/5] Update exit code --- samples/python/second/src/utils/utils.py | 38 ++++++++++++++++-------- 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/samples/python/second/src/utils/utils.py b/samples/python/second/src/utils/utils.py index 95f663b..7e7ceba 100644 --- a/samples/python/second/src/utils/utils.py +++ b/samples/python/second/src/utils/utils.py @@ -114,10 +114,10 @@ def collect_input_data( def check_user_exist(user_id: int) -> Optional[Dict[str, Any]]: """ - Check if a user exists in the system and return user information. + Check if a user exists in the system and return user information. Can be used with both username and UID. Args: - user_id: The user ID to check + user_id: The username or ID to check Returns: Dictionary with username, uid, and gid if user exists, None otherwise @@ -157,7 +157,6 @@ def create_user( ) -> None: cmd = ["adduser", "--quiet"] - # Add optional parameters if provided if home: cmd.extend(["--home", home]) @@ -171,7 +170,13 @@ def create_user( group_name = grp.getgrgid(gid).gr_name cmd.extend(["--gid", group_name]) except KeyError: - raise ValueError(f"Group ID {gid} does not exist") + Logger.error( + "Group ID does not exist", + "user_management", + command="adduser", + gid=gid, + ) + sys.exit(3) cmd.append(username) Logger.info( @@ -193,9 +198,6 @@ def set_password(username: str, password: str) -> None: Args: username: The username to set password for password: The new password - - Raises: - RuntimeError: If setting the password fails """ if not username or not password: Logger.error( @@ -219,7 +221,13 @@ def set_password(username: str, password: str) -> None: except RuntimeError as e: error_msg = str(e) sanitized_error = error_msg.replace(password, "********") - raise RuntimeError(f"Failed to set password: {sanitized_error}") + Logger.error( + "Failed to set password", + "user_management", + username=username, + error=sanitized_error, + ) + sys.exit(3) except subprocess.CalledProcessError as e: error_msg = e.stderr.decode("utf-8") if hasattr(e, "stderr") else str(e) @@ -231,12 +239,12 @@ def set_password(username: str, password: str) -> None: username=username, error=sanitized_error, ) - raise RuntimeError(f"Failed to set password: {sanitized_error}") + sys.exit(3) except Exception as e: Logger.error( "Error setting password", "user_management", username=username, error=str(e) ) - raise RuntimeError(f"Error setting password: {str(e)}") + sys.exit(3) def update_user( @@ -483,7 +491,13 @@ def run_command( username=username, ) if check: - raise RuntimeError(f"Command failed: {error_msg}") + Logger.error( + f"Command '{cmd[0]}' failed", + "user_management", + command=cmd[0], + error=error_msg, + username=username, + ) return e except Exception as e: if command_name: @@ -495,5 +509,5 @@ def run_command( username=username, ) if check: - raise RuntimeError(f"Command execution error: {str(e)}") + sys.exit(3) return None