296 lines
10 KiB
Python
Executable file
296 lines
10 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
from debian.deb822 import Deb822, Release
|
|
import email.utils
|
|
import os
|
|
import sys
|
|
import shutil
|
|
import subprocess
|
|
import argparse
|
|
import time
|
|
from datetime import timedelta
|
|
from collections import defaultdict
|
|
|
|
have_qemu = os.getenv("HAVE_QEMU", "yes") == "yes"
|
|
have_unshare = os.getenv("HAVE_UNSHARE", "yes") == "yes"
|
|
have_binfmt = os.getenv("HAVE_BINFMT", "yes") == "yes"
|
|
run_ma_same_tests = os.getenv("RUN_MA_SAME_TESTS", "yes") == "yes"
|
|
cmd = os.getenv("CMD", "./mmdebstrap")
|
|
|
|
default_dist = os.getenv("DEFAULT_DIST", "unstable")
|
|
all_dists = ["oldstable", "stable", "testing", "unstable"]
|
|
default_mode = "auto" if have_unshare else "root"
|
|
all_modes = ["auto", "root", "unshare", "fakechroot", "chrootless"]
|
|
default_variant = "apt"
|
|
all_variants = [
|
|
"extract",
|
|
"custom",
|
|
"essential",
|
|
"apt",
|
|
"minbase",
|
|
"buildd",
|
|
"-",
|
|
"standard",
|
|
]
|
|
default_format = "auto"
|
|
all_formats = ["auto", "directory", "tar", "squashfs", "ext2", "null"]
|
|
|
|
mirror = os.getenv("mirror", "http://127.0.0.1/debian")
|
|
hostarch = subprocess.check_output(["dpkg", "--print-architecture"]).decode().strip()
|
|
|
|
release_path = f"./shared/cache/debian/dists/{default_dist}/Release"
|
|
if not os.path.exists(release_path):
|
|
print("path doesn't exist:", release_path, file=sys.stderr)
|
|
print("run ./make_mirror.sh first", file=sys.stderr)
|
|
exit(1)
|
|
if os.getenv("SOURCE_DATE_EPOCH") is not None:
|
|
s_d_e = os.getenv("SOURCE_DATE_EPOCH")
|
|
else:
|
|
with open(release_path) as f:
|
|
rel = Release(f)
|
|
s_d_e = str(email.utils.mktime_tz(email.utils.parsedate_tz(rel["Date"])))
|
|
|
|
separator = (
|
|
"------------------------------------------------------------------------------"
|
|
)
|
|
|
|
|
|
def skip(condition, dist, mode, variant, fmt):
|
|
if not condition:
|
|
return ""
|
|
for line in condition.splitlines():
|
|
if not line:
|
|
continue
|
|
if eval(line):
|
|
return line.strip()
|
|
return ""
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("test", nargs="*", help="only run these tests")
|
|
parser.add_argument(
|
|
"-x",
|
|
"--exitfirst",
|
|
action="store_const",
|
|
dest="maxfail",
|
|
const=1,
|
|
help="exit instantly on first error or failed test.",
|
|
)
|
|
parser.add_argument(
|
|
"--maxfail",
|
|
metavar="num",
|
|
action="store",
|
|
type=int,
|
|
dest="maxfail",
|
|
default=0,
|
|
help="exit after first num failures or errors.",
|
|
)
|
|
parser.add_argument("--mode", metavar="mode", help="only run tests with this mode")
|
|
parser.add_argument("--dist", metavar="dist", help="only run tests with this dist")
|
|
args = parser.parse_args()
|
|
|
|
# copy over files from git or as distributed
|
|
for (git, dist, target) in [
|
|
("./mmdebstrap", "/usr/bin/mmdebstrap", "mmdebstrap"),
|
|
("./taridshift", "/usr/bin/mmtaridshift", "taridshift"),
|
|
("./tarfilter", "/usr/bin/mmtarfilter", "tarfilter"),
|
|
(
|
|
"./proxysolver",
|
|
"/usr/lib/apt/solvers/mmdebstrap-dump-solution",
|
|
"proxysolver",
|
|
),
|
|
(
|
|
"./ldconfig.fakechroot",
|
|
"/usr/libexec/mmdebstrap/ldconfig.fakechroot",
|
|
"ldconfig.fakechroot",
|
|
),
|
|
]:
|
|
if os.path.exists(git):
|
|
shutil.copy(git, f"shared/{target}")
|
|
else:
|
|
shutil.copy(dist, f"shared/{target}")
|
|
# copy over hooks from git or as distributed
|
|
if os.path.exists("hooks"):
|
|
shutil.copytree("hooks", "shared/hooks", dirs_exist_ok=True)
|
|
else:
|
|
shutil.copytree(
|
|
"/usr/share/mmdebstrap/hooks", "shared/hooks", dirs_exist_ok=True
|
|
)
|
|
|
|
tests = []
|
|
with open("coverage.txt") as f:
|
|
for test in Deb822.iter_paragraphs(f):
|
|
name = test["Test"]
|
|
dists = test.get("Dists", default_dist)
|
|
if dists == "any":
|
|
dists = all_dists
|
|
elif dists == "default":
|
|
dists = [default_dist]
|
|
else:
|
|
dists = dists.split()
|
|
modes = test.get("Modes", default_mode)
|
|
if modes == "any":
|
|
modes = all_modes
|
|
elif modes == "default":
|
|
modes = [default_mode]
|
|
else:
|
|
modes = modes.split()
|
|
variants = test.get("Variants", default_variant)
|
|
if variants == "any":
|
|
variants = all_variants
|
|
elif variants == "default":
|
|
variants = [default_variant]
|
|
else:
|
|
variants = variants.split()
|
|
formats = test.get("Formats", default_format)
|
|
if formats == "any":
|
|
formats = all_formats
|
|
elif formats == "default":
|
|
formats = [default_format]
|
|
else:
|
|
formats = formats.split()
|
|
for dist in dists:
|
|
for mode in modes:
|
|
for variant in variants:
|
|
for fmt in formats:
|
|
skipreason = skip(
|
|
test.get("Skip-If"), dist, mode, variant, fmt
|
|
)
|
|
if skipreason:
|
|
tt = ("skip", skipreason)
|
|
elif have_qemu:
|
|
tt = "qemu"
|
|
elif test.get("Needs-QEMU", "false") == "true":
|
|
tt = ("skip", "test needs QEMU")
|
|
elif test.get("Needs-Root", "false") == "true":
|
|
tt = "sudo"
|
|
elif mode == "auto" and not have_unshare:
|
|
tt = "sudo"
|
|
elif mode == "root":
|
|
tt = "sudo"
|
|
elif mode == "unshare" and not have_unshare:
|
|
tt = ("skip", "test needs unshare")
|
|
else:
|
|
tt = "null"
|
|
tests.append((tt, name, dist, mode, variant, fmt))
|
|
|
|
torun = []
|
|
num_tests = len(tests)
|
|
if args.test:
|
|
# check if all given tests are either a valid name or a valid number
|
|
for test in args.test:
|
|
if test in [name for (_, name, _, _, _, _) in tests]:
|
|
continue
|
|
if not test.isdigit():
|
|
print(f"cannot find test named {test}", file=sys.stderr)
|
|
exit(1)
|
|
if int(test) >= len(tests) or int(test) <= 0 or str(int(test)) != test:
|
|
print(f"test number {test} doesn't exist", file=sys.stderr)
|
|
exit(1)
|
|
|
|
for i, (_, name, _, _, _, _) in enumerate(tests):
|
|
# if either the number or test name matches, then we use this test,
|
|
# otherwise we skip it
|
|
if name in args.test:
|
|
torun.append(i)
|
|
if str(i + 1) in args.test:
|
|
torun.append(i)
|
|
num_tests = len(torun)
|
|
|
|
starttime = time.time()
|
|
skipped = defaultdict(list)
|
|
failed = []
|
|
num_success = 0
|
|
num_finished = 0
|
|
for i, (test, name, dist, mode, variant, fmt) in enumerate(tests):
|
|
if torun and i not in torun:
|
|
continue
|
|
print(separator, file=sys.stderr)
|
|
print("(%d/%d) %s" % (i + 1, len(tests), name), file=sys.stderr)
|
|
print("dist: %s" % dist, file=sys.stderr)
|
|
print("mode: %s" % mode, file=sys.stderr)
|
|
print("variant: %s" % variant, file=sys.stderr)
|
|
print("format: %s" % fmt, file=sys.stderr)
|
|
if num_finished > 0:
|
|
currenttime = time.time()
|
|
timeleft = timedelta(
|
|
seconds=int(
|
|
(num_tests - num_finished)
|
|
* (currenttime - starttime)
|
|
/ num_finished
|
|
)
|
|
)
|
|
print("time left: %s" % timeleft, file=sys.stderr)
|
|
if failed:
|
|
print("failed: %d" % len(failed), file=sys.stderr)
|
|
num_finished += 1
|
|
with open("tests/" + name) as fin, open("shared/test.sh", "w") as fout:
|
|
for line in fin:
|
|
line = line.replace("{{ CMD }}", cmd)
|
|
line = line.replace("{{ SOURCE_DATE_EPOCH }}", s_d_e)
|
|
line = line.replace("{{ DIST }}", dist)
|
|
line = line.replace("{{ MIRROR }}", mirror)
|
|
line = line.replace("{{ MODE }}", mode)
|
|
line = line.replace("{{ VARIANT }}", variant)
|
|
line = line.replace("{{ FORMAT }}", fmt)
|
|
line = line.replace("{{ HOSTARCH }}", hostarch)
|
|
fout.write(line)
|
|
argv = None
|
|
match test:
|
|
case "qemu":
|
|
argv = ["./run_qemu.sh"]
|
|
case "sudo":
|
|
argv = ["./run_null.sh", "SUDO"]
|
|
case "null":
|
|
argv = ["./run_null.sh"]
|
|
case ("skip", reason):
|
|
skipped[reason].append(
|
|
("(%d/%d) %s" % (i + 1, len(tests), name), dist, mode, variant, fmt)
|
|
)
|
|
print(f"skipped because of {reason}", file=sys.stderr)
|
|
continue
|
|
print(separator, file=sys.stderr)
|
|
if args.dist and args.dist != dist:
|
|
print(f"skipping because of --dist={args.dist}", file=sys.stderr)
|
|
continue
|
|
if args.mode and args.mode != mode:
|
|
print(f"skipping because of --mode={args.mode}", file=sys.stderr)
|
|
continue
|
|
proc = subprocess.Popen(argv)
|
|
try:
|
|
proc.wait()
|
|
except KeyboardInterrupt:
|
|
proc.terminate()
|
|
proc.wait()
|
|
break
|
|
print(separator, file=sys.stderr)
|
|
if proc.returncode != 0:
|
|
failed.append(
|
|
("(%d/%d) %s" % (i + 1, len(tests), name), dist, mode, variant, fmt)
|
|
)
|
|
print("result: FAILURE", file=sys.stderr)
|
|
else:
|
|
print("result: SUCCESS", file=sys.stderr)
|
|
num_success += 1
|
|
if args.maxfail and len(failed) >= args.maxfail:
|
|
break
|
|
print(
|
|
"successfully ran %d tests" % num_success,
|
|
file=sys.stderr,
|
|
)
|
|
if skipped:
|
|
print("skipped %d:" % sum([len(v) for v in skipped.values()]), file=sys.stderr)
|
|
for reason, l in skipped.items():
|
|
print(f"skipped because of {reason}:", file=sys.stderr)
|
|
for t in l:
|
|
print(f" {t}", file=sys.stderr)
|
|
if failed:
|
|
print("failed %d:" % len(failed), file=sys.stderr)
|
|
for t in failed:
|
|
print(f" {t}", file=sys.stderr)
|
|
exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|