diff --git a/dump.rdb b/dump.rdb index 2f3b490..7a274db 100644 Binary files a/dump.rdb and b/dump.rdb differ diff --git a/selfprivacy_api/jobs/nix_collect_garbage.py b/selfprivacy_api/jobs/nix_collect_garbage.py index ec894fd..82e00e2 100644 --- a/selfprivacy_api/jobs/nix_collect_garbage.py +++ b/selfprivacy_api/jobs/nix_collect_garbage.py @@ -98,9 +98,8 @@ def get_dead_packages(output): # @huey.task() def nix_collect_garbage( job, - jobs=Jobs, ): - set_job_status = set_job_status_wrapper(jobs, job) + set_job_status = set_job_status_wrapper(Jobs, job) set_job_status( status=JobStatus.RUNNING, @@ -108,7 +107,9 @@ def nix_collect_garbage( status_text="Сalculate the number of dead packages...", ) - dead_packages, package_equal_to_percent = get_dead_packages(run_nix_store_print_dead()) + dead_packages, package_equal_to_percent = get_dead_packages( + run_nix_store_print_dead() + ) if dead_packages == 0: set_job_status( diff --git a/tests/test_graphql/test_nix_collect_garbage.py b/tests/test_graphql/test_nix_collect_garbage.py index 0db620a..98848b4 100644 --- a/tests/test_graphql/test_nix_collect_garbage.py +++ b/tests/test_graphql/test_nix_collect_garbage.py @@ -3,11 +3,11 @@ # pylint: disable=missing-function-docstring import pytest -from selfprivacy_api.jobs import JobStatus -from selfprivacy_api.graphql import schema -import asyncio import strawberry +from selfprivacy_api.jobs import JobStatus, Jobs +from selfprivacy_api.graphql import schema + # from selfprivacy_api.graphql.schema import Subscription from selfprivacy_api.jobs.nix_collect_garbage import ( @@ -21,7 +21,7 @@ from selfprivacy_api.jobs.nix_collect_garbage import ( ) -output_print_dead = """ +OUTPUT_PRINT_DEAD = """ finding garbage collector roots... determining live/dead paths... /nix/store/02k8pmw00p7p7mf2dg3n057771w7liia-python3.10-cchardet-2.1.7 @@ -32,7 +32,7 @@ determining live/dead paths... """ -output_collect_garbage = """ +OUTPUT_COLLECT_GARBAGE = """ removing old generations of profile /nix/var/nix/profiles/per-user/def/channels finding garbage collector roots... deleting garbage... @@ -46,8 +46,53 @@ note: currently hard linking saves -0.00 MiB 190 store paths deleted, 425.51 MiB freed """ +log_event = [] + + +def set_job_status(status="", progress="", status_text="", result=""): + log_event.append((status, progress, status_text, result)) + + +@pytest.fixture +def mock_set_job_status(mocker): + mock = mocker.patch( + "selfprivacy_api.jobs.nix_collect_garbage.set_job_status_wrapper", + autospec=True, + return_value=set_job_status, + ) + return mock + + +@pytest.fixture +def mock_run_nix_collect_garbage(mocker): + mock = mocker.patch( + "selfprivacy_api.jobs.nix_collect_garbage.run_nix_collect_garbage", + autospec=True, + return_value=OUTPUT_COLLECT_GARBAGE.split("\n"), + ) + return mock + + +@pytest.fixture +def mock_run_nix_store_print_dead(mocker): + mock = mocker.patch( + "selfprivacy_api.jobs.nix_collect_garbage.run_nix_store_print_dead", + autospec=True, + return_value="", + ) + return mock + + +@pytest.fixture +def job_reset(): + Jobs.reset() + + +# --- + + +def test_parse_line(job_reset): -def test_parse_line(): txt = "190 store paths deleted, 425.51 MiB freed" output = ( JobStatus.FINISHED, @@ -58,7 +103,7 @@ def test_parse_line(): assert parse_line(txt) == output -def test_parse_line_with_blank_line(): +def test_parse_line_with_blank_line(job_reset): txt = "" output = ( JobStatus.FINISHED, @@ -69,11 +114,11 @@ def test_parse_line_with_blank_line(): assert parse_line(txt) == output -def test_get_dead_packages(): - assert get_dead_packages(output_print_dead) == (5, 20.0) +def test_get_dead_packages(job_resetм): + assert get_dead_packages(OUTPUT_PRINT_DEAD) == (5, 20.0) -def test_get_dead_packages_zero(): +def test_get_dead_packages_zero(job_reset): assert get_dead_packages("") == (0, None) @@ -96,59 +141,52 @@ def test_stream_process(): def set_job_status(status, progress, status_text, result=""): log_event.append((status, progress, status_text, result)) - stream_process(output_collect_garbage.split("\n"), 5, set_job_status) + stream_process(OUTPUT_COLLECT_GARBAGE.split("\n"), 5, set_job_status) assert log_event == reference -def test_nix_collect_garbage(): +def test_nix_collect_garbage( + mock_set_job_status, mock_run_nix_collect_garbage, job_reset +): log_event = [] reference = [ - (JobStatus.RUNNING, 0, 'Сalculate the number of dead packages...', ''), - (JobStatus.RUNNING, 0, 'Found 5 packages to remove!', ''), - (JobStatus.RUNNING, 5, 'Сleaning...', ''), - (JobStatus.RUNNING, 10, 'Сleaning...', ''), - (JobStatus.RUNNING, 15, 'Сleaning...', ''), - (JobStatus.RUNNING, 20, 'Сleaning...', ''), - (JobStatus.RUNNING, 25, 'Сleaning...', ''), - (JobStatus.FINISHED, 100, 'Сleaning completed.', '425.51 MiB have been cleared'), + (JobStatus.RUNNING, 0, "Сalculate the number of dead packages...", ""), + (JobStatus.RUNNING, 0, "Found 5 packages to remove!", ""), + (JobStatus.RUNNING, 5, "Сleaning...", ""), + (JobStatus.RUNNING, 10, "Сleaning...", ""), + (JobStatus.RUNNING, 15, "Сleaning...", ""), + (JobStatus.RUNNING, 20, "Сleaning...", ""), + (JobStatus.RUNNING, 25, "Сleaning...", ""), + ( + JobStatus.FINISHED, + 100, + "Сleaning completed.", + "425.51 MiB have been cleared", + ), ] - def set_job_status(status="", progress="", status_text="", result=""): - log_event.append((status, progress, status_text, result)) - - nix_collect_garbage( - None, - None, - lambda: output_print_dead, - lambda: output_collect_garbage.split("\n"), - set_job_status, - ) - print("log_event:", log_event) - print("reference:", reference) + nix_collect_garbage(None) assert log_event == reference -def test_nix_collect_garbage_zero_trash(): - log_event = [] +def test_nix_collect_garbage_zero_trash( + mock_set_job_status, + mock_run_nix_collect_garbage, + mock_run_nix_store_print_dead, + job_reset, +): + reference = [ (JobStatus.RUNNING, 0, "Сalculate the number of dead packages...", ""), (JobStatus.FINISHED, 100, "Nothing to clear", "System is clear"), ] - def set_job_status(status="", progress="", status_text="", result=""): - log_event.append((status, progress, status_text, result)) - - nix_collect_garbage( - None, - None, - lambda: "", - lambda: output_collect_garbage.split("\n"), - set_job_status, - ) + nix_collect_garbage(None) assert log_event == reference + # андр констракнш @pytest.mark.asyncio async def test_graphql_nix_collect_garbage():