diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4f097ba..ec9a59d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -10,7 +10,7 @@ repos: rev: 26.3.1 hooks: - id: black - language_version: python3.13 + language_version: python3 - repo: https://github.com/pre-commit/pre-commit-hooks rev: v6.0.0 diff --git a/README.md b/README.md index cc363d4..2ae5bb2 100644 --- a/README.md +++ b/README.md @@ -203,6 +203,26 @@ chguard --restore app-baseline --permissions chguard --restore app-baseline --owner ``` +### Remove old states +``` +chguard --prune-states +``` +This removes states older than the number of days set in `CHGUARD_STATES_LIFE` + +### Remove states older than _N_ days +``` +chguard --prune-states=14 +``` +This removes all states older than 14 days. + +### Remove all states +``` +chguard --prune-states=all +``` + +### Environment Variable +`CHGUARD_STATES_LIFE`controls the default number of days to keep when using `chguard --prune-states`. + ### Wrapper mode Use `--` to separate `chguard` arguments from the wrapped command: diff --git a/chguard/cli.py b/chguard/cli.py index 58a7130..f771935 100644 --- a/chguard/cli.py +++ b/chguard/cli.py @@ -2,33 +2,37 @@ from __future__ import annotations import argparse import argcomplete +import grp import importlib.metadata import os -import sys -import stat import pwd -import grp +import stat import subprocess +import sys from collections import Counter, defaultdict +from datetime import datetime, timedelta, timezone from pathlib import Path -from datetime import datetime +from rich import box from rich.console import Console from rich.table import Table -from rich import box from chguard.db import ( connect, - init_db, create_state, delete_state, get_state, + init_db, + prune_all_states, + prune_states_before, state_exists, ) +from chguard.restore import apply_restore, plan_restore from chguard.scan import scan_tree -from chguard.restore import plan_restore, apply_restore from chguard.util import normalize_root +PRUNE_STATES_FROM_ENV = "__CHGUARD_PRUNE_STATES_FROM_ENV__" + def get_version(): try: @@ -88,6 +92,49 @@ def _is_root() -> bool: return os.geteuid() == 0 +def _parse_prune_states_value(value: str | None) -> int | str: + if value is None: + value = os.environ.get("CHGUARD_STATES_LIFE") + + if not value: + raise SystemExit( + "Missing prune age. Use --prune-states=N or set " + "CHGUARD_STATES_LIFE." + ) + + value = value.strip().lower() + + if value == "all": + return "all" + + try: + days = int(value) + except ValueError as exc: + raise SystemExit( + "Invalid prune age. Use an integer number of days or 'all'." + ) from exc + + if days < 0: + raise SystemExit("Invalid prune age. Value must be >= 0.") + + return days + + +def _confirm_or_abort(*, yes: bool, prompt: str) -> None: + if yes: + return + + if not sys.stdin.isatty(): + raise SystemExit( + "Refusing to continue without confirmation (no TTY).\n" + "Use --yes to force." + ) + + answer = input(f"\n{prompt} (y/N) ").strip().lower() + if answer not in ("y", "yes"): + raise SystemExit("Aborted.") + + def complete_state_names(prefix, parsed_args, **kwargs): try: conn = connect( @@ -113,27 +160,12 @@ def _extract_paths_from_command(cmd: list[str]) -> list[Path]: def main() -> None: - """ - Entry point for the CLI. - - Behavior summary: - - --save snapshots ownership and permissions - - --restore previews changes, then asks for confirmation - - Root privileges are required only when necessary - - Symlinks are skipped during scanning - """ - wrapper_cmd = None if "--" in sys.argv: idx = sys.argv.index("--") wrapper_cmd = sys.argv[idx + 1 :] sys.argv = sys.argv[:idx] - parser = argparse.ArgumentParser( - prog="chguard", - description="Snapshot and restore filesystem ownership and permissions.", - ) - parser = argparse.ArgumentParser( prog="chguard", description="Snapshot and restore filesystem ownership and permissions.", @@ -158,75 +190,53 @@ def main() -> None: ) actions.add_argument( - "--save", - metavar="PATH", - help="Save state for PATH", + "--save", metavar="PATH", help="Save state for PATH" ).completer = argcomplete.FilesCompleter() actions.add_argument( - "--restore", - action="store_true", - help="Restore a saved state", + "--restore", action="store_true", help="Restore a saved state" + ) + actions.add_argument( + "--list", action="store_true", help="List saved states" ) actions.add_argument( - "--list", - action="store_true", - help="List saved states", - ) - - actions.add_argument( - "--delete", - metavar="STATE", - help="Delete a saved state", + "--delete", metavar="STATE", help="Delete a saved state" ).completer = complete_state_names - # positional STATE - parser.add_argument( - "state", + actions.add_argument( + "--prune-states", nargs="?", - help="State name (required with --restore)", - ).completer = complete_state_names + const=PRUNE_STATES_FROM_ENV, + metavar="N", + help=( + "Delete states older than N days. If N is omitted, " + "CHGUARD_STATES_LIFE is used. Use 'all' to delete all states." + ), + ) + parser.add_argument("state", nargs="?", help="State name").completer = ( + complete_state_names + ) + parser.add_argument("--name", help="State name") parser.add_argument( - "--name", - help="State name (required with --save)", + "--overwrite", action="store_true", help="Overwrite existing state" + ) + parser.add_argument( + "--permissions", action="store_true", help="Restore MODE only" + ) + parser.add_argument( + "--owner", action="store_true", help="Restore OWNER only" + ) + parser.add_argument( + "--dry-run", action="store_true", help="Preview only; do not apply" + ) + parser.add_argument( + "--yes", action="store_true", help="Apply without confirmation" ) parser.add_argument( - "--overwrite", - action="store_true", - help="Overwrite existing state", - ) - - parser.add_argument( - "--permissions", - action="store_true", - help="Restore MODE only", - ) - - parser.add_argument( - "--owner", - action="store_true", - help="Restore OWNER only", - ) - - parser.add_argument( - "--dry-run", - action="store_true", - help="Preview only; do not apply", - ) - - parser.add_argument( - "--yes", - action="store_true", - help="Apply without confirmation", - ) - - parser.add_argument( - "--root", - metavar="PATH", - help="Override restore root", + "--root", metavar="PATH", help="Override restore root" ).completer = argcomplete.FilesCompleter() parser.add_argument( @@ -237,9 +247,7 @@ def main() -> None: ).completer = argcomplete.FilesCompleter() parser.add_argument( - "--db", - metavar="PATH", - help="Override database path", + "--db", metavar="PATH", help="Override database path" ).completer = argcomplete.FilesCompleter() argcomplete.autocomplete(parser) @@ -250,7 +258,6 @@ def main() -> None: raise SystemExit("No command provided after '--'") cmd = Path(wrapper_cmd[0]).name - if cmd not in ("chown", "chmod", "chgrp"): raise SystemExit( "Wrapper mode only supports chown, chmod, and chgrp" @@ -263,12 +270,14 @@ def main() -> None: if wrapper_cmd: paths = _extract_paths_from_command(wrapper_cmd) + if paths: auto_name = f"auto-{datetime.now().strftime('%Y%m%d-%H%M%S')}" + root_path = paths[0].resolve() + with conn: - root_path = str(Path(paths[0]).resolve()) state_id = create_state( - conn, auto_name, root_path, os.getuid(), commit=False + conn, auto_name, str(root_path), os.getuid(), commit=False ) for path in paths: @@ -281,7 +290,8 @@ def main() -> None: ) conn.execute( """ - INSERT INTO entries (state_id, path, type, mode, uid, gid) + INSERT INTO entries + (state_id, path, type, mode, uid, gid) VALUES (?, ?, ?, ?, ?, ?) """, ( @@ -300,15 +310,32 @@ def main() -> None: "This command affects root-owned files.\n" "Please re-run with sudo." ) + + if stat.S_ISLNK(st.st_mode): + typ = "symlink" + elif stat.S_ISREG(st.st_mode): + typ = "file" + elif stat.S_ISDIR(st.st_mode): + typ = "dir" + else: + continue + + rel = ( + "" + if path.resolve() == root_path + else str(path.resolve().relative_to(root_path)) + ) + conn.execute( """ - INSERT INTO entries (state_id, path, type, mode, uid, gid) + INSERT INTO entries + (state_id, path, type, mode, uid, gid) VALUES (?, ?, ?, ?, ?, ?) """, ( state_id, - str(path), - "file", + rel, + typ, stat.S_IMODE(st.st_mode), st.st_uid, st.st_gid, @@ -322,40 +349,107 @@ def main() -> None: proc = subprocess.run(wrapper_cmd) sys.exit(proc.returncode) + if args.prune_states is not None: + value = ( + None + if args.prune_states == PRUNE_STATES_FROM_ENV + else args.prune_states + ) + life = _parse_prune_states_value(value) + + if life == "all": + rows = conn.execute(""" + SELECT name, root_path, created_at + FROM states + ORDER BY created_at + """).fetchall() + cutoff_iso = None + else: + cutoff = datetime.now(timezone.utc) - timedelta(days=life) + cutoff_iso = cutoff.isoformat(timespec="seconds") + rows = conn.execute( + """ + SELECT name, root_path, created_at + FROM states + WHERE created_at < ? + ORDER BY created_at + """, + (cutoff_iso,), + ).fetchall() + + if not rows: + console.print("No states matched.") + return + + console.print( + f"\nThe following {len(rows)} state(s) will be deleted:\n" + ) + + table = Table(box=box.SIMPLE, header_style="bold") + table.add_column("State") + table.add_column("Root path") + table.add_column("Created") + + for name, root, created in rows: + state_name = ( + f"[bright_cyan]{name}[/bright_cyan]" + if name.startswith("auto-") + else name + ) + table.add_row( + state_name, + f"[bright_magenta]{root}[/bright_magenta]", + f"[bright_cyan]{created}[/bright_cyan]", + ) + + console.print(table) + + if args.dry_run: + console.print( + "\n[yellow]Dry-run only. No states were deleted.[/yellow]" + ) + return + + _confirm_or_abort(yes=args.yes, prompt="Delete these states?") + + if life == "all": + deleted = prune_all_states(conn) + else: + deleted = prune_states_before(conn, cutoff_iso) + + console.print(f"\nDeleted {deleted} state(s).") + return + if args.list: - rows = conn.execute( - "SELECT name, root_path, created_at FROM states ORDER BY created_at DESC" - ).fetchall() + rows = conn.execute(""" + SELECT name, root_path, created_at + FROM states + ORDER BY created_at DESC + """).fetchall() if not rows: console.print("No saved states.") return table = Table(box=box.SIMPLE, header_style="bold") - table.add_column("State") table.add_column("Root path") table.add_column("Created") for name, root, created in rows: - dt = datetime.fromisoformat(created) - ts = dt.strftime("%Y-%m-%d %H:%M:%S %z") - state_name = ( f"[bright_cyan]{name}[/bright_cyan]" if name.startswith("auto-") else name ) - root = f"[bright_magenta]{root}[/bright_magenta]" - ts = f"[bright_cyan]{created}[/bright_cyan]" - table.add_row( state_name, - root, - ts, + f"[bright_magenta]{root}[/bright_magenta]", + f"[bright_cyan]{created}[/bright_cyan]", ) console.print(table) + return if args.delete: if delete_state(conn, args.delete) == 0: @@ -369,49 +463,43 @@ def main() -> None: root = normalize_root(args.save) - try: - with conn: # start transaction - if state_exists(conn, args.name): - if not args.overwrite: - raise SystemExit( - f"State '{args.name}' already exists (use --overwrite)" - ) - # if the new save fails, this delete_state step will also roll back - delete_state(conn, args.name, commit=False) + with conn: + if state_exists(conn, args.name): + if not args.overwrite: + raise SystemExit( + f"State '{args.name}' already exists (use --overwrite)" + ) + delete_state(conn, args.name, commit=False) - state_id = create_state( - conn, args.name, str(root), os.getuid(), commit=False - ) + state_id = create_state( + conn, args.name, str(root), os.getuid(), commit=False + ) - # Abort early if root-owned files exist and user is not root. - # This prevents creating snapshots that cannot be meaningfully restored. - for entry in scan_tree(root, excludes=args.exclude): - if entry.uid == 0 and not _is_root(): - raise SystemExit( - "This path contains root-owned files.\n" - "Saving this state requires sudo." - ) - - conn.execute( - """ - INSERT INTO entries (state_id, path, type, mode, uid, gid) - VALUES (?, ?, ?, ?, ?, ?) - """, - ( - state_id, - entry.path, - entry.type, - entry.mode, - entry.uid, - entry.gid, - ), + for entry in scan_tree(root, excludes=args.exclude): + if entry.uid == 0 and not _is_root(): + raise SystemExit( + "This path contains root-owned files.\n" + "Saving this state requires sudo." ) - console.print(f"Saved state '{args.name}' for {root}") - return + conn.execute( + """ + INSERT INTO entries + (state_id, path, type, mode, uid, gid) + VALUES (?, ?, ?, ?, ?, ?) + """, + ( + state_id, + entry.path, + entry.type, + entry.mode, + entry.uid, + entry.gid, + ), + ) - except SystemExit: - raise + console.print(f"Saved state '{args.name}' for {root}") + return if args.restore: if not args.state: @@ -424,7 +512,6 @@ def main() -> None: snapshot_root = Path(state.root_path) target_root = normalize_root(args.root) if args.root else snapshot_root - # Default restore behavior is OWNER + MODE unless narrowed explicitly. restore_permissions = args.permissions or ( not args.permissions and not args.owner ) @@ -447,7 +534,6 @@ def main() -> None: needs_root = False current_uid = os.geteuid() - # Build a per-path view of owner/mode changes and detect privilege needs. for ch in changes: if ch.kind not in ("owner", "mode"): continue @@ -467,8 +553,11 @@ def main() -> None: ] = f"{_format_owner(bu, bg)} → {_format_owner(au, ag)}" counts["owner"] += 1 - if ch.path.stat().st_uid != current_uid: - needs_root = True + try: + if ch.path.stat().st_uid != current_uid: + needs_root = True + except FileNotFoundError: + pass elif ch.kind == "mode" and restore_permissions: b, a = ch.detail.split(" -> ") @@ -497,9 +586,7 @@ def main() -> None: for path in sorted(per_path): row = per_path[path] table.add_row( - str(path), - row.get("owner", "—"), - row.get("mode", "—"), + str(path), row.get("owner", "—"), row.get("mode", "—") ) console.print(table) @@ -520,21 +607,9 @@ def main() -> None: "Please re-run the command with sudo." ) - if not args.yes: - if not sys.stdin.isatty(): - raise SystemExit( - "Refusing to apply changes without confirmation (no TTY).\n" - "Use --yes to force." - ) - - answer = ( - input("\nDo you want to restore this state? (y/N) ") - .strip() - .lower() - ) - if answer not in ("y", "yes"): - console.print("\nAborted.") - return + _confirm_or_abort( + yes=args.yes, prompt="Do you want to restore this state?" + ) apply_restore( root=target_root, diff --git a/chguard/db.py b/chguard/db.py index 5f51a80..339bbd7 100644 --- a/chguard/db.py +++ b/chguard/db.py @@ -4,6 +4,7 @@ import sqlite3 from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path + from platformdirs import user_data_dir APP_NAME = "chguard" @@ -66,7 +67,8 @@ def create_state( commit: bool = True, ) -> int: cur = conn.execute( - "INSERT INTO states (name, root_path, created_at, created_by_uid) VALUES (?, ?, ?, ?)", + "INSERT INTO states (name, root_path, created_at, created_by_uid) " + "VALUES (?, ?, ?, ?)", (name, root_path, utc_now_iso(), created_by_uid), ) if commit: @@ -83,6 +85,28 @@ def delete_state( return cur.rowcount +def prune_states_before( + conn: sqlite3.Connection, + cutoff_iso: str, + *, + commit: bool = True, +) -> int: + cur = conn.execute( + "DELETE FROM states WHERE created_at < ?", + (cutoff_iso,), + ) + if commit: + conn.commit() + return cur.rowcount + + +def prune_all_states(conn: sqlite3.Connection, *, commit: bool = True) -> int: + cur = conn.execute("DELETE FROM states") + if commit: + conn.commit() + return cur.rowcount + + @dataclass(frozen=True) class State: id: int @@ -94,7 +118,8 @@ class State: def get_state(conn: sqlite3.Connection, name: str) -> State | None: cur = conn.execute( - "SELECT id, name, root_path, created_at, created_by_uid FROM states WHERE name = ?", + "SELECT id, name, root_path, created_at, created_by_uid " + "FROM states WHERE name = ?", (name,), ) row = cur.fetchone() diff --git a/chguard/scan.py b/chguard/scan.py index 7312816..ce256d2 100644 --- a/chguard/scan.py +++ b/chguard/scan.py @@ -27,9 +27,43 @@ def _is_excluded(rel: str, excludes: Iterable[str]) -> bool: return False +def _entry_for_path(p: Path, root: Path) -> Entry | None: + try: + st = p.lstat() # never follow symlinks + except FileNotFoundError: + return None + + if stat.S_ISDIR(st.st_mode): + typ = "dir" + elif stat.S_ISREG(st.st_mode): + typ = "file" + elif stat.S_ISLNK(st.st_mode): + typ = "symlink" + else: + # skip special files (devices, sockets, fifos) in v0.1 + return None + + rel = "" if p == root else str(p.relative_to(root)) + + return Entry( + path=rel, + type=typ, + mode=stat.S_IMODE(st.st_mode), + uid=st.st_uid, + gid=st.st_gid, + ) + + def scan_tree(root: Path, excludes: Iterable[str] = ()) -> Iterator[Entry]: root = root.resolve() + root_entry = _entry_for_path(root, root) + if root_entry is not None: + yield root_entry + + if not root.is_dir(): + return + for dirpath, dirnames, filenames in os.walk(root, followlinks=False): # prune excluded directories early rel_dir = ( @@ -56,24 +90,6 @@ def scan_tree(root: Path, excludes: Iterable[str] = ()) -> Iterator[Entry]: # record dirs and files for name in list(dirnames) + list(files): p = Path(dirpath) / name - try: - st = p.lstat() # never follow symlinks - except FileNotFoundError: - continue - - rel = str(p.relative_to(root)) - - if stat.S_ISDIR(st.st_mode): - typ = "dir" - elif stat.S_ISREG(st.st_mode): - typ = "file" - elif stat.S_ISLNK(st.st_mode): - typ = "symlink" - else: - # skip special files (devices, sockets, fifos) in v0.1 - continue - - mode = stat.S_IMODE(st.st_mode) - yield Entry( - path=rel, type=typ, mode=mode, uid=st.st_uid, gid=st.st_gid - ) + entry = _entry_for_path(p, root) + if entry is not None: + yield entry diff --git a/pyproject.toml b/pyproject.toml index 454e0c4..aa71d3c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "chguard" -version = "0.3.4" +version = "0.4.0" description = "Safety-first tool to snapshot and restore filesystem ownership and permissions." authors = ["Marco D'Aleo "] license = "GPL-3.0-or-later"