1
0
mirror of https://github.com/TheLocehiliosan/yadm synced 2024-10-27 20:34:27 +00:00
TheLocehiliosan_yadm/test/conftest.py

598 lines
16 KiB
Python
Raw Normal View History

"""Global tests configuration and fixtures"""
import collections
2019-09-26 13:46:05 +00:00
import contextlib
import copy
import os
import platform
import pwd
2023-07-09 16:13:13 +00:00
import shutil
2023-07-10 14:14:33 +00:00
from subprocess import PIPE, Popen
import py
import pytest
def pytest_addoption(parser):
"""Add options to pytest"""
parser.addoption(
"--force-linters",
action="store_true",
default=False,
help="Run linters regardless of installed versions",
)
2023-07-10 19:43:17 +00:00
@pytest.fixture(scope="session")
def shellcheck_version():
"""Version of shellcheck supported"""
2023-07-10 19:43:17 +00:00
return "0.9.0"
2023-07-10 19:43:17 +00:00
@pytest.fixture(scope="session")
def pylint_version():
"""Version of pylint supported"""
2023-07-10 19:43:17 +00:00
return "2.17.0"
2023-07-10 19:43:17 +00:00
@pytest.fixture(scope="session")
2023-07-10 14:14:33 +00:00
def isort_version():
"""Version of isort supported"""
2023-07-10 19:43:17 +00:00
return "5.12.0"
2023-07-10 14:14:33 +00:00
2023-07-10 19:43:17 +00:00
@pytest.fixture(scope="session")
def flake8_version():
"""Version of flake8 supported"""
2023-07-10 19:43:17 +00:00
return "6.0.0"
2023-07-10 19:43:17 +00:00
@pytest.fixture(scope="session")
2023-07-10 14:18:36 +00:00
def black_version():
"""Version of black supported"""
2023-07-10 19:43:17 +00:00
return "23.1.0"
2023-07-10 14:18:36 +00:00
2023-07-10 19:43:17 +00:00
@pytest.fixture(scope="session")
2019-03-21 12:38:38 +00:00
def yamllint_version():
"""Version of yamllint supported"""
2023-07-10 19:43:17 +00:00
return "1.30.0"
2019-03-21 12:38:38 +00:00
2023-07-10 19:43:17 +00:00
@pytest.fixture(scope="session")
def tst_user():
"""Test session's user id"""
return pwd.getpwuid(os.getuid()).pw_name
2023-07-10 19:43:17 +00:00
@pytest.fixture(scope="session")
def tst_host():
"""Test session's short hostname value"""
2023-07-10 19:43:17 +00:00
return platform.node().split(".")[0]
2023-07-10 19:43:17 +00:00
@pytest.fixture(scope="session")
def tst_distro(runner):
"""Test session's distro"""
2023-07-10 19:43:17 +00:00
distro = ""
2019-09-26 13:46:05 +00:00
with contextlib.suppress(Exception):
2023-07-10 19:43:17 +00:00
run = runner(command=["lsb_release", "-si"], report=False)
distro = run.out.strip()
return distro
2023-07-10 19:43:17 +00:00
@pytest.fixture(scope="session")
def tst_distro_family(runner):
"""Test session's distro_family"""
2023-07-10 19:43:17 +00:00
family = ""
with contextlib.suppress(Exception):
2023-07-10 19:43:17 +00:00
run = runner(command=["grep", "-oP", r"ID_LIKE=\K.+", "/etc/os-release"], report=False)
family = run.out.strip()
return family
2023-07-10 19:43:17 +00:00
@pytest.fixture(scope="session")
def tst_sys():
"""Test session's uname value"""
return platform.system()
2023-07-10 19:43:17 +00:00
@pytest.fixture(scope="session")
def tst_arch():
"""Test session's uname value"""
return platform.machine()
2023-07-10 19:43:17 +00:00
@pytest.fixture(scope="session")
def supported_commands():
"""List of supported commands
This list should be updated every time yadm learns a new command.
"""
return [
2023-07-10 19:43:17 +00:00
"alt",
"bootstrap",
"clean",
"clone",
"config",
"decrypt",
"encrypt",
"enter",
"git-crypt",
"gitconfig",
"help",
"init",
"introspect",
"list",
"perms",
"transcrypt",
"upgrade",
"version",
]
@pytest.fixture(scope="session")
def supported_configs():
"""List of supported config options
This list should be updated every time yadm learns a new config.
"""
return [
2023-07-10 19:43:17 +00:00
"local.arch",
"local.class",
"local.hostname",
"local.os",
"local.user",
"yadm.alt-copy",
"yadm.auto-alt",
"yadm.auto-exclude",
"yadm.auto-perms",
"yadm.auto-private-dirs",
"yadm.cipher",
"yadm.git-program",
"yadm.gpg-perms",
"yadm.gpg-program",
"yadm.gpg-recipient",
"yadm.openssl-ciphername",
"yadm.openssl-old",
"yadm.openssl-program",
"yadm.ssh-perms",
]
@pytest.fixture(scope="session")
def supported_switches():
"""List of supported switches
This list should be updated every time yadm learns a new switch.
"""
return [
2023-07-10 19:43:17 +00:00
"--yadm-archive",
"--yadm-bootstrap",
"--yadm-config",
"--yadm-data",
"--yadm-dir",
"--yadm-encrypt",
"--yadm-repo",
"-Y",
]
@pytest.fixture(scope="session")
def supported_local_configs(supported_configs):
"""List of supported local config options"""
2023-07-10 19:43:17 +00:00
return [c for c in supported_configs if c.startswith("local.")]
2023-07-10 19:43:17 +00:00
class Runner:
"""Class for running commands
Within yadm tests, this object should be used when running commands that
require:
* Acting on the status code
* Parsing the output of the command
* Passing input to the command
Other instances of simply running commands should use os.system().
"""
2023-07-10 19:43:17 +00:00
def __init__(self, command, inp=None, shell=False, cwd=None, env=None, expect=None, report=True):
if shell:
2023-07-10 19:43:17 +00:00
self.command = " ".join([str(cmd) for cmd in command])
else:
self.command = command
if env is None:
env = {}
merged_env = os.environ.copy()
merged_env.update(env)
self.inp = inp
self.wrap(expect)
2023-07-09 16:13:13 +00:00
with Popen(
self.command,
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
shell=shell,
cwd=cwd,
env=merged_env,
2023-07-09 16:13:13 +00:00
) as process:
input_bytes = self.inp
if self.inp:
input_bytes = self.inp.encode()
(out_bstream, err_bstream) = process.communicate(input=input_bytes)
self.out = out_bstream.decode()
self.err = err_bstream.decode()
self.code = process.wait()
self.success = self.code == 0
self.failure = self.code != 0
if report:
self.report()
def __repr__(self):
2023-07-10 19:43:17 +00:00
return f"Runner({self.command})"
def report(self):
"""Print code/stdout/stderr"""
2023-07-10 19:43:17 +00:00
print(f"{self}")
print(f" RUN: code:{self.code}")
if self.inp:
2023-07-10 19:43:17 +00:00
print(f" RUN: input:\n{self.inp}")
print(f" RUN: stdout:\n{self.out}")
print(f" RUN: stderr:\n{self.err}")
def wrap(self, expect):
"""Wrap command with expect"""
if not expect:
return
2023-07-10 19:43:17 +00:00
cmdline = " ".join([f'"{w}"' for w in self.command])
expect_script = f"set timeout 2\nspawn {cmdline}\n"
for question, answer in expect:
2023-07-10 19:43:17 +00:00
expect_script += "expect {\n" f'"{question}" {{send "{answer}\\r"}}\n' "timeout {close;exit 128}\n" "}\n"
expect_script += "expect eof\n" "foreach {pid spawnid os_error_flag value} [wait] break\n" "exit $value"
self.inp = expect_script
2023-07-10 19:43:17 +00:00
print(f"EXPECT:{expect_script}")
self.command = ["expect"]
2023-07-10 19:43:17 +00:00
@pytest.fixture(scope="session")
def runner():
"""Class for running commands"""
return Runner
2023-07-10 19:43:17 +00:00
@pytest.fixture(scope="session")
def config_git():
"""Configure global git configuration, if missing"""
2023-07-10 19:43:17 +00:00
os.system("git config init.defaultBranch || git config --global init.defaultBranch master")
os.system('git config user.name || git config --global user.name "test"')
os.system('git config user.email || git config --global user.email "test@test.test"')
@pytest.fixture()
def repo_config(runner, paths):
"""Function to query a yadm repo configuration value"""
def query_func(key):
"""Query a yadm repo configuration value"""
run = runner(
2023-07-10 19:43:17 +00:00
command=("git", "config", "--local", key),
env={"GIT_DIR": paths.repo},
report=False,
2023-07-10 19:43:17 +00:00
)
return run.out.rstrip()
return query_func
2023-07-10 19:43:17 +00:00
@pytest.fixture(scope="session")
def yadm():
"""Path to yadm program to be tested"""
2023-07-10 19:43:17 +00:00
full_path = os.path.realpath("yadm")
assert os.path.isfile(full_path), "yadm program file isn't present"
return full_path
@pytest.fixture()
def paths(tmpdir, yadm):
"""Function scoped test paths"""
2023-07-10 19:43:17 +00:00
dir_root = tmpdir.mkdir("root")
dir_remote = dir_root.mkdir("remote")
dir_work = dir_root.mkdir("work")
dir_xdg_data = dir_root.mkdir("xdg_data")
dir_xdg_home = dir_root.mkdir("xdg_home")
dir_data = dir_xdg_data.mkdir("yadm")
dir_yadm = dir_xdg_home.mkdir("yadm")
dir_hooks = dir_yadm.mkdir("hooks")
dir_repo = dir_data.mkdir("repo.git")
file_archive = dir_data.join("archive")
file_bootstrap = dir_yadm.join("bootstrap")
file_config = dir_yadm.join("config")
file_encrypt = dir_yadm.join("encrypt")
paths = collections.namedtuple(
2023-07-10 19:43:17 +00:00
"Paths",
[
"pgm",
"root",
"remote",
"work",
"xdg_data",
"xdg_home",
"data",
"yadm",
"hooks",
"repo",
"archive",
"bootstrap",
"config",
"encrypt",
],
)
os.environ["XDG_CONFIG_HOME"] = str(dir_xdg_home)
os.environ["XDG_DATA_HOME"] = str(dir_xdg_data)
return paths(
yadm,
dir_root,
dir_remote,
dir_work,
dir_xdg_data,
dir_xdg_home,
dir_data,
dir_yadm,
dir_hooks,
dir_repo,
file_archive,
file_bootstrap,
file_config,
file_encrypt,
2023-07-10 19:43:17 +00:00
)
@pytest.fixture()
def yadm_cmd(paths):
"""Generate custom command_list function"""
2023-07-10 19:43:17 +00:00
def command_list(*args):
"""Produce params for running yadm with -Y"""
return [paths.pgm] + list(args)
2023-07-10 19:43:17 +00:00
return command_list
2023-07-09 16:13:13 +00:00
class NoRelativePath(Exception):
"""Exception when finding relative paths"""
2023-07-10 19:43:17 +00:00
class DataFile:
"""Datafile object"""
def __init__(self, path, tracked=True, private=False):
self.__path = path
self.__parent = None
self.__tracked = tracked
self.__private = private
@property
def path(self):
"""Path property"""
return self.__path
@property
def relative(self):
"""Relative path property"""
if self.__parent:
return self.__parent.join(self.path)
2023-07-10 19:43:17 +00:00
raise NoRelativePath("Unable to provide relative path, no parent")
@property
def tracked(self):
"""Tracked property"""
return self.__tracked
@property
def private(self):
"""Private property"""
return self.__private
def relative_to(self, parent):
"""Update all relative paths to this py.path"""
self.__parent = parent
2023-07-10 19:43:17 +00:00
class DataSet:
"""Dataset object"""
def __init__(self):
2023-07-09 16:13:13 +00:00
self.__files = []
self.__dirs = []
self.__tracked_dirs = []
self.__private_dirs = []
self.__relpath = None
def __repr__(self):
2023-07-10 19:43:17 +00:00
return f"[DS with {len(self)} files; " f"{len(self.tracked)} tracked, " f"{len(self.private)} private]"
def __iter__(self):
return iter(self.__files)
def __len__(self):
return len(self.__files)
def __contains__(self, datafile):
if [f for f in self.__files if f.path == datafile]:
return True
if datafile in self.__files:
return True
return False
@property
def files(self):
"""List of DataFiles in DataSet"""
return list(self.__files)
@property
def tracked(self):
"""List of tracked DataFiles in DataSet"""
return [f for f in self.__files if f.tracked]
@property
def private(self):
"""List of private DataFiles in DataSet"""
return [f for f in self.__files if f.private]
@property
def dirs(self):
"""List of directories in DataSet"""
return list(self.__dirs)
@property
def plain_dirs(self):
"""List of directories in DataSet not starting with '.'"""
2023-07-10 19:43:17 +00:00
return [d for d in self.dirs if not d.startswith(".")]
@property
def hidden_dirs(self):
"""List of directories in DataSet starting with '.'"""
2023-07-10 19:43:17 +00:00
return [d for d in self.dirs if d.startswith(".")]
@property
def tracked_dirs(self):
"""List of directories in DataSet not starting with '.'"""
2023-07-10 19:43:17 +00:00
return [d for d in self.__tracked_dirs if not d.startswith(".")]
@property
def private_dirs(self):
"""List of directories in DataSet considered 'private'"""
return list(self.__private_dirs)
def add_file(self, path, tracked=True, private=False):
"""Add file to data set"""
if path not in self:
datafile = DataFile(path, tracked, private)
if self.__relpath:
datafile.relative_to(self.__relpath)
self.__files.append(datafile)
dname = os.path.dirname(path)
if dname and dname not in self.__dirs:
self.__dirs.append(dname)
if tracked:
self.__tracked_dirs.append(dname)
if private:
self.__private_dirs.append(dname)
def relative_to(self, relpath):
"""Update all relative paths to this py.path"""
self.__relpath = relpath
for datafile in self.files:
datafile.relative_to(self.__relpath)
2023-07-10 19:43:17 +00:00
@pytest.fixture(scope="session")
def ds1_dset(tst_sys):
"""Meta-data for dataset one files"""
dset = DataSet()
2023-07-10 19:43:17 +00:00
dset.add_file("t1")
dset.add_file("d1/t2")
dset.add_file(f"test_alt_copy##os.{tst_sys}")
dset.add_file("u1", tracked=False)
dset.add_file("d2/u2", tracked=False)
dset.add_file(".ssh/p1", tracked=False, private=True)
dset.add_file(".ssh/.p2", tracked=False, private=True)
dset.add_file(".gnupg/p3", tracked=False, private=True)
dset.add_file(".gnupg/.p4", tracked=False, private=True)
return dset
2023-07-10 19:43:17 +00:00
@pytest.fixture(scope="session")
def ds1_data(tmpdir_factory, config_git, ds1_dset, runner):
"""A set of test data, worktree & repo"""
# pylint: disable=unused-argument
# This is ignored because
# @pytest.mark.usefixtures('config_git')
# cannot be applied to another fixture.
2023-07-10 19:43:17 +00:00
data = tmpdir_factory.mktemp("ds1")
2023-07-10 19:43:17 +00:00
work = data.mkdir("work")
for datafile in ds1_dset:
work.join(datafile.path).write(datafile.path, ensure=True)
2023-07-10 19:43:17 +00:00
repo = data.mkdir("repo.git")
env = os.environ.copy()
2023-07-10 19:43:17 +00:00
env["GIT_DIR"] = str(repo)
runner(command=["git", "init", "--shared=0600", "--bare", str(repo)], report=False)
runner(command=["git", "config", "core.bare", "false"], env=env, report=False)
runner(command=["git", "config", "status.showUntrackedFiles", "no"], env=env, report=False)
runner(command=["git", "config", "yadm.managed", "true"], env=env, report=False)
runner(command=["git", "config", "core.worktree", str(work)], env=env, report=False)
runner(command=["git", "add"] + [str(work.join(f.path)) for f in ds1_dset if f.tracked], env=env)
runner(command=["git", "commit", "--allow-empty", "-m", "Initial commit"], env=env, report=False)
data = collections.namedtuple("Data", ["work", "repo"])
return data(work, repo)
@pytest.fixture()
def ds1_work_copy(ds1_data, paths):
"""Function scoped copy of ds1_data.work"""
2023-07-09 16:13:13 +00:00
shutil.copytree(str(ds1_data.work), str(paths.work), dirs_exist_ok=True)
@pytest.fixture()
def ds1_repo_copy(runner, ds1_data, paths):
"""Function scoped copy of ds1_data.repo"""
2023-07-09 16:13:13 +00:00
shutil.copytree(str(ds1_data.repo), str(paths.repo), dirs_exist_ok=True)
env = os.environ.copy()
2023-07-10 19:43:17 +00:00
env["GIT_DIR"] = str(paths.repo)
runner(command=["git", "config", "core.worktree", str(paths.work)], env=env, report=False)
@pytest.fixture()
def ds1_copy(ds1_work_copy, ds1_repo_copy):
"""Function scoped copy of ds1_data"""
# pylint: disable=unused-argument
# This is ignored because
# @pytest.mark.usefixtures('ds1_work_copy', 'ds1_repo_copy')
# cannot be applied to another fixture.
return None
@pytest.fixture()
def ds1(ds1_work_copy, paths, ds1_dset):
"""Function scoped ds1_dset w/paths"""
# pylint: disable=unused-argument
# This is ignored because
# @pytest.mark.usefixtures('ds1_copy')
# cannot be applied to another fixture.
dscopy = copy.deepcopy(ds1_dset)
dscopy.relative_to(copy.deepcopy(paths.work))
return dscopy
2023-07-10 19:43:17 +00:00
@pytest.fixture(scope="session")
def gnupg(tmpdir_factory, runner):
"""Location of GNUPGHOME"""
def register_gpg_password(password):
"""Publish a new GPG mock password"""
2023-07-10 19:43:17 +00:00
py.path.local("/tmp/mock-password").write(password)
2023-07-10 19:43:17 +00:00
home = tmpdir_factory.mktemp("gnupghome")
home.chmod(0o700)
2023-07-10 19:43:17 +00:00
conf = home.join("gpg.conf")
conf.write("no-secmem-warning\n")
2019-12-03 16:40:40 +00:00
conf.chmod(0o600)
2023-07-10 19:43:17 +00:00
agentconf = home.join("gpg-agent.conf")
agentconf.write(f'pinentry-program {os.path.abspath("test/pinentry-mock")}\n' "max-cache-ttl 0\n")
2019-12-03 16:40:40 +00:00
agentconf.chmod(0o600)
2023-07-10 19:43:17 +00:00
data = collections.namedtuple("GNUPG", ["home", "pw"])
env = os.environ.copy()
2023-07-10 19:43:17 +00:00
env["GNUPGHOME"] = home
# this pre-populates std files in the GNUPGHOME
2023-07-10 19:43:17 +00:00
runner(["gpg", "-k"], env=env)
return data(home, register_gpg_password)