diff --git a/flake.nix b/flake.nix index e33a3e9..361dd9b 100644 --- a/flake.nix +++ b/flake.nix @@ -19,6 +19,7 @@ pytest pytest-datadir pytest-mock + pytest-subprocess black mypy pylsp-mypy diff --git a/selfprivacy_api/actions/system.py b/selfprivacy_api/actions/system.py index 13c3708..9b52497 100644 --- a/selfprivacy_api/actions/system.py +++ b/selfprivacy_api/actions/system.py @@ -4,6 +4,8 @@ import subprocess import pytz from typing import Optional, List from pydantic import BaseModel +from selfprivacy_api.jobs import Job, JobStatus, Jobs +from selfprivacy_api.jobs.upgrade_system import rebuild_system_task from selfprivacy_api.utils import WriteUserData, ReadUserData @@ -87,10 +89,16 @@ def run_blocking(cmd: List[str], new_session: bool = False) -> str: return stdout -def rebuild_system() -> int: +def rebuild_system() -> Job: """Rebuild the system""" - run_blocking(["systemctl", "start", "sp-nixos-rebuild.service"], new_session=True) - return 0 + job = Jobs.add( + type_id="system.nixos.rebuild", + name="Rebuild system", + description="Applying the new system configuration by building the new NixOS generation.", + status=JobStatus.CREATED, + ) + rebuild_system_task(job) + return job def rollback_system() -> int: @@ -99,10 +107,16 @@ def rollback_system() -> int: return 0 -def upgrade_system() -> int: +def upgrade_system() -> Job: """Upgrade the system""" - run_blocking(["systemctl", "start", "sp-nixos-upgrade.service"], new_session=True) - return 0 + job = Jobs.add( + type_id="system.nixos.upgrade", + name="Upgrade system", + description="Upgrading the system to the latest version.", + status=JobStatus.CREATED, + ) + rebuild_system_task(job, upgrade=True) + return job def reboot_system() -> None: diff --git a/selfprivacy_api/graphql/mutations/system_mutations.py b/selfprivacy_api/graphql/mutations/system_mutations.py index 13ac16b..5740a0d 100644 --- a/selfprivacy_api/graphql/mutations/system_mutations.py +++ b/selfprivacy_api/graphql/mutations/system_mutations.py @@ -3,7 +3,9 @@ import typing import strawberry from selfprivacy_api.graphql import IsAuthenticated +from selfprivacy_api.graphql.common_types.jobs import job_to_api_job from selfprivacy_api.graphql.mutations.mutation_interface import ( + GenericJobMutationReturn, GenericMutationReturn, MutationReturnInterface, ) @@ -114,16 +116,17 @@ class SystemMutations: ) @strawberry.mutation(permission_classes=[IsAuthenticated]) - def run_system_rebuild(self) -> GenericMutationReturn: + def run_system_rebuild(self) -> GenericJobMutationReturn: try: - system_actions.rebuild_system() - return GenericMutationReturn( + job = system_actions.rebuild_system() + return GenericJobMutationReturn( success=True, - message="Starting rebuild system", + message="Starting system rebuild", code=200, + job=job_to_api_job(job), ) except system_actions.ShellException as e: - return GenericMutationReturn( + return GenericJobMutationReturn( success=False, message=str(e), code=500, @@ -135,7 +138,7 @@ class SystemMutations: try: return GenericMutationReturn( success=True, - message="Starting rebuild system", + message="Starting system rollback", code=200, ) except system_actions.ShellException as e: @@ -146,16 +149,17 @@ class SystemMutations: ) @strawberry.mutation(permission_classes=[IsAuthenticated]) - def run_system_upgrade(self) -> GenericMutationReturn: - system_actions.upgrade_system() + def run_system_upgrade(self) -> GenericJobMutationReturn: try: - return GenericMutationReturn( + job = system_actions.upgrade_system() + return GenericJobMutationReturn( success=True, - message="Starting rebuild system", + message="Starting system upgrade", code=200, + job=job_to_api_job(job), ) except system_actions.ShellException as e: - return GenericMutationReturn( + return GenericJobMutationReturn( success=False, message=str(e), code=500, diff --git a/selfprivacy_api/jobs/upgrade_system.py b/selfprivacy_api/jobs/upgrade_system.py new file mode 100644 index 0000000..940efdb --- /dev/null +++ b/selfprivacy_api/jobs/upgrade_system.py @@ -0,0 +1,126 @@ +""" +A task to start the system upgrade or rebuild by starting a systemd unit. +After starting, track the status of the systemd unit and update the Job +status accordingly. +""" +import subprocess +from selfprivacy_api.utils.huey import huey +from selfprivacy_api.jobs import JobStatus, Jobs, Job +from selfprivacy_api.utils.waitloop import wait_until_true +from selfprivacy_api.utils.systemd import ( + get_service_status, + get_last_log_lines, + ServiceStatus, +) + +START_TIMEOUT = 60 * 5 +START_INTERVAL = 1 +RUN_TIMEOUT = 60 * 60 +RUN_INTERVAL = 5 + + +def check_if_started(unit_name: str): + """Check if the systemd unit has started""" + try: + status = get_service_status(unit_name) + if status == ServiceStatus.ACTIVE: + return True + return False + except subprocess.CalledProcessError: + return False + + +def check_running_status(job: Job, unit_name: str): + """Check if the systemd unit is running""" + try: + status = get_service_status(unit_name) + if status == ServiceStatus.INACTIVE: + Jobs.update( + job=job, + status=JobStatus.FINISHED, + result="System rebuilt.", + progress=100, + ) + return True + if status == ServiceStatus.FAILED: + log_lines = get_last_log_lines(unit_name, 10) + Jobs.update( + job=job, + status=JobStatus.ERROR, + error="System rebuild failed. Last log lines:\n" + "\n".join(log_lines), + ) + return True + if status == ServiceStatus.ACTIVE: + log_lines = get_last_log_lines(unit_name, 1) + Jobs.update( + job=job, + status=JobStatus.RUNNING, + status_text=log_lines[0] if len(log_lines) > 0 else "", + ) + return False + return False + except subprocess.CalledProcessError: + return False + + +@huey.task() +def rebuild_system_task(job: Job, upgrade: bool = False): + """Rebuild the system""" + unit_name = "sp-nixos-upgrade.service" if upgrade else "sp-nixos-rebuild.service" + try: + command = ["systemctl", "start", unit_name] + subprocess.run( + command, + check=True, + start_new_session=True, + shell=False, + ) + Jobs.update( + job=job, + status=JobStatus.RUNNING, + status_text="Starting the system rebuild...", + ) + # Wait for the systemd unit to start + try: + wait_until_true( + lambda: check_if_started(unit_name), + timeout_sec=START_TIMEOUT, + interval=START_INTERVAL, + ) + except TimeoutError: + log_lines = get_last_log_lines(unit_name, 10) + Jobs.update( + job=job, + status=JobStatus.ERROR, + error="System rebuild timed out. Last log lines:\n" + + "\n".join(log_lines), + ) + return + Jobs.update( + job=job, + status=JobStatus.RUNNING, + status_text="Rebuilding the system...", + ) + # Wait for the systemd unit to finish + try: + wait_until_true( + lambda: check_running_status(job, unit_name), + timeout_sec=RUN_TIMEOUT, + interval=RUN_INTERVAL, + ) + except TimeoutError: + log_lines = get_last_log_lines(unit_name, 10) + Jobs.update( + job=job, + status=JobStatus.ERROR, + error="System rebuild timed out. Last log lines:\n" + + "\n".join(log_lines), + ) + return + + except subprocess.CalledProcessError as e: + Jobs.update( + job=job, + status=JobStatus.ERROR, + status_text=str(e), + ) diff --git a/selfprivacy_api/migrations/__init__.py b/selfprivacy_api/migrations/__init__.py index 5e05b2d..2a2cbaa 100644 --- a/selfprivacy_api/migrations/__init__.py +++ b/selfprivacy_api/migrations/__init__.py @@ -11,9 +11,13 @@ Adding DISABLE_ALL to that array disables the migrations module entirely. from selfprivacy_api.utils import ReadUserData, UserDataFiles from selfprivacy_api.migrations.write_token_to_redis import WriteTokenToRedis +from selfprivacy_api.migrations.check_for_system_rebuild_jobs import ( + CheckForSystemRebuildJobs, +) migrations = [ WriteTokenToRedis(), + CheckForSystemRebuildJobs(), ] diff --git a/selfprivacy_api/migrations/check_for_system_rebuild_jobs.py b/selfprivacy_api/migrations/check_for_system_rebuild_jobs.py new file mode 100644 index 0000000..9bbac8a --- /dev/null +++ b/selfprivacy_api/migrations/check_for_system_rebuild_jobs.py @@ -0,0 +1,47 @@ +from selfprivacy_api.migrations.migration import Migration +from selfprivacy_api.jobs import JobStatus, Jobs + + +class CheckForSystemRebuildJobs(Migration): + """Check if there are unfinished system rebuild jobs and finish them""" + + def get_migration_name(self): + return "check_for_system_rebuild_jobs" + + def get_migration_description(self): + return "Check if there are unfinished system rebuild jobs and finish them" + + def is_migration_needed(self): + # Check if there are any unfinished system rebuild jobs + for job in Jobs.get_jobs(): + if ( + job.type_id + in [ + "system.nixos.rebuild", + "system.nixos.upgrade", + ] + ) and job.status in [ + JobStatus.CREATED, + JobStatus.RUNNING, + ]: + return True + + def migrate(self): + # As the API is restarted, we assume that the jobs are finished + for job in Jobs.get_jobs(): + if ( + job.type_id + in [ + "system.nixos.rebuild", + "system.nixos.upgrade", + ] + ) and job.status in [ + JobStatus.CREATED, + JobStatus.RUNNING, + ]: + Jobs.update( + job=job, + status=JobStatus.FINISHED, + result="System rebuilt.", + progress=100, + ) diff --git a/selfprivacy_api/services/bitwarden/__init__.py b/selfprivacy_api/services/bitwarden/__init__.py index 0734115..52f1466 100644 --- a/selfprivacy_api/services/bitwarden/__init__.py +++ b/selfprivacy_api/services/bitwarden/__init__.py @@ -5,7 +5,7 @@ from typing import Optional, List from selfprivacy_api.utils import get_domain -from selfprivacy_api.services.generic_status_getter import get_service_status +from selfprivacy_api.utils.systemd import get_service_status from selfprivacy_api.services.service import Service, ServiceStatus from selfprivacy_api.services.bitwarden.icon import BITWARDEN_ICON diff --git a/selfprivacy_api/services/gitea/__init__.py b/selfprivacy_api/services/gitea/__init__.py index 26a0fd9..311d59e 100644 --- a/selfprivacy_api/services/gitea/__init__.py +++ b/selfprivacy_api/services/gitea/__init__.py @@ -5,7 +5,7 @@ from typing import Optional, List from selfprivacy_api.utils import get_domain -from selfprivacy_api.services.generic_status_getter import get_service_status +from selfprivacy_api.utils.systemd import get_service_status from selfprivacy_api.services.service import Service, ServiceStatus from selfprivacy_api.services.gitea.icon import GITEA_ICON diff --git a/selfprivacy_api/services/jitsimeet/__init__.py b/selfprivacy_api/services/jitsimeet/__init__.py index 3531181..53d572c 100644 --- a/selfprivacy_api/services/jitsimeet/__init__.py +++ b/selfprivacy_api/services/jitsimeet/__init__.py @@ -4,7 +4,7 @@ import subprocess from typing import Optional, List from selfprivacy_api.jobs import Job -from selfprivacy_api.services.generic_status_getter import ( +from selfprivacy_api.utils.systemd import ( get_service_status_from_several_units, ) from selfprivacy_api.services.service import Service, ServiceStatus diff --git a/selfprivacy_api/services/mailserver/__init__.py b/selfprivacy_api/services/mailserver/__init__.py index 492cc55..d2e9b5d 100644 --- a/selfprivacy_api/services/mailserver/__init__.py +++ b/selfprivacy_api/services/mailserver/__init__.py @@ -4,7 +4,7 @@ import base64 import subprocess from typing import Optional, List -from selfprivacy_api.services.generic_status_getter import ( +from selfprivacy_api.utils.systemd import ( get_service_status_from_several_units, ) from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus diff --git a/selfprivacy_api/services/nextcloud/__init__.py b/selfprivacy_api/services/nextcloud/__init__.py index 9a7aaec..3e5b8d3 100644 --- a/selfprivacy_api/services/nextcloud/__init__.py +++ b/selfprivacy_api/services/nextcloud/__init__.py @@ -6,7 +6,7 @@ from typing import Optional, List from selfprivacy_api.utils import get_domain from selfprivacy_api.jobs import Job, Jobs -from selfprivacy_api.services.generic_status_getter import get_service_status +from selfprivacy_api.utils.systemd import get_service_status from selfprivacy_api.services.service import Service, ServiceStatus from selfprivacy_api.services.nextcloud.icon import NEXTCLOUD_ICON diff --git a/selfprivacy_api/services/ocserv/__init__.py b/selfprivacy_api/services/ocserv/__init__.py index a7cfa06..4dd802f 100644 --- a/selfprivacy_api/services/ocserv/__init__.py +++ b/selfprivacy_api/services/ocserv/__init__.py @@ -3,7 +3,7 @@ import base64 import subprocess import typing from selfprivacy_api.jobs import Job -from selfprivacy_api.services.generic_status_getter import get_service_status +from selfprivacy_api.utils.systemd import get_service_status from selfprivacy_api.services.service import Service, ServiceStatus from selfprivacy_api.utils.block_devices import BlockDevice from selfprivacy_api.services.ocserv.icon import OCSERV_ICON diff --git a/selfprivacy_api/services/pleroma/__init__.py b/selfprivacy_api/services/pleroma/__init__.py index 84eca59..44a9be8 100644 --- a/selfprivacy_api/services/pleroma/__init__.py +++ b/selfprivacy_api/services/pleroma/__init__.py @@ -6,7 +6,7 @@ from typing import Optional, List from selfprivacy_api.utils import get_domain from selfprivacy_api.services.owned_path import OwnedPath -from selfprivacy_api.services.generic_status_getter import get_service_status +from selfprivacy_api.utils.systemd import get_service_status from selfprivacy_api.services.service import Service, ServiceStatus from selfprivacy_api.services.pleroma.icon import PLEROMA_ICON diff --git a/selfprivacy_api/task_registry.py b/selfprivacy_api/task_registry.py index dfd329c..a492e23 100644 --- a/selfprivacy_api/task_registry.py +++ b/selfprivacy_api/task_registry.py @@ -1,4 +1,5 @@ from selfprivacy_api.utils.huey import huey from selfprivacy_api.jobs.test import test_job from selfprivacy_api.backup.tasks import * -from selfprivacy_api.services.generic_service_mover import move_service +from selfprivacy_api.services.tasks import move_service +from selfprivacy_api.jobs.upgrade_system import rebuild_system_task diff --git a/selfprivacy_api/services/generic_status_getter.py b/selfprivacy_api/utils/systemd.py similarity index 78% rename from selfprivacy_api/services/generic_status_getter.py rename to selfprivacy_api/utils/systemd.py index 46720af..f8b6244 100644 --- a/selfprivacy_api/services/generic_status_getter.py +++ b/selfprivacy_api/utils/systemd.py @@ -1,5 +1,6 @@ """Generic service status fetcher using systemctl""" import subprocess +from typing import List from selfprivacy_api.services.service import ServiceStatus @@ -58,3 +59,24 @@ def get_service_status_from_several_units(services: list[str]) -> ServiceStatus: if ServiceStatus.ACTIVE in service_statuses: return ServiceStatus.ACTIVE return ServiceStatus.OFF + + +def get_last_log_lines(service: str, lines_count: int) -> List[str]: + if lines_count < 1: + raise ValueError("lines_count must be greater than 0") + try: + logs = subprocess.check_output( + [ + "journalctl", + "-u", + service, + "-n", + str(lines_count), + "-o", + "cat", + ], + shell=False, + ).decode("utf-8") + return logs.splitlines() + except subprocess.CalledProcessError: + return [] diff --git a/tests/test_graphql/test_system_nixos_tasks.py b/tests/test_graphql/test_system_nixos_tasks.py index 4a750c4..b50223e 100644 --- a/tests/test_graphql/test_system_nixos_tasks.py +++ b/tests/test_graphql/test_system_nixos_tasks.py @@ -3,6 +3,9 @@ # pylint: disable=missing-function-docstring import pytest +from selfprivacy_api.jobs import JobStatus, Jobs +from tests.test_graphql.common import assert_empty, assert_ok, get_data + class ProcessMock: """Mock subprocess.Popen""" @@ -37,6 +40,13 @@ def mock_subprocess_check_output(mocker): return mock +@pytest.fixture +def mock_sleep_intervals(mocker): + mock_start = mocker.patch("selfprivacy_api.jobs.upgrade_system.START_INTERVAL", 0) + mock_run = mocker.patch("selfprivacy_api.jobs.upgrade_system.RUN_INTERVAL", 0) + return (mock_start, mock_run) + + API_REBUILD_SYSTEM_MUTATION = """ mutation rebuildSystem { system { @@ -44,46 +54,14 @@ mutation rebuildSystem { success message code + job { + uid + } } } } """ - -def test_graphql_system_rebuild_unauthorized(client, mock_subprocess_popen): - """Test system rebuild without authorization""" - response = client.post( - "/graphql", - json={ - "query": API_REBUILD_SYSTEM_MUTATION, - }, - ) - assert response.status_code == 200 - assert response.json().get("data") is None - assert mock_subprocess_popen.call_count == 0 - - -def test_graphql_system_rebuild(authorized_client, mock_subprocess_popen): - """Test system rebuild""" - response = authorized_client.post( - "/graphql", - json={ - "query": API_REBUILD_SYSTEM_MUTATION, - }, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["system"]["runSystemRebuild"]["success"] is True - assert response.json()["data"]["system"]["runSystemRebuild"]["message"] is not None - assert response.json()["data"]["system"]["runSystemRebuild"]["code"] == 200 - assert mock_subprocess_popen.call_count == 1 - assert mock_subprocess_popen.call_args[0][0] == [ - "systemctl", - "start", - "sp-nixos-rebuild.service", - ] - - API_UPGRADE_SYSTEM_MUTATION = """ mutation upgradeSystem { system { @@ -91,44 +69,140 @@ mutation upgradeSystem { success message code + job { + uid + } } } } """ -def test_graphql_system_upgrade_unauthorized(client, mock_subprocess_popen): - """Test system upgrade without authorization""" +@pytest.mark.parametrize("action", ["rebuild", "upgrade"]) +def test_graphql_system_rebuild_unauthorized(client, fp, action): + """Test system rebuild without authorization""" + query = ( + API_REBUILD_SYSTEM_MUTATION + if action == "rebuild" + else API_UPGRADE_SYSTEM_MUTATION + ) + response = client.post( "/graphql", json={ - "query": API_UPGRADE_SYSTEM_MUTATION, + "query": query, }, ) - assert response.status_code == 200 - assert response.json().get("data") is None - assert mock_subprocess_popen.call_count == 0 + assert_empty(response) + assert fp.call_count([fp.any()]) == 0 -def test_graphql_system_upgrade(authorized_client, mock_subprocess_popen): - """Test system upgrade""" +@pytest.mark.parametrize("action", ["rebuild", "upgrade"]) +def test_graphql_system_rebuild(authorized_client, fp, action, mock_sleep_intervals): + """Test system rebuild""" + unit_name = f"sp-nixos-{action}.service" + query = ( + API_REBUILD_SYSTEM_MUTATION + if action == "rebuild" + else API_UPGRADE_SYSTEM_MUTATION + ) + + # Start the unit + fp.register(["systemctl", "start", unit_name]) + + # Wait for it to start + fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive") + fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive") + fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active") + + # Check its exectution + fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active") + fp.register( + ["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"], + stdout="Starting rebuild...", + ) + + fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active") + fp.register( + ["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"], stdout="Rebuilding..." + ) + + fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive") + response = authorized_client.post( "/graphql", json={ - "query": API_UPGRADE_SYSTEM_MUTATION, + "query": query, }, ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["system"]["runSystemUpgrade"]["success"] is True - assert response.json()["data"]["system"]["runSystemUpgrade"]["message"] is not None - assert response.json()["data"]["system"]["runSystemUpgrade"]["code"] == 200 - assert mock_subprocess_popen.call_count == 1 - assert mock_subprocess_popen.call_args[0][0] == [ - "systemctl", - "start", - "sp-nixos-upgrade.service", - ] + data = get_data(response)["system"][f"runSystem{action.capitalize()}"] + assert_ok(data) + + assert fp.call_count(["systemctl", "start", unit_name]) == 1 + assert fp.call_count(["systemctl", "show", unit_name]) == 6 + + job_id = response.json()["data"]["system"][f"runSystem{action.capitalize()}"][ + "job" + ]["uid"] + assert Jobs.get_job(job_id).status == JobStatus.FINISHED + assert Jobs.get_job(job_id).type_id == f"system.nixos.{action}" + + +@pytest.mark.parametrize("action", ["rebuild", "upgrade"]) +def test_graphql_system_rebuild_failed( + authorized_client, fp, action, mock_sleep_intervals +): + """Test system rebuild""" + unit_name = f"sp-nixos-{action}.service" + query = ( + API_REBUILD_SYSTEM_MUTATION + if action == "rebuild" + else API_UPGRADE_SYSTEM_MUTATION + ) + + # Start the unit + fp.register(["systemctl", "start", unit_name]) + + # Wait for it to start + fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive") + fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive") + fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active") + + # Check its exectution + fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active") + fp.register( + ["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"], + stdout="Starting rebuild...", + ) + + fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active") + fp.register( + ["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"], stdout="Rebuilding..." + ) + + fp.register(["systemctl", "show", unit_name], stdout="ActiveState=failed") + + fp.register( + ["journalctl", "-u", unit_name, "-n", "10", "-o", "cat"], stdout="Some error" + ) + + response = authorized_client.post( + "/graphql", + json={ + "query": query, + }, + ) + data = get_data(response)["system"][f"runSystem{action.capitalize()}"] + assert_ok(data) + + assert fp.call_count(["systemctl", "start", unit_name]) == 1 + assert fp.call_count(["systemctl", "show", unit_name]) == 6 + + job_id = response.json()["data"]["system"][f"runSystem{action.capitalize()}"][ + "job" + ]["uid"] + assert Jobs.get_job(job_id).status == JobStatus.ERROR + assert Jobs.get_job(job_id).type_id == f"system.nixos.{action}" API_ROLLBACK_SYSTEM_MUTATION = """