test: fix nix collect garbage, add tests

pull/21/head
def 2022-12-19 06:27:44 +04:00 committed by dettlaff
parent 7e3adffb68
commit 7553202a69
2 changed files with 199 additions and 60 deletions

View File

@ -1,105 +1,128 @@
import re import re
import subprocess import subprocess
from selfprivacy_api.jobs import Job, JobStatus, Jobs from selfprivacy_api.jobs import JobStatus, Jobs
from selfprivacy_api.utils.huey import huey
COMPLETED_WITH_ERROR = "Completed with an error"
RESULT_WAAS_NOT_FOUND_ERROR = "We are sorry, result was not found :("
CLEAR_COMPLETED = "Сleaning completed."
def run_nix_store_print_dead(): def run_nix_store_print_dead():
return subprocess.check_output(["nix-store", "--gc", "--print-dead"]) return subprocess.check_output(["nix-store", "--gc", "--print-dead"]).decode(
"utf-8"
)
def run_nix_collect_garbage(): def run_nix_collect_garbage():
return subprocess.Popen( return subprocess.Popen(
["nix-collect-garbage", "-d"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT ["nix-collect-garbage", "-d"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT
) ).stdout
def parse_line(line, job: Job): def set_job_status_wrapper(Jobs, job):
pattern = re.compile(r"[+-]?\d+\.\d+ \w+ freed") def set_job_status(status, progress, status_text, result="Default result"):
Jobs.update(
job=job,
status=status,
progress=progress,
status_text=status_text,
result=result,
)
return set_job_status
def parse_line(line):
pattern = re.compile(r"[+-]?\d+\.\d+ \w+(?= freed)")
match = re.search( match = re.search(
pattern, pattern,
line, line,
) )
if match is None: if match is None:
Jobs.update( return (
job=job, JobStatus.FINISHED,
status=JobStatus.FINISHED, 100,
progress=100, COMPLETED_WITH_ERROR,
status_text="Completed with an error", RESULT_WAAS_NOT_FOUND_ERROR,
result="We are sorry, result was not found :(",
) )
else: else:
Jobs.update( return (
job=job, JobStatus.FINISHED,
status=JobStatus.FINISHED, 100,
progress=100, CLEAR_COMPLETED,
status_text="Сleaning completed.", f"{match.group(0)} have been cleared",
result=f"{match.group(0)} have been cleared",
) )
def stream_process( def stream_process(
process, stream,
package_equal_to_percent, package_equal_to_percent,
job: Job, set_job_status,
): ):
go = process.poll() is None
percent = 0 percent = 0
for line in process.stdout: for line in stream:
if "deleting '/nix/store/" in line: if "deleting '/nix/store/" in line:
percent += package_equal_to_percent percent += package_equal_to_percent
Jobs.update( set_job_status(
job=job,
status=JobStatus.RUNNING, status=JobStatus.RUNNING,
progress=int(percent), progress=int(percent),
status_text="Сleaning...", status_text="Сleaning...",
) )
elif "store paths deleted," in line: elif "store paths deleted," in line:
parse_line(line, job) status = parse_line(line)
set_job_status(
return go status=status[0],
progress=status[1],
status_text=status[2],
result=status[3],
)
def get_dead_packages(output):
dead = len(re.findall("/nix/store/", output))
percent = None
if dead != 0:
percent = 100 / dead
return dead, percent
@huey.task()
def nix_collect_garbage( def nix_collect_garbage(
job: Job, job,
jobs=Jobs,
run_nix_store=run_nix_store_print_dead, run_nix_store=run_nix_store_print_dead,
run_nix_collect=run_nix_collect_garbage, run_nix_collect=run_nix_collect_garbage,
set_job_status=None,
): # innocent as a pure function ): # innocent as a pure function
set_job_status = set_job_status or set_job_status_wrapper(jobs, job)
Jobs.update( set_job_status(
job=job,
status=JobStatus.RUNNING, status=JobStatus.RUNNING,
progress=0, progress=0,
status_text="Сalculate the number of dead packages...", status_text="Сalculate the number of dead packages...",
) )
output = run_nix_store() dead_packages, package_equal_to_percent = get_dead_packages(run_nix_store())
dead_packages = len(re.findall("/nix/store/", output.decode("utf-8")))
if dead_packages == 0: if dead_packages == 0:
Jobs.update( set_job_status(
job=job,
status=JobStatus.FINISHED, status=JobStatus.FINISHED,
progress=100, progress=100,
status_text="Nothing to clear", status_text="Nothing to clear",
result="System is clear", result="System is clear",
) )
return
package_equal_to_percent = 100 / dead_packages set_job_status(
Jobs.update(
job=job,
status=JobStatus.RUNNING, status=JobStatus.RUNNING,
progress=0, progress=0,
status_text=f"Found {dead_packages} packages to remove!", status_text=f"Found {dead_packages} packages to remove!",
) )
stream_process(run_nix_collect, package_equal_to_percent, job) stream_process(run_nix_collect(), package_equal_to_percent, set_job_status)

View File

@ -3,30 +3,146 @@
# pylint: disable=missing-function-docstring # pylint: disable=missing-function-docstring
import pytest import pytest
from selfprivacy_api.jobs import JobStatus
from selfprivacy_api.jobs.nix_collect_garbage import nix_collect_garbage from selfprivacy_api.jobs.nix_collect_garbage import (
get_dead_packages,
nix_collect_garbage,
parse_line,
CLEAR_COMPLETED,
COMPLETED_WITH_ERROR,
stream_process,
RESULT_WAAS_NOT_FOUND_ERROR,
)
output_print_dead = """
finding garbage collector roots...
determining live/dead paths...
/nix/store/02k8pmw00p7p7mf2dg3n057771w7liia-python3.10-cchardet-2.1.7
/nix/store/03vc6dznx8njbvyd3gfhfa4n5j4lvhbl-python3.10-async-timeout-4.0.2
/nix/store/03ybv2dvfk7c3cpb527y5kzf6i35ch41-python3.10-pycparser-2.21
/nix/store/04dn9slfqwhqisn1j3jv531lms9w5wlj-python3.10-hypothesis-6.50.1.drv
/nix/store/04hhx2z1iyi3b48hxykiw1g03lp46jk7-python-remove-bin-bytecode-hook
"""
output_collect_garbage = """
created_at: datetime.datetime removing old generations of profile /nix/var/nix/profiles/per-user/def/channels
updated_at: datetime.datetime finding garbage collector roots...
uid: UUID deleting garbage...
type_id: str deleting '/nix/store/02k8pmw00p7p7mf2dg3n057771w7liia-python3.10-cchardet-2.1.7'
name: str deleting '/nix/store/03vc6dznx8njbvyd3gfhfa4n5j4lvhbl-python3.10-async-timeout-4.0.2'
description: str deleting '/nix/store/03ybv2dvfk7c3cpb527y5kzf6i35ch41-python3.10-pycparser-2.21'
status: JobStatus deleting '/nix/store/04dn9slfqwhqisn1j3jv531lms9w5wlj-python3.10-hypothesis-6.50.1.drv'
deleting '/nix/store/04hhx2z1iyi3b48hxykiw1g03lp46jk7-python-remove-bin-bytecode-hook'
deleting unused links...
note: currently hard linking saves -0.00 MiB
190 store paths deleted, 425.51 MiB freed
"""
def test_nix_collect_garbage(job( def test_parse_line():
created_at = "2019-12-04", txt = "190 store paths deleted, 425.51 MiB freed"
updated_at = "2019-12-04", output = (
uid = UUID, JobStatus.FINISHED,
type_id = "typeid", 100,
name = "name", CLEAR_COMPLETED,
description: "desc", "425.51 MiB have been cleared",
status = status(CREATED = "CREATED"), )
)): assert parse_line(txt) == output
assert nix_collect_garbage() is not None
def test_parse_line_with_blank_line():
txt = ""
output = (
JobStatus.FINISHED,
100,
COMPLETED_WITH_ERROR,
RESULT_WAAS_NOT_FOUND_ERROR,
)
assert parse_line(txt) == output
def test_get_dead_packages():
assert get_dead_packages(output_print_dead) == (5, 20.0)
def test_get_dead_packages_zero():
assert get_dead_packages("") == (0, None)
def test_stream_process():
log_event = []
reference = [
(JobStatus.RUNNING, 20, "Сleaning...", ""),
(JobStatus.RUNNING, 40, "Сleaning...", ""),
(JobStatus.RUNNING, 60, "Сleaning...", ""),
(JobStatus.RUNNING, 80, "Сleaning...", ""),
(JobStatus.RUNNING, 100, "Сleaning...", ""),
(
JobStatus.FINISHED,
100,
"Сleaning completed.",
"425.51 MiB have been cleared",
),
]
def set_job_status(status, progress, status_text, result=""):
log_event.append((status, progress, status_text, result))
stream_process(output_collect_garbage.split("\n"), 20.0, set_job_status)
assert log_event == reference
def test_nix_collect_garbage():
log_event = []
reference = [
(JobStatus.RUNNING, 0, "Сalculate the number of dead packages...", ""),
(JobStatus.RUNNING, 0, "Found 5 packages to remove!", ""),
(JobStatus.RUNNING, 20, "Сleaning...", ""),
(JobStatus.RUNNING, 40, "Сleaning...", ""),
(JobStatus.RUNNING, 60, "Сleaning...", ""),
(JobStatus.RUNNING, 80, "Сleaning...", ""),
(JobStatus.RUNNING, 100, "Сleaning...", ""),
(
JobStatus.FINISHED,
100,
"Сleaning completed.",
"425.51 MiB have been cleared",
),
]
def set_job_status(status="", progress="", status_text="", result=""):
log_event.append((status, progress, status_text, result))
nix_collect_garbage(
None,
None,
lambda: output_print_dead,
lambda: output_collect_garbage.split("\n"),
set_job_status,
)
assert log_event == reference
def test_nix_collect_garbage_zero_trash():
log_event = []
reference = [
(JobStatus.RUNNING, 0, "Сalculate the number of dead packages...", ""),
(JobStatus.FINISHED, 100, "Nothing to clear", "System is clear"),
]
def set_job_status(status="", progress="", status_text="", result=""):
log_event.append((status, progress, status_text, result))
nix_collect_garbage(
None,
None,
lambda: "",
lambda: output_collect_garbage.split("\n"),
set_job_status,
)
assert log_event == reference