1
0
mirror of https://github.com/TheLocehiliosan/yadm synced 2026-03-02 03:49:29 +00:00

Change handling of dirs with alt conditions

Instead of creating symlinks pointing at the directory, create individual
symlinks for each file within the dir alternate (fixes #490).

Also rework how stale symlinks are removed. Now a (stale) symlink will only be
removed if it's pointing at a file that's an altnerate file (fixes #236).
This commit is contained in:
Erik Flodin
2025-01-27 21:06:34 +01:00
parent 7f76a455bb
commit 4214de8d91
6 changed files with 244 additions and 233 deletions

View File

@@ -40,15 +40,15 @@ def test_alt_source(runner, paths, tracked, encrypt, exclude, yadm_alt):
source_file_content = link_path + "##default"
source_file = basepath.join(source_file_content)
link_file = paths.work.join(link_path)
if link_path == utils.ALT_DIR:
source_file = source_file.join(utils.CONTAINED)
link_file = link_file.join(utils.CONTAINED)
if tracked or (encrypt and not exclude):
assert link_file.islink()
target = py.path.local(os.path.realpath(link_file))
if target.isfile():
assert link_file.read() == source_file_content
assert str(source_file) in linked
else:
assert link_file.join(utils.CONTAINED).read() == source_file_content
assert str(source_file) in linked
assert target.isfile()
assert link_file.read() == source_file_content
assert str(source_file) in linked
else:
assert not link_file.exists()
assert str(source_file) not in linked
@@ -73,6 +73,9 @@ def test_relative_link(runner, paths, yadm_alt):
source_file_content = link_path + "##default"
source_file = basepath.join(source_file_content)
link_file = paths.work.join(link_path)
if link_path == utils.ALT_DIR:
source_file = source_file.join(utils.CONTAINED)
link_file = link_file.join(utils.CONTAINED)
link = link_file.readlink()
relpath = os.path.relpath(source_file, start=os.path.dirname(link_file))
assert link == relpath
@@ -128,15 +131,17 @@ def test_alt_conditions(runner, paths, tst_arch, tst_sys, tst_distro, tst_distro
linked = utils.parse_alt_output(run.out)
for link_path in TEST_PATHS:
source_file = link_path + suffix
assert paths.work.join(link_path).islink()
target = py.path.local(os.path.realpath(paths.work.join(link_path)))
if target.isfile():
assert paths.work.join(link_path).read() == source_file
assert str(paths.work.join(source_file)) in linked
else:
assert paths.work.join(link_path).join(utils.CONTAINED).read() == source_file
assert str(paths.work.join(source_file)) in linked
source_file_content = link_path + suffix
source_file = paths.work.join(source_file_content)
link_file = paths.work.join(link_path)
if link_path == utils.ALT_DIR:
source_file = source_file.join(utils.CONTAINED)
link_file = link_file.join(utils.CONTAINED)
assert link_file.islink()
target = py.path.local(os.path.realpath(link_file))
assert target.isfile()
assert link_file.read() == source_file_content
assert str(source_file) in linked
@pytest.mark.usefixtures("ds1_copy")
@@ -156,6 +161,7 @@ def test_alt_templates(runner, paths, kind, label):
suffix = f"##{label}.{kind}"
if kind is None:
suffix = f"##{label}"
utils.create_alt_files(paths, suffix)
run = runner([paths.pgm, "-Y", yadm_dir, "--yadm-data", yadm_data, "alt"])
assert run.success
@@ -163,11 +169,15 @@ def test_alt_templates(runner, paths, kind, label):
created = utils.parse_alt_output(run.out, linked=False)
for created_path in TEST_PATHS:
if created_path != utils.ALT_DIR:
source_file = created_path + suffix
assert paths.work.join(created_path).isfile()
assert paths.work.join(created_path).read().strip() == source_file
assert str(paths.work.join(source_file)) in created
source_file_content = created_path + suffix
source_file = paths.work.join(source_file_content)
created_file = paths.work.join(created_path)
if created_path == utils.ALT_DIR:
source_file = source_file.join(utils.CONTAINED)
created_file = created_file.join(utils.CONTAINED)
assert created_file.isfile()
assert created_file.read().strip() == source_file_content
assert str(source_file) in created
@pytest.mark.usefixtures("ds1_copy")
@@ -201,20 +211,22 @@ def test_auto_alt(runner, yadm_cmd, paths, autoalt):
linked = utils.parse_alt_output(run.out)
for link_path in TEST_PATHS:
source_file = link_path + "##default"
source_file_content = link_path + "##default"
source_file = paths.work.join(source_file_content)
link_file = paths.work.join(link_path)
if link_path == utils.ALT_DIR:
source_file = source_file.join(utils.CONTAINED)
link_file = link_file.join(utils.CONTAINED)
if autoalt == "false":
assert not paths.work.join(link_path).exists()
assert not link_file.exists()
else:
assert paths.work.join(link_path).islink()
target = py.path.local(os.path.realpath(paths.work.join(link_path)))
if target.isfile():
assert paths.work.join(link_path).read() == source_file
# no linking output when run via auto-alt
assert str(paths.work.join(source_file)) not in linked
else:
assert paths.work.join(link_path).join(utils.CONTAINED).read() == source_file
# no linking output when run via auto-alt
assert str(paths.work.join(source_file)) not in linked
assert link_file.islink()
target = py.path.local(os.path.realpath(link_file))
assert target.isfile()
assert link_file.read() == source_file_content
# no linking output when run via auto-alt
assert str(source_file) not in linked
@pytest.mark.usefixtures("ds1_copy")
@@ -236,6 +248,8 @@ def test_alt_exclude(runner, yadm_cmd, paths, autoexclude):
status = run.out.split("\0")
for link_path in TEST_PATHS:
if link_path == utils.ALT_DIR:
link_path = f"{link_path}/{utils.CONTAINED}"
flags = "??" if autoexclude == "false" else "!!"
assert f"{flags} {link_path}" in status
@@ -262,16 +276,18 @@ def test_stale_link_removal(runner, yadm_cmd, paths):
linked = utils.parse_alt_output(run.out)
# assert the proper linking has occurred
for stale_path in TEST_PATHS:
source_file = stale_path + "##class." + tst_class
assert paths.work.join(stale_path).islink()
target = py.path.local(os.path.realpath(paths.work.join(stale_path)))
if target.isfile():
assert paths.work.join(stale_path).read() == source_file
assert str(paths.work.join(source_file)) in linked
else:
assert paths.work.join(stale_path).join(utils.CONTAINED).read() == source_file
assert str(paths.work.join(source_file)) in linked
for link_path in TEST_PATHS:
source_file_content = link_path + f"##class.{tst_class}"
source_file = paths.work.join(source_file_content)
link_file = paths.work.join(link_path)
if link_path == utils.ALT_DIR:
source_file = source_file.join(utils.CONTAINED)
link_file = link_file.join(utils.CONTAINED)
assert link_file.islink()
target = py.path.local(os.path.realpath(link_file))
assert target.isfile()
assert link_file.read() == source_file_content
assert str(source_file) in linked
# change the class so there are no valid alternates
utils.set_local(paths, "class", "changedclass")
@@ -284,9 +300,53 @@ def test_stale_link_removal(runner, yadm_cmd, paths):
# assert the linking is removed
for stale_path in TEST_PATHS:
source_file = stale_path + "##class." + tst_class
assert not paths.work.join(stale_path).exists()
assert str(paths.work.join(source_file)) not in linked
source_file_content = stale_path + f"##class.{tst_class}"
source_file = paths.work.join(source_file_content)
stale_file = paths.work.join(stale_path)
if stale_path == utils.ALT_DIR:
source_file = source_file.join(utils.CONTAINED)
stale_file = stale_file.join(utils.CONTAINED)
assert not stale_file.exists()
assert str(source_file) not in linked
@pytest.mark.usefixtures("ds1_copy")
def test_legacy_dir_link_removal(runner, yadm_cmd, paths):
"""Legacy link to alternative dir is removed
This test ensures that a legacy dir alternative (i.e. symlink to the dir
itself) is converted to indiividual links.
"""
utils.create_alt_files(paths, "##default")
# Create legacy link
link_dir = paths.work.join(utils.ALT_DIR)
link_dir.mksymlinkto(link_dir.basename + "##default")
assert link_dir.islink()
# run alt to trigger linking
run = runner(yadm_cmd("alt"))
assert run.success
assert run.err == ""
linked = utils.parse_alt_output(run.out)
# assert legacy link is removed
assert not link_dir.islink()
# assert the proper linking has occurred
for link_path in TEST_PATHS:
source_file_content = link_path + "##default"
source_file = paths.work.join(source_file_content)
link_file = paths.work.join(link_path)
if link_path == utils.ALT_DIR:
source_file = source_file.join(utils.CONTAINED)
link_file = link_file.join(utils.CONTAINED)
assert link_file.islink()
target = py.path.local(os.path.realpath(link_file))
assert target.isfile()
assert link_file.read() == source_file_content
assert str(source_file) in linked
@pytest.mark.usefixtures("ds1_repo_copy")

View File

@@ -1,39 +0,0 @@
"""Unit tests: remove_stale_links"""
import os
import pytest
@pytest.mark.parametrize("linked", [True, False])
@pytest.mark.parametrize("kind", ["file", "symlink"])
def test_remove_stale_links(runner, yadm, tmpdir, kind, linked):
"""Test remove_stale_links()"""
source_file = tmpdir.join("source_file")
source_file.write("source file", ensure=True)
link = tmpdir.join("link")
if kind == "file":
link.write("link file", ensure=True)
else:
os.system(f"ln -s {source_file} {link}")
alt_linked = ""
if linked:
alt_linked = source_file
script = f"""
YADM_TEST=1 source {yadm}
possible_alt_targets=({link})
alt_linked=({alt_linked})
function rm() {{ echo rm "$@"; }}
remove_stale_links
"""
run = runner(command=["bash"], inp=script)
assert run.err == ""
if kind == "symlink" and not linked:
assert f"rm -f {link}" in run.out
else:
assert run.out == ""

View File

@@ -39,13 +39,11 @@ CONDITION = {
TEMPLATE_LABELS = ["t", "template", "yadm"]
def calculate_score(filename):
def calculate_score(conditions):
"""Calculate the expected score"""
# pylint: disable=too-many-branches
score = 0
_, conditions = filename.split("##", 1)
for condition in conditions.split(","):
label = condition
value = None
@@ -111,70 +109,70 @@ def test_score_values(runner, yadm, default, arch, system, distro, cla, host, us
local_distro = "testDISTro"
local_host = "testHost"
local_user = "testUser"
filenames = {"filename##": 0}
conditions = {"": 0}
if default:
for filename in list(filenames):
for condition in list(conditions):
for label in CONDITION[default]["labels"]:
newfile = filename
if not newfile.endswith("##"):
newfile += ","
newfile += label
filenames[newfile] = calculate_score(newfile)
newcond = condition
if newcond:
newcond += ","
newcond += label
conditions[newcond] = calculate_score(newcond)
if arch:
for filename in list(filenames):
for condition in list(conditions):
for match in [True, False]:
for label in CONDITION[arch]["labels"]:
newfile = filename
if not newfile.endswith("##"):
newfile += ","
newfile += ".".join([label, local_arch if match else "badarch"])
filenames[newfile] = calculate_score(newfile)
newcond = condition
if newcond:
newcond += ","
newcond += ".".join([label, local_arch if match else "badarch"])
conditions[newcond] = calculate_score(newcond)
if system:
for filename in list(filenames):
for condition in list(conditions):
for match in [True, False]:
for label in CONDITION[system]["labels"]:
newfile = filename
if not newfile.endswith("##"):
newfile += ","
newfile += ".".join([label, local_system if match else "badsys"])
filenames[newfile] = calculate_score(newfile)
newcond = condition
if newcond:
newcond += ","
newcond += ".".join([label, local_system if match else "badsys"])
conditions[newcond] = calculate_score(newcond)
if distro:
for filename in list(filenames):
for condition in list(conditions):
for match in [True, False]:
for label in CONDITION[distro]["labels"]:
newfile = filename
if not newfile.endswith("##"):
newfile += ","
newfile += ".".join([label, local_distro if match else "baddistro"])
filenames[newfile] = calculate_score(newfile)
newcond = condition
if newcond:
newcond += ","
newcond += ".".join([label, local_distro if match else "baddistro"])
conditions[newcond] = calculate_score(newcond)
if cla:
for filename in list(filenames):
for condition in list(conditions):
for match in [True, False]:
for label in CONDITION[cla]["labels"]:
newfile = filename
if not newfile.endswith("##"):
newfile += ","
newfile += ".".join([label, local_class if match else "badclass"])
filenames[newfile] = calculate_score(newfile)
newcond = condition
if newcond:
newcond += ","
newcond += ".".join([label, local_class if match else "badclass"])
conditions[newcond] = calculate_score(newcond)
if host:
for filename in list(filenames):
for condition in list(conditions):
for match in [True, False]:
for label in CONDITION[host]["labels"]:
newfile = filename
if not newfile.endswith("##"):
newfile += ","
newfile += ".".join([label, local_host if match else "badhost"])
filenames[newfile] = calculate_score(newfile)
newcond = condition
if newcond:
newcond += ","
newcond += ".".join([label, local_host if match else "badhost"])
conditions[newcond] = calculate_score(newcond)
if user:
for filename in list(filenames):
for condition in list(conditions):
for match in [True, False]:
for label in CONDITION[user]["labels"]:
newfile = filename
if not newfile.endswith("##"):
newfile += ","
newfile += ".".join([label, local_user if match else "baduser"])
filenames[newfile] = calculate_score(newfile)
newcond = condition
if newcond:
newcond += ","
newcond += ".".join([label, local_user if match else "baduser"])
conditions[newcond] = calculate_score(newcond)
script = f"""
YADM_TEST=1 source {yadm}
@@ -187,15 +185,15 @@ def test_score_values(runner, yadm, default, arch, system, distro, cla, host, us
local_host={local_host}
local_user={local_user}
"""
expected = ""
for filename, score in filenames.items():
expected = []
for condition, score in conditions.items():
script += f"""
score_file "{filename}" "dest"
echo "{filename}"
echo "$score"
score_file "source" "target" "{condition}"
echo "{condition}=$score"
"""
expected += filename + "\n"
expected += str(score) + "\n"
expected.append(f"{condition}={score}")
expected.append("")
expected = "\n".join(expected)
run = runner(command=["bash"], inp=script)
assert run.success
assert run.err == ""
@@ -206,15 +204,15 @@ def test_score_values(runner, yadm, default, arch, system, distro, cla, host, us
def test_extensions(runner, yadm, ext):
"""Verify extensions do not effect scores"""
local_user = "testuser"
filename = f"filename##u.{local_user}"
condition = f"u.{local_user}"
if ext:
filename += f",{ext}.xyz"
condition += f",{ext}.xyz"
expected = ""
script = f"""
YADM_TEST=1 source {yadm}
score=0
local_user={local_user}
score_file "{filename}"
score_file "source" "target" "{condition}"
echo "$score"
"""
expected = f'{1000 + CONDITION["user"]["modifier"]}\n'
@@ -232,15 +230,15 @@ def test_score_values_templates(runner, yadm):
local_distro = "testdistro"
local_host = "testhost"
local_user = "testuser"
filenames = {"filename##": 0}
conditions = {"": 0}
for filename in list(filenames):
for condition in list(conditions):
for label in TEMPLATE_LABELS:
newfile = filename
if not newfile.endswith("##"):
newfile += ","
newfile += ".".join([label, "testtemplate"])
filenames[newfile] = calculate_score(newfile)
newcond = condition
if newcond:
newcond += ","
newcond += ".".join([label, "testtemplate"])
conditions[newcond] = calculate_score(newcond)
script = f"""
YADM_TEST=1 source {yadm}
@@ -252,15 +250,15 @@ def test_score_values_templates(runner, yadm):
local_host={local_host}
local_user={local_user}
"""
expected = ""
for filename, score in filenames.items():
expected = []
for condition, score in conditions.items():
script += f"""
score_file "{filename}" "dest"
echo "{filename}"
echo "$score"
score_file "source" "target" "{condition}"
echo "{condition}=$score"
"""
expected += filename + "\n"
expected += str(score) + "\n"
expected.append(f"{condition}={score}")
expected.append("")
expected = "\n".join(expected)
run = runner(command=["bash"], inp=script)
assert run.success
assert run.err == ""
@@ -281,7 +279,7 @@ def test_template_recording(runner, yadm, processor_generated):
YADM_TEST=1 source {yadm}
function record_score() {{ [ -n "$4" ] && echo "template recorded"; }}
{mock}
score_file "testfile##template.kind"
score_file "source" "target" "template.kind"
"""
run = runner(command=["bash"], inp=script)
assert run.success
@@ -293,13 +291,13 @@ def test_underscores_and_upper_case_in_distro_and_family(runner, yadm):
"""Test replacing spaces with underscores and lowering case in distro / distro_family"""
local_distro = "test distro"
local_distro_family = "test family"
filenames = {
"filename##distro.Test Distro": 1004,
"filename##distro.test-distro": 0,
"filename##distro.test_distro": 1004,
"filename##distro_family.test FAMILY": 1008,
"filename##distro_family.test-family": 0,
"filename##distro_family.test_family": 1008,
conditions = {
"distro.Test Distro": 1004,
"distro.test-distro": 0,
"distro.test_distro": 1004,
"distro_family.test FAMILY": 1008,
"distro_family.test-family": 0,
"distro_family.test_family": 1008,
}
script = f"""
@@ -308,15 +306,15 @@ def test_underscores_and_upper_case_in_distro_and_family(runner, yadm):
local_distro="{local_distro}"
local_distro_family="{local_distro_family}"
"""
expected = ""
for filename, score in filenames.items():
expected = []
for condition, score in conditions.items():
script += f"""
score_file "{filename}"
echo "{filename}"
echo "$score"
score_file "source" "target" "{condition}"
echo "{condition}=$score"
"""
expected += filename + "\n"
expected += str(score) + "\n"
expected.append(f"{condition}={score}")
expected.append("")
expected = "\n".join(expected)
run = runner(command=["bash"], inp=script)
assert run.success
assert run.err == ""
@@ -332,17 +330,17 @@ def test_negative_class_condition(runner, yadm):
# 0
score=0
score_file "filename##~class.testclass" "dest"
score_file "source" "target" "~class.testclass"
echo "score: $score"
# 16
score=0
score_file "filename##~class.badclass" "dest"
score_file "source" "target" "~class.badclass"
echo "score2: $score"
# 16
score=0
score_file "filename##~c.badclass" "dest"
score_file "source" "target" "~c.badclass"
echo "score3: $score"
"""
run = runner(command=["bash"], inp=script)
@@ -363,27 +361,27 @@ def test_negative_combined_conditions(runner, yadm):
# (0) + (0) = 0
score=0
score_file "filename##~class.testclass,~distro.testdistro" "dest"
score_file "source" "target" "~class.testclass,~distro.testdistro"
echo "score: $score"
# (1000 + 16) + (1000 + 4) = 2020
score=0
score_file "filename##class.testclass,distro.testdistro" "dest"
score_file "source" "target" "class.testclass,distro.testdistro"
echo "score2: $score"
# 0 (negated class condition)
score=0
score_file "filename##~class.badclass,~distro.testdistro" "dest"
score_file "source" "target" "~class.badclass,~distro.testdistro"
echo "score3: $score"
# (1000 + 16) + (4) = 1020
score=0
score_file "filename##class.testclass,~distro.baddistro" "dest"
score_file "source" "target" "class.testclass,~distro.baddistro"
echo "score4: $score"
# (1000 + 16) + (16) = 1032
score=0
score_file "filename##class.testclass,~class.badclass" "dest"
score_file "source" "target" "class.testclass,~class.badclass"
echo "score5: $score"
"""
run = runner(command=["bash"], inp=script)

View File

@@ -61,12 +61,8 @@ def create_alt_files(
new_dir = basepath.join(ALT_DIR + suffix).join(CONTAINED)
new_dir.write(ALT_DIR + suffix, ensure=True)
# Do not test directory support for jinja alternates
test_paths = [new_file1, new_file2]
test_names = [ALT_FILE1, ALT_FILE2]
if not re.match(r"##(t$|t\.|template|yadm)", suffix):
test_paths += [new_dir]
test_names += [ALT_DIR]
test_paths = [new_file1, new_file2, new_dir]
test_names = [ALT_FILE1, ALT_FILE2, ALT_DIR]
for test_path in test_paths:
if content: