tests: delete old, fix
continuous-integration/drone/push Build is failing Details
continuous-integration/drone/pr Build is failing Details

pull/21/head
dettlaff 2023-08-03 05:47:57 +04:00
parent 66970e6375
commit 7c68f05040
2 changed files with 29 additions and 132 deletions

BIN
dump.rdb

Binary file not shown.

View File

@ -8,17 +8,15 @@ import strawberry
from selfprivacy_api.jobs import JobStatus, Jobs
from selfprivacy_api.graphql import schema
# from selfprivacy_api.graphql.schema import Subscription
# from selfprivacy_api.jobs.nix_collect_garbage import (
# get_dead_packages,
# nix_collect_garbage,
# parse_line,
# CLEAR_COMPLETED,
# COMPLETED_WITH_ERROR,
# stream_process,
# RESULT_WAS_NOT_FOUND_ERROR,
# )
from selfprivacy_api.jobs.nix_collect_garbage import (
get_dead_packages,
parse_line,
CLEAR_COMPLETED,
COMPLETED_WITH_ERROR,
stream_process,
RESULT_WAS_NOT_FOUND_ERROR,
)
pytest_plugins = ("pytest_asyncio",)
@ -66,19 +64,6 @@ determining live/dead paths...
log_event = []
def set_job_status(status="", progress="", status_text="", result=""):
log_event.append((status, progress, status_text, result))
@pytest.fixture
def mock_run_nix_store_print_dead_zero_trash(mocker):
mock = mocker.patch(
"selfprivacy_api.jobs.nix_collect_garbage.run_nix_store_print_dead",
autospec=True,
return_value=OUTPUT_RUN_NIX_STORE_PRINT_DEAD_ZERO_TRASH.split("\n"),
)
@pytest.fixture
def mock_set_job_status(mocker):
mock = mocker.patch(
@ -89,36 +74,6 @@ def mock_set_job_status(mocker):
return mock
@pytest.fixture
def mock_run_nix_collect_garbage(mocker):
mock = mocker.patch(
"selfprivacy_api.jobs.nix_collect_garbage.run_nix_collect_garbage",
autospec=True,
return_value=OUTPUT_COLLECT_GARBAGE.split("\n"),
)
return mock
@pytest.fixture
def mock_run_nix_collect_garbage_zero_trash(mocker):
mock = mocker.patch(
"selfprivacy_api.jobs.nix_collect_garbage.run_nix_collect_garbage",
autospec=True,
return_value=OUTPUT_COLLECT_GARBAGE_ZERO_TRASH.split("\n"),
)
return mock
@pytest.fixture
def mock_run_nix_store_print_dead(mocker):
mock = mocker.patch(
"selfprivacy_api.jobs.nix_collect_garbage.run_nix_store_print_dead",
autospec=True,
return_value="",
)
return mock
@pytest.fixture
def job_reset():
Jobs.reset()
@ -154,88 +109,30 @@ def test_get_dead_packages(job_reset):
def test_get_dead_packages_zero(job_reset):
assert get_dead_packages("") == (0, None)
assert get_dead_packages("") == (0, 0)
def test_stream_process(job_reset):
log_event = []
reference = [
(JobStatus.RUNNING, 20, "Cleaning...", ""),
(JobStatus.RUNNING, 40, "Cleaning...", ""),
(JobStatus.RUNNING, 60, "Cleaning...", ""),
(JobStatus.RUNNING, 80, "Cleaning...", ""),
(JobStatus.RUNNING, 100, "Cleaning...", ""),
(
JobStatus.FINISHED,
100,
"Cleaning completed.",
"425.51 MiB have been cleared",
),
]
def set_job_status(status, progress, status_text, result=""):
log_event.append((status, progress, status_text, result))
stream_process(OUTPUT_COLLECT_GARBAGE.split("\n"), 5, set_job_status)
assert log_event == reference
RUN_NIX_COLLECT_GARBAGE_QUERY = """
mutation system {
nixCollectGarbage {
success
message
code
}
}
"""
def test_nix_collect_garbage(
mock_set_job_status, mock_run_nix_collect_garbage, job_reset
):
log_event = []
reference = [
(JobStatus.RUNNING, 0, "Calculate the number of dead packages...", ""),
(JobStatus.RUNNING, 0, "Found 5 packages to remove!", ""),
(JobStatus.RUNNING, 5, "Cleaning...", ""),
(JobStatus.RUNNING, 10, "Cleaning...", ""),
(JobStatus.RUNNING, 15, "Cleaning...", ""),
(JobStatus.RUNNING, 20, "Cleaning...", ""),
(JobStatus.RUNNING, 25, "Cleaning...", ""),
(
JobStatus.FINISHED,
100,
"Cleaning completed.",
"425.51 MiB have been cleared",
),
]
nix_collect_garbage(None)
assert log_event == reference
def test_nix_collect_garbage_zero_trash(
mock_set_job_status,
mock_run_nix_collect_garbage_zero_trash,
mock_run_nix_store_print_dead,
job_reset,
mock_run_nix_store_print_dead_zero_trash,
):
reference = [
(JobStatus.RUNNING, 0, "Calculate the number of dead packages...", ""),
(JobStatus.FINISHED, 100, "Nothing to clear", "System is clear"),
]
nix_collect_garbage(None)
assert log_event == reference
# андр конcтракнш
@pytest.mark.asyncio
async def test_graphql_nix_collect_garbage():
query = """
subscription {
nixCollectGarbage()
}
"""
schema_for_garbage = strawberry.Schema(
query=schema.Query, mutation=schema.Mutation, subscription=schema.Subscription
def test_graphql_nix_collect_garbage(authorized_client):
response = authorized_client.post(
"/graphql",
json={
"query": RUN_NIX_COLLECT_GARBAGE_QUERY,
},
)
sub = await schema_for_garbage.subscribe(query)
async for result in sub:
assert not result.errors
assert result.data == {}
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["nixCollectGarbage"]["success"] is True
assert response.json()["data"]["nixCollectGarbage"]["message"] is not None
assert response.json()["data"]["nixCollectGarbage"]["code"] == 200