feat: Basic tracking of the NixOS rebuilds #98
|
@ -19,6 +19,7 @@
|
|||
pytest
|
||||
pytest-datadir
|
||||
pytest-mock
|
||||
pytest-subprocess
|
||||
black
|
||||
mypy
|
||||
pylsp-mypy
|
||||
|
|
|
@ -4,6 +4,8 @@ import subprocess
|
|||
import pytz
|
||||
from typing import Optional, List
|
||||
from pydantic import BaseModel
|
||||
from selfprivacy_api.jobs import Job, JobStatus, Jobs
|
||||
from selfprivacy_api.jobs.upgrade_system import rebuild_system_task
|
||||
|
||||
from selfprivacy_api.utils import WriteUserData, ReadUserData
|
||||
|
||||
|
@ -87,10 +89,16 @@ def run_blocking(cmd: List[str], new_session: bool = False) -> str:
|
|||
return stdout
|
||||
|
||||
|
||||
def rebuild_system() -> int:
|
||||
def rebuild_system() -> Job:
|
||||
"""Rebuild the system"""
|
||||
run_blocking(["systemctl", "start", "sp-nixos-rebuild.service"], new_session=True)
|
||||
return 0
|
||||
job = Jobs.add(
|
||||
type_id="system.nixos.rebuild",
|
||||
name="Rebuild system",
|
||||
description="Applying the new system configuration by building the new NixOS generation.",
|
||||
status=JobStatus.CREATED,
|
||||
)
|
||||
rebuild_system_task(job)
|
||||
return job
|
||||
|
||||
|
||||
def rollback_system() -> int:
|
||||
|
@ -99,10 +107,16 @@ def rollback_system() -> int:
|
|||
return 0
|
||||
|
||||
|
||||
def upgrade_system() -> int:
|
||||
def upgrade_system() -> Job:
|
||||
"""Upgrade the system"""
|
||||
run_blocking(["systemctl", "start", "sp-nixos-upgrade.service"], new_session=True)
|
||||
return 0
|
||||
job = Jobs.add(
|
||||
type_id="system.nixos.upgrade",
|
||||
name="Upgrade system",
|
||||
description="Upgrading the system to the latest version.",
|
||||
status=JobStatus.CREATED,
|
||||
)
|
||||
rebuild_system_task(job, upgrade=True)
|
||||
return job
|
||||
|
||||
|
||||
def reboot_system() -> None:
|
||||
|
|
|
@ -3,7 +3,9 @@
|
|||
import typing
|
||||
import strawberry
|
||||
from selfprivacy_api.graphql import IsAuthenticated
|
||||
from selfprivacy_api.graphql.common_types.jobs import job_to_api_job
|
||||
from selfprivacy_api.graphql.mutations.mutation_interface import (
|
||||
GenericJobMutationReturn,
|
||||
GenericMutationReturn,
|
||||
MutationReturnInterface,
|
||||
)
|
||||
|
@ -114,16 +116,17 @@ class SystemMutations:
|
|||
)
|
||||
|
||||
@strawberry.mutation(permission_classes=[IsAuthenticated])
|
||||
def run_system_rebuild(self) -> GenericMutationReturn:
|
||||
def run_system_rebuild(self) -> GenericJobMutationReturn:
|
||||
try:
|
||||
system_actions.rebuild_system()
|
||||
return GenericMutationReturn(
|
||||
job = system_actions.rebuild_system()
|
||||
return GenericJobMutationReturn(
|
||||
success=True,
|
||||
message="Starting rebuild system",
|
||||
message="Starting system rebuild",
|
||||
code=200,
|
||||
job=job_to_api_job(job),
|
||||
)
|
||||
except system_actions.ShellException as e:
|
||||
return GenericMutationReturn(
|
||||
return GenericJobMutationReturn(
|
||||
success=False,
|
||||
message=str(e),
|
||||
code=500,
|
||||
|
@ -135,7 +138,7 @@ class SystemMutations:
|
|||
try:
|
||||
return GenericMutationReturn(
|
||||
success=True,
|
||||
message="Starting rebuild system",
|
||||
message="Starting system rollback",
|
||||
code=200,
|
||||
)
|
||||
except system_actions.ShellException as e:
|
||||
|
@ -146,16 +149,17 @@ class SystemMutations:
|
|||
)
|
||||
|
||||
@strawberry.mutation(permission_classes=[IsAuthenticated])
|
||||
def run_system_upgrade(self) -> GenericMutationReturn:
|
||||
system_actions.upgrade_system()
|
||||
def run_system_upgrade(self) -> GenericJobMutationReturn:
|
||||
try:
|
||||
return GenericMutationReturn(
|
||||
job = system_actions.upgrade_system()
|
||||
return GenericJobMutationReturn(
|
||||
success=True,
|
||||
message="Starting rebuild system",
|
||||
message="Starting system upgrade",
|
||||
code=200,
|
||||
job=job_to_api_job(job),
|
||||
)
|
||||
except system_actions.ShellException as e:
|
||||
return GenericMutationReturn(
|
||||
return GenericJobMutationReturn(
|
||||
success=False,
|
||||
message=str(e),
|
||||
code=500,
|
||||
|
|
|
@ -0,0 +1,126 @@
|
|||
"""
|
||||
A task to start the system upgrade or rebuild by starting a systemd unit.
|
||||
After starting, track the status of the systemd unit and update the Job
|
||||
status accordingly.
|
||||
"""
|
||||
import subprocess
|
||||
from selfprivacy_api.utils.huey import huey
|
||||
from selfprivacy_api.jobs import JobStatus, Jobs, Job
|
||||
from selfprivacy_api.utils.waitloop import wait_until_true
|
||||
from selfprivacy_api.utils.systemd import (
|
||||
get_service_status,
|
||||
get_last_log_lines,
|
||||
ServiceStatus,
|
||||
)
|
||||
|
||||
START_TIMEOUT = 60 * 5
|
||||
START_INTERVAL = 1
|
||||
RUN_TIMEOUT = 60 * 60
|
||||
RUN_INTERVAL = 5
|
||||
|
||||
|
||||
def check_if_started(unit_name: str):
|
||||
"""Check if the systemd unit has started"""
|
||||
try:
|
||||
status = get_service_status(unit_name)
|
||||
if status == ServiceStatus.ACTIVE:
|
||||
return True
|
||||
return False
|
||||
except subprocess.CalledProcessError:
|
||||
return False
|
||||
|
||||
|
||||
def check_running_status(job: Job, unit_name: str):
|
||||
"""Check if the systemd unit is running"""
|
||||
try:
|
||||
status = get_service_status(unit_name)
|
||||
if status == ServiceStatus.INACTIVE:
|
||||
Jobs.update(
|
||||
inex marked this conversation as resolved
Outdated
|
||||
job=job,
|
||||
status=JobStatus.FINISHED,
|
||||
result="System rebuilt.",
|
||||
inex marked this conversation as resolved
Outdated
houkime
commented
Outdated
Review
Use selfprivacy_api.services.generic_status_getter Use selfprivacy_api.services.generic_status_getter
Also move that file to utils and name something like systemd.py
|
||||
progress=100,
|
||||
)
|
||||
return True
|
||||
if status == ServiceStatus.FAILED:
|
||||
log_lines = get_last_log_lines(unit_name, 10)
|
||||
Jobs.update(
|
||||
job=job,
|
||||
status=JobStatus.ERROR,
|
||||
inex marked this conversation as resolved
houkime
commented
Review
Try using function to get last log lines to populate this error Try using function to get last log lines to populate this error
|
||||
error="System rebuild failed. Last log lines:\n" + "\n".join(log_lines),
|
||||
)
|
||||
return True
|
||||
if status == ServiceStatus.ACTIVE:
|
||||
log_lines = get_last_log_lines(unit_name, 1)
|
||||
Jobs.update(
|
||||
job=job,
|
||||
status=JobStatus.RUNNING,
|
||||
status_text=log_lines[0] if len(log_lines) > 0 else "",
|
||||
)
|
||||
return False
|
||||
return False
|
||||
except subprocess.CalledProcessError:
|
||||
return False
|
||||
|
||||
|
||||
@huey.task()
|
||||
def rebuild_system_task(job: Job, upgrade: bool = False):
|
||||
inex marked this conversation as resolved
Outdated
houkime
commented
Outdated
Review
same, use generic_status_getter (which hopefully becomes systemd.py) same, use generic_status_getter (which hopefully becomes systemd.py)
|
||||
"""Rebuild the system"""
|
||||
unit_name = "sp-nixos-upgrade.service" if upgrade else "sp-nixos-rebuild.service"
|
||||
try:
|
||||
command = ["systemctl", "start", unit_name]
|
||||
subprocess.run(
|
||||
command,
|
||||
check=True,
|
||||
start_new_session=True,
|
||||
shell=False,
|
||||
)
|
||||
Jobs.update(
|
||||
job=job,
|
||||
status=JobStatus.RUNNING,
|
||||
status_text="Starting the system rebuild...",
|
||||
)
|
||||
# Wait for the systemd unit to start
|
||||
try:
|
||||
wait_until_true(
|
||||
lambda: check_if_started(unit_name),
|
||||
timeout_sec=START_TIMEOUT,
|
||||
interval=START_INTERVAL,
|
||||
inex marked this conversation as resolved
Outdated
houkime
commented
Outdated
Review
Add service output lookup also to utils.systemd Add service output lookup also to utils.systemd
|
||||
)
|
||||
except TimeoutError:
|
||||
log_lines = get_last_log_lines(unit_name, 10)
|
||||
Jobs.update(
|
||||
job=job,
|
||||
inex marked this conversation as resolved
houkime
commented
Review
same, include unit output same, include unit output
|
||||
status=JobStatus.ERROR,
|
||||
error="System rebuild timed out. Last log lines:\n"
|
||||
+ "\n".join(log_lines),
|
||||
)
|
||||
return
|
||||
Jobs.update(
|
||||
job=job,
|
||||
status=JobStatus.RUNNING,
|
||||
status_text="Rebuilding the system...",
|
||||
)
|
||||
# Wait for the systemd unit to finish
|
||||
try:
|
||||
wait_until_true(
|
||||
lambda: check_running_status(job, unit_name),
|
||||
timeout_sec=RUN_TIMEOUT,
|
||||
interval=RUN_INTERVAL,
|
||||
)
|
||||
except TimeoutError:
|
||||
log_lines = get_last_log_lines(unit_name, 10)
|
||||
inex marked this conversation as resolved
Outdated
houkime
commented
Outdated
Review
include unit output if any include unit output if any
|
||||
Jobs.update(
|
||||
job=job,
|
||||
status=JobStatus.ERROR,
|
||||
error="System rebuild timed out. Last log lines:\n"
|
||||
+ "\n".join(log_lines),
|
||||
)
|
||||
return
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
Jobs.update(
|
||||
job=job,
|
||||
status=JobStatus.ERROR,
|
||||
status_text=str(e),
|
||||
)
|
|
@ -11,9 +11,13 @@ Adding DISABLE_ALL to that array disables the migrations module entirely.
|
|||
|
||||
from selfprivacy_api.utils import ReadUserData, UserDataFiles
|
||||
from selfprivacy_api.migrations.write_token_to_redis import WriteTokenToRedis
|
||||
from selfprivacy_api.migrations.check_for_system_rebuild_jobs import (
|
||||
CheckForSystemRebuildJobs,
|
||||
)
|
||||
|
||||
migrations = [
|
||||
WriteTokenToRedis(),
|
||||
CheckForSystemRebuildJobs(),
|
||||
]
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
from selfprivacy_api.migrations.migration import Migration
|
||||
from selfprivacy_api.jobs import JobStatus, Jobs
|
||||
|
||||
|
||||
class CheckForSystemRebuildJobs(Migration):
|
||||
"""Check if there are unfinished system rebuild jobs and finish them"""
|
||||
|
||||
def get_migration_name(self):
|
||||
return "check_for_system_rebuild_jobs"
|
||||
|
||||
def get_migration_description(self):
|
||||
return "Check if there are unfinished system rebuild jobs and finish them"
|
||||
|
||||
def is_migration_needed(self):
|
||||
# Check if there are any unfinished system rebuild jobs
|
||||
for job in Jobs.get_jobs():
|
||||
if (
|
||||
job.type_id
|
||||
in [
|
||||
"system.nixos.rebuild",
|
||||
"system.nixos.upgrade",
|
||||
]
|
||||
) and job.status in [
|
||||
JobStatus.CREATED,
|
||||
JobStatus.RUNNING,
|
||||
]:
|
||||
return True
|
||||
|
||||
def migrate(self):
|
||||
# As the API is restarted, we assume that the jobs are finished
|
||||
for job in Jobs.get_jobs():
|
||||
if (
|
||||
job.type_id
|
||||
in [
|
||||
"system.nixos.rebuild",
|
||||
"system.nixos.upgrade",
|
||||
]
|
||||
) and job.status in [
|
||||
JobStatus.CREATED,
|
||||
JobStatus.RUNNING,
|
||||
]:
|
||||
Jobs.update(
|
||||
job=job,
|
||||
status=JobStatus.FINISHED,
|
||||
result="System rebuilt.",
|
||||
progress=100,
|
||||
)
|
|
@ -5,7 +5,7 @@ from typing import Optional, List
|
|||
|
||||
from selfprivacy_api.utils import get_domain
|
||||
|
||||
from selfprivacy_api.services.generic_status_getter import get_service_status
|
||||
from selfprivacy_api.utils.systemd import get_service_status
|
||||
from selfprivacy_api.services.service import Service, ServiceStatus
|
||||
from selfprivacy_api.services.bitwarden.icon import BITWARDEN_ICON
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ from typing import Optional, List
|
|||
|
||||
from selfprivacy_api.utils import get_domain
|
||||
|
||||
from selfprivacy_api.services.generic_status_getter import get_service_status
|
||||
from selfprivacy_api.utils.systemd import get_service_status
|
||||
from selfprivacy_api.services.service import Service, ServiceStatus
|
||||
from selfprivacy_api.services.gitea.icon import GITEA_ICON
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ import subprocess
|
|||
from typing import Optional, List
|
||||
|
||||
from selfprivacy_api.jobs import Job
|
||||
from selfprivacy_api.services.generic_status_getter import (
|
||||
from selfprivacy_api.utils.systemd import (
|
||||
get_service_status_from_several_units,
|
||||
)
|
||||
from selfprivacy_api.services.service import Service, ServiceStatus
|
||||
|
|
|
@ -4,7 +4,7 @@ import base64
|
|||
import subprocess
|
||||
from typing import Optional, List
|
||||
|
||||
from selfprivacy_api.services.generic_status_getter import (
|
||||
from selfprivacy_api.utils.systemd import (
|
||||
get_service_status_from_several_units,
|
||||
)
|
||||
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
|
||||
|
|
|
@ -6,7 +6,7 @@ from typing import Optional, List
|
|||
from selfprivacy_api.utils import get_domain
|
||||
from selfprivacy_api.jobs import Job, Jobs
|
||||
|
||||
from selfprivacy_api.services.generic_status_getter import get_service_status
|
||||
from selfprivacy_api.utils.systemd import get_service_status
|
||||
from selfprivacy_api.services.service import Service, ServiceStatus
|
||||
|
||||
from selfprivacy_api.services.nextcloud.icon import NEXTCLOUD_ICON
|
||||
|
|
|
@ -3,7 +3,7 @@ import base64
|
|||
import subprocess
|
||||
import typing
|
||||
from selfprivacy_api.jobs import Job
|
||||
from selfprivacy_api.services.generic_status_getter import get_service_status
|
||||
from selfprivacy_api.utils.systemd import get_service_status
|
||||
from selfprivacy_api.services.service import Service, ServiceStatus
|
||||
from selfprivacy_api.utils.block_devices import BlockDevice
|
||||
from selfprivacy_api.services.ocserv.icon import OCSERV_ICON
|
||||
|
|
|
@ -6,7 +6,7 @@ from typing import Optional, List
|
|||
from selfprivacy_api.utils import get_domain
|
||||
|
||||
from selfprivacy_api.services.owned_path import OwnedPath
|
||||
from selfprivacy_api.services.generic_status_getter import get_service_status
|
||||
from selfprivacy_api.utils.systemd import get_service_status
|
||||
from selfprivacy_api.services.service import Service, ServiceStatus
|
||||
|
||||
from selfprivacy_api.services.pleroma.icon import PLEROMA_ICON
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
from selfprivacy_api.utils.huey import huey
|
||||
from selfprivacy_api.jobs.test import test_job
|
||||
from selfprivacy_api.backup.tasks import *
|
||||
from selfprivacy_api.services.generic_service_mover import move_service
|
||||
from selfprivacy_api.services.tasks import move_service
|
||||
from selfprivacy_api.jobs.upgrade_system import rebuild_system_task
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
"""Generic service status fetcher using systemctl"""
|
||||
import subprocess
|
||||
from typing import List
|
||||
|
||||
from selfprivacy_api.services.service import ServiceStatus
|
||||
|
||||
|
@ -58,3 +59,24 @@ def get_service_status_from_several_units(services: list[str]) -> ServiceStatus:
|
|||
if ServiceStatus.ACTIVE in service_statuses:
|
||||
return ServiceStatus.ACTIVE
|
||||
return ServiceStatus.OFF
|
||||
|
||||
|
||||
def get_last_log_lines(service: str, lines_count: int) -> List[str]:
|
||||
if lines_count < 1:
|
||||
raise ValueError("lines_count must be greater than 0")
|
||||
try:
|
||||
logs = subprocess.check_output(
|
||||
[
|
||||
"journalctl",
|
||||
"-u",
|
||||
service,
|
||||
"-n",
|
||||
str(lines_count),
|
||||
"-o",
|
||||
"cat",
|
||||
],
|
||||
shell=False,
|
||||
).decode("utf-8")
|
||||
return logs.splitlines()
|
||||
except subprocess.CalledProcessError:
|
||||
return []
|
|
@ -3,6 +3,9 @@
|
|||
# pylint: disable=missing-function-docstring
|
||||
import pytest
|
||||
|
||||
from selfprivacy_api.jobs import JobStatus, Jobs
|
||||
from tests.test_graphql.common import assert_empty, assert_ok, get_data
|
||||
|
||||
|
||||
class ProcessMock:
|
||||
"""Mock subprocess.Popen"""
|
||||
|
@ -37,6 +40,13 @@ def mock_subprocess_check_output(mocker):
|
|||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_sleep_intervals(mocker):
|
||||
mock_start = mocker.patch("selfprivacy_api.jobs.upgrade_system.START_INTERVAL", 0)
|
||||
mock_run = mocker.patch("selfprivacy_api.jobs.upgrade_system.RUN_INTERVAL", 0)
|
||||
return (mock_start, mock_run)
|
||||
|
||||
|
||||
API_REBUILD_SYSTEM_MUTATION = """
|
||||
mutation rebuildSystem {
|
||||
system {
|
||||
|
@ -44,46 +54,14 @@ mutation rebuildSystem {
|
|||
success
|
||||
message
|
||||
code
|
||||
job {
|
||||
uid
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
def test_graphql_system_rebuild_unauthorized(client, mock_subprocess_popen):
|
||||
"""Test system rebuild without authorization"""
|
||||
response = client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_REBUILD_SYSTEM_MUTATION,
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is None
|
||||
assert mock_subprocess_popen.call_count == 0
|
||||
|
||||
|
||||
def test_graphql_system_rebuild(authorized_client, mock_subprocess_popen):
|
||||
"""Test system rebuild"""
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_REBUILD_SYSTEM_MUTATION,
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
assert response.json()["data"]["system"]["runSystemRebuild"]["success"] is True
|
||||
assert response.json()["data"]["system"]["runSystemRebuild"]["message"] is not None
|
||||
assert response.json()["data"]["system"]["runSystemRebuild"]["code"] == 200
|
||||
assert mock_subprocess_popen.call_count == 1
|
||||
assert mock_subprocess_popen.call_args[0][0] == [
|
||||
"systemctl",
|
||||
"start",
|
||||
"sp-nixos-rebuild.service",
|
||||
]
|
||||
|
||||
|
||||
API_UPGRADE_SYSTEM_MUTATION = """
|
||||
mutation upgradeSystem {
|
||||
system {
|
||||
|
@ -91,44 +69,140 @@ mutation upgradeSystem {
|
|||
success
|
||||
message
|
||||
code
|
||||
job {
|
||||
uid
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
def test_graphql_system_upgrade_unauthorized(client, mock_subprocess_popen):
|
||||
"""Test system upgrade without authorization"""
|
||||
@pytest.mark.parametrize("action", ["rebuild", "upgrade"])
|
||||
def test_graphql_system_rebuild_unauthorized(client, fp, action):
|
||||
"""Test system rebuild without authorization"""
|
||||
query = (
|
||||
API_REBUILD_SYSTEM_MUTATION
|
||||
if action == "rebuild"
|
||||
else API_UPGRADE_SYSTEM_MUTATION
|
||||
)
|
||||
|
||||
response = client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_UPGRADE_SYSTEM_MUTATION,
|
||||
"query": query,
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is None
|
||||
assert mock_subprocess_popen.call_count == 0
|
||||
assert_empty(response)
|
||||
assert fp.call_count([fp.any()]) == 0
|
||||
|
||||
|
||||
def test_graphql_system_upgrade(authorized_client, mock_subprocess_popen):
|
||||
"""Test system upgrade"""
|
||||
@pytest.mark.parametrize("action", ["rebuild", "upgrade"])
|
||||
def test_graphql_system_rebuild(authorized_client, fp, action, mock_sleep_intervals):
|
||||
"""Test system rebuild"""
|
||||
unit_name = f"sp-nixos-{action}.service"
|
||||
query = (
|
||||
API_REBUILD_SYSTEM_MUTATION
|
||||
if action == "rebuild"
|
||||
else API_UPGRADE_SYSTEM_MUTATION
|
||||
)
|
||||
|
||||
# Start the unit
|
||||
fp.register(["systemctl", "start", unit_name])
|
||||
|
||||
# Wait for it to start
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
|
||||
|
||||
# Check its exectution
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
|
||||
fp.register(
|
||||
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"],
|
||||
stdout="Starting rebuild...",
|
||||
)
|
||||
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
|
||||
fp.register(
|
||||
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"], stdout="Rebuilding..."
|
||||
)
|
||||
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
|
||||
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_UPGRADE_SYSTEM_MUTATION,
|
||||
"query": query,
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
assert response.json()["data"]["system"]["runSystemUpgrade"]["success"] is True
|
||||
assert response.json()["data"]["system"]["runSystemUpgrade"]["message"] is not None
|
||||
assert response.json()["data"]["system"]["runSystemUpgrade"]["code"] == 200
|
||||
assert mock_subprocess_popen.call_count == 1
|
||||
assert mock_subprocess_popen.call_args[0][0] == [
|
||||
"systemctl",
|
||||
"start",
|
||||
"sp-nixos-upgrade.service",
|
||||
]
|
||||
data = get_data(response)["system"][f"runSystem{action.capitalize()}"]
|
||||
assert_ok(data)
|
||||
inex marked this conversation as resolved
Outdated
houkime
commented
Outdated
Review
use tests.test_graphql.common get_data and other functions from that file please (it makes things much shorter and more readable). use tests.test_graphql.common get_data and other functions from that file please (it makes things much shorter and more readable).
For reference consult test_graphql/test_api_backup
|
||||
|
||||
assert fp.call_count(["systemctl", "start", unit_name]) == 1
|
||||
assert fp.call_count(["systemctl", "show", unit_name]) == 6
|
||||
|
||||
job_id = response.json()["data"]["system"][f"runSystem{action.capitalize()}"][
|
||||
"job"
|
||||
]["uid"]
|
||||
assert Jobs.get_job(job_id).status == JobStatus.FINISHED
|
||||
assert Jobs.get_job(job_id).type_id == f"system.nixos.{action}"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("action", ["rebuild", "upgrade"])
|
||||
def test_graphql_system_rebuild_failed(
|
||||
authorized_client, fp, action, mock_sleep_intervals
|
||||
):
|
||||
"""Test system rebuild"""
|
||||
unit_name = f"sp-nixos-{action}.service"
|
||||
query = (
|
||||
API_REBUILD_SYSTEM_MUTATION
|
||||
if action == "rebuild"
|
||||
else API_UPGRADE_SYSTEM_MUTATION
|
||||
)
|
||||
|
||||
# Start the unit
|
||||
fp.register(["systemctl", "start", unit_name])
|
||||
|
||||
# Wait for it to start
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
|
||||
|
||||
# Check its exectution
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
|
||||
fp.register(
|
||||
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"],
|
||||
stdout="Starting rebuild...",
|
||||
)
|
||||
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
|
||||
fp.register(
|
||||
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"], stdout="Rebuilding..."
|
||||
)
|
||||
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=failed")
|
||||
|
||||
fp.register(
|
||||
["journalctl", "-u", unit_name, "-n", "10", "-o", "cat"], stdout="Some error"
|
||||
)
|
||||
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": query,
|
||||
},
|
||||
)
|
||||
data = get_data(response)["system"][f"runSystem{action.capitalize()}"]
|
||||
assert_ok(data)
|
||||
|
||||
assert fp.call_count(["systemctl", "start", unit_name]) == 1
|
||||
assert fp.call_count(["systemctl", "show", unit_name]) == 6
|
||||
|
||||
job_id = response.json()["data"]["system"][f"runSystem{action.capitalize()}"][
|
||||
"job"
|
||||
]["uid"]
|
||||
assert Jobs.get_job(job_id).status == JobStatus.ERROR
|
||||
assert Jobs.get_job(job_id).type_id == f"system.nixos.{action}"
|
||||
|
||||
|
||||
API_ROLLBACK_SYSTEM_MUTATION = """
|
||||
|
|
Loading…
Reference in New Issue
use utils.waitloop.wait_until_true