tests: Cover upgrade and rebuild task
continuous-integration/drone/push Build is failing Details

pull/98/head
Inex Code 2024-02-26 22:49:32 +03:00
parent d8666fa179
commit c63552241c
3 changed files with 164 additions and 74 deletions

View File

@ -19,6 +19,7 @@
pytest
pytest-datadir
pytest-mock
pytest-subprocess
black
mypy
pylsp-mypy

View File

@ -4,9 +4,15 @@ After starting, track the status of the systemd unit and update the Job
status accordingly.
"""
import subprocess
import time
from selfprivacy_api.utils.huey import huey
from selfprivacy_api.jobs import JobStatus, Jobs, Job
import time
from datetime import datetime
START_TIMEOUT = 60 * 5
START_INTERVAL = 1
RUN_TIMEOUT = 60 * 60
RUN_INTERVAL = 5
@huey.task()
@ -27,7 +33,7 @@ def rebuild_system_task(job: Job, upgrade: bool = False):
status_text="Starting the system rebuild...",
)
# Get current time to handle timeout
start_time = time.time()
start_time = datetime.now()
# Wait for the systemd unit to start
while True:
try:
@ -37,18 +43,16 @@ def rebuild_system_task(job: Job, upgrade: bool = False):
capture_output=True,
text=True,
)
print(status.stdout.strip())
if status.stdout.strip() == "active":
break
# Timeount after 5 minutes
if time.time() - start_time > 300:
if (datetime.now() - start_time).total_seconds() > START_TIMEOUT:
Jobs.update(
job=job,
status=JobStatus.ERROR,
error="System rebuild timed out.",
)
return
time.sleep(1)
time.sleep(START_INTERVAL)
except subprocess.CalledProcessError:
pass
Jobs.update(
@ -56,7 +60,6 @@ def rebuild_system_task(job: Job, upgrade: bool = False):
status=JobStatus.RUNNING,
status_text="Rebuilding the system...",
)
print("Rebuilding the system...")
# Wait for the systemd unit to finish
while True:
try:
@ -66,9 +69,7 @@ def rebuild_system_task(job: Job, upgrade: bool = False):
capture_output=True,
text=True,
)
print(f"Unit status: {status.stdout.strip()}")
if status.stdout.strip() == "inactive":
print("System rebuilt.")
Jobs.update(
job=job,
status=JobStatus.FINISHED,
@ -77,7 +78,6 @@ def rebuild_system_task(job: Job, upgrade: bool = False):
)
break
elif status.stdout.strip() == "failed":
print("System rebuild failed.")
Jobs.update(
job=job,
status=JobStatus.ERROR,
@ -85,7 +85,6 @@ def rebuild_system_task(job: Job, upgrade: bool = False):
)
break
elif status.stdout.strip() == "active":
print("Geting a log line")
log_line = subprocess.run(
[
"journalctl",
@ -100,25 +99,21 @@ def rebuild_system_task(job: Job, upgrade: bool = False):
capture_output=True,
text=True,
).stdout.strip()
print(log_line)
Jobs.update(
job=job,
status=JobStatus.RUNNING,
status_text=f"{log_line}",
)
# Timeout of 60 minutes
if time.time() - start_time > 3600:
print("System rebuild timed out.")
Jobs.update(
job=job,
status=JobStatus.ERROR,
error="System rebuild timed out.",
)
break
time.sleep(5)
except subprocess.CalledProcessError:
print("subprocess.CalledProcessError")
pass
if (datetime.now() - start_time).total_seconds() > RUN_TIMEOUT:
Jobs.update(
job=job,
status=JobStatus.ERROR,
error="System rebuild timed out.",
)
break
time.sleep(RUN_INTERVAL)
except subprocess.CalledProcessError as e:
Jobs.update(

View File

@ -3,6 +3,8 @@
# pylint: disable=missing-function-docstring
import pytest
from selfprivacy_api.jobs import JobStatus, Jobs
class ProcessMock:
"""Mock subprocess.Popen"""
@ -37,6 +39,13 @@ def mock_subprocess_check_output(mocker):
return mock
@pytest.fixture
def mock_sleep_intervals(mocker):
mock_start = mocker.patch("selfprivacy_api.jobs.upgrade_system.START_INTERVAL", 0)
mock_run = mocker.patch("selfprivacy_api.jobs.upgrade_system.RUN_INTERVAL", 0)
return (mock_start, mock_run)
API_REBUILD_SYSTEM_MUTATION = """
mutation rebuildSystem {
system {
@ -44,46 +53,14 @@ mutation rebuildSystem {
success
message
code
job {
uid
}
}
}
}
"""
def test_graphql_system_rebuild_unauthorized(client, mock_subprocess_popen):
"""Test system rebuild without authorization"""
response = client.post(
"/graphql",
json={
"query": API_REBUILD_SYSTEM_MUTATION,
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert mock_subprocess_popen.call_count == 0
def test_graphql_system_rebuild(authorized_client, mock_subprocess_popen):
"""Test system rebuild"""
response = authorized_client.post(
"/graphql",
json={
"query": API_REBUILD_SYSTEM_MUTATION,
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["system"]["runSystemRebuild"]["success"] is True
assert response.json()["data"]["system"]["runSystemRebuild"]["message"] is not None
assert response.json()["data"]["system"]["runSystemRebuild"]["code"] == 200
assert mock_subprocess_popen.call_count == 1
assert mock_subprocess_popen.call_args[0][0] == [
"systemctl",
"start",
"sp-nixos-rebuild.service",
]
API_UPGRADE_SYSTEM_MUTATION = """
mutation upgradeSystem {
system {
@ -91,44 +68,161 @@ mutation upgradeSystem {
success
message
code
job {
uid
}
}
}
}
"""
def test_graphql_system_upgrade_unauthorized(client, mock_subprocess_popen):
"""Test system upgrade without authorization"""
@pytest.mark.parametrize("action", ["rebuild", "upgrade"])
def test_graphql_system_rebuild_unauthorized(client, fp, action):
"""Test system rebuild without authorization"""
query = (
API_REBUILD_SYSTEM_MUTATION
if action == "rebuild"
else API_UPGRADE_SYSTEM_MUTATION
)
response = client.post(
"/graphql",
json={
"query": API_UPGRADE_SYSTEM_MUTATION,
"query": query,
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert mock_subprocess_popen.call_count == 0
assert fp.call_count([fp.any()]) == 0
def test_graphql_system_upgrade(authorized_client, mock_subprocess_popen):
"""Test system upgrade"""
@pytest.mark.parametrize("action", ["rebuild", "upgrade"])
def test_graphql_system_rebuild(authorized_client, fp, action, mock_sleep_intervals):
"""Test system rebuild"""
unit_name = f"sp-nixos-{action}.service"
query = (
API_REBUILD_SYSTEM_MUTATION
if action == "rebuild"
else API_UPGRADE_SYSTEM_MUTATION
)
# Start the unit
fp.register(["systemctl", "start", unit_name])
# Wait for it to start
fp.register(["systemctl", "is-active", unit_name], stdout="inactive")
fp.register(["systemctl", "is-active", unit_name], stdout="inactive")
fp.register(["systemctl", "is-active", unit_name], stdout="active")
# Check its exectution
fp.register(["systemctl", "is-active", unit_name], stdout="active")
fp.register(
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"],
stdout="Starting rebuild...",
)
fp.register(["systemctl", "is-active", unit_name], stdout="active")
fp.register(
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"], stdout="Rebuilding..."
)
fp.register(["systemctl", "is-active", unit_name], stdout="inactive")
response = authorized_client.post(
"/graphql",
json={
"query": API_UPGRADE_SYSTEM_MUTATION,
"query": query,
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["system"]["runSystemUpgrade"]["success"] is True
assert response.json()["data"]["system"]["runSystemUpgrade"]["message"] is not None
assert response.json()["data"]["system"]["runSystemUpgrade"]["code"] == 200
assert mock_subprocess_popen.call_count == 1
assert mock_subprocess_popen.call_args[0][0] == [
"systemctl",
"start",
"sp-nixos-upgrade.service",
]
assert (
response.json()["data"]["system"][f"runSystem{action.capitalize()}"]["success"]
is True
)
assert (
response.json()["data"]["system"][f"runSystem{action.capitalize()}"]["message"]
is not None
)
assert (
response.json()["data"]["system"][f"runSystem{action.capitalize()}"]["code"]
== 200
)
assert fp.call_count(["systemctl", "start", unit_name]) == 1
assert fp.call_count(["systemctl", "is-active", unit_name]) == 6
job_id = response.json()["data"]["system"][f"runSystem{action.capitalize()}"][
"job"
]["uid"]
assert Jobs.get_job(job_id).status == JobStatus.FINISHED
assert Jobs.get_job(job_id).type_id == f"system.nixos.{action}"
@pytest.mark.parametrize("action", ["rebuild", "upgrade"])
def test_graphql_system_rebuild_failed(
authorized_client, fp, action, mock_sleep_intervals
):
"""Test system rebuild"""
unit_name = f"sp-nixos-{action}.service"
query = (
API_REBUILD_SYSTEM_MUTATION
if action == "rebuild"
else API_UPGRADE_SYSTEM_MUTATION
)
# Start the unit
fp.register(["systemctl", "start", unit_name])
# Wait for it to start
fp.register(["systemctl", "is-active", unit_name], stdout="inactive")
fp.register(["systemctl", "is-active", unit_name], stdout="inactive")
fp.register(["systemctl", "is-active", unit_name], stdout="active")
# Check its exectution
fp.register(["systemctl", "is-active", unit_name], stdout="active")
fp.register(
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"],
stdout="Starting rebuild...",
)
fp.register(["systemctl", "is-active", unit_name], stdout="active")
fp.register(
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"], stdout="Rebuilding..."
)
fp.register(["systemctl", "is-active", unit_name], stdout="failed")
response = authorized_client.post(
"/graphql",
json={
"query": query,
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert (
response.json()["data"]["system"][f"runSystem{action.capitalize()}"]["success"]
is True
)
assert (
response.json()["data"]["system"][f"runSystem{action.capitalize()}"]["message"]
is not None
)
assert (
response.json()["data"]["system"][f"runSystem{action.capitalize()}"]["code"]
== 200
)
assert fp.call_count(["systemctl", "start", unit_name]) == 1
assert fp.call_count(["systemctl", "is-active", unit_name]) == 6
job_id = response.json()["data"]["system"][f"runSystem{action.capitalize()}"][
"job"
]["uid"]
assert Jobs.get_job(job_id).status == JobStatus.ERROR
assert Jobs.get_job(job_id).type_id == f"system.nixos.{action}"
API_ROLLBACK_SYSTEM_MUTATION = """