tesh -- testing shell
========================
-Copyright (c) 2012-2021. The SimGrid Team. All rights reserved.
+Copyright (c) 2012-2022. The SimGrid Team. All rights reserved.
This program is free software; you can redistribute it and/or modify it
under the terms of the license (GNU LGPL) which comes with this package.
"""
import sys
+import errno
import os
import shlex
import re
import subprocess
import _thread
else:
- raise "This program is expected to run with Python3 only"
+ raise RuntimeError("This program is expected to run with Python3 only")
##############
#
#
#
-def isWindows():
+def is_windows():
+ """ Check if running on Windows """
return sys.platform.startswith('win')
# Singleton metaclass that works in Python 2 & 3
return cls._instances[cls]
class Singleton(_Singleton('SingletonMeta', (object,), {})):
- pass
+ """ The Singleton base class """
+ # pass
SIGNALS_TO_NAMES_DICT = dict((getattr(signal, n), n)
for n in dir(signal) if n.startswith('SIG') and '_' not in n)
-return_code = 0
-
-# exit correctly
def tesh_exit(errcode):
+ """ Exit correctly """
# If you do not flush some prints are skipped
sys.stdout.flush()
# os._exit exit even when executed within a thread
+ # pylint: disable=protected-access
os._exit(errcode)
def fatal_error(msg):
+ """ Exit with error """
print("[Tesh/CRITICAL] " + str(msg))
tesh_exit(1)
-# Set an environment variable.
-# arg must be a string with the format "variable=value"
def setenv(arg):
+ """
+ Set an environment variable.
+ arg must be a string with the format "variable=value"
+ """
print("[Tesh/INFO] setenv " + arg)
- t = arg.split("=", 1)
- os.environ[t[0]] = t[1]
- # os.putenv(t[0], t[1]) does not work
+ (var, val) = arg.split("=", 1)
+ os.environ[var] = val
+ # os.putenv(var, val) does not work
# see http://stackoverflow.com/questions/17705419/python-os-environ-os-putenv-usr-bin-env
-# http://stackoverflow.com/questions/30734967/how-to-expand-environment-variables-in-python-as-bash-does
def expandvars2(path):
+ """ http://stackoverflow.com/questions/30734967/how-to-expand-environment-variables-in-python-as-bash-does """
return re.sub(r'(?<!\\)\$[A-Za-z_][A-Za-z0-9_]*', '', os.path.expandvars(path))
##############
#
#
-# Global variable. Stores which process group should be killed (or None otherwise)
-running_pids = list()
-
-# Tests whether the process is dead already
def process_is_dead(pid):
+ """ Tests whether the process is dead already """
try:
os.kill(pid, 0)
except ProcessLookupError:
return True
return False
-# This function send TERM signal + KILL signal after 0.2s to the group of the specified process
def kill_process_group(pid):
- if pid is None: # Nobody to kill. We don't know who to kill on windows, or we don't have anyone to kill on signal handler
+ """ This function send TERM signal + KILL signal after 0.2s to the group of the specified process """
+ if pid is None:
+ # Nobody to kill. We don't know who to kill on windows, or we don't have anyone to kill on signal handler
return
try:
pgid = os.getpgid(pid)
- except:
+ except OSError:
# os.getpgid failed. Ok, don't cleanup.
return
-
+
try:
os.killpg(pgid, signal.SIGTERM)
if process_is_dead(pid):
# os.killpg failed. OK. Some subprocesses may still be running.
pass
-def signal_handler(signal, frame):
- print("Caught signal {}".format(SIGNALS_TO_NAMES_DICT[signal]))
- global running_pids
- running_pids_copy = running_pids # Just in case of interthread conflicts.
- for pid in running_pids_copy:
+def signal_handler(signo, _frame):
+ """ Signal handler """
+ print("Caught signal {}".format(SIGNALS_TO_NAMES_DICT[signo]))
+ running_pids = TeshState().running_pids # Just in case of interthread conflicts.
+ for pid in running_pids:
kill_process_group(pid)
- running_pids.clear()
+ TeshState().running_pids.clear()
tesh_exit(5)
#
-# read file line per line (and concat line that ends with "\")
class FileReader(Singleton):
+ """ Read file line per line (and concat line that ends with "\") """
def __init__(self, filename=None):
if filename is None:
self.filename = "(stdin)"
- self.f = sys.stdin
+ self.fileno = sys.stdin
else:
self.filename_raw = filename
self.filename = os.path.basename(filename)
self.abspath = os.path.abspath(filename)
- self.f = open(self.filename_raw)
+ self.fileno = open(self.filename_raw)
self.linenumber = 0
return self.filename + ":" + str(self.linenumber)
def readfullline(self):
+ """ Read a full line """
try:
- line = next(self.f)
+ line = next(self.fileno)
self.linenumber += 1
except StopIteration:
return None
txt = line
while len(line) > 1 and line[-2] == "\\":
txt = txt[0:-1]
- line = next(self.f)
+ line = next(self.fileno)
self.linenumber += 1
txt += line[0:-1]
return txt
-# keep the state of tesh (mostly configuration values)
class TeshState(Singleton):
+ """ Keep the state of tesh (mostly configuration values) """
def __init__(self):
+ self.running_pids = list() # stores which process group should be killed (or None otherwise)
self.threads = []
self.args_suffix = ""
self.ignore_regexps_common = []
self.timeout = 10 # default value: 10 sec
self.wrapper = None
self.keep = False
+ self.return_code = 0
def add_thread(self, thread):
+ """ Add another thread to wait for """
self.threads.append(thread)
def join_all_threads(self):
- for t in self.threads:
- t.acquire()
- t.release()
+ """ Wait for all threads """
+ for thread in self.threads:
+ thread.acquire()
+ thread.release()
-# Command line object
+ def set_return_code(self, value):
+ """ Set exit status """
+ if value > self.return_code:
+ self.return_code = value
-class Cmd(object):
+class Cmd:
+ """ Command line object """
def __init__(self):
self.input_pipe = []
self.output_pipe_stdout = []
self.ignore_regexps = TeshState().ignore_regexps_common
- def add_input_pipe(self, l):
- self.input_pipe.append(l)
+ def add_input_pipe(self, line):
+ """ Add a line to stdin input """
+ self.input_pipe.append(line)
- def add_output_pipe_stdout(self, l):
- self.output_pipe_stdout.append(l)
+ def add_output_pipe_stdout(self, line):
+ """ Add a line to stdout output """
+ self.output_pipe_stdout.append(line)
- def add_output_pipe_stderr(self, l):
- self.output_pipe_stderr.append(l)
+ def add_output_pipe_stderr(self, line):
+ """ Add a line to stderr output """
+ self.output_pipe_stderr.append(line)
def set_cmd(self, args, linenumber):
+ """ Set command line """
self.args = args
self.linenumber = linenumber
def add_ignore(self, txt):
+ """ Add regexp to ignore lines """
self.ignore_regexps.append(re.compile(txt))
def remove_ignored_lines(self, lines):
+ """ Remove ignored lines """
for ign in self.ignore_regexps:
lines = [l for l in lines if not ign.match(l)]
return lines
file.write("\n")
file.close()
- def _cmd_cd(self, argline):
+ def _cmd_cd(self, argline): # pylint: disable=no-self-use
args = shlex.split(argline)
if len(args) != 2:
fatal_error("Too many arguments to cd")
print("Test suite `" + FileReader().filename + "': NOK (system error)")
tesh_exit(4)
- # Run the Cmd if possible.
- # Return False if nothing has been ran.
-
def run_if_possible(self):
+ """
+ Run the Cmd if possible.
+ Return False if nothing has been ran.
+ """
if not self.can_run():
return False
if self.background:
os.chdir(self.cwd)
# retrocompatibility: support ${aaa:=.} variable format
- def replace_perl_variables(m):
- vname = m.group(1)
- vdefault = m.group(2)
+ def replace_perl_variables(arg):
+ vname = arg.group(1)
+ vdefault = arg.group(2)
if vname in os.environ:
return "$" + vname
return vdefault
logs = list()
logs.append("[{file}:{number}] {args}".format(file=FileReader().filename,
- number=self.linenumber, args=self.args))
+ number=self.linenumber, args=self.args))
args = shlex.split(self.args)
- global running_pids
local_pid = None
- global return_code
try:
preexec_function = None
- if not isWindows():
+ if not is_windows():
preexec_function = lambda: os.setpgid(0, 0)
- proc = subprocess.Popen(
+ proc = subprocess.Popen( # pylint: disable=subprocess-popen-preexec-fn
args,
bufsize=1,
stdin=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
preexec_fn=preexec_function)
- if not isWindows():
+ if not is_windows():
local_pid = proc.pid
- running_pids.append(local_pid)
+ TeshState().running_pids.append(local_pid)
except PermissionError:
logs.append("[{file}:{number}] Cannot start '{cmd}': The binary is not executable.".format(
file=FileReader().filename, number=self.linenumber, cmd=args[0]))
logs.append("[{file}:{number}] Current dir: {dir}".format(file=FileReader().filename,
- number=self.linenumber, dir=os.getcwd()))
- return_code = max(3, return_code)
+ number=self.linenumber, dir=os.getcwd()))
+ TeshState().set_return_code(3)
print('\n'.join(logs))
return
except NotADirectoryError:
logs.append("[{file}:{number}] Cannot start '{cmd}': The path to binary does not exist.".format(
file=FileReader().filename, number=self.linenumber, cmd=args[0]))
logs.append("[{file}:{number}] Current dir: {dir}".format(file=FileReader().filename,
- number=self.linenumber, dir=os.getcwd()))
- return_code = max(3, return_code)
+ number=self.linenumber, dir=os.getcwd()))
+ TeshState().set_return_code(3)
print('\n'.join(logs))
return
except FileNotFoundError:
logs.append("[{file}:{number}] Cannot start '{cmd}': File not found.".format(
file=FileReader().filename, number=self.linenumber, cmd=args[0]))
- return_code = max(3, return_code)
+ TeshState().set_return_code(3)
print('\n'.join(logs))
return
- except OSError as osE:
- if osE.errno == 8:
- osE.strerror += "\nOSError: [Errno 8] Executed scripts should start with shebang line (like #!/usr/bin/env sh)"
- raise osE
+ except OSError as err:
+ if err.errno == 8:
+ err.strerror += \
+ "\nOSError: [Errno 8] Executed scripts should start with shebang line (like #!/usr/bin/env sh)"
+ raise err
- cmdName = FileReader().filename + ":" + str(self.linenumber)
+ cmd_name = FileReader().filename + ":" + str(self.linenumber)
try:
- (stdout_data, stderr_data) = proc.communicate("\n".join(self.input_pipe), self.timeout)
- local_pid = None
+ (stdout_data, _stderr_data) = proc.communicate("\n".join(self.input_pipe), self.timeout)
timeout_reached = False
except subprocess.TimeoutExpired:
timeout_reached = True
logs.append("Test suite `{file}': NOK (<{cmd}> timeout after {timeout} sec)".format(
- file=FileReader().filename, cmd=cmdName, timeout=self.timeout))
- running_pids.remove(local_pid)
+ file=FileReader().filename, cmd=cmd_name, timeout=self.timeout))
+ TeshState().running_pids.remove(local_pid)
kill_process_group(local_pid)
# Try to get the output of the timeout process, to help in debugging.
try:
- (stdout_data, stderr_data) = proc.communicate(timeout=1)
+ (stdout_data, _stderr_data) = proc.communicate(timeout=1)
except subprocess.TimeoutExpired:
logs.append("[{file}:{number}] Could not retrieve output. Killing the process group failed?".format(
file=FileReader().filename, number=self.linenumber))
- return_code = max(3, return_code)
+ TeshState().set_return_code(3)
print('\n'.join(logs))
return
stdout_data = ansi_escape.sub('', stdout_data)
if self.ignore_output:
- logs.append("(ignoring the output of <{cmd}> as requested)".format(cmd=cmdName))
+ logs.append("(ignoring the output of <{cmd}> as requested)".format(cmd=cmd_name))
else:
stdouta = stdout_data.split("\n")
+ stdouta = self.remove_ignored_lines(stdouta)
while stdouta and stdouta[-1] == "":
del stdouta[-1]
- stdouta = self.remove_ignored_lines(stdouta)
stdcpy = stdouta[:]
# Mimic the "sort" bash command, which is case unsensitive.
fromfile='expected',
tofile='obtained'))
if diff:
- logs.append("Output of <{cmd}> mismatch:".format(cmd=cmdName))
+ logs.append("Output of <{cmd}> mismatch:".format(cmd=cmd_name))
if self.sort >= 0: # If sorted, truncate the diff output and show the unsorted version
difflen = 0
for line in diff:
logs.append(line)
logs.append("Test suite `{file}': NOK (<{cmd}> output mismatch)".format(
- file=FileReader().filename, cmd=cmdName))
+ file=FileReader().filename, cmd=cmd_name))
if lock is not None:
lock.release()
if TeshState().keep:
- f = open('obtained', 'w')
+ file = open('obtained', 'w')
obtained = stdout_data.split("\n")
while obtained and obtained[-1] == "":
del obtained[-1]
obtained = self.remove_ignored_lines(obtained)
for line in obtained:
- f.write("> " + line + "\n")
- f.close()
+ file.write("> " + line + "\n")
+ file.close()
logs.append("Obtained output kept as requested: {path}".format(path=os.path.abspath("obtained")))
- return_code = max(2, return_code)
+ TeshState().set_return_code(2)
print('\n'.join(logs))
return
if timeout_reached:
- return_code = max(3, return_code)
+ TeshState().set_return_code(3)
print('\n'.join(logs))
return
if not proc.returncode in self.expect_return:
if proc.returncode >= 0:
logs.append("Test suite `{file}': NOK (<{cmd}> returned code {code})".format(
- file=FileReader().filename, cmd=cmdName, code=proc.returncode))
+ file=FileReader().filename, cmd=cmd_name, code=proc.returncode))
if lock is not None:
lock.release()
- return_code = max(2, return_code)
+ TeshState().set_return_code(2)
print('\n'.join(logs))
return
logs.append("Test suite `{file}': NOK (<{cmd}> got signal {sig})".format(
- file=FileReader().filename, cmd=cmdName,
+ file=FileReader().filename, cmd=cmd_name,
sig=SIGNALS_TO_NAMES_DICT[-proc.returncode]))
if lock is not None:
lock.release()
- return_code = max(max(-proc.returncode, 1), return_code)
+ TeshState().set_return_code(max(-proc.returncode, 1))
print('\n'.join(logs))
return
print('\n'.join(logs))
def can_run(self):
+ """ Check if ready to run """
return self.args is not None
##############
#
#
-if __name__ == '__main__':
+def main():
+ """ main function """
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
re.compile(r"Python runtime initialized with LC_CTYPE=C .*"),
# Seen on CircleCI
re.compile(r"cmake: /usr/local/lib/libcurl\.so\.4: no version information available \(required by cmake\)"),
- re.compile(r".*mmap broken on FreeBSD, but dlopen\+thread broken too\. Switching to dlopen\+raw contexts\."),
+ re.compile(
+ r".*mmap broken on FreeBSD, but dlopen\+thread broken too\. Switching to dlopen\+raw contexts\."),
re.compile(r".*dlopen\+thread broken on Apple and BSD\. Switching to raw contexts\."),
]
TeshState().jenkins = True # This is a Jenkins build
if options.teshfile is None:
- f = FileReader(None)
+ file = FileReader(None)
print("Test suite from stdin")
else:
if not os.path.isfile(options.teshfile):
print("Cannot open teshfile '" + options.teshfile + "': File not found")
tesh_exit(3)
- f = FileReader(options.teshfile)
- print("Test suite '" + f.abspath + "'")
+ file = FileReader(options.teshfile)
+ print("Test suite '" + file.abspath + "'")
if options.setenv is not None:
- for e in options.setenv:
- setenv(e)
+ for env in options.setenv:
+ setenv(env)
if options.cfg is not None:
- for c in options.cfg:
- TeshState().args_suffix += " --cfg=" + c
+ for cfg in options.cfg:
+ TeshState().args_suffix += " --cfg=" + cfg
if options.log is not None:
- for l in options.log:
- TeshState().args_suffix += " --log=" + l
+ for log in options.log:
+ TeshState().args_suffix += " --log=" + log
if options.wrapper is not None:
TeshState().wrapper = options.wrapper
# when ready, we execute it.
cmd = Cmd()
- line = f.readfullline()
+ line = file.readfullline()
while line is not None:
# print(">>============="+line+"==<<")
if not line:
elif line[0:2] == "$ ":
if cmd.run_if_possible():
cmd = Cmd()
- cmd.set_cmd(line[2:], f.linenumber)
+ cmd.set_cmd(line[2:], file.linenumber)
elif line[0:2] == "& ":
if cmd.run_if_possible():
cmd = Cmd()
- cmd.set_cmd(line[2:], f.linenumber)
+ cmd.set_cmd(line[2:], file.linenumber)
cmd.background = True
elif line[0:15] == "! output ignore":
else:
fatal_error("UNRECOGNIZED OPTION")
- line = f.readfullline()
+ line = file.readfullline()
cmd.run_if_possible()
TeshState().join_all_threads()
- if return_code == 0:
- if f.filename == "(stdin)":
+ if TeshState().return_code == 0:
+ if file.filename == "(stdin)":
print("Test suite from stdin OK")
else:
- print("Test suite `" + f.filename + "' OK")
- tesh_exit(return_code)
+ print("Test suite `" + file.filename + "' OK")
+ tesh_exit(TeshState().return_code)
+
+if __name__ == '__main__':
+ main()