Add --prune-states + root path fix #10

Merged
mdaleo404 merged 2 commits from restore-root-folder into main 2026-05-23 10:37:01 +00:00
4 changed files with 282 additions and 162 deletions
Showing only changes of commit 4abc89def9 - Show all commits
+20
View File
@@ -203,6 +203,26 @@ chguard --restore app-baseline --permissions
chguard --restore app-baseline --owner chguard --restore app-baseline --owner
``` ```
### Remove old states
```
chguard --prune-states
```
This removes states older than the number of days set in `CHGUARD_STATES_LIFE`
### Remove states older than _N_ days
```
chguard --prune-states=14
```
This removes all states older than 14 days.
### Remove all states
```
chguard --prune-states=all
```
### Environment Variable
`CHGUARD_STATES_LIFE`controls the default number of days to keep when using `chguard --prune-states`.
### Wrapper mode ### Wrapper mode
Use `--` to separate `chguard` arguments from the wrapped command: Use `--` to separate `chguard` arguments from the wrapped command:
+234 -159
View File
@@ -2,33 +2,37 @@ from __future__ import annotations
import argparse import argparse
import argcomplete import argcomplete
import grp
import importlib.metadata import importlib.metadata
import os import os
import sys
import stat
import pwd import pwd
import grp import stat
import subprocess import subprocess
import sys
from collections import Counter, defaultdict from collections import Counter, defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path from pathlib import Path
from datetime import datetime
from rich import box
from rich.console import Console from rich.console import Console
from rich.table import Table from rich.table import Table
from rich import box
from chguard.db import ( from chguard.db import (
connect, connect,
init_db,
create_state, create_state,
delete_state, delete_state,
get_state, get_state,
init_db,
prune_all_states,
prune_states_before,
state_exists, state_exists,
) )
from chguard.restore import apply_restore, plan_restore
from chguard.scan import scan_tree from chguard.scan import scan_tree
from chguard.restore import plan_restore, apply_restore
from chguard.util import normalize_root from chguard.util import normalize_root
PRUNE_STATES_FROM_ENV = "__CHGUARD_PRUNE_STATES_FROM_ENV__"
def get_version(): def get_version():
try: try:
@@ -88,6 +92,49 @@ def _is_root() -> bool:
return os.geteuid() == 0 return os.geteuid() == 0
def _parse_prune_states_value(value: str | None) -> int | str:
if value is None:
value = os.environ.get("CHGUARD_STATES_LIFE")
if not value:
raise SystemExit(
"Missing prune age. Use --prune-states=N or set "
"CHGUARD_STATES_LIFE."
)
value = value.strip().lower()
if value == "all":
return "all"
try:
days = int(value)
except ValueError as exc:
raise SystemExit(
"Invalid prune age. Use an integer number of days or 'all'."
) from exc
if days < 0:
raise SystemExit("Invalid prune age. Value must be >= 0.")
return days
def _confirm_or_abort(*, yes: bool, prompt: str) -> None:
if yes:
return
if not sys.stdin.isatty():
raise SystemExit(
"Refusing to continue without confirmation (no TTY).\n"
"Use --yes to force."
)
answer = input(f"\n{prompt} (y/N) ").strip().lower()
if answer not in ("y", "yes"):
raise SystemExit("Aborted.")
def complete_state_names(prefix, parsed_args, **kwargs): def complete_state_names(prefix, parsed_args, **kwargs):
try: try:
conn = connect( conn = connect(
@@ -113,27 +160,12 @@ def _extract_paths_from_command(cmd: list[str]) -> list[Path]:
def main() -> None: def main() -> None:
"""
Entry point for the CLI.
Behavior summary:
- --save snapshots ownership and permissions
- --restore previews changes, then asks for confirmation
- Root privileges are required only when necessary
- Symlinks are skipped during scanning
"""
wrapper_cmd = None wrapper_cmd = None
if "--" in sys.argv: if "--" in sys.argv:
idx = sys.argv.index("--") idx = sys.argv.index("--")
wrapper_cmd = sys.argv[idx + 1 :] wrapper_cmd = sys.argv[idx + 1 :]
sys.argv = sys.argv[:idx] sys.argv = sys.argv[:idx]
parser = argparse.ArgumentParser(
prog="chguard",
description="Snapshot and restore filesystem ownership and permissions.",
)
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog="chguard", prog="chguard",
description="Snapshot and restore filesystem ownership and permissions.", description="Snapshot and restore filesystem ownership and permissions.",
@@ -158,75 +190,53 @@ def main() -> None:
) )
actions.add_argument( actions.add_argument(
"--save", "--save", metavar="PATH", help="Save state for PATH"
metavar="PATH",
help="Save state for PATH",
).completer = argcomplete.FilesCompleter() ).completer = argcomplete.FilesCompleter()
actions.add_argument( actions.add_argument(
"--restore", "--restore", action="store_true", help="Restore a saved state"
action="store_true", )
help="Restore a saved state", actions.add_argument(
"--list", action="store_true", help="List saved states"
) )
actions.add_argument( actions.add_argument(
"--list", "--delete", metavar="STATE", help="Delete a saved state"
action="store_true",
help="List saved states",
)
actions.add_argument(
"--delete",
metavar="STATE",
help="Delete a saved state",
).completer = complete_state_names ).completer = complete_state_names
# positional STATE actions.add_argument(
parser.add_argument( "--prune-states",
"state",
nargs="?", nargs="?",
help="State name (required with --restore)", const=PRUNE_STATES_FROM_ENV,
).completer = complete_state_names metavar="N",
help=(
"Delete states older than N days. If N is omitted, "
"CHGUARD_STATES_LIFE is used. Use 'all' to delete all states."
),
)
parser.add_argument("state", nargs="?", help="State name").completer = (
complete_state_names
)
parser.add_argument("--name", help="State name")
parser.add_argument( parser.add_argument(
"--name", "--overwrite", action="store_true", help="Overwrite existing state"
help="State name (required with --save)", )
parser.add_argument(
"--permissions", action="store_true", help="Restore MODE only"
)
parser.add_argument(
"--owner", action="store_true", help="Restore OWNER only"
)
parser.add_argument(
"--dry-run", action="store_true", help="Preview only; do not apply"
)
parser.add_argument(
"--yes", action="store_true", help="Apply without confirmation"
) )
parser.add_argument( parser.add_argument(
"--overwrite", "--root", metavar="PATH", help="Override restore root"
action="store_true",
help="Overwrite existing state",
)
parser.add_argument(
"--permissions",
action="store_true",
help="Restore MODE only",
)
parser.add_argument(
"--owner",
action="store_true",
help="Restore OWNER only",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Preview only; do not apply",
)
parser.add_argument(
"--yes",
action="store_true",
help="Apply without confirmation",
)
parser.add_argument(
"--root",
metavar="PATH",
help="Override restore root",
).completer = argcomplete.FilesCompleter() ).completer = argcomplete.FilesCompleter()
parser.add_argument( parser.add_argument(
@@ -237,9 +247,7 @@ def main() -> None:
).completer = argcomplete.FilesCompleter() ).completer = argcomplete.FilesCompleter()
parser.add_argument( parser.add_argument(
"--db", "--db", metavar="PATH", help="Override database path"
metavar="PATH",
help="Override database path",
).completer = argcomplete.FilesCompleter() ).completer = argcomplete.FilesCompleter()
argcomplete.autocomplete(parser) argcomplete.autocomplete(parser)
@@ -250,7 +258,6 @@ def main() -> None:
raise SystemExit("No command provided after '--'") raise SystemExit("No command provided after '--'")
cmd = Path(wrapper_cmd[0]).name cmd = Path(wrapper_cmd[0]).name
if cmd not in ("chown", "chmod", "chgrp"): if cmd not in ("chown", "chmod", "chgrp"):
raise SystemExit( raise SystemExit(
"Wrapper mode only supports chown, chmod, and chgrp" "Wrapper mode only supports chown, chmod, and chgrp"
@@ -263,12 +270,14 @@ def main() -> None:
if wrapper_cmd: if wrapper_cmd:
paths = _extract_paths_from_command(wrapper_cmd) paths = _extract_paths_from_command(wrapper_cmd)
if paths: if paths:
auto_name = f"auto-{datetime.now().strftime('%Y%m%d-%H%M%S')}" auto_name = f"auto-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
root_path = paths[0].resolve()
with conn: with conn:
root_path = str(Path(paths[0]).resolve())
state_id = create_state( state_id = create_state(
conn, auto_name, root_path, os.getuid(), commit=False conn, auto_name, str(root_path), os.getuid(), commit=False
) )
for path in paths: for path in paths:
@@ -281,7 +290,8 @@ def main() -> None:
) )
conn.execute( conn.execute(
""" """
INSERT INTO entries (state_id, path, type, mode, uid, gid) INSERT INTO entries
(state_id, path, type, mode, uid, gid)
VALUES (?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?)
""", """,
( (
@@ -300,15 +310,32 @@ def main() -> None:
"This command affects root-owned files.\n" "This command affects root-owned files.\n"
"Please re-run with sudo." "Please re-run with sudo."
) )
if stat.S_ISLNK(st.st_mode):
typ = "symlink"
elif stat.S_ISREG(st.st_mode):
typ = "file"
elif stat.S_ISDIR(st.st_mode):
typ = "dir"
else:
continue
rel = (
""
if path.resolve() == root_path
else str(path.resolve().relative_to(root_path))
)
conn.execute( conn.execute(
""" """
INSERT INTO entries (state_id, path, type, mode, uid, gid) INSERT INTO entries
(state_id, path, type, mode, uid, gid)
VALUES (?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?)
""", """,
( (
state_id, state_id,
str(path), rel,
"file", typ,
stat.S_IMODE(st.st_mode), stat.S_IMODE(st.st_mode),
st.st_uid, st.st_uid,
st.st_gid, st.st_gid,
@@ -322,40 +349,107 @@ def main() -> None:
proc = subprocess.run(wrapper_cmd) proc = subprocess.run(wrapper_cmd)
sys.exit(proc.returncode) sys.exit(proc.returncode)
if args.prune_states is not None:
value = (
None
if args.prune_states == PRUNE_STATES_FROM_ENV
else args.prune_states
)
life = _parse_prune_states_value(value)
if life == "all":
rows = conn.execute("""
SELECT name, root_path, created_at
FROM states
ORDER BY created_at
""").fetchall()
cutoff_iso = None
else:
cutoff = datetime.now(timezone.utc) - timedelta(days=life)
cutoff_iso = cutoff.isoformat(timespec="seconds")
rows = conn.execute(
"""
SELECT name, root_path, created_at
FROM states
WHERE created_at < ?
ORDER BY created_at
""",
(cutoff_iso,),
).fetchall()
if not rows:
console.print("No states matched.")
return
console.print(
f"\nThe following {len(rows)} state(s) will be deleted:\n"
)
table = Table(box=box.SIMPLE, header_style="bold")
table.add_column("State")
table.add_column("Root path")
table.add_column("Created")
for name, root, created in rows:
state_name = (
f"[bright_cyan]{name}[/bright_cyan]"
if name.startswith("auto-")
else name
)
table.add_row(
state_name,
f"[bright_magenta]{root}[/bright_magenta]",
f"[bright_cyan]{created}[/bright_cyan]",
)
console.print(table)
if args.dry_run:
console.print(
"\n[yellow]Dry-run only. No states were deleted.[/yellow]"
)
return
_confirm_or_abort(yes=args.yes, prompt="Delete these states?")
if life == "all":
deleted = prune_all_states(conn)
else:
deleted = prune_states_before(conn, cutoff_iso)
console.print(f"\nDeleted {deleted} state(s).")
return
if args.list: if args.list:
rows = conn.execute( rows = conn.execute("""
"SELECT name, root_path, created_at FROM states ORDER BY created_at DESC" SELECT name, root_path, created_at
).fetchall() FROM states
ORDER BY created_at DESC
""").fetchall()
if not rows: if not rows:
console.print("No saved states.") console.print("No saved states.")
return return
table = Table(box=box.SIMPLE, header_style="bold") table = Table(box=box.SIMPLE, header_style="bold")
table.add_column("State") table.add_column("State")
table.add_column("Root path") table.add_column("Root path")
table.add_column("Created") table.add_column("Created")
for name, root, created in rows: for name, root, created in rows:
dt = datetime.fromisoformat(created)
ts = dt.strftime("%Y-%m-%d %H:%M:%S %z")
state_name = ( state_name = (
f"[bright_cyan]{name}[/bright_cyan]" f"[bright_cyan]{name}[/bright_cyan]"
if name.startswith("auto-") if name.startswith("auto-")
else name else name
) )
root = f"[bright_magenta]{root}[/bright_magenta]"
ts = f"[bright_cyan]{created}[/bright_cyan]"
table.add_row( table.add_row(
state_name, state_name,
root, f"[bright_magenta]{root}[/bright_magenta]",
ts, f"[bright_cyan]{created}[/bright_cyan]",
) )
console.print(table) console.print(table)
return
if args.delete: if args.delete:
if delete_state(conn, args.delete) == 0: if delete_state(conn, args.delete) == 0:
@@ -369,49 +463,43 @@ def main() -> None:
root = normalize_root(args.save) root = normalize_root(args.save)
try: with conn:
with conn: # start transaction if state_exists(conn, args.name):
if state_exists(conn, args.name): if not args.overwrite:
if not args.overwrite: raise SystemExit(
raise SystemExit( f"State '{args.name}' already exists (use --overwrite)"
f"State '{args.name}' already exists (use --overwrite)" )
) delete_state(conn, args.name, commit=False)
# if the new save fails, this delete_state step will also roll back
delete_state(conn, args.name, commit=False)
state_id = create_state( state_id = create_state(
conn, args.name, str(root), os.getuid(), commit=False conn, args.name, str(root), os.getuid(), commit=False
) )
# Abort early if root-owned files exist and user is not root. for entry in scan_tree(root, excludes=args.exclude):
# This prevents creating snapshots that cannot be meaningfully restored. if entry.uid == 0 and not _is_root():
for entry in scan_tree(root, excludes=args.exclude): raise SystemExit(
if entry.uid == 0 and not _is_root(): "This path contains root-owned files.\n"
raise SystemExit( "Saving this state requires sudo."
"This path contains root-owned files.\n"
"Saving this state requires sudo."
)
conn.execute(
"""
INSERT INTO entries (state_id, path, type, mode, uid, gid)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
state_id,
entry.path,
entry.type,
entry.mode,
entry.uid,
entry.gid,
),
) )
console.print(f"Saved state '{args.name}' for {root}") conn.execute(
return """
INSERT INTO entries
(state_id, path, type, mode, uid, gid)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
state_id,
entry.path,
entry.type,
entry.mode,
entry.uid,
entry.gid,
),
)
except SystemExit: console.print(f"Saved state '{args.name}' for {root}")
raise return
if args.restore: if args.restore:
if not args.state: if not args.state:
@@ -424,7 +512,6 @@ def main() -> None:
snapshot_root = Path(state.root_path) snapshot_root = Path(state.root_path)
target_root = normalize_root(args.root) if args.root else snapshot_root target_root = normalize_root(args.root) if args.root else snapshot_root
# Default restore behavior is OWNER + MODE unless narrowed explicitly.
restore_permissions = args.permissions or ( restore_permissions = args.permissions or (
not args.permissions and not args.owner not args.permissions and not args.owner
) )
@@ -447,7 +534,6 @@ def main() -> None:
needs_root = False needs_root = False
current_uid = os.geteuid() current_uid = os.geteuid()
# Build a per-path view of owner/mode changes and detect privilege needs.
for ch in changes: for ch in changes:
if ch.kind not in ("owner", "mode"): if ch.kind not in ("owner", "mode"):
continue continue
@@ -467,8 +553,11 @@ def main() -> None:
] = f"{_format_owner(bu, bg)}{_format_owner(au, ag)}" ] = f"{_format_owner(bu, bg)}{_format_owner(au, ag)}"
counts["owner"] += 1 counts["owner"] += 1
if ch.path.stat().st_uid != current_uid: try:
needs_root = True if ch.path.stat().st_uid != current_uid:
needs_root = True
except FileNotFoundError:
pass
elif ch.kind == "mode" and restore_permissions: elif ch.kind == "mode" and restore_permissions:
b, a = ch.detail.split(" -> ") b, a = ch.detail.split(" -> ")
@@ -497,9 +586,7 @@ def main() -> None:
for path in sorted(per_path): for path in sorted(per_path):
row = per_path[path] row = per_path[path]
table.add_row( table.add_row(
str(path), str(path), row.get("owner", ""), row.get("mode", "")
row.get("owner", ""),
row.get("mode", ""),
) )
console.print(table) console.print(table)
@@ -520,21 +607,9 @@ def main() -> None:
"Please re-run the command with sudo." "Please re-run the command with sudo."
) )
if not args.yes: _confirm_or_abort(
if not sys.stdin.isatty(): yes=args.yes, prompt="Do you want to restore this state?"
raise SystemExit( )
"Refusing to apply changes without confirmation (no TTY).\n"
"Use --yes to force."
)
answer = (
input("\nDo you want to restore this state? (y/N) ")
.strip()
.lower()
)
if answer not in ("y", "yes"):
console.print("\nAborted.")
return
apply_restore( apply_restore(
root=target_root, root=target_root,
+27 -2
View File
@@ -4,6 +4,7 @@ import sqlite3
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from platformdirs import user_data_dir from platformdirs import user_data_dir
APP_NAME = "chguard" APP_NAME = "chguard"
@@ -66,7 +67,8 @@ def create_state(
commit: bool = True, commit: bool = True,
) -> int: ) -> int:
cur = conn.execute( cur = conn.execute(
"INSERT INTO states (name, root_path, created_at, created_by_uid) VALUES (?, ?, ?, ?)", "INSERT INTO states (name, root_path, created_at, created_by_uid) "
"VALUES (?, ?, ?, ?)",
(name, root_path, utc_now_iso(), created_by_uid), (name, root_path, utc_now_iso(), created_by_uid),
) )
if commit: if commit:
@@ -83,6 +85,28 @@ def delete_state(
return cur.rowcount return cur.rowcount
def prune_states_before(
conn: sqlite3.Connection,
cutoff_iso: str,
*,
commit: bool = True,
) -> int:
cur = conn.execute(
"DELETE FROM states WHERE created_at < ?",
(cutoff_iso,),
)
if commit:
conn.commit()
return cur.rowcount
def prune_all_states(conn: sqlite3.Connection, *, commit: bool = True) -> int:
cur = conn.execute("DELETE FROM states")
if commit:
conn.commit()
return cur.rowcount
@dataclass(frozen=True) @dataclass(frozen=True)
class State: class State:
id: int id: int
@@ -94,7 +118,8 @@ class State:
def get_state(conn: sqlite3.Connection, name: str) -> State | None: def get_state(conn: sqlite3.Connection, name: str) -> State | None:
cur = conn.execute( cur = conn.execute(
"SELECT id, name, root_path, created_at, created_by_uid FROM states WHERE name = ?", "SELECT id, name, root_path, created_at, created_by_uid "
"FROM states WHERE name = ?",
(name,), (name,),
) )
row = cur.fetchone() row = cur.fetchone()
+1 -1
View File
@@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "chguard" name = "chguard"
version = "0.3.4" version = "0.4.0"
description = "Safety-first tool to snapshot and restore filesystem ownership and permissions." description = "Safety-first tool to snapshot and restore filesystem ownership and permissions."
authors = ["Marco D'Aleo <marco@marcodaleo.com>"] authors = ["Marco D'Aleo <marco@marcodaleo.com>"]
license = "GPL-3.0-or-later" license = "GPL-3.0-or-later"