#! /usr/bin/python3 # Create/view log files produced by ansible-plybook. # For examples, see the help command. import os import sys import argparse import gzip import fnmatch import json import shutil import subprocess import time import glob # Use utf8 prefixes in diff, these need to be a "normal" width 1 character conf_utf8 = True _conf_utf8_warn = '⚠' # Rebooted _conf_utf8_info = '⚐' # Rebooted and updated _conf_utf8_okay = '➚' # Arrow seperator. Doesn't need to be a single character conf_host_arrow_asci = '->' conf_host_arrow_utf8 = '→' # Use ansi codes. None means auto, aka. look for a tty on stdout. conf_ansi_terminal = None conf_term_cmd = 'dim' conf_term_user = '' conf_term_warn = '' conf_term_info = '' conf_term_okay = '' conf_term_highlight = 'bold,underline' conf_term_keyword = 'underline' conf_term_time = 'underline' conf_term_title = 'italic' # Use _ instead of , for number seperator. conf_num_sep_ = False # Make it easier to see different date's conf_ui_date = True # Do we use a shorter duration by default (drop minutes/seconds) conf_short_duration = True # Do we want a small osinfo in diff/list/etc. conf_small_osinfo = True # Try to print OS/ver even nicer (when small) ... but includes spaces. conf_align_osinfo_small = True # How many playbook runs to look at by default, _per_ playbook... conf_play_max = 16 # How many hosts to show in list/etc. conf_show_hosts_max = 4 # How old playbook runs to look at by default... conf_play_duration = "2w" # Hosts that we'll show info. for, by default. info/host cmds. conf_important_hosts = ["batcave*", "bastion01*", "noc*"] # Remove suffix noise in names. conf_suffix_dns_replace = { '.fedoraproject.org' : '..org', '.fedorainfracloud.org' : '..org', } _suffix_dns_replace = {} # Some of our playbook names are big, so make them nicer. conf_play_remap = { 'generate-updates-uptimes-per-host-file' : 'Updates+uptimes', 'communishift_send_email_notifications' : 'CS_emails', } # Dir. where we put, and look for, the files... conf_path = "/var/log/ansible/" conf_user_conf_path = "~/.config/ansilog-playbook/config.conf" # Now we can change the above conf_ variables, via. a conf file. def _user_conf(): ucp = os.path.expanduser(conf_user_conf_path) if not os.path.exists(ucp): return for line in open(ucp): _user_conf_line(line) def _user_conf_line(line): line = line.lstrip() if not line: return if line[0] == '#': return op = "+=" x = line.split(op, 2) if len(x) != 2: op = ":=" x = line.split(op, 2) if len(x) != 2: op = "=" x = line.split(op, 2) if len(x) != 2: print(" Error: Configuration: ", line, file=sys.stderr) return key,val = x key = 'conf_' + key.strip().lower() if key not in globals(): print(" Warn: Configuration not found: ", key, file=sys.stderr) return if False: pass elif op == '=': val = val.strip() if False: pass elif val.lower() in ("false", "no"): val = False elif val.lower() in ("true", "yes"): val = True elif val == '[]': val = [] elif val == '{}': val = {} elif val.isdigit(): val = int(val) if type(globals()[key]) != type(val): print(" Error: Configuration ", key,'bad:',val, file=sys.stderr) return globals()[key] = val elif op == '+=': val = val.strip() if type(globals()[key]) != type([]): print(" Error: Configuration ", key, 'not []', file=sys.stderr) return globals()[key].append(val) elif op == ':=': if type(globals()[key]) != type({}): print(" Error: Configuration ", key, 'not {}', file=sys.stderr) return if '=' not in val: print(" Error: Configuration bad :=", file=sys.stderr) return dkey, dval = val.split('=', 1) globals()[key][dkey.strip()] = dval.strip() else: print(" Error: Configuration ", key,'bad op', file=sys.stderr) return # This is kind of fast and kind of small. No re, and no allocation. # Sort as: 0, 00, 000, 01, 011, 1, 11, a01, a1, z01, z1, etc. def natcmp(x, y): """ Natural sort string comparison. https://en.wikipedia.org/wiki/Natural_sort_order Aka. vercmp() """ def _cmp_xy_mix(): # One is a digit, the other isn't. if inum is not None: # 0/1 vs. x/. return 1 if x[i] > y[i]: return 1 else: return -1 inum = None check_zeros = False for i in range(min(len(x), len(y))): if x[i] in "0123456789" and y[i] not in "0123456789": return _cmp_xy_mix() if x[i] not in "0123456789" and y[i] in "0123456789": return _cmp_xy_mix() if x[i] in "0123456789": # Both are digits... if inum is None: check_zeros = True inum = 0 if check_zeros: # Leading zeros... (0 < 00 < 01 < 011 < 1 < 11) if x[i] == '0' and y[i] == '0': continue elif x[i] == '0': return -1 elif y[i] == '0': return 1 else: check_zeros = False # If we are already in a number, we only care about the length or # the first digit that is different. if inum != 0: continue if x[i] == y[i]: continue # Non-zero first digit, Eg. 7 < 9 inum = int(x[i]) - int(y[i]) continue # Both are not digits... if inum is not None and inum != 0: return inum inum = None # Can be equal if x[i] > y[i]: return 1 if x[i] < y[i]: return -1 if len(x) > len(y): if inum is not None and inum != 0 and x[i+1] not in "0123456789": return inum return 1 if len(x) < len(y): if inum is not None and inum != 0 and y[i+1] not in "0123456789": return inum return -1 if inum is None: # Same length, not in a num. assert x == y return 0 # So the strings are equal. return inum class NatCmp(): __slots__ = ['s',] def __init__(self, s): self.s = s def __str__(self): return self.s def __eq__(self, other): return self.s == other.s def __gt__(self, other): ret = natcmp(self.s, other.s) if ret > 0: return True return False # Given a list of strings, sort them using natcmp() def nat_sorted(xs): for ret in sorted(NatCmp(x) for x in xs): yield ret.s def _fnmatchi(path, pat): """ Simple way to always use case insensitive filename matching. """ return fnmatch.fnmatch(path.lower(), pat.lower()) # Have nice "plain" numbers... def _ui_int(num): if conf_num_sep_: return "{:_}".format(int(num)) return "{:,}".format(int(num)) # See: https://en.wikipedia.org/wiki/ANSI_escape_code#Select_Graphic_Rendition_parameters # We merge the colours for 16 values so fg0-fgf bg0-bgf ansi = {'bold' : '\033[1m', 'dim' : '\033[2m', 'italic' : '\033[3m', 'underline' : '\033[4m', 'blink' : '\033[5m', 'reverse' :'\033[7m'} ansi_stop = '\033[0m' for i in range(7): ansi['fg:' + str(i)] = '\033[3' + str(i) + 'm' ansi['bg:' + str(i)] = '\033[4' + str(i) + 'm' for i, j in ((0, '8'), (1, '9'), (2, 'a'), (3, 'b'), (4, 'c'), (5, 'd'), (6, 'e'), (7, 'f')): ansi['fg:' + j] = '\033[9' + str(i) + 'm' ansi['bg:' + j] = '\033[10' + str(i) + 'm' def _ui_t_align(text, align=None, olen=None): if align is None or align == 0: return text if olen is None: olen = len(text) if abs(align) > olen: # "%*s", align, text extra = abs(align) - olen if align > 0: text = " " * extra + text else: text = text + " " * extra return text def _ui_t_ansi(text, codes, align=0): olen = len(text) text = _ui_t_align(text, align) if not conf_ansi_terminal or not codes or olen == 0: return text esc = '' for c in codes.split(','): if c == 'reset': esc = '' if c not in ansi: # Ignore bad codes continue esc += ansi[c] if not esc: return text # Deal with leading/trailing spaces, mainly for underline. olen = len(text) text = text.lstrip() prefix = olen - len(text) text = text.rstrip() suffix = (olen - prefix) - len(text) return "%*s%s%s%s%*s" % (prefix, '', esc, text, ansi_stop, suffix, '') def _ui_t_cmd(text, align=0): return _ui_t_ansi(text, conf_term_cmd, align=align) def _ui_t_high(text, align=0): return _ui_t_ansi(text, conf_term_highlight, align=align) def _ui_t_key(text, align=0): return _ui_t_ansi(text, conf_term_keyword, align=align) def _ui_t_time(text, align=0): return _ui_t_ansi(text, conf_term_time, align=align) def _ui_t_title(text, align=0): return _ui_t_ansi(text, conf_term_title, align=align) def _ui_t_user(text, align=0): return _ui_t_ansi(text, conf_term_user, align=align) def _ui_t_warn(text, align=0): return _ui_t_ansi(text, conf_term_warn, align=align) def _ui_t_info(text, align=0): return _ui_t_ansi(text, conf_term_info, align=align) def _ui_t_okay(text, align=0): return _ui_t_ansi(text, conf_term_okay, align=align) # Make it easier to spot date differences def _ui_date(d1, align=None, prev=None): if not conf_ui_date: return _ui_t_align(d1.date, align) if prev is not None and d1.date == prev: return _ui_t_align(" \" ", align) # YYYY-MM-DD HH:MM # 1234567890 23456 if prev is None: prev = _today if conf_ansi_terminal and d1.date != prev: for i in (15, 14, 12, 11, 9, 8, 7, 5): if d1.date[:i] == prev[:i]: ndate = d1.date[:i] + _ui_t_high(d1.date[i:]) return _ui_t_align(ndate, align, len(d1.date)) return _ui_t_align(d1.date, align) def _pre_cmd__setup(): global conf_path global plays global _today global _yesterday global conf_ansi_terminal conf_path = os.path.expanduser(conf_path) if conf_path[0] != '/': print(" Warning: Conf path isn't absolute", file=sys.stderr) plays = [] # Eg. /2026/02/13/23.08.09/playbook-4067657.info pat = "/????/??/??/??.??.??/playbook-*.info" cpr = parse_duration(conf_play_duration) # NOTE: Try to be a bit clever and only look at this years logs, if we can. # Could expand this. patyr = time.strftime("%Y", time.gmtime(time.time())) if patyr == time.strftime("%Y", time.gmtime(time.time() - cpr)): pat = "/" + patyr + "/??/??/??.??.??/playbook-*.info" for pbdir in glob.glob(conf_path + "/*"): num = 0 for pbi in reversed(glob.glob(pbdir + pat)): pb = Playbook(pbi) if not pb.hosts: continue if cpr > 0 and int(time.time() - pb.beg) > cpr: break plays.append(pb) num += 1 if conf_play_max > 0 and num >= conf_play_max: break # plays = list(sorted(plays)) plays = list(reversed(sorted(plays, key=lambda x: x.beg))) _suffix_dns_replace.clear() for x in conf_suffix_dns_replace: _suffix_dns_replace[x] = False if conf_ansi_terminal is None: conf_ansi_terminal = sys.stdout.isatty() tm_today = int(time.time()) _today = time.strftime("%Y-%m-%d", time.gmtime(tm_today)) tm_yesterday = int(time.time()) - (60*60*24) _yesterday = time.strftime("%Y-%m-%d", time.gmtime(tm_yesterday)) def _pre_cmd__verbose(args): if args.verbose <= 0: return if args.verbose >= 3: globals()['conf_ui_date'] = False if args.verbose >= 2: globals()['conf_small_osinfo'] = False globals()['conf_suffix_dns_replace'] = {} globals()['conf_hist_show'] = 0 globals()['conf_host_end_total_hostnum'] = 0 globals()['conf_host_skip_eq'] = False globals()['conf_info_machine_ids'] = True globals()['conf_short_duration'] = False globals()['conf_stat_4_hosts'] *= (args.verbose * 2) def _wild_eq(s1, s2): """ Compare two strings, but allow '?' to mean anything. """ if s1 == '?' or s2 == '?': return True return s1 == s2 class Task(): """ Class for holding the Task data. """ __slots__ = ['datetime', 'json', 'name', 'num', 'status'] def __init__ (self, line): data = line.split('\t', 4) self.datetime = data[0] self.json = data[4] self.name = data[3] self.num = data[1] self.status = data[2] def __str__(self): return self.name def __eq__(self, other): for key in self.__slots__: if getattr(self, key) != getattr(other, key): return False return True def __gt__(self, other): if self.num > other.num: return True if self.num != other.num: return False if self.name > other.name: return True return False def statuses_hosts(hosts): ret = set() for host in hosts: for task in host.tasks: ret.add(task.status.lower()) return list(sorted(ret)) class Host(): """ Class for holding the Host data from a line in the files. """ __slots__ = ['name', '_logfn', '_tasks', "t_ok", "t_failures", "t_unreachable", "t_changed", "t_skipped", "t_rescued", "t_ignored"] def __init__ (self, name, logfn, playbook_stats): self.name = name self._logfn = logfn self._tasks = None for key in self.__slots__: if key == 'name' or key[0] == '_': continue setattr(self, key, playbook_stats[key[2:]]) def __str__(self): return self.name def __eq__(self, other): for key in self.__slots__: if getattr(self, key) != getattr(other, key): return False return True def __gt__(self, other): ret = natcmp(self.name, other.name) if ret > 0: return True if ret < 0: return False for key in self.__slots__: if key == 'name' or key[0] == '_': continue if getattr(self, key) > getattr(other, key): return True if getattr(self, key) != getattr(other, key): return False return False # Pretend to be a dict... def __getitem__(self, key): if key not in self.__slots__: raise KeyError() return getattr(self, key) @property def ofcs(self): return "OK=%s Fail=%s Changed=%s" % (_ui_int(self.t_ok), _ui_int(self.t_failures), _ui_int(self.t_changed)) @property def statuses(self): return statuses_hosts([self]) @property def changed(self): fail = ' ' if self.t_failures > 0: fail = '!' return "%s%s" % (fail, _ui_int(self.t_changed)) @property def tasks(self): tasks = [] for line in gzip.open(self._logfn, 'rt'): tasks.append(Task(line)) return tasks def _json_load_multi(fo, multi=0): """ Read multiple json objects from a file. """ content = fo.read() decoder = json.JSONDecoder() pos = 0 results = [] while pos < len(content) and (multi <= 0 or len(results) < multi): # Skip leading whitespace/newlines if content[pos].isspace(): pos += 1 continue # raw_decode returns the parsed object and the index where it ended obj, pos = decoder.raw_decode(content, pos) results.append(obj) return results def ofcs_hosts(hosts): o, f, c = 0, 0, 0 for host in hosts: o += host.t_ok f += host.t_failures c += host.t_changed return "OK=%s Fail=%s Changed=%s" % (_ui_int(o), _ui_int(f), _ui_int(c)) class Playbook(): """ Class for holding the Playbook data from the log files. """ __slots__ = ['play', 'title', 'user', 'ro', 'beg', 'end', 'hosts'] def __init__ (self, path): # print("JDBG:", "loading:", path) jdata = _json_load_multi(open(path)) hdr = jdata[0] self.play = hdr['playbook'] self.user = hdr['userid'] self.beg = hdr['playbook_start'] inv = hdr['inventory'][0] if inv.endswith('/inventory'): inv = inv[:-len('/inventory')] if self.play.startswith(inv): self.play = self.play[len(inv):] # See if the playbook finished: self.end = None end = jdata[-1] if 'playbook_end' in end: self.end = end['playbook_end'] # See if we have stats. self.hosts = [] stats = jdata[-2] if 'stats' not in stats: return hosts = stats['stats'] for host in hosts: logfn = os.path.dirname(path) + "/" + host + ".log.gz" self.hosts.append(Host(host, logfn, hosts[host])) self.hosts = list(sorted(self.hosts)) if not self.hosts: return play = jdata[1] self.title = play['play'] self.ro = play.get('check', False) or play.get('diff', False) xtra_plays = 0 while xtra_plays < (len(jdata)-4): # FIXME: WTF to do with title? xtra_plays += 1 play = jdata[1+xtra_plays] if self.ro: self.ro = play['check'] or play['diff'] def __str__(self): return self.name def __eq__(self, other): if self.play != other.play: return False if self.beg != other.beg: return False return True def __gt__(self, other): ret = natcmp(self.play, other.play) if ret > 0: return True if ret < 0: return False if self.beg > other.beg: return True return False # Pretend to be a dict... def __getitem__(self, key): if key not in self.__slots__: raise KeyError() return getattr(self, key) @property def name(self): return os.path.basename(self.play) @property def date(self): return time.strftime("%Y-%m-%d %H:%M", time.gmtime(self.beg)) @property def date_end(self): if self.end is None: return "" return time.strftime("%Y-%m-%d %H:%M", time.gmtime(self.end)) @property def ofcs(self): return ofcs_hosts(self.hosts) @property def statuses(self): return statuses_hosts(self.hosts) @property def changed(self): f, c = 0, 0 for host in self.hosts: f += host.t_failures c += host.t_changed fail = ' ' if f > 0: fail = '!' return "%s%s" % (fail, _ui_int(c)) _tm_d = {'d' : 60*60*24, 'h' : 60*60, 'm' : 60, 's' : 1, 'w' : 60*60*24*7, 'q' : 60*60*24*7*13} def parse_duration(seconds): if seconds is None: return None if seconds.isdigit(): return int(seconds) ret = 0 for mark in ('w', 'd', 'h', 'm', 's'): pos = seconds.find(mark) if pos == -1: continue val = seconds[:pos] seconds = seconds[pos+1:] if not val.isdigit(): # dbg("!isdigit", val) return None ret += _tm_d[mark]*int(val) if seconds.isdigit(): ret += int(seconds) elif seconds != '': # dbg("!empty", seconds) return None return ret def _add_dur(dur, ret, nummod, suffix, static=False): mod = dur % nummod dur = dur // nummod if mod > 0 or (static and dur > 0): ret.append(suffix) if static and dur > 0: ret.append("%0*d" % (len(str(nummod)), mod)) else: ret.append(str(mod)) return dur def format_duration(seconds, short=False, static=False): if seconds is None: seconds = 0 dur = int(seconds) ret = [] dur = _add_dur(dur, ret, 60, "s", static=static) dur = _add_dur(dur, ret, 60, "m", static=static) if short: if dur == 0 and not static: return '<1h' if dur == 0: return '<01h' ret = [] dur = _add_dur(dur, ret, 24, "h", static=static) dur = _add_dur(dur, ret, 7, "d", static=static) if dur > 0: ret.append("w") ret.append(str(dur)) return "".join(reversed(ret)) # Duration in UI for lists/etc. def _ui_dur(dur, short=None): if short is None: short = conf_short_duration return format_duration(dur, short=short, static=True) __get_login_user = None # Get the username of the person who logged in, not root, cached. def _get_login(): global __get_login_user if __get_login_user is None: # Checked on 3.9.25 and it did the work everytime. __get_login_user = os.getlogin() return __get_login_user # Filter datas using name as a filename wildcard match. def filter_name_datas(datas, names): if not names: # Allow everything... for data in datas: yield data return for data in datas: found = False for host in data.hosts: for name in names: if _fnmatchi(host.name, name): found = True break if found: break if found: yield data # Use this to only get then get only the specific hosts that matched... def iter_name_play(play, names): if not names: # Allow everything... for host in play.hosts: yield host return for host in play.hosts: for name in names: if _fnmatchi(host.name, name): yield host break # Filter datas using name as a filename wildcard match. def filter_play_datas(datas, names): if not names: # Allow everything... for data in datas: yield data return for data in datas: nyml = data.name if nyml.endswith(".yml"): # Should be true always? nyml = nyml[:-len(".yml")] found = False for name in names: if False: pass elif _fnmatchi(data.name, name): found = True elif _fnmatchi(nyml, name): found = True elif _fnmatchi(_ui_play_name(data.name), name): found = True if found: break if found: yield data # Filter datas using name as a filename wildcard match. def filter_status_datas(datas, statuses): if not statuses: # Allow everything... for data in datas: yield data return for data in datas: for status in statuses: if status.lower() in data.statuses: yield data # Filter datas user using name as a filename wildcard match. def filter_user_datas(datas, names): if not names: # Allow everything... for data in datas: yield data return for data in datas: found = False for name in names: if name == '.': name = _get_login() if _fnmatchi(data.user, name): found = True break if found: yield data # Filter datas date using name as a filename wildcard match. def filter_date_datas(datas, dates): if not dates: # Allow everything... for data in datas: yield data return for data in datas: found = False for date in dates: if _fnmatchi(data.date, date): found = True break if found: yield data # Filter datas using osname/vers/info as a filename wildcard match. def filter_osname_datas(datas, names): if not names: # Allow everything... for data in datas: yield data return for data in datas: for name in names: if _fnmatchi(data.osinfo, name): break if _fnmatchi(data.osinfo_small, name): break if _fnmatchi(data.osname, name): break if _fnmatchi(data.osname_small, name): break if _fnmatchi(data.osvers, name): break off = data.osvers.find('.') if off != -1: vers = data.osvers[:off] if _fnmatchi(vers, name): break else: continue yield data # Sub. suffix of DNS names for UI def _ui_name(name): for suffix in conf_suffix_dns_replace: if name.endswith(suffix): _suffix_dns_replace[suffix] = True return name[:-len(suffix)] + conf_suffix_dns_replace[suffix] return name # Reset the usage after _max_update() def _reset_ui_name(): for suffix in sorted(_suffix_dns_replace): _suffix_dns_replace[suffix] = False # Explain if we used any suffix subs. def _explain_ui_name(): done = False pre = "* NOTE:" for suffix in sorted(_suffix_dns_replace): if _suffix_dns_replace[suffix]: print("%s %12s = %s" % (pre,conf_suffix_dns_replace[suffix],suffix)) pre = " :" done = True if done: print(" : Use -vv to show full names.") def _ui_play_name(name): if name.endswith('.yml'): name = name[:-len('.yml')] name = conf_play_remap.get(name, name) if conf_utf8: if len(name) > 20: name = name[:18] + '…' else: if len(name) > 20: name = name[:17] + '...' return name def _ui_hosts(data): return _ui_int(len(data.hosts)) _max_len_name = 0 _max_len_host = 0 # Host numbers _max_len_ofcs = 0 # OK/Failed/Changed numbers _max_len_date = 0 # YYYY-MM-DD HH:MM = 4+1+2+1+2 +1+2+1+2 _max_terminal_width = shutil.get_terminal_size().columns if _max_terminal_width < 20: _max_terminal_width = 80 # _max_terminal_width -= 14 def _max_update(datas): for data in datas: _max_update_data(data) def _max_update_data(data): global _max_len_name global _max_len_host global _max_len_ofcs global _max_len_date name = _ui_play_name(data.name) if len(name) > _max_len_name: _max_len_name = len(name) for host in data.hosts: hn = _ui_name(host.name) if len(hn) > _max_len_host: _max_len_host = len(hn) ofcs = host.changed if len(ofcs) > _max_len_ofcs: _max_len_ofcs = len(ofcs) if len(data.date) > _max_len_date: _max_len_date = len(data.date) def _max_update_correct(prefix): global _max_len_name global _max_len_host global _max_len_ofcs global _max_len_date mw = _max_terminal_width - len(prefix) if _max_len_name + _max_len_host + _max_len_ofcs + _max_len_date < (mw-4): _max_len_name += 1 _max_len_host += 1 _max_len_ofcs += 1 _max_len_date += 1 while _max_len_name + _max_len_host + _max_len_ofcs + _max_len_date >= mw: _max_len_host -= 1 # This is the real __main__ start ... def _usage(short=False): prog = "updates+uptime" if sys.argv: prog = os.path.basename(sys.argv[0]) print("""\ Usage: %s Optional arguments: --help, -h Show this help message and exit. --verbose, -v Increase verbosity. --conf CONF Specify configuration. --ansi ANSI Use ansi terminal codes. Cmds: """ % (prog,), end='') if short: print("""\ help info [host*] [backup] [backup]... json [host*] [task*] list [host*]... """, end='') else: # Also see: _cmd_help() below... print("""\ info [host*] [backup] [backup]... = See the current state, in long form, can be filtered by name. json [host*] [task*] --play playbook* = Get the JSON data from a task. list [host*]... --date date* --play playbook* = See the current state, can be filtered by name. """, end='') def inventory_hosts(): # The "correct" way to do this is something like: # ansible-inventory --list | jq -r '._meta.hostvars | keys[]' # ...but that is _much_ slower, as it's loading a lot of data/facts which # we ignore. cmds = ["ansible", "all", "--list-host"] p = subprocess.Popen(cmds, text=True, stdout=subprocess.PIPE) header = p.stdout.readline() if not header.strip().startswith("hosts ("): return set() ret = set() for line in p.stdout: ret.add(line.strip()) return ret def _pre_cmd__check_paths(): # Below here are the query commands, stuff needs to exist at this point. if not os.path.exists(conf_path): print(" Error: No log file dir. Run a playbook?", file=sys.stderr) sys.exit(4) def _date_suffix(dt): suffix = '' if dt == _today: suffix = ' (today)' if dt == _yesterday: suffix = ' (yesterday)' return suffix def _cli_match_host(args, data): if args.hosts: hosts = args.hosts[:] print("Matching:", ", ".join(hosts)) data = filter_name_datas(data, hosts) data = list(data) if not data: print("Not host(s) matched:", ", ".join(hosts)) sys.exit(2) return data def __cmd_std_filters(args, data, hosts): if args.play is not None: data = list(filter_play_datas(data, [args.play])) if args.user is not None: data = list(filter_user_datas(data, [args.user])) if args.date is not None: data = list(filter_date_datas(data, [args.date])) data = list(filter_name_datas(data, hosts)) if args.status: data = list(filter_status_datas(data, [args.status])) return data def _cmd_info(args): hosts = [] if args.host: # print("JDBG:", args.host) hosts = [args.host] tasks = [] if args.task is not None: tasks = [args.task] data = __cmd_std_filters(args, plays, hosts) for play in data: print("=" * 70) print("Play :", play.play) print("Title:", play.title) print("User :", play.user) print("Beg :", play.date) print("End :", play.date_end) print("OFC :", ofcs_hosts(iter_name_play(play, hosts))) done = False for host in iter_name_play(play, hosts): host_done = False for task in host.tasks: if args.status and args.status != task.status.lower(): continue if tasks and not _fnmatchi(task.name, tasks[0]): continue if not host_done: print(' ', host.name, host.ofcs) done = True host_done = True print(' ', task.num, "%s:" % task.status, task.name) if args.json: js = json.loads(task.json) if not js or 'results' not in js: continue js = js['results'] print(' ', json.dumps(js, indent=4, sort_keys=True)) if done and args.one: return # If save=True, and we haven't output anything then save the line as we might # not do anything. After something has gone out save=True does nothing. _prnt_line_saved = [] def _print_line_add(line): global _prnt_line_saved if _prnt_line_saved is None: _prnt_line_saved = [] _prnt_line_saved.append(line) def _print_line_reset(): global _prnt_line_saved ret = _prnt_line_saved is not None _prnt_line_saved = [] return ret def _print_play(prefix, data, hosts=[], high='', prev=None): global _prnt_line_saved if prev is not None: prev = prev.date done = False hosts = list(iter_name_play(data, hosts)) if len(hosts) > (conf_show_hosts_max+1): hosts = hosts[:conf_show_hosts_max] xtra = None if len(hosts) < len(data.hosts): if conf_utf8: ellipsis = '…' else: ellipsis = '...' xtra = " " + ellipsis xtra += "%s more hosts" % (_ui_int(len(data.hosts) - len(hosts))) xtra += " " + ellipsis name = data.name for host in hosts: uiname = "%-*s" % (_max_len_name, _ui_play_name(name)) uihost = "%-*s" % (_max_len_host, _ui_name(host.name)) if high and not done: uiname = _ui_t_ansi(uiname, high) done = True if high: uinhost = _ui_t_ansi(uihost, high) line = "%s%s %s %*s %s" % (prefix, uiname, uihost, _max_len_ofcs, host.changed, _ui_date(data, align=_max_len_date, prev=prev)) prev = data.date print(line) name = "" if xtra is not None: print(xtra) def _print_plays(prefix, data, hosts=[], explain=True): pd1 = None for d1 in data: _print_play(prefix, d1, hosts, prev=pd1) pd1 = d1 if explain: _explain_ui_name() # -n variants match multiple things, but only allow looking at current data def _cmd_list(args): # FIXME: Ideally argparse would do this for us :( hosts = [] if hasattr(args, 'hosts'): hosts = args.hosts[:] data = __cmd_std_filters(args, plays, hosts) _max_update(data) _max_update_correct('') print(_ui_t_title("Play", -_max_len_name), _ui_t_title("Hosts", -_max_len_host), _ui_t_title("Chg", _max_len_ofcs), _ui_t_title("Date", _max_len_date)) _print_plays('', data, hosts) def _cmd_json(args): hosts = [] if args.host is not None: hosts = [args.host] tasks = [] if args.task is not None: tasks = [args.task] data = __cmd_std_filters(args, plays, hosts) done = False for play in data: for host in iter_name_play(play, hosts): for task in host.tasks: if args.status and args.status != task.status.lower(): continue if tasks and not _fnmatchi(task.name, tasks[0]): continue done = True print(task.json) if done and args.one: return # CMDLINE validation: def _cmdline_arg_date(oval): if len(oval) > len("YYYY-MM-DD HH:MM"): raise argparse.ArgumentTypeError(f"{oval} is too big for a date") val = oval.lower() return val def _cmdline_arg_play(oval): val = oval.lower() return val def _cmdline_arg_user(oval): val = oval.lower() return val def _cmdline_arg_status(oval): val = oval.lower() if val not in ("changed", "failed", "ok", "skipped", "stats"): raise argparse.ArgumentTypeError(f"{oval} is not an ansible task status") return val def _cmdline_arg_ansi(oval): val = oval.lower() if val in ("true", "y", "yes", "on", "1", "always"): return True if val in ("false", "n", "no", "off", "0", "never"): return False if val in ("automatic", "?", "tty", "auto"): return None raise argparse.ArgumentTypeError(f"{oval} is not valid: always/never/auto") def _cmdline_arg_duration(oval): if oval == "forever": return 0 val = parse_duration(oval) if val is None: raise argparse.ArgumentTypeError(f"{oval} is not a duration") return val def _cmdline_arg_positive_integer(oval): try: val = int(oval) except: val = -1 if val <= 0: raise argparse.ArgumentTypeError(f"{oval} is not a positive integer") return val _cmds_als = { "information" : ["info"], "json" : [], "list" : [], None : set(), } for c in _cmds_als: if c is None: continue _cmds_als[c] = set(_cmds_als[c]) _cmds_als[None].update(_cmds_als[c]) _cmds_als[None].add(c) def _cmd_help(args): prog = "ansilog-playbook" if sys.argv: prog = os.path.basename(sys.argv[0]) if not args.hcmd: _usage() if args.hcmd not in _cmds_als[None]: print(" Unknown command:", args.hcmd) _usage() def _eq_cmd(x): return args.hcmd == x or args.hcmd in _cmds_als[x] def _hlp_als(x): if not _cmds_als[x]: return '' als = set() als.add(x) als.update(_cmds_als[x]) als.remove(args.hcmd) return f"""Aliases: {", ".join(sorted(als))}\n""" if False: pass elif _eq_cmd("information"): print(f"""\ Usage: {prog} {args.hcmd} [host*] [task*] {' '*len(prog)} {' '*len(args.hcmd)} --date date* {' '*len(prog)} {' '*len(args.hcmd)} --json {' '*len(prog)} {' '*len(args.hcmd)} --one -1 {' '*len(prog)} {' '*len(args.hcmd)} --play playbook* {' '*len(prog)} {' '*len(args.hcmd)} --status status {' '*len(prog)} {' '*len(args.hcmd)} --user user* See the results of playbook(s), in long form, can be filtered by: date, playbook name, hostname, task name, task status, user. --json pretty prints the JSON, if there are results. --one make it stop after the first playbook match. If you want to compare things by hand, use this. {_hlp_als("information")} Eg. {prog} {args.hcmd} {prog} {args.hcmd} 'batcave*' {prog} {args.hcmd} --play check-etc -1 --user=. 'noc*' 'Report*' """, end='') elif _eq_cmd("list"): print(f"""\ Usage: {prog} {args.hcmd} [host*] [host*]... {' '*len(prog)} {' '*len(args.hcmd)} --date date* {' '*len(prog)} {' '*len(args.hcmd)} --play playbook* {' '*len(prog)} {' '*len(args.hcmd)} --status status {' '*len(prog)} {' '*len(args.hcmd)} --user user* See the current state of the hosts/playbooks, can be filtered by: date, playbook name, hostname, task status, user. {_hlp_als("list")} Eg. {prog} {args.hcmd} {prog} {args.hcmd} --conf=play_duration=13w 'batcave*' {prog} {args.hcmd} 'batcave*' 'noc*' {prog} {args.hcmd} --play check-etc --user=. '*stg*' '*test*' """, end='') elif _eq_cmd("json"): print(f"""\ Usage: {prog} {args.hcmd} [host*] [task*] {' '*len(prog)} {' '*len(args.hcmd)} --date date* {' '*len(prog)} {' '*len(args.hcmd)} --one -1 {' '*len(prog)} {' '*len(args.hcmd)} --play playbook* {' '*len(prog)} {' '*len(args.hcmd)} --status status {' '*len(prog)} {' '*len(args.hcmd)} --user user* See JSON for the task on the host. Note that the json is just printed raw without filtering for every task (unlike "info --json"). {_hlp_als("json")} Eg. {prog} {args.hcmd} 'batcave*' {prog} {args.hcmd} --user=. --play check-etc --status=ok -1 \\* Report\\* \\ | jq .results[].msg \\ | sed 's/"UNKNOWN File: \\(- .*\)"/\\1/' \\ | sort -V | uniq | less """, end='') def _main(): global conf_ansi_terminal global conf_path global cmd _user_conf() parser = argparse.ArgumentParser(add_help=False) parser.add_argument('--verbose', '-v', action='count', default=0) parser.add_argument("--conf", action='append', default=[]) parser.add_argument("--ansi", type=_cmdline_arg_ansi, help="Use ansi terminal codes") parser.add_argument("--colour", type=_cmdline_arg_ansi, dest='ansi', help=argparse.SUPPRESS) parser.add_argument("--color", type=_cmdline_arg_ansi, dest='ansi', help=argparse.SUPPRESS) parser.add_argument('-h', '--help', action='store_true', help='Show this help message') # We do this here so that `$0 -v blah -v` works. margs, args = parser.parse_known_args() if margs.help: _usage(short=True) sys.exit(0) subparsers = parser.add_subparsers(dest="cmd") cmd = subparsers.add_parser("help") cmd.add_argument("hcmd", nargs='?', help="cmd to get help for") cmd.set_defaults(func=_cmd_help) def __defs(func): cmd.set_defaults(func=func) # HIDDEN commands... cmd = subparsers.add_parser("dur2secs", help=argparse.SUPPRESS) cmd.add_argument("dur", type=_cmdline_arg_duration, help="duration") __defs(func=lambda x: print("secs:", x.dur)) cmd = subparsers.add_parser("secs2dur", help=argparse.SUPPRESS) cmd.add_argument("secs", type=int, help="seconds") __defs(func=lambda x: print("dur:", _ui_dur(x.secs, short=False))) cmd = subparsers.add_parser("int2num", help=argparse.SUPPRESS) cmd.add_argument("num", type=int, help="int") __defs(func=lambda x: print("num:", _ui_int(x.num))) # -- Start of the real commands... # info command als = _cmds_als["information"] hlp = "show host information" cmd = subparsers.add_parser("information", aliases=als, help=hlp) cmd.add_argument("--date", type=_cmdline_arg_date, help="wildcard Date") cmd.add_argument("--play", type=_cmdline_arg_play, help="wildcard Playbook") cmd.add_argument("--status", type=_cmdline_arg_status, help="task status") cmd.add_argument("--user", type=_cmdline_arg_user, help="task user") cmd.add_argument("-1", "--one", action='store_true', default=False, help="latest play only") cmd.add_argument("--json", action='store_true', default=False, help="also show JSON") cmd.add_argument("host", nargs='?', help="wildcard hostname") cmd.add_argument("task", nargs='?', help="wildcard taskname") __defs(func=_cmd_info) # list cmd cmd = subparsers.add_parser("list", help="list hosts") cmd.add_argument("--date", type=_cmdline_arg_date, help="wildcard Date") cmd.add_argument("--play", type=_cmdline_arg_play, help="wildcard Playbook") cmd.add_argument("--user", type=_cmdline_arg_user, help="task user") cmd.add_argument("--status", type=_cmdline_arg_status, help="task status") cmd.add_argument("hosts", nargs='*', help="wildcard hostname(s)") __defs(func=_cmd_list) # json command cmd = subparsers.add_parser("json", help="get json") cmd.add_argument("--date", type=_cmdline_arg_date, help="wildcard Date") cmd.add_argument("--play", type=_cmdline_arg_play, help="wildcard Playbook") cmd.add_argument("--status", type=_cmdline_arg_status, help="task status") cmd.add_argument("--user", type=_cmdline_arg_user, help="task user") cmd.add_argument("-1", "--one", action='store_true', default=False, help="latest play only") cmd.add_argument("host", nargs='?', help="wildcard hostname") cmd.add_argument("task", nargs='?', help="wildcard taskname") __defs(func=_cmd_json) # Need to presetup for cmd line validation ... but conf can change # so just validate format? And revalidate later? # FIXME: We do no real validation on options, so skip doing the glob twice. # _pre_cmd__setup() # Parse the above options/cmds args = parser.parse_args(args) for line in margs.conf: _user_conf_line(line) if margs.ansi is not None: conf_ansi_terminal = margs.ansi # Setup based on the config. _pre_cmd__setup() _pre_cmd__verbose(margs) _pre_cmd__check_paths() # Run the actual command. if not hasattr(args, "func"): cmd = "list" args.date = None args.play = None args.hosts = [] _cmd_list(args) else: cmd = args.cmd args.func(args) if __name__ == "__main__": _main()