Add new flags and their respective functionalities: --list, --restore-last, --prune-backups. Change Python dependecies version. Remove Black target-version from pyproject.toml. Mirro version bump.
This commit is contained in:
@@ -3,6 +3,7 @@ import argparse
|
||||
import tempfile
|
||||
import subprocess
|
||||
import os
|
||||
import textwrap
|
||||
from pathlib import Path
|
||||
import time
|
||||
|
||||
@@ -66,9 +67,207 @@ def main():
|
||||
version=f"mirro {get_version()}",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--list",
|
||||
action="store_true",
|
||||
help="List all backups in the backup directory and exit",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--restore-last",
|
||||
metavar="FILE",
|
||||
type=str,
|
||||
help="Restore the last backup of the given file and exit",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--prune-backups",
|
||||
nargs="?",
|
||||
const="default",
|
||||
help="Prune backups older than MIRRO_BACKUPS_LIFE days, or 'all' to delete all backups",
|
||||
)
|
||||
|
||||
# Parse only options. Leave everything else untouched.
|
||||
args, positional = parser.parse_known_args()
|
||||
|
||||
if args.list:
|
||||
import pwd, grp
|
||||
|
||||
backup_dir = Path(args.backup_dir).expanduser().resolve()
|
||||
if not backup_dir.exists():
|
||||
print("No backups found.")
|
||||
return
|
||||
|
||||
backups = sorted(
|
||||
backup_dir.iterdir(), key=os.path.getmtime, reverse=True
|
||||
)
|
||||
if not backups:
|
||||
print("No backups found.")
|
||||
return
|
||||
|
||||
def perms(mode):
|
||||
is_file = "-"
|
||||
perms = ""
|
||||
flags = [
|
||||
(mode & 0o400, "r"),
|
||||
(mode & 0o200, "w"),
|
||||
(mode & 0o100, "x"),
|
||||
(mode & 0o040, "r"),
|
||||
(mode & 0o020, "w"),
|
||||
(mode & 0o010, "x"),
|
||||
(mode & 0o004, "r"),
|
||||
(mode & 0o002, "w"),
|
||||
(mode & 0o001, "x"),
|
||||
]
|
||||
for bit, char in flags:
|
||||
perms += char if bit else "-"
|
||||
return is_file + perms
|
||||
|
||||
for b in backups:
|
||||
stat = b.stat()
|
||||
mode = perms(stat.st_mode)
|
||||
|
||||
try:
|
||||
owner = pwd.getpwuid(stat.st_uid).pw_name
|
||||
except KeyError:
|
||||
owner = str(stat.st_uid)
|
||||
|
||||
try:
|
||||
group = grp.getgrgid(stat.st_gid).gr_name
|
||||
except KeyError:
|
||||
group = str(stat.st_gid)
|
||||
|
||||
owner_group = f"{owner} {group}"
|
||||
|
||||
mtime = time.strftime(
|
||||
"%Y-%m-%d %H:%M:%S", time.gmtime(stat.st_mtime)
|
||||
)
|
||||
|
||||
print(f"{mode:11} {owner_group:20} {mtime} {b.name}")
|
||||
|
||||
return
|
||||
|
||||
if args.restore_last:
|
||||
backup_dir = Path(args.backup_dir).expanduser().resolve()
|
||||
target = Path(args.restore_last).expanduser().resolve()
|
||||
|
||||
if not backup_dir.exists():
|
||||
print("No backup directory found.")
|
||||
return 1
|
||||
|
||||
# backup filenames look like: <name>.orig.<timestamp>
|
||||
prefix = f"{target.name}.orig."
|
||||
|
||||
backups = [
|
||||
b for b in backup_dir.iterdir() if b.name.startswith(prefix)
|
||||
]
|
||||
|
||||
if not backups:
|
||||
print(f"No backups found for {target}")
|
||||
return 1
|
||||
|
||||
# newest backup
|
||||
last = max(backups, key=os.path.getmtime)
|
||||
|
||||
# read and strip header
|
||||
raw = last.read_text(encoding="utf-8", errors="replace")
|
||||
restored = []
|
||||
skipping = True
|
||||
for line in raw.splitlines(keepends=True):
|
||||
# header ends at first blank line after the dashed line block
|
||||
if skipping:
|
||||
if line.strip() == "" and restored == []:
|
||||
# allow only after header
|
||||
continue
|
||||
if line.startswith("#") or line.strip() == "":
|
||||
continue
|
||||
skipping = False
|
||||
restored.append(line)
|
||||
|
||||
# if header wasn't present, restored = raw
|
||||
if not restored:
|
||||
restored_text = raw
|
||||
else:
|
||||
restored_text = "".join(restored)
|
||||
|
||||
# write the restored file back
|
||||
target.write_text(restored_text, encoding="utf-8")
|
||||
|
||||
print(f"Restored {target} from backup {last.name}")
|
||||
return
|
||||
|
||||
if args.prune_backups is not None:
|
||||
mode = args.prune_backups
|
||||
|
||||
# ALL mode
|
||||
if mode == "all":
|
||||
prune_days = None
|
||||
|
||||
# default
|
||||
elif mode == "default":
|
||||
raw_env = os.environ.get("MIRRO_BACKUPS_LIFE", "30")
|
||||
try:
|
||||
prune_days = int(raw_env)
|
||||
if prune_days < 1:
|
||||
raise ValueError
|
||||
except ValueError:
|
||||
print(
|
||||
f"Invalid MIRRO_BACKUPS_LIFE value: {raw_env}. "
|
||||
"It must be an integer >= 1. Falling back to 30."
|
||||
)
|
||||
prune_days = 30
|
||||
|
||||
# numeric mode e.g. --prune-backups=7
|
||||
else:
|
||||
try:
|
||||
prune_days = int(mode)
|
||||
if prune_days < 1:
|
||||
raise ValueError
|
||||
except ValueError:
|
||||
msg = f"""
|
||||
Invalid value for --prune-backups: {mode}
|
||||
|
||||
--prune-backups use MIRRO_BACKUPS_LIFE (default: 30 days)
|
||||
--prune-backups=N expire backups older than N days (N >= 1)
|
||||
--prune-backups=all remove ALL backups
|
||||
"""
|
||||
print(textwrap.dedent(msg))
|
||||
return 1
|
||||
|
||||
backup_dir = Path(args.backup_dir).expanduser().resolve()
|
||||
|
||||
if not backup_dir.exists():
|
||||
print("No backup directory found.")
|
||||
return 0
|
||||
|
||||
# prune EVERYTHING
|
||||
if prune_days is None:
|
||||
removed = []
|
||||
for b in backup_dir.iterdir():
|
||||
if b.is_file():
|
||||
removed.append(b)
|
||||
b.unlink()
|
||||
print(f"Removed ALL backups ({len(removed)} file(s)).")
|
||||
return 0
|
||||
|
||||
# prune by age
|
||||
cutoff = time.time() - (prune_days * 86400)
|
||||
removed = []
|
||||
|
||||
for b in backup_dir.iterdir():
|
||||
if b.is_file() and b.stat().st_mtime < cutoff:
|
||||
removed.append(b)
|
||||
b.unlink()
|
||||
|
||||
if removed:
|
||||
print(
|
||||
f"Removed {len(removed)} backup(s) older than {prune_days} days."
|
||||
)
|
||||
else:
|
||||
print(f"No backups older than {prune_days} days.")
|
||||
|
||||
return 0
|
||||
|
||||
# Flexible positional parsing
|
||||
if not positional:
|
||||
parser.error("the following arguments are required: file")
|
||||
|
||||
Reference in New Issue
Block a user