fix: types, add tests

pull/21/head
def 2022-12-27 04:15:02 +04:00 committed by dettlaff
parent 7553202a69
commit af0129bdfe
3 changed files with 31 additions and 4 deletions

View File

@ -2,10 +2,11 @@ import re
import subprocess
from selfprivacy_api.jobs import JobStatus, Jobs
from selfprivacy_api.utils.huey import huey
COMPLETED_WITH_ERROR = "Completed with an error"
RESULT_WAAS_NOT_FOUND_ERROR = "We are sorry, result was not found :("
RESULT_WAS_NOT_FOUND_ERROR = "We are sorry, result was not found :("
CLEAR_COMPLETED = "Сleaning completed."
@ -46,7 +47,7 @@ def parse_line(line):
JobStatus.FINISHED,
100,
COMPLETED_WITH_ERROR,
RESULT_WAAS_NOT_FOUND_ERROR,
RESULT_WAS_NOT_FOUND_ERROR,
)
else:
@ -72,6 +73,7 @@ def stream_process(
set_job_status(
status=JobStatus.RUNNING,
progress=int(percent),
progress=int(percent),
status_text="Сleaning...",
)
@ -93,6 +95,7 @@ def get_dead_packages(output):
return dead, percent
@huey.task()
def nix_collect_garbage(
job,
jobs=Jobs,

View File

@ -2,3 +2,4 @@ from selfprivacy_api.utils.huey import huey
from selfprivacy_api.jobs.test import test_job
from selfprivacy_api.restic_controller.tasks import *
from selfprivacy_api.services.generic_service_mover import move_service
from selfprivacy_api.jobs.nix_collect_garbage import nix_collect_garbage

View File

@ -4,6 +4,11 @@
import pytest
from selfprivacy_api.jobs import JobStatus
from selfprivacy_api.graphql import schema
import asyncio
import strawberry
# from selfprivacy_api.graphql.schema import Subscription
from selfprivacy_api.jobs.nix_collect_garbage import (
get_dead_packages,
@ -12,7 +17,7 @@ from selfprivacy_api.jobs.nix_collect_garbage import (
CLEAR_COMPLETED,
COMPLETED_WITH_ERROR,
stream_process,
RESULT_WAAS_NOT_FOUND_ERROR,
RESULT_WAS_NOT_FOUND_ERROR,
)
@ -59,7 +64,7 @@ def test_parse_line_with_blank_line():
JobStatus.FINISHED,
100,
COMPLETED_WITH_ERROR,
RESULT_WAAS_NOT_FOUND_ERROR,
RESULT_WAS_NOT_FOUND_ERROR,
)
assert parse_line(txt) == output
@ -146,3 +151,21 @@ def test_nix_collect_garbage_zero_trash():
)
assert log_event == reference
@pytest.mark.asyncio
async def test_graphql_nix_collect_garbage():
query = """
subscription {
nixCollectGarbage()
}
"""
schema_for_garbage = strawberry.Schema(
query=schema.Query, mutation=schema.Mutation, subscription=schema.Subscription
)
sub = await schema_for_garbage.subscribe(query)
for result in sub:
assert not result.errors
assert result.data == {}