Files
chguard/chguard/cli.py
2025-12-20 19:01:10 +00:00

342 lines
10 KiB
Python

from __future__ import annotations
import argparse
import os
import sys
import stat
import pwd
import grp
from collections import Counter, defaultdict
from pathlib import Path
from datetime import datetime
from rich.console import Console
from rich.table import Table
from rich import box
from chguard.db import (
connect,
init_db,
create_state,
delete_state,
get_state,
state_exists,
)
from chguard.scan import scan_tree
from chguard.restore import plan_restore, apply_restore
from chguard.util import normalize_root
def _uid_to_name(uid: int) -> str:
"""Return username for uid, or uid as string if unknown."""
try:
return pwd.getpwuid(uid).pw_name
except KeyError:
return str(uid)
def _gid_to_name(gid: int) -> str:
"""Return group name for gid, or gid as string if unknown."""
try:
return grp.getgrgid(gid).gr_name
except KeyError:
return str(gid)
def _format_owner(uid: int, gid: int) -> str:
"""Format uid/gid as username:group."""
return f"{_uid_to_name(uid)}:{_gid_to_name(gid)}"
def _mode_to_rwx(mode: int) -> str:
"""Convert numeric mode to rwx-style permissions."""
bits = (
stat.S_IRUSR,
stat.S_IWUSR,
stat.S_IXUSR,
stat.S_IRGRP,
stat.S_IWGRP,
stat.S_IXGRP,
stat.S_IROTH,
stat.S_IWOTH,
stat.S_IXOTH,
)
out = []
for b in bits:
if mode & b:
out.append(
"r"
if b in (stat.S_IRUSR, stat.S_IRGRP, stat.S_IROTH)
else (
"w"
if b in (stat.S_IWUSR, stat.S_IWGRP, stat.S_IWOTH)
else "x"
)
)
else:
out.append("-")
return "".join(out)
def _is_root() -> bool:
"""Return True if running as root."""
return os.geteuid() == 0
def main() -> None:
"""
Entry point for the CLI.
Behavior summary:
- --save snapshots ownership and permissions
- --restore previews changes, then asks for confirmation
- Root privileges are required only when necessary
- Symlinks are skipped during scanning
"""
parser = argparse.ArgumentParser(
prog="chguard",
description="Snapshot and restore filesystem ownership and permissions.",
)
actions = parser.add_mutually_exclusive_group(required=True)
actions.add_argument("--save", metavar="PATH", help="Save state for PATH")
actions.add_argument(
"--restore", action="store_true", help="Restore a saved state"
)
actions.add_argument(
"--list", action="store_true", help="List saved states"
)
actions.add_argument(
"--delete", metavar="STATE", help="Delete a saved state"
)
parser.add_argument(
"state", nargs="?", help="State name (required with --restore)"
)
parser.add_argument("--name", help="State name (required with --save)")
parser.add_argument(
"--overwrite", action="store_true", help="Overwrite existing state"
)
parser.add_argument(
"--permissions", action="store_true", help="Restore MODE only"
)
parser.add_argument(
"--owner", action="store_true", help="Restore OWNER only"
)
parser.add_argument(
"--dry-run", action="store_true", help="Preview only; do not apply"
)
parser.add_argument(
"--yes", action="store_true", help="Apply without confirmation"
)
parser.add_argument("--root", metavar="PATH", help="Override restore root")
parser.add_argument(
"--exclude", action="append", default=[], help="Exclude path prefix"
)
parser.add_argument("--db", metavar="PATH", help="Override database path")
args = parser.parse_args()
console = Console()
conn = connect(Path(args.db).expanduser().resolve() if args.db else None)
init_db(conn)
if args.list:
rows = conn.execute(
"SELECT name, root_path, created_at FROM states ORDER BY created_at DESC"
).fetchall()
if not rows:
console.print("No saved states.")
return
for name, root, created in rows:
dt = datetime.fromisoformat(created)
ts = dt.strftime("%Y-%m-%d %H:%M:%S %z")
console.print(f"{name}\t{root}\t[dim]{ts}[/dim]")
return
if args.delete:
if delete_state(conn, args.delete) == 0:
raise SystemExit(f"No such state: {args.delete}")
console.print(f"Deleted state '{args.delete}'")
return
if args.save:
if not args.name:
parser.error("--name is required with --save")
root = normalize_root(args.save)
if state_exists(conn, args.name):
if not args.overwrite:
raise SystemExit(
f"State '{args.name}' already exists (use --overwrite)"
)
delete_state(conn, args.name)
state_id = create_state(conn, args.name, str(root), os.getuid())
# Abort early if root-owned files exist and user is not root.
# This prevents creating snapshots that cannot be meaningfully restored.
for entry in scan_tree(root, excludes=args.exclude):
if entry.uid == 0 and not _is_root():
raise SystemExit(
"This path contains root-owned files.\n"
"Saving this state requires sudo."
)
conn.execute(
"""
INSERT INTO entries (state_id, path, type, mode, uid, gid)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
state_id,
entry.path,
entry.type,
entry.mode,
entry.uid,
entry.gid,
),
)
conn.commit()
console.print(f"Saved state '{args.name}' for {root}")
return
if args.restore:
if not args.state:
parser.error("STATE is required with --restore")
state = get_state(conn, args.state)
if not state:
raise SystemExit(f"No such state: {args.state}")
snapshot_root = Path(state.root_path)
target_root = normalize_root(args.root) if args.root else snapshot_root
# Default restore behavior is OWNER + MODE unless narrowed explicitly.
restore_permissions = args.permissions or (
not args.permissions and not args.owner
)
restore_owner = args.owner or (not args.permissions and not args.owner)
rows = conn.execute(
"SELECT path, type, mode, uid, gid FROM entries WHERE state_id = ?",
(state.id,),
).fetchall()
changes = plan_restore(
root=target_root,
rows=rows,
restore_permissions=restore_permissions,
restore_owner=restore_owner,
)
per_path: dict[Path, dict[str, str]] = defaultdict(dict)
counts = Counter()
needs_root = False
current_uid = os.geteuid()
# Build a per-path view of owner/mode changes and detect privilege needs.
for ch in changes:
if ch.kind not in ("owner", "mode"):
continue
try:
rel = ch.path.relative_to(target_root)
except ValueError:
rel = ch.path
if ch.kind == "owner" and restore_owner:
before, after = ch.detail.split(" -> ")
bu, bg = map(int, before.split(":"))
au, ag = map(int, after.split(":"))
per_path[rel][
"owner"
] = f"{_format_owner(bu, bg)}{_format_owner(au, ag)}"
counts["owner"] += 1
if au != current_uid:
needs_root = True
elif ch.kind == "mode" and restore_permissions:
b, a = ch.detail.split(" -> ")
per_path[rel][
"mode"
] = f"{_mode_to_rwx(int(b, 8))}{_mode_to_rwx(int(a, 8))}"
counts["mode"] += 1
try:
if ch.path.stat().st_uid != current_uid:
needs_root = True
except FileNotFoundError:
pass
if not per_path:
console.print("No differences found.")
return
console.print(f"\nRestoring under: {target_root}\n")
table = Table(box=box.SIMPLE, header_style="bold")
table.add_column("Path")
table.add_column("Owner change", style="cyan")
table.add_column("Mode change", style="green")
for path in sorted(per_path):
row = per_path[path]
table.add_row(
str(path),
row.get("owner", ""),
row.get("mode", ""),
)
console.print(table)
console.print(
f"\nSummary: {counts['mode']} mode change(s), "
f"{counts['owner']} owner change(s)"
)
if args.dry_run:
console.print(
"\n[yellow]Dry-run only. No changes were applied.[/yellow]"
)
return
if needs_root and not _is_root():
raise SystemExit(
"This restore requires elevated privileges.\n"
"Please re-run the command with sudo."
)
if not args.yes:
if not sys.stdin.isatty():
raise SystemExit(
"Refusing to apply changes without confirmation (no TTY).\n"
"Use --yes to force."
)
answer = (
input("\nDo you want to restore this state? (y/N) ")
.strip()
.lower()
)
if answer not in ("y", "yes"):
console.print("\nAborted.")
return
apply_restore(
root=target_root,
rows=rows,
restore_permissions=restore_permissions,
restore_owner=restore_owner,
)
console.print("\n[green]Restore complete.[/green]")