feat: Basic tracking of the NixOS rebuilds #98

Merged
inex merged 39 commits from system-rebuild-tracking into master 2024-03-06 18:12:22 +02:00
16 changed files with 373 additions and 80 deletions

View File

@ -19,6 +19,7 @@
pytest
pytest-datadir
pytest-mock
pytest-subprocess
black
mypy
pylsp-mypy

View File

@ -4,6 +4,8 @@ import subprocess
import pytz
from typing import Optional, List
from pydantic import BaseModel
from selfprivacy_api.jobs import Job, JobStatus, Jobs
from selfprivacy_api.jobs.upgrade_system import rebuild_system_task
from selfprivacy_api.utils import WriteUserData, ReadUserData
@ -87,10 +89,16 @@ def run_blocking(cmd: List[str], new_session: bool = False) -> str:
return stdout
def rebuild_system() -> int:
def rebuild_system() -> Job:
"""Rebuild the system"""
run_blocking(["systemctl", "start", "sp-nixos-rebuild.service"], new_session=True)
return 0
job = Jobs.add(
type_id="system.nixos.rebuild",
name="Rebuild system",
description="Applying the new system configuration by building the new NixOS generation.",
status=JobStatus.CREATED,
)
rebuild_system_task(job)
return job
def rollback_system() -> int:
@ -99,10 +107,16 @@ def rollback_system() -> int:
return 0
def upgrade_system() -> int:
def upgrade_system() -> Job:
"""Upgrade the system"""
run_blocking(["systemctl", "start", "sp-nixos-upgrade.service"], new_session=True)
return 0
job = Jobs.add(
type_id="system.nixos.upgrade",
name="Upgrade system",
description="Upgrading the system to the latest version.",
status=JobStatus.CREATED,
)
rebuild_system_task(job, upgrade=True)
return job
def reboot_system() -> None:

View File

@ -3,7 +3,9 @@
import typing
import strawberry
from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.graphql.common_types.jobs import job_to_api_job
from selfprivacy_api.graphql.mutations.mutation_interface import (
GenericJobMutationReturn,
GenericMutationReturn,
MutationReturnInterface,
)
@ -114,16 +116,17 @@ class SystemMutations:
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def run_system_rebuild(self) -> GenericMutationReturn:
def run_system_rebuild(self) -> GenericJobMutationReturn:
try:
system_actions.rebuild_system()
return GenericMutationReturn(
job = system_actions.rebuild_system()
return GenericJobMutationReturn(
success=True,
message="Starting rebuild system",
message="Starting system rebuild",
code=200,
job=job_to_api_job(job),
)
except system_actions.ShellException as e:
return GenericMutationReturn(
return GenericJobMutationReturn(
success=False,
message=str(e),
code=500,
@ -135,7 +138,7 @@ class SystemMutations:
try:
return GenericMutationReturn(
success=True,
message="Starting rebuild system",
message="Starting system rollback",
code=200,
)
except system_actions.ShellException as e:
@ -146,16 +149,17 @@ class SystemMutations:
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def run_system_upgrade(self) -> GenericMutationReturn:
system_actions.upgrade_system()
def run_system_upgrade(self) -> GenericJobMutationReturn:
try:
return GenericMutationReturn(
job = system_actions.upgrade_system()
return GenericJobMutationReturn(
success=True,
message="Starting rebuild system",
message="Starting system upgrade",
code=200,
job=job_to_api_job(job),
)
except system_actions.ShellException as e:
return GenericMutationReturn(
return GenericJobMutationReturn(
success=False,
message=str(e),
code=500,

View File

@ -0,0 +1,126 @@
"""
A task to start the system upgrade or rebuild by starting a systemd unit.
After starting, track the status of the systemd unit and update the Job
status accordingly.
"""
import subprocess
from selfprivacy_api.utils.huey import huey
from selfprivacy_api.jobs import JobStatus, Jobs, Job
from selfprivacy_api.utils.waitloop import wait_until_true
from selfprivacy_api.utils.systemd import (
get_service_status,
get_last_log_lines,
ServiceStatus,
)
START_TIMEOUT = 60 * 5
START_INTERVAL = 1
RUN_TIMEOUT = 60 * 60
RUN_INTERVAL = 5
def check_if_started(unit_name: str):
"""Check if the systemd unit has started"""
try:
status = get_service_status(unit_name)
if status == ServiceStatus.ACTIVE:
return True
return False
except subprocess.CalledProcessError:
return False
def check_running_status(job: Job, unit_name: str):
"""Check if the systemd unit is running"""
try:
status = get_service_status(unit_name)
if status == ServiceStatus.INACTIVE:
Jobs.update(
job=job,
status=JobStatus.FINISHED,
result="System rebuilt.",
progress=100,
)
return True
if status == ServiceStatus.FAILED:
log_lines = get_last_log_lines(unit_name, 10)
Jobs.update(
job=job,
status=JobStatus.ERROR,
inex marked this conversation as resolved

Try using function to get last log lines to populate this error

Try using function to get last log lines to populate this error
error="System rebuild failed. Last log lines:\n" + "\n".join(log_lines),
)
return True
if status == ServiceStatus.ACTIVE:
log_lines = get_last_log_lines(unit_name, 1)
Jobs.update(
job=job,
status=JobStatus.RUNNING,
status_text=log_lines[0] if len(log_lines) > 0 else "",
)
return False
return False
except subprocess.CalledProcessError:
return False
@huey.task()
def rebuild_system_task(job: Job, upgrade: bool = False):
"""Rebuild the system"""
unit_name = "sp-nixos-upgrade.service" if upgrade else "sp-nixos-rebuild.service"
try:
command = ["systemctl", "start", unit_name]
subprocess.run(
command,
check=True,
start_new_session=True,
shell=False,
)
Jobs.update(
job=job,
status=JobStatus.RUNNING,
status_text="Starting the system rebuild...",
)
# Wait for the systemd unit to start
try:
wait_until_true(
lambda: check_if_started(unit_name),
timeout_sec=START_TIMEOUT,
interval=START_INTERVAL,
)
except TimeoutError:
log_lines = get_last_log_lines(unit_name, 10)
Jobs.update(
job=job,
inex marked this conversation as resolved

same, include unit output

same, include unit output
status=JobStatus.ERROR,
error="System rebuild timed out. Last log lines:\n"
+ "\n".join(log_lines),
)
return
Jobs.update(
job=job,
status=JobStatus.RUNNING,
status_text="Rebuilding the system...",
)
# Wait for the systemd unit to finish
try:
wait_until_true(
lambda: check_running_status(job, unit_name),
timeout_sec=RUN_TIMEOUT,
interval=RUN_INTERVAL,
)
except TimeoutError:
log_lines = get_last_log_lines(unit_name, 10)
Jobs.update(
job=job,
status=JobStatus.ERROR,
error="System rebuild timed out. Last log lines:\n"
+ "\n".join(log_lines),
)
return
except subprocess.CalledProcessError as e:
Jobs.update(
job=job,
status=JobStatus.ERROR,
status_text=str(e),
)

View File

@ -11,9 +11,13 @@ Adding DISABLE_ALL to that array disables the migrations module entirely.
from selfprivacy_api.utils import ReadUserData, UserDataFiles
from selfprivacy_api.migrations.write_token_to_redis import WriteTokenToRedis
from selfprivacy_api.migrations.check_for_system_rebuild_jobs import (
CheckForSystemRebuildJobs,
)
migrations = [
WriteTokenToRedis(),
CheckForSystemRebuildJobs(),
]

View File

@ -0,0 +1,47 @@
from selfprivacy_api.migrations.migration import Migration
from selfprivacy_api.jobs import JobStatus, Jobs
class CheckForSystemRebuildJobs(Migration):
"""Check if there are unfinished system rebuild jobs and finish them"""
def get_migration_name(self):
return "check_for_system_rebuild_jobs"
def get_migration_description(self):
return "Check if there are unfinished system rebuild jobs and finish them"
def is_migration_needed(self):
# Check if there are any unfinished system rebuild jobs
for job in Jobs.get_jobs():
if (
job.type_id
in [
"system.nixos.rebuild",
"system.nixos.upgrade",
]
) and job.status in [
JobStatus.CREATED,
JobStatus.RUNNING,
]:
return True
def migrate(self):
# As the API is restarted, we assume that the jobs are finished
for job in Jobs.get_jobs():
if (
job.type_id
in [
"system.nixos.rebuild",
"system.nixos.upgrade",
]
) and job.status in [
JobStatus.CREATED,
JobStatus.RUNNING,
]:
Jobs.update(
job=job,
status=JobStatus.FINISHED,
result="System rebuilt.",
progress=100,
)

View File

@ -5,7 +5,7 @@ from typing import Optional, List
from selfprivacy_api.utils import get_domain
from selfprivacy_api.services.generic_status_getter import get_service_status
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.services.bitwarden.icon import BITWARDEN_ICON

View File

@ -5,7 +5,7 @@ from typing import Optional, List
from selfprivacy_api.utils import get_domain
from selfprivacy_api.services.generic_status_getter import get_service_status
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.services.gitea.icon import GITEA_ICON

View File

@ -4,7 +4,7 @@ import subprocess
from typing import Optional, List
from selfprivacy_api.jobs import Job
from selfprivacy_api.services.generic_status_getter import (
from selfprivacy_api.utils.systemd import (
get_service_status_from_several_units,
)
from selfprivacy_api.services.service import Service, ServiceStatus

View File

@ -4,7 +4,7 @@ import base64
import subprocess
from typing import Optional, List
from selfprivacy_api.services.generic_status_getter import (
from selfprivacy_api.utils.systemd import (
get_service_status_from_several_units,
)
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus

View File

@ -6,7 +6,7 @@ from typing import Optional, List
from selfprivacy_api.utils import get_domain
from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_status_getter import get_service_status
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.services.nextcloud.icon import NEXTCLOUD_ICON

View File

@ -3,7 +3,7 @@ import base64
import subprocess
import typing
from selfprivacy_api.jobs import Job
from selfprivacy_api.services.generic_status_getter import get_service_status
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.ocserv.icon import OCSERV_ICON

View File

@ -6,7 +6,7 @@ from typing import Optional, List
from selfprivacy_api.utils import get_domain
from selfprivacy_api.services.owned_path import OwnedPath
from selfprivacy_api.services.generic_status_getter import get_service_status
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.services.pleroma.icon import PLEROMA_ICON

View File

@ -1,4 +1,5 @@
from selfprivacy_api.utils.huey import huey
from selfprivacy_api.jobs.test import test_job
from selfprivacy_api.backup.tasks import *
from selfprivacy_api.services.generic_service_mover import move_service
from selfprivacy_api.services.tasks import move_service
from selfprivacy_api.jobs.upgrade_system import rebuild_system_task

View File

@ -1,5 +1,6 @@
"""Generic service status fetcher using systemctl"""
import subprocess
from typing import List
from selfprivacy_api.services.service import ServiceStatus
@ -58,3 +59,24 @@ def get_service_status_from_several_units(services: list[str]) -> ServiceStatus:
if ServiceStatus.ACTIVE in service_statuses:
return ServiceStatus.ACTIVE
return ServiceStatus.OFF
def get_last_log_lines(service: str, lines_count: int) -> List[str]:
if lines_count < 1:
raise ValueError("lines_count must be greater than 0")
try:
logs = subprocess.check_output(
[
"journalctl",
"-u",
service,
"-n",
str(lines_count),
"-o",
"cat",
],
shell=False,
).decode("utf-8")
return logs.splitlines()
except subprocess.CalledProcessError:
return []

View File

@ -3,6 +3,9 @@
# pylint: disable=missing-function-docstring
import pytest
from selfprivacy_api.jobs import JobStatus, Jobs
from tests.test_graphql.common import assert_empty, assert_ok, get_data
class ProcessMock:
"""Mock subprocess.Popen"""
@ -37,6 +40,13 @@ def mock_subprocess_check_output(mocker):
return mock
@pytest.fixture
def mock_sleep_intervals(mocker):
mock_start = mocker.patch("selfprivacy_api.jobs.upgrade_system.START_INTERVAL", 0)
mock_run = mocker.patch("selfprivacy_api.jobs.upgrade_system.RUN_INTERVAL", 0)
return (mock_start, mock_run)
API_REBUILD_SYSTEM_MUTATION = """
mutation rebuildSystem {
system {
@ -44,46 +54,14 @@ mutation rebuildSystem {
success
message
code
job {
uid
}
}
}
}
"""
def test_graphql_system_rebuild_unauthorized(client, mock_subprocess_popen):
"""Test system rebuild without authorization"""
response = client.post(
"/graphql",
json={
"query": API_REBUILD_SYSTEM_MUTATION,
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert mock_subprocess_popen.call_count == 0
def test_graphql_system_rebuild(authorized_client, mock_subprocess_popen):
"""Test system rebuild"""
response = authorized_client.post(
"/graphql",
json={
"query": API_REBUILD_SYSTEM_MUTATION,
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["system"]["runSystemRebuild"]["success"] is True
assert response.json()["data"]["system"]["runSystemRebuild"]["message"] is not None
assert response.json()["data"]["system"]["runSystemRebuild"]["code"] == 200
assert mock_subprocess_popen.call_count == 1
assert mock_subprocess_popen.call_args[0][0] == [
"systemctl",
"start",
"sp-nixos-rebuild.service",
]
API_UPGRADE_SYSTEM_MUTATION = """
mutation upgradeSystem {
system {
@ -91,44 +69,140 @@ mutation upgradeSystem {
success
message
code
job {
uid
}
}
}
}
"""
def test_graphql_system_upgrade_unauthorized(client, mock_subprocess_popen):
"""Test system upgrade without authorization"""
@pytest.mark.parametrize("action", ["rebuild", "upgrade"])
def test_graphql_system_rebuild_unauthorized(client, fp, action):
"""Test system rebuild without authorization"""
query = (
API_REBUILD_SYSTEM_MUTATION
if action == "rebuild"
else API_UPGRADE_SYSTEM_MUTATION
)
response = client.post(
"/graphql",
json={
"query": API_UPGRADE_SYSTEM_MUTATION,
"query": query,
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert mock_subprocess_popen.call_count == 0
assert_empty(response)
assert fp.call_count([fp.any()]) == 0
def test_graphql_system_upgrade(authorized_client, mock_subprocess_popen):
"""Test system upgrade"""
@pytest.mark.parametrize("action", ["rebuild", "upgrade"])
def test_graphql_system_rebuild(authorized_client, fp, action, mock_sleep_intervals):
"""Test system rebuild"""
unit_name = f"sp-nixos-{action}.service"
query = (
API_REBUILD_SYSTEM_MUTATION
if action == "rebuild"
else API_UPGRADE_SYSTEM_MUTATION
)
# Start the unit
fp.register(["systemctl", "start", unit_name])
# Wait for it to start
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
# Check its exectution
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
fp.register(
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"],
stdout="Starting rebuild...",
)
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
fp.register(
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"], stdout="Rebuilding..."
)
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
response = authorized_client.post(
"/graphql",
json={
"query": API_UPGRADE_SYSTEM_MUTATION,
"query": query,
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["system"]["runSystemUpgrade"]["success"] is True
assert response.json()["data"]["system"]["runSystemUpgrade"]["message"] is not None
assert response.json()["data"]["system"]["runSystemUpgrade"]["code"] == 200
assert mock_subprocess_popen.call_count == 1
assert mock_subprocess_popen.call_args[0][0] == [
"systemctl",
"start",
"sp-nixos-upgrade.service",
]
data = get_data(response)["system"][f"runSystem{action.capitalize()}"]
assert_ok(data)
assert fp.call_count(["systemctl", "start", unit_name]) == 1
assert fp.call_count(["systemctl", "show", unit_name]) == 6
job_id = response.json()["data"]["system"][f"runSystem{action.capitalize()}"][
"job"
]["uid"]
assert Jobs.get_job(job_id).status == JobStatus.FINISHED
assert Jobs.get_job(job_id).type_id == f"system.nixos.{action}"
@pytest.mark.parametrize("action", ["rebuild", "upgrade"])
def test_graphql_system_rebuild_failed(
authorized_client, fp, action, mock_sleep_intervals
):
"""Test system rebuild"""
unit_name = f"sp-nixos-{action}.service"
query = (
API_REBUILD_SYSTEM_MUTATION
if action == "rebuild"
else API_UPGRADE_SYSTEM_MUTATION
)
# Start the unit
fp.register(["systemctl", "start", unit_name])
# Wait for it to start
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
# Check its exectution
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
fp.register(
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"],
stdout="Starting rebuild...",
)
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
fp.register(
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"], stdout="Rebuilding..."
)
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=failed")
fp.register(
["journalctl", "-u", unit_name, "-n", "10", "-o", "cat"], stdout="Some error"
)
response = authorized_client.post(
"/graphql",
json={
"query": query,
},
)
data = get_data(response)["system"][f"runSystem{action.capitalize()}"]
assert_ok(data)
assert fp.call_count(["systemctl", "start", unit_name]) == 1
assert fp.call_count(["systemctl", "show", unit_name]) == 6
job_id = response.json()["data"]["system"][f"runSystem{action.capitalize()}"][
"job"
]["uid"]
assert Jobs.get_job(job_id).status == JobStatus.ERROR
assert Jobs.get_job(job_id).type_id == f"system.nixos.{action}"
API_ROLLBACK_SYSTEM_MUTATION = """