#!/usr/bin/env python3 #---------------------------------------------------------------------- # cmdtest.py #---------------------------------------------------------------------- # Copyright 2013-2015 Johan Holmberg. #---------------------------------------------------------------------- # This file is part of "cmdtest". # # "cmdtest" is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # "cmdtest" is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with "cmdtest". If not, see . #---------------------------------------------------------------------- # This is a minimal Python version of "cmdtest". import argparse import contextlib import copy import glob import hashlib import io from itertools import zip_longest, dropwhile import os import re import shutil import subprocess import sys import time import types if sys.platform == 'win32': try: import win_unicode_console except ImportError: pass else: win_unicode_console.enable() class AssertFailed(Exception): pass ROOT_WD = os.getcwd() class Statistics: def __init__(self): self.classes = 0 self.methods = 0 self.commands = 0 self.errors = 0 self.fatals = 0 def __repr__(self): return "Statistics(classes=%s, methods=%s, command=%s, errors=%s, fatals=%s)" % ( self.classes, self.methods, self.commands, self.errors, self.fatals, ) #---------------------------------------------------------------------- def subranges(n, arr): arr = list(arr) for i in range(0,len(arr)-n+1): yield arr[i:i+n] def flatten(seq): for item in seq: if isinstance(item, (list,tuple)): yield from flatten(item) else: yield item def to_list(arg): return arg if isinstance(arg, list) else [arg] def to_content(lines): return ''.join(line + "\n" for line in lines) def to_lines(content, newline="\n"): lines = content.split("\n") if lines[-1] == '': lines.pop() else: lines[-1] += "" return lines def mkdir_for(filepath): dirpath = os.path.dirname(filepath) if dirpath: os.makedirs(dirpath, exist_ok=True) @contextlib.contextmanager def temp_chdir(path): global ROOT_WD starting_directory = os.getcwd() try: os.chdir(path) ROOT_WD = os.getcwd() yield finally: os.chdir(starting_directory) @contextlib.contextmanager def extra_sys_path(dname): try: old_sys_path = sys.path sys.path.append(dname) yield finally: sys.path = old_sys_path def progress(*args): print("###", "-" * 50, *args) def error_show(name, what, arg): try: msg = arg.error_msg(what) except: if re.match(r'(stdout|stderr|file|files)_', name): print(what) if len(arg) == 0: print(" <>") else: for line in arg: print(" ", line) else: print(what, arg) else: print(msg, end='') class Lines: def __init__(self, lines): self.lines = lines def error_msg(self, what): res = io.StringIO() print(what, file=res) for line in self.lines: print(" ", line, file=res) return res.getvalue() class Regexp: def __init__(self, pattern): self.pattern = pattern def error_msg(self, what): res = io.StringIO() print(what, file=res) print(" pattern '%s'" % self.pattern, file=res) return res.getvalue() #---------------------------------------------------------------------- class File: def __init__(self, fname): with open(fname, 'rb') as f: self.content = f.read() def lines(self, encoding): return to_lines(self.content.decode(encoding=encoding), os.linesep) #---------------------------------------------------------------------- class ExpectFile: def __init__(self, result, content, encoding): self.result = result self.encoding = encoding if isinstance(content, list): self.lines = content else: self.lines = to_lines(content) def check(self, name, actual_bytes): try: actual_lines = actual_bytes.lines(self.encoding) except UnicodeDecodeError: actual_lines = [""] if actual_lines != self.lines: print("--- ERROR:", name) error_show(name, "actual:", actual_lines) error_show(name, "expect:", self.lines) self.result._nerrors += 1 #---------------------------------------------------------------------- class ExpectPattern: def __init__(self, result, pattern, encoding): self.result = result self.encoding = encoding self.pattern = pattern def _match(self, lines): patterns = to_list(self.pattern) for some_lines in subranges(len(patterns), lines): for pattern, line in zip(patterns, some_lines): if not re.search(pattern, line): break else: return True def check(self, name, actual_bytes): try: actual_lines = actual_bytes.lines(self.encoding) except UnicodeDecodeError: actual_lines = [""] ok = False else: ok = self._match(actual_lines) if not ok: print("--- ERROR:", name) error_show(name, "actual:", actual_lines) error_show(name, "expect:", ['PATTERN:'] + to_list(self.pattern)) self.result._nerrors += 1 #---------------------------------------------------------------------- class Result: def __init__(self, err, before, after, stdout, stderr, tmpdir, test_class_name): self._err = err self._before = before self._after = after self._stdout = stdout self._stderr = stderr self._checked_stdout = False self._checked_stderr = False self._checked_status = False self._checked_files = set() self._nerrors = 0 self.tmpdir = tmpdir self.test_class_name = test_class_name def __enter__(self, *args): return self def __exit__(self, exc_type, exc_value, traceback): if exc_type is not None: return False if "created" not in self._checked_files: self.created_files() if "modified" not in self._checked_files: self.modified_files() if "removed" not in self._checked_files: self.removed_files() if not self._checked_status: self.exit_zero() if not self._checked_stdout: self.stdout_equal([]) if not self._checked_stderr: self.stderr_equal([]) if self._nerrors > 0: self.tmpdir.preserve(self.test_class_name) raise AssertFailed("...") def _error(self, name, actual, expect): self._nerrors += 1 print("--- ERROR:", name) error_show(name, "actual:", actual) error_show(name, "expect:", expect) print() def exit_status(self, status): self._checked_status = True if self._err != status: self._error("exit_status", actual=self._err, expect=status) def exit_nonzero(self): self._checked_status = True if self._err == 0: self._error("exit_nonzero", actual=self._err, expect="") def exit_zero(self): self._checked_status = True if self._err != 0: self._error("exit_zero", actual=self._err, expect=0) def stdout_match(self, pattern, encoding='utf-8'): self._checked_stdout = True expect = ExpectPattern(self, pattern, encoding) expect.check("stdout_match", self._stdout) def stderr_match(self, pattern, encoding='utf-8'): self._checked_stderr = True expect = ExpectPattern(self, pattern, encoding) expect.check("stderr_match", self._stderr) def stdout_equal(self, content, encoding='utf-8'): self._checked_stdout = True expect = ExpectFile(self, content, encoding) expect.check("stdout_equal", self._stdout) def stderr_equal(self, content, encoding='utf-8'): self._checked_stderr = True expect = ExpectFile(self, content, encoding) expect.check("stderr_equal", self._stderr) def file_match(self, fname, pattern, encoding='utf-8'): expect = ExpectPattern(self, pattern, encoding) expect.check("file_match %s" % fname, File(fname)) def file_equal(self, fname, content, encoding='utf-8'): expect = ExpectFile(self, content, encoding) expect.check("file_equal %s" % fname, File(fname)) def files_equal(self, fname1, fname2, encoding='utf-8'): with open(fname1, "r", encoding=encoding) as f: lines1 = list(f) with open(fname2, "r", encoding=encoding) as f: lines2 = list(f) if lines1 != lines2: both = list(zip_longest(lines1, lines2)) n1 = len(both) rest = list(dropwhile(lambda x: x[0] == x[1], both)) n2 = len(rest) nequal = n1 - n2 self._error("files_equal", actual=[ "<%d equal lines>" % nequal, both[nequal][0].rstrip("\n"), "<...>", ], expect=[ "<%d equal lines>" % nequal, both[nequal][1].rstrip("\n"), "<...>", ]) TESTS = { "created_files" : (lambda before,after: not before and after, {"created"}), "modified_files" : (lambda before,after: before and after and before != after, {"modified"}), "written_files" : (lambda before,after: (not before and after or before and after and before != after), {"created","modified"}), "affected_files" : (lambda before,after: (bool(before) != bool(after) or before and after and before != after), {"created","modified","removed"}), "removed_files" : (lambda before,after: before and not after, {"removed"}), } def __getattr__(self, name): try: f, tags = self.TESTS[name] except KeyError: raise AttributeError("'%s' object has no attribute '%s'" % (type(self).__name__, name)) self._checked_files |= tags def method(*fnames): expect = set(fnames) known = self._after.files() | self._before.files() actual = set() for x in known: after = self._after.get(x) before = self._before.get(x) if f(before, after): actual.add(x) if actual != expect: self._error(name, actual=sorted(actual), expect=sorted(expect)) return method #---------------------------------------------------------------------- class TestCase: def __init__(self, tmpdir, statistics): self.__tmpdir = tmpdir self.__statistics = statistics self.__always_ignored_files = set() def setup(self): pass def teardown(self): pass def prepend_path(self, dirpath): os.environ['PATH'] = os.pathsep.join((os.path.join(ROOT_WD, dirpath), os.environ['PATH'])) def prepend_local_path(self, dirpath): os.environ['PATH'] = os.pathsep.join((os.path.join(self.__tmpdir.top, dirpath), os.environ['PATH'])) def always_ignore_file(self, fname): self.__always_ignored_files.add(fname) def import_file(self, src, tgt): mkdir_for(tgt) shutil.copy(os.path.join(ROOT_WD, src), tgt) def create_file(self, fname, content, encoding='utf-8'): mkdir_for(fname) with open(fname, "w", encoding=encoding) as f: if isinstance(content, (list,tuple)): for line in flatten(content): print(line, file=f) else: f.write(content) def transcode_file(self, src_file, tgt_file=None, src_encoding='utf-8', tgt_encoding='utf-8'): if tgt_file: mkdir_for(tgt_file) else: tgt_file = src_file with open(src_file, "r", encoding=src_encoding) as f: data = f.read() with open(tgt_file, "w", encoding=tgt_encoding) as f: f.write(data) def cmd(self, cmdline, *, timeout=None): tmpdir = self.__tmpdir before = tmpdir.snapshot(self.__always_ignored_files) stdout_log = tmpdir.stdout_log() stderr_log = tmpdir.stderr_log() self._wait_for_new_second() self.__statistics.commands += 1 print("### cmdline:", cmdline) with open(stdout_log, "w") as stdout, open(stderr_log, "w") as stderr: if cmdline: err = subprocess.call(cmdline, stdout=stdout, stderr=stderr, shell=True, timeout=timeout) else: err = 0 after = tmpdir.snapshot(self.__always_ignored_files) return Result(err, before, after, File(stdout_log), File(stderr_log), tmpdir, type(self).__name__) def _wait_for_new_second(self): newest = self._newest_file_time() while self._current_file_time() == newest: time.sleep(0.1) def _newest_file_time(self): newest = None for dirpath, dirnames, filenames in os.walk(self.__tmpdir.top): for filename in filenames: path = os.path.join(dirpath, filename) mtime = os.path.getmtime(path) if not newest or mtime > newest: newest = mtime return newest def _current_file_time(self): with open(self.__tmpdir.timestamp_file(), "w") as f: print("file written to detect 'mtime'", file=f) return os.path.getmtime(self.__tmpdir.timestamp_file()) #---------------------------------------------------------------------- class DirInfo: def __init__(self, dirpath, prefix=""): self.dirpath = dirpath self.prefix = prefix self.display_path = prefix def entries(self): for entry in os.listdir(self.dirpath): path = os.path.join(self.dirpath, entry) if os.path.isdir(path): yield DirInfo(path, self.prefix + entry + "/") else: yield FileInfo(path, self.prefix + entry) def __eq__(self, other): return self.display_path == other.display_path def __ne__(self, other): return not (self == other) class FileInfo: def __init__(self, path, relpath): self.path = path self.relpath = relpath self.display_path = relpath self.stat = os.stat(self.path) m = hashlib.md5() with open(path, "rb") as f: m.update(f.read()) self.digest = m.hexdigest() def __eq__(self, other): return (self.display_path == other.display_path and self.digest == other.digest and self.stat.st_ino == other.stat.st_ino and self.stat.st_mtime == other.stat.st_mtime) def __ne__(self, other): return not (self == other) #---------------------------------------------------------------------- class FsSnapshot: def __init__(self, topdir, ignored_files): self.topdir = topdir self.ignored_files = ignored_files self.bypath = {} self._collect_files(DirInfo(topdir)) def __getitem__(self, path): return self.bypath[path] def get(self, path): try: return self[path] except KeyError: return None def _collect_files(self, dirinfo): for entry in dirinfo.entries(): if entry.display_path in self.ignored_files: continue self.bypath[entry.display_path] = entry if isinstance(entry, DirInfo): self._collect_files(entry) def files(self): return set(self.bypath.keys()) #---------------------------------------------------------------------- class Tmpdir: def __init__(self): self.top = os.path.abspath("tmp-cmdtest-python/work") self.top_save = self.top + "-save" self.logdir = os.path.dirname(self.top) self.environ_path = os.environ['PATH'] self.old_cwds = [] self.remove_all_preserve() self.test_method_name = None def stdout_log(self): return os.path.join(self.logdir, "tmp.stdout") def stderr_log(self): return os.path.join(self.logdir, "tmp.stderr") def timestamp_file(self): return os.path.join(self.logdir, "tmp.timestamp") def snapshot(self, ignored_files): return FsSnapshot(self.top, ignored_files) def prepare_for_test(self, test_method_name): self.clear() self.test_method_name = test_method_name def clear(self): if os.path.exists(self.top): shutil.rmtree(self.top) os.makedirs(self.top) def remove_all_preserve(self): if os.path.exists(self.top_save): shutil.rmtree(self.top_save) def preserve(self, test_class_name): shutil.move(self.top, os.path.join(self.top_save, test_class_name, self.test_method_name)) def __enter__(self): self.old_cwds.append(os.getcwd()) os.chdir(self.top) def __exit__(self, exc_type, exc_value, traceback): os.chdir(self.old_cwds.pop()) os.environ['PATH'] = self.environ_path if exc_type is not None: return False #---------------------------------------------------------------------- class Tmethod: def __init__(self, method, tclass): self.method = method self.tclass = tclass def name(self): return self.method.__name__ def run(self, tmpdir, statistics): obj = self.tclass.klass(tmpdir, statistics) tmpdir.prepare_for_test(self.name()) with tmpdir: try: obj.setup() self.method(obj) except AssertFailed as e: statistics.errors += 1 except Exception as e: print("--- exception in test: %s: %s" % (sys.exc_info()[0].__name__, e)) import traceback traceback.print_tb(sys.exc_info()[2]) statistics.fatals += 1 obj.teardown() #---------------------------------------------------------------------- class Tclass: def __init__(self, klass, tfile): self.klass = klass self.tfile = tfile def name(self): return self.klass.__name__ def tmethods(self): for name in sorted(self.klass.__dict__.keys()): if re.match(r'test_', name): yield Tmethod(self.klass.__dict__[name], self) #---------------------------------------------------------------------- class Tfile: def __init__(self, filename): try: with open(filename, encoding='utf-8') as f: co = compile(f.read(), filename, "exec") except IOError as e: print("cmdtest: error: failed to read %s" % filename, file=sys.stderr) sys.exit(1) except SyntaxError as e: print("cmdtest: error: syntax error reading %s: %s" % (filename, e), file=sys.stderr) sys.exit(1) self.glob = dict() self.glob['TestCase'] = TestCase self.glob['__file__'] = os.path.abspath(filename) with extra_sys_path(os.path.dirname(filename)): exec(co, self.glob) def tclasses(self): for name in sorted(self.glob.keys()): if re.match(r'TC_', name): yield Tclass(self.glob[name], self) #---------------------------------------------------------------------- # Run cmdtest in given directory def cmdtest_in_dir(path, *, py_files=None, selected_classes=None, selected_methods=None, quiet=False): with temp_chdir(path): if not py_files: py_files = glob.glob("CMDTEST_*.py") return test_files(py_files, selected_classes=selected_classes, selected_methods=selected_methods, quiet=quiet) def test_files(py_files, *, selected_classes=None, selected_methods=None, quiet=False): statistics = Statistics() tmpdir = Tmpdir() for py_file in py_files: tfile = Tfile(py_file) for tclass in tfile.tclasses(): statistics.classes += 1 if not quiet: progress(tclass.name()) if not selected_classes or tclass.name() in selected_classes: for tmethod in tclass.tmethods(): if not selected_methods or tmethod.name() in selected_methods: statistics.methods += 1 if not quiet: progress(tmethod.name()) tmethod.run(tmpdir, statistics) return statistics def parse_options(): parser = argparse.ArgumentParser('cmdtest') parser.add_argument("-v", "--verbose", action="store_true", help="be more verbose") parser.add_argument("-q", "--quiet", action="store_true", help="be more quiet") parser.add_argument("arg", nargs="*", help="CMDTEST_*.py files / test methods") options = parser.parse_args() py_files = [] selected_methods = set() for arg in options.arg: if os.path.isdir(arg): py_files.extend(glob.glob('%s/CMDTEST_*.py' % arg)) elif re.match(r'CMDTEST_.*\.py$', os.path.basename(arg)): py_files.append(arg) else: selected_methods.add(arg) if not py_files: py_files = glob.glob("CMDTEST_*.py") if not py_files: print("ERROR: no CMDTEST_*.py files found") exit(1) return options, py_files, selected_methods def main(): options, py_files, selected_methods = parse_options() statistics = test_files(py_files, selected_methods=selected_methods, quiet=options.quiet) if not options.quiet: print() print(statistics) print() exit(0 if statistics.errors == 0 and statistics.fatals == 0 else 1) if __name__ == '__main__': main()