#!/usr/bin/env python3 """ Prune communishift_projects under ansible/inventory/group_vars/all: - Keeps entries where do_not_delete is truthy (true / yes / 1 / on, case-insensitive strings). - Removes every other child mapping under communishift_projects. Only the communishift_projects mapping (that key plus its body up to—but not including—the next line at indentation 0) is rewritten. All other bytes in the file are preserved. Default: dry-run (no writes). Print summary; use --diff for a unified file diff. Dependencies: PyYAML (python3-pyyaml). Examples: ansible/scripts/communishift_prune_projects ansible/scripts/communishift_prune_projects --diff ansible/scripts/communishift_prune_projects --write """ from __future__ import annotations import argparse import difflib import os import re import sys from pathlib import Path from tempfile import NamedTemporaryFile from typing import Any, Dict, List, Mapping, Sequence, Tuple try: import yaml except ImportError as exc: sys.stderr.write( "This script requires PyYAML (install python3-pyyaml on this host).\n" ) raise SystemExit(1) from exc _STANZA_START = re.compile(r"^([A-Za-z0-9_.-]+):\s*(?:\#.*)?\s*$") _CHILD_INDENT = 2 def ansible_truthy(value: Any) -> bool: if isinstance(value, bool): return value if isinstance(value, str): return value.strip().lower() in {"true", "yes", "1", "on"} return False def _expanded_indent(raw: str) -> int: return len(raw.expandtabs()) - len(raw.expandtabs().lstrip(" ")) def locate_communishift_block(lines: Sequence[str]) -> Tuple[int, int]: start = None for i, line in enumerate(lines): if line.lstrip("\ufeff").startswith("communishift_projects:"): start = i break if start is None: raise ValueError('No root key line starting with "communishift_projects:"') root_indent = _expanded_indent(lines[start]) j = start + 1 n_lines = len(lines) while j < n_lines: line = lines[j] if line.strip("\r\n") == "": j += 1 continue if _expanded_indent(line) <= root_indent: break j += 1 return start, j def parse_stanza_blocks( section_body: Sequence[str], ) -> Tuple[List[str], Dict[str, List[str]]]: """ Lines AFTER the communishift_projects: line. Preamble = comments/blanks/stanzas whose first line is not a two-space child key line. """ preamble: List[str] = [] blocks: Dict[str, List[str]] = {} idx = 0 maximum = len(section_body) while idx < maximum: line = section_body[idx] stripped_for_match = line.rstrip("\r\n") indent = _expanded_indent(line) matched = indent == _CHILD_INDENT and bool( _STANZA_START.match(stripped_for_match.lstrip(" ")) ) if matched: key_match = _STANZA_START.match(stripped_for_match.lstrip(" ")) assert key_match is not None key = key_match.group(1) if key in blocks: raise ValueError("Duplicate Communishift stanza key {0!r}".format(key)) stanza_lines: List[str] = [line] idx += 1 while idx < maximum: nxt = section_body[idx] n_indent = _expanded_indent(nxt) nn = _STANZA_START.match(nxt.rstrip("\r\n").lstrip(" ")) if n_indent == _CHILD_INDENT and nn: break stanza_lines.append(nxt) idx += 1 blocks[key] = list(stanza_lines) continue preamble.append(line) idx += 1 return preamble, blocks def stanza_keeps(meta: Any) -> bool: return isinstance(meta, Mapping) and ansible_truthy(meta.get("do_not_delete")) def load_projects_map(path: Path) -> Mapping[str, Any]: raw = yaml.safe_load(path.read_text(encoding="utf-8")) if not isinstance(raw, dict): raise ValueError("YAML root must be a mapping") proj = raw.get("communishift_projects") if proj is None: raise ValueError("communishift_projects missing from YAML") if not isinstance(proj, dict): raise ValueError("communishift_projects must be a mapping") return proj def splice_communishift_projects( text: str, proj_data: Mapping[str, Any], ) -> Tuple[str, Dict[str, Any]]: lines = text.splitlines(keepends=True) start, end = locate_communishift_block(lines) header_line = lines[start] body = lines[start + 1 : end] preamble, blocks = parse_stanza_blocks(body) removed: List[str] = [] skipped_non_mapping: List[str] = [] kept_names: List[str] = [] for key in sorted(blocks.keys()): meta = proj_data.get(key) if not stanza_keeps(meta): removed.append(key) if not isinstance(meta, dict): skipped_non_mapping.append(key) continue kept_names.append(str(meta.get("name", key))) sorted_kept = sorted(k for k in blocks if stanza_keeps(proj_data.get(k))) new_section: List[str] = [header_line] new_section.extend(preamble) for k in sorted_kept: new_section.extend(blocks[k]) after = "".join(lines[:start] + new_section + lines[end:]) stats = { "kept_count": len(sorted_kept), "kept_names": sorted(kept_names), "removed": removed, "skipped_non_mapping": skipped_non_mapping, } return after, stats def main(argv: List[str]) -> int: ansible_root = Path(__file__).resolve().parent.parent parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--group-vars", type=Path, default=ansible_root / "inventory" / "group_vars" / "all", help="YAML file defining communishift_projects", ) parser.add_argument( "--diff", action="store_true", help="Print unified diff of the whole file", ) parser.add_argument( "--write", action="store_true", help="Write the updated file", ) args = parser.parse_args(argv) group_vars_path = Path(args.group_vars) original_text = group_vars_path.read_text(encoding="utf-8") try: proj_data = load_projects_map(group_vars_path) except ValueError as err: sys.stderr.write("{0}: {1}\n".format(group_vars_path, err)) return 1 try: after_text, stats = splice_communishift_projects(original_text, proj_data) except ValueError as err: sys.stderr.write("{0}: {1}\n".format(group_vars_path, err)) return 1 print("File: {0}".format(group_vars_path)) extra = "" if stats["skipped_non_mapping"]: extra = " (+ {0} non-mapping skipped)".format(len(stats["skipped_non_mapping"])) print( "Keeping {0} project stanza(s){1}".format(stats["kept_count"], extra), ) if stats["kept_names"]: print(" " + ", ".join(sorted(stats["kept_names"]))) print("Removing {0} stanza key(s)".format(len(stats["removed"]))) for name in sorted(stats["removed"], key=lambda s: str(s).lower()): print(" - {0}".format(name)) if args.diff: sys.stdout.writelines( difflib.unified_diff( original_text.splitlines(True), after_text.splitlines(True), fromfile="{0} (before)".format(group_vars_path), tofile="{0} (after)".format(group_vars_path), ) ) if after_text == original_text: print("\nNo changes needed.") return 0 if not args.write: print("\nDry-run only — no file modified. Pass --write to apply.") return 0 fh = NamedTemporaryFile( mode="w", encoding="utf-8", suffix=".tmp", prefix="communishift-prune.", dir=str(group_vars_path.parent), delete=False, ) tmp_path = fh.name try: fh.write(after_text) fh.flush() os.fsync(fh.fileno()) fh.close() os.replace(tmp_path, group_vars_path) except OSError: fh.close() try: os.unlink(tmp_path) except OSError: pass raise print("\nUpdated {0}".format(group_vars_path)) return 0 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))