Compare commits

...

23 Commits

Author SHA1 Message Date
Houkime 6f38b2309f fix(huey): adapt to new VM test environment
continuous-integration/drone/push Build is failing Details
2024-03-18 12:18:55 +00:00
Houkime baf7843349 test(huey): only import test task if it is a test 2024-03-18 12:18:55 +00:00
Houkime 8e48a5ad5f test(huey): add a scheduling test (expected-fails for now) 2024-03-18 12:18:55 +00:00
Houkime fde461b4b9 test(huey): test that redis socket connection works 2024-03-18 12:18:55 +00:00
Houkime 9954737791 use kill() instead of terminate in huey tests 2024-03-18 12:18:55 +00:00
Houkime 2b19633cbd test(huey): break out preparing the environment vars
I did it for testing redis socketing too, but I guess this will wait for
another time. Somehow it worked even without an actual redis socket and it was
creepy. Idk yet how one can best make redis to make sockets at arbitrary
temporary dirs without starting another redis.
2024-03-18 12:18:55 +00:00
Houkime 83592b7bf4 feature(huey): use RedisHuey 2024-03-18 12:18:55 +00:00
houkime efc6b47cfe Merge pull request 'rebuild-when-moving' (#101) from rebuild-when-moving into master
continuous-integration/drone/push Build is passing Details
Reviewed-on: #101
Reviewed-by: Inex Code <inex.code@selfprivacy.org>
2024-03-18 14:14:08 +02:00
Houkime b2edfe784a refactor(service): add return typing to DNSrecord conversion and comments
continuous-integration/drone/push Build is passing Details
2024-03-18 11:44:53 +00:00
Houkime 6e29da4a4f test(service): test moving with rebuilding via fp 2024-03-18 11:32:02 +00:00
Houkime 12b2153b7c test(service): do not call bash needlessly (it screwed up with fp) 2024-03-18 11:32:02 +00:00
Houkime 8c8c9a51cc refactor(service): visually break down the move function a bit 2024-03-18 11:32:02 +00:00
Houkime fed5735b24 refactor(service): break out DNS records into a separate resolver field 2024-03-18 11:32:02 +00:00
Houkime b257d7f39e fix(service): FAILING TESTS, rebuild when moving 2024-03-18 11:32:02 +00:00
Houkime 70a0287794 refactor(service): move finishing the job out of moving function 2024-03-18 11:32:02 +00:00
Houkime 534d965cab refactor(service): break out sync rebuilding 2024-03-18 11:32:02 +00:00
Houkime f333e791e1 refactor(service): break out ServiceStatus and ServiceDNSRecord 2024-03-18 11:32:02 +00:00
houkime 962e8d5ca7 Merge pull request 'CI: run pytest and coverage tests inside ephemeral VM in the "builder" VM (nested)' (#103) from ci-vm-for-pytest into master
continuous-integration/drone/push Build is passing Details
Reviewed-on: #103
Reviewed-by: houkime <houkime@protonmail.com>
2024-03-18 12:07:54 +02:00
Alexander 5e29816c84 ci: delete USE_REDIS_PORT environment variable
continuous-integration/drone/push Build is passing Details
2024-03-16 00:18:01 +04:00
Alexander 53ec774c90 flake: VM test: remove Redis service port number setting
continuous-integration/drone/push Build is passing Details
2024-03-15 16:23:21 +04:00
Inex Code bda21b7507 fix: Mark md5 as not used for security 2024-03-15 16:14:31 +04:00
Inex Code 2d5ac51c06 fix: future mock are now more in the future 2024-03-15 16:14:31 +04:00
Alexander 61b9a00cea ci: run pytest and coverage as part of nix flake check in VM 2024-03-15 16:14:31 +04:00
20 changed files with 332 additions and 151 deletions

View File

@ -5,18 +5,11 @@ name: default
steps:
- name: Run Tests and Generate Coverage Report
commands:
- kill $(ps aux | grep 'redis-server 127.0.0.1:6389' | awk '{print $2}') || true
- redis-server --bind 127.0.0.1 --port 6389 >/dev/null &
# We do not care about persistance on CI
- sleep 10
- redis-cli -h 127.0.0.1 -p 6389 config set stop-writes-on-bgsave-error no
- coverage run -m pytest -q
- coverage xml
- nix flake check -L
- sonar-scanner -Dsonar.projectKey=SelfPrivacy-REST-API -Dsonar.sources=. -Dsonar.host.url=http://analyzer.lan:9000 -Dsonar.login="$SONARQUBE_TOKEN"
environment:
SONARQUBE_TOKEN:
from_secret: SONARQUBE_TOKEN
USE_REDIS_PORT: 6389
- name: Run Bandit Checks

View File

@ -27,6 +27,7 @@
python-lsp-server
pyflakes
typer # for strawberry
types-redis # for mypy
] ++ strawberry-graphql.optional-dependencies.cli));
vmtest-src-dir = "/root/source";
@ -135,7 +136,6 @@
services.redis.servers.sp-api = {
enable = true;
save = [ ];
port = 6379; # FIXME
settings.notify-keyspace-events = "KEA";
};
environment.systemPackages = with pkgs; [

View File

@ -2,6 +2,7 @@ import typing
import strawberry
# TODO: use https://strawberry.rocks/docs/integrations/pydantic when it is stable
@strawberry.type
class DnsRecord:
"""DNS record"""

View File

@ -1,14 +1,17 @@
from enum import Enum
import typing
import strawberry
from typing import Optional, List
import datetime
import strawberry
from selfprivacy_api.graphql.common_types.backup import BackupReason
from selfprivacy_api.graphql.common_types.dns import DnsRecord
from selfprivacy_api.services import get_service_by_id, get_services_by_location
from selfprivacy_api.services import Service as ServiceInterface
from selfprivacy_api.services import ServiceDnsRecord
from selfprivacy_api.utils.block_devices import BlockDevices
import selfprivacy_api.utils.network as network_utils
from selfprivacy_api.utils.network import get_ip4, get_ip6
def get_usages(root: "StorageVolume") -> list["StorageUsageInterface"]:
@ -33,8 +36,8 @@ class StorageVolume:
used_space: str
root: bool
name: str
model: typing.Optional[str]
serial: typing.Optional[str]
model: Optional[str]
serial: Optional[str]
type: str
@strawberry.field
@ -46,7 +49,7 @@ class StorageVolume:
@strawberry.interface
class StorageUsageInterface:
used_space: str
volume: typing.Optional[StorageVolume]
volume: Optional[StorageVolume]
title: str
@ -54,7 +57,7 @@ class StorageUsageInterface:
class ServiceStorageUsage(StorageUsageInterface):
"""Storage usage for a service"""
service: typing.Optional["Service"]
service: Optional["Service"]
@strawberry.enum
@ -86,6 +89,20 @@ def get_storage_usage(root: "Service") -> ServiceStorageUsage:
)
# TODO: This won't be needed when deriving DnsRecord via strawberry pydantic integration
# https://strawberry.rocks/docs/integrations/pydantic
# Remove when the link above says it got stable.
def service_dns_to_graphql(record: ServiceDnsRecord) -> DnsRecord:
return DnsRecord(
record_type=record.type,
name=record.name,
content=record.content,
ttl=record.ttl,
priority=record.priority,
display_name=record.display_name,
)
@strawberry.type
class Service:
id: str
@ -98,16 +115,26 @@ class Service:
can_be_backed_up: bool
backup_description: str
status: ServiceStatusEnum
url: typing.Optional[str]
dns_records: typing.Optional[typing.List[DnsRecord]]
url: Optional[str]
@strawberry.field
def dns_records(self) -> Optional[List[DnsRecord]]:
service = get_service_by_id(self.id)
if service is None:
raise LookupError(f"no service {self.id}. Should be unreachable")
raw_records = service.get_dns_records(get_ip4(), get_ip6())
dns_records = [service_dns_to_graphql(record) for record in raw_records]
return dns_records
@strawberry.field
def storage_usage(self) -> ServiceStorageUsage:
"""Get storage usage for a service"""
return get_storage_usage(self)
# TODO: fill this
@strawberry.field
def backup_snapshots(self) -> typing.Optional[typing.List["SnapshotInfo"]]:
def backup_snapshots(self) -> Optional[List["SnapshotInfo"]]:
return None
@ -133,23 +160,10 @@ def service_to_graphql_service(service: ServiceInterface) -> Service:
backup_description=service.get_backup_description(),
status=ServiceStatusEnum(service.get_status().value),
url=service.get_url(),
dns_records=[
DnsRecord(
record_type=record.type,
name=record.name,
content=record.content,
ttl=record.ttl,
priority=record.priority,
display_name=record.display_name,
)
for record in service.get_dns_records(
network_utils.get_ip4(), network_utils.get_ip6()
)
],
)
def get_volume_by_id(volume_id: str) -> typing.Optional[StorageVolume]:
def get_volume_by_id(volume_id: str) -> Optional[StorageVolume]:
"""Get volume by id"""
volume = BlockDevices().get_block_device(volume_id)
if volume is None:

View File

@ -63,9 +63,13 @@ def check_running_status(job: Job, unit_name: str):
return False
@huey.task()
def rebuild_system_task(job: Job, upgrade: bool = False):
"""Rebuild the system"""
def rebuild_system(job: Job, upgrade: bool = False):
"""
Broken out to allow calling it synchronously.
We cannot just block until task is done because it will require a second worker
Which we do not have
"""
unit_name = "sp-nixos-upgrade.service" if upgrade else "sp-nixos-rebuild.service"
try:
command = ["systemctl", "start", unit_name]
@ -124,3 +128,9 @@ def rebuild_system_task(job: Job, upgrade: bool = False):
status=JobStatus.ERROR,
status_text=str(e),
)
@huey.task()
def rebuild_system_task(job: Job, upgrade: bool = False):
"""Rebuild the system"""
rebuild_system(job, upgrade)

View File

@ -0,0 +1,24 @@
from enum import Enum
from typing import Optional
from pydantic import BaseModel
class ServiceStatus(Enum):
"""Enum for service status"""
ACTIVE = "ACTIVE"
RELOADING = "RELOADING"
INACTIVE = "INACTIVE"
FAILED = "FAILED"
ACTIVATING = "ACTIVATING"
DEACTIVATING = "DEACTIVATING"
OFF = "OFF"
class ServiceDnsRecord(BaseModel):
type: str
name: str
content: str
ttl: int
display_name: str
priority: Optional[int] = None

View File

@ -30,7 +30,7 @@ class RedisTokensRepository(AbstractTokensRepository):
@staticmethod
def token_key_for_device(device_name: str):
md5_hash = md5()
md5_hash = md5(usedforsecurity=False)
md5_hash.update(bytes(device_name, "utf-8"))
digest = md5_hash.hexdigest()
return TOKENS_PREFIX + digest

View File

@ -1,13 +1,16 @@
"""Abstract class for a service running on a server"""
from abc import ABC, abstractmethod
from enum import Enum
from typing import List, Optional
from pydantic import BaseModel
from selfprivacy_api.jobs import Job, Jobs, JobStatus, report_progress
from selfprivacy_api import utils
from selfprivacy_api.utils import ReadUserData, WriteUserData
from selfprivacy_api.utils.waitloop import wait_until_true
from selfprivacy_api.utils.block_devices import BlockDevice, BlockDevices
from selfprivacy_api.jobs import Job, Jobs, JobStatus, report_progress
from selfprivacy_api.jobs.upgrade_system import rebuild_system
from selfprivacy_api.models.services import ServiceStatus, ServiceDnsRecord
from selfprivacy_api.services.generic_size_counter import get_storage_usage
from selfprivacy_api.services.owned_path import OwnedPath, Bind
from selfprivacy_api.services.moving import (
@ -20,34 +23,10 @@ from selfprivacy_api.services.moving import (
move_data_to_volume,
)
from selfprivacy_api import utils
from selfprivacy_api.utils.waitloop import wait_until_true
from selfprivacy_api.utils import ReadUserData, WriteUserData
DEFAULT_START_STOP_TIMEOUT = 5 * 60
class ServiceStatus(Enum):
"""Enum for service status"""
ACTIVE = "ACTIVE"
RELOADING = "RELOADING"
INACTIVE = "INACTIVE"
FAILED = "FAILED"
ACTIVATING = "ACTIVATING"
DEACTIVATING = "DEACTIVATING"
OFF = "OFF"
class ServiceDnsRecord(BaseModel):
type: str
name: str
content: str
ttl: int
display_name: str
priority: Optional[int] = None
class Service(ABC):
"""
Service here is some software that is hosted on the server and
@ -387,14 +366,6 @@ class Service(ABC):
report_progress(95, job, f"Finishing moving {service_name}...")
self.set_location(new_volume)
Jobs.update(
job=job,
status=JobStatus.FINISHED,
result=f"{service_name} moved successfully.",
status_text=f"Starting {service_name}...",
progress=100,
)
def move_to_volume(self, volume: BlockDevice, job: Job) -> Job:
service_name = self.get_display_name()
@ -407,6 +378,17 @@ class Service(ABC):
report_progress(9, job, "Stopped service, starting the move...")
self.do_move_to_volume(volume, job)
report_progress(98, job, "Move complete, rebuilding...")
rebuild_system(job, upgrade=False)
Jobs.update(
job=job,
status=JobStatus.FINISHED,
result=f"{service_name} moved successfully.",
status_text=f"Starting {service_name}...",
progress=100,
)
return job
@classmethod

View File

@ -8,10 +8,9 @@ from os import path
# from enum import Enum
from selfprivacy_api.jobs import Job, Jobs, JobStatus
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
from selfprivacy_api.jobs import Job
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.utils.block_devices import BlockDevice
import selfprivacy_api.utils.network as network_utils
from selfprivacy_api.services.test_service.icon import BITWARDEN_ICON
@ -89,7 +88,7 @@ class DummyService(Service):
@classmethod
def set_status(cls, status: ServiceStatus):
with open(cls.status_file(), "w") as file:
status_string = file.write(status.value)
file.write(status.value)
@classmethod
def get_status(cls) -> ServiceStatus:
@ -102,16 +101,17 @@ class DummyService(Service):
cls, new_status: ServiceStatus, delay_sec: float
):
"""simulating a delay on systemd side"""
status_file = cls.status_file()
if delay_sec == 0:
cls.set_status(new_status)
return
status_file = cls.status_file()
command = [
"bash",
"-c",
f" sleep {delay_sec} && echo {new_status.value} > {status_file}",
]
handle = subprocess.Popen(command)
if delay_sec == 0:
handle.communicate()
subprocess.Popen(command)
@classmethod
def set_backuppable(cls, new_value: bool) -> None:
@ -192,6 +192,5 @@ class DummyService(Service):
if self.simulate_moving is False:
return super(DummyService, self).do_move_to_volume(volume, job)
else:
Jobs.update(job, status=JobStatus.FINISHED)
self.set_drive(volume.name)
return job

View File

@ -1,5 +1,12 @@
from os import environ
from selfprivacy_api.utils.huey import huey
from selfprivacy_api.jobs.test import test_job
from selfprivacy_api.backup.tasks import *
from selfprivacy_api.services.tasks import move_service
from selfprivacy_api.jobs.upgrade_system import rebuild_system_task
from selfprivacy_api.jobs.test import test_job
if environ.get("TEST_MODE"):
from tests.test_huey import sum

View File

@ -1,16 +1,24 @@
"""MiniHuey singleton."""
import os
from huey import SqliteHuey
from os import environ
from huey import RedisHuey
from selfprivacy_api.utils.redis_pool import RedisPool
HUEY_DATABASE_NUMBER = 10
def immediate() -> bool:
if environ.get("HUEY_QUEUES_FOR_TESTS"):
return False
if environ.get("TEST_MODE"):
return True
return False
HUEY_DATABASE = "/etc/selfprivacy/tasks.db"
# Singleton instance containing the huey database.
test_mode = os.environ.get("TEST_MODE")
huey = SqliteHuey(
huey = RedisHuey(
"selfprivacy-api",
filename=HUEY_DATABASE if not test_mode else None,
immediate=test_mode == "true",
url=RedisPool.connection_url(dbnumber=HUEY_DATABASE_NUMBER),
immediate=immediate(),
utc=True,
)

View File

@ -1,8 +1,8 @@
"""
Redis pool module for selfprivacy_api
"""
from os import environ
import redis
from selfprivacy_api.utils.singleton_metaclass import SingletonMetaclass
REDIS_SOCKET = "/run/redis-sp-api/redis.sock"
@ -14,20 +14,20 @@ class RedisPool(metaclass=SingletonMetaclass):
"""
def __init__(self):
if "USE_REDIS_PORT" in environ:
self._pool = redis.ConnectionPool(
host="127.0.0.1",
port=int(environ["USE_REDIS_PORT"]),
decode_responses=True,
)
else:
self._pool = redis.ConnectionPool.from_url(
f"unix://{REDIS_SOCKET}",
decode_responses=True,
)
self._pool = redis.ConnectionPool.from_url(
RedisPool.connection_url(dbnumber=0),
decode_responses=True,
)
self._pubsub_connection = self.get_connection()
@staticmethod
def connection_url(dbnumber: int) -> str:
"""
redis://[[username]:[password]]@localhost:6379/0
unix://[username@]/path/to/socket.sock?db=0[&password=password]
"""
return f"unix://{REDIS_SOCKET}?db={dbnumber}"
def get_connection(self):
"""
Get a connection from the pool.

View File

@ -2,16 +2,16 @@
import subprocess
from typing import List
from selfprivacy_api.services.service import ServiceStatus
from selfprivacy_api.models.services import ServiceStatus
def get_service_status(service: str) -> ServiceStatus:
def get_service_status(unit: str) -> ServiceStatus:
"""
Return service status from systemd.
Use systemctl show to get the status of a service.
Get ActiveState from the output.
"""
service_status = subprocess.check_output(["systemctl", "show", service])
service_status = subprocess.check_output(["systemctl", "show", unit])
if b"LoadState=not-found" in service_status:
return ServiceStatus.OFF
if b"ActiveState=active" in service_status:

View File

@ -7,16 +7,16 @@ RECOVERY_KEY_VALIDATION_DATETIME = "selfprivacy_api.models.tokens.time.datetime"
DEVICE_KEY_VALIDATION_DATETIME = RECOVERY_KEY_VALIDATION_DATETIME
def ten_minutes_into_future_naive():
return datetime.now() + timedelta(minutes=10)
def ten_hours_into_future_naive():
return datetime.now() + timedelta(hours=10)
def ten_minutes_into_future_naive_utc():
return datetime.utcnow() + timedelta(minutes=10)
def ten_hours_into_future_naive_utc():
return datetime.utcnow() + timedelta(hours=10)
def ten_minutes_into_future():
return datetime.now(timezone.utc) + timedelta(minutes=10)
def ten_hours_into_future():
return datetime.now(timezone.utc) + timedelta(hours=10)
def ten_minutes_into_past_naive():
@ -34,11 +34,11 @@ def ten_minutes_into_past():
class NearFuture(datetime):
@classmethod
def now(cls, tz=None):
return datetime.now(tz) + timedelta(minutes=13)
return datetime.now(tz) + timedelta(hours=13)
@classmethod
def utcnow(cls):
return datetime.utcnow() + timedelta(minutes=13)
return datetime.utcnow() + timedelta(hours=13)
def read_json(file_path):

View File

@ -99,23 +99,14 @@ def generic_userdata(mocker, tmpdir):
@pytest.fixture
def huey_database(mocker, shared_datadir):
"""Mock huey database."""
mock = mocker.patch(
"selfprivacy_api.utils.huey.HUEY_DATABASE", shared_datadir / "huey.db"
)
return mock
@pytest.fixture
def client(huey_database, redis_repo_with_tokens):
def client(redis_repo_with_tokens):
from selfprivacy_api.app import app
return TestClient(app)
@pytest.fixture
def authorized_client(huey_database, redis_repo_with_tokens):
def authorized_client(redis_repo_with_tokens):
"""Authorized test client fixture."""
from selfprivacy_api.app import app
@ -127,7 +118,7 @@ def authorized_client(huey_database, redis_repo_with_tokens):
@pytest.fixture
def wrong_auth_client(huey_database, redis_repo_with_tokens):
def wrong_auth_client(redis_repo_with_tokens):
"""Wrong token test client fixture."""
from selfprivacy_api.app import app

View File

@ -14,9 +14,9 @@ from tests.common import (
)
# Graphql API's output should be timezone-naive
from tests.common import ten_minutes_into_future_naive_utc as ten_minutes_into_future
from tests.common import ten_minutes_into_future as ten_minutes_into_future_tz
from tests.common import ten_minutes_into_past_naive_utc as ten_minutes_into_past
from tests.common import ten_hours_into_future_naive_utc as ten_hours_into_future
from tests.common import ten_hours_into_future as ten_hours_into_future_tz
from tests.common import ten_minutes_into_past_naive_utc as ten_hours_into_past
from tests.test_graphql.common import (
assert_empty,
@ -168,7 +168,7 @@ def test_graphql_generate_recovery_key(client, authorized_client):
@pytest.mark.parametrize(
"expiration_date", [ten_minutes_into_future(), ten_minutes_into_future_tz()]
"expiration_date", [ten_hours_into_future(), ten_hours_into_future_tz()]
)
def test_graphql_generate_recovery_key_with_expiration_date(
client, authorized_client, expiration_date: datetime
@ -193,7 +193,7 @@ def test_graphql_generate_recovery_key_with_expiration_date(
def test_graphql_use_recovery_key_after_expiration(client, authorized_client, mocker):
expiration_date = ten_minutes_into_future()
expiration_date = ten_hours_into_future()
key = graphql_make_new_recovery_key(authorized_client, expires_at=expiration_date)
# Timewarp to after it expires
@ -219,7 +219,7 @@ def test_graphql_use_recovery_key_after_expiration(client, authorized_client, mo
def test_graphql_generate_recovery_key_with_expiration_in_the_past(authorized_client):
expiration_date = ten_minutes_into_past()
expiration_date = ten_hours_into_past()
response = request_make_new_recovery_key(
authorized_client, expires_at=expiration_date
)

View File

@ -13,8 +13,7 @@ from selfprivacy_api.services.test_service import DummyService
from tests.common import generate_service_query
from tests.test_graphql.common import assert_empty, assert_ok, get_data
from tests.test_block_device_utils import lsblk_singular_mock
from tests.test_graphql.test_system_nixos_tasks import prepare_nixos_rebuild_calls
LSBLK_BLOCKDEVICES_DICTS = [
{
@ -618,10 +617,7 @@ def test_graphql_move_service_without_folders_on_old_volume(
def test_graphql_move_service(
authorized_client,
generic_userdata,
mock_check_volume,
dummy_service_with_binds,
authorized_client, generic_userdata, mock_check_volume, dummy_service_with_binds, fp
):
dummy_service = dummy_service_with_binds
@ -633,10 +629,30 @@ def test_graphql_move_service(
dummy_service.set_drive(origin)
dummy_service.set_simulated_moves(False)
unit_name = "sp-nixos-rebuild.service"
rebuild_command = ["systemctl", "start", unit_name]
prepare_nixos_rebuild_calls(fp, unit_name)
# We will be mounting and remounting folders
mount_command = ["mount", fp.any()]
unmount_command = ["umount", fp.any()]
fp.pass_command(mount_command, 2)
fp.pass_command(unmount_command, 2)
# We will be changing ownership
chown_command = ["chown", fp.any()]
fp.pass_command(chown_command, 2)
mutation_response = api_move(authorized_client, dummy_service, target)
data = get_data(mutation_response)["services"]["moveService"]
assert_ok(data)
assert data["service"] is not None
assert fp.call_count(rebuild_command) == 1
assert fp.call_count(mount_command) == 2
assert fp.call_count(unmount_command) == 2
assert fp.call_count(chown_command) == 2
def test_mailservice_cannot_enable_disable(authorized_client):

View File

@ -97,16 +97,7 @@ def test_graphql_system_rebuild_unauthorized(client, fp, action):
assert fp.call_count([fp.any()]) == 0
@pytest.mark.parametrize("action", ["rebuild", "upgrade"])
def test_graphql_system_rebuild(authorized_client, fp, action, mock_sleep_intervals):
"""Test system rebuild"""
unit_name = f"sp-nixos-{action}.service"
query = (
API_REBUILD_SYSTEM_MUTATION
if action == "rebuild"
else API_UPGRADE_SYSTEM_MUTATION
)
def prepare_nixos_rebuild_calls(fp, unit_name):
# Start the unit
fp.register(["systemctl", "start", unit_name])
@ -129,6 +120,19 @@ def test_graphql_system_rebuild(authorized_client, fp, action, mock_sleep_interv
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
@pytest.mark.parametrize("action", ["rebuild", "upgrade"])
def test_graphql_system_rebuild(authorized_client, fp, action, mock_sleep_intervals):
"""Test system rebuild"""
unit_name = f"sp-nixos-{action}.service"
query = (
API_REBUILD_SYSTEM_MUTATION
if action == "rebuild"
else API_UPGRADE_SYSTEM_MUTATION
)
prepare_nixos_rebuild_calls(fp, unit_name)
response = authorized_client.post(
"/graphql",
json={

132
tests/test_huey.py Normal file
View File

@ -0,0 +1,132 @@
import pytest
import redis
from typing import List
import subprocess
from subprocess import Popen, check_output, TimeoutExpired
from os import environ, path, set_blocking
from io import BufferedReader
from huey.exceptions import HueyException
from selfprivacy_api.utils.huey import huey, immediate, HUEY_DATABASE_NUMBER
from selfprivacy_api.utils.redis_pool import RedisPool, REDIS_SOCKET
@huey.task()
def sum(a: int, b: int) -> int:
return a + b
def reset_huey_storage():
huey.storage = huey.create_storage()
def flush_huey_redis_forcefully():
url = RedisPool.connection_url(HUEY_DATABASE_NUMBER)
pool = redis.ConnectionPool.from_url(url, decode_responses=True)
connection = redis.Redis(connection_pool=pool)
connection.flushdb()
# TODO: may be useful in other places too, move to utils/ tests common if using it somewhere
def read_all_ready_output(stream: BufferedReader) -> str:
set_blocking(stream.fileno(), False)
output: List[bytes] = []
while True:
line = stream.readline()
raise ValueError(line)
if line == b"":
break
else:
output.append(line)
set_blocking(stream.fileno(), True)
result = b"".join(output)
return result.decode("utf-8")
@pytest.fixture()
def not_immediate():
assert environ["TEST_MODE"] == "true"
old_immediate = huey.immediate
environ["HUEY_QUEUES_FOR_TESTS"] = "Yes"
huey.immediate = False
assert huey.immediate is False
yield
del environ["HUEY_QUEUES_FOR_TESTS"]
huey.immediate = old_immediate
assert huey.immediate == old_immediate
@pytest.fixture()
def huey_socket_consumer(not_immediate):
"""
Same as above, but with socketed redis
"""
flush_huey_redis_forcefully()
command = ["huey_consumer.py", "selfprivacy_api.task_registry.huey"]
# First assert that consumer does not fail by itself
# Idk yet how to do it more elegantly
try:
check_output(command, timeout=2)
except TimeoutExpired:
pass
# Then open it for real
consumer_handle = Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert path.exists(REDIS_SOCKET)
yield consumer_handle
consumer_handle.kill()
def test_huey_over_redis_socket(huey_socket_consumer):
assert huey.immediate is False
assert immediate() is False
assert "unix" in RedisPool.connection_url(HUEY_DATABASE_NUMBER)
try:
assert (
RedisPool.connection_url(HUEY_DATABASE_NUMBER)
in huey.storage_kwargs.values()
)
except AssertionError:
raise ValueError(
"our test-side huey does not connect over socket: ", huey.storage_kwargs
)
result = sum(2, 5)
try:
assert result(blocking=True, timeout=10) == 7
except HueyException as error:
if "timed out" in str(error):
output = read_all_ready_output(huey_socket_consumer.stdout)
errorstream = read_all_ready_output(huey_socket_consumer.stderr)
raise TimeoutError(
f"Huey timed out: {str(error)}",
f"Consumer output: {output}",
f"Consumer errorstream: {errorstream}",
)
else:
raise error
@pytest.mark.xfail(reason="cannot yet schedule with sockets for some reason")
def test_huey_schedule(huey_queues_socket):
# We do not schedule tasks anywhere, but concerning that it fails.
sum.schedule((2, 5), delay=10)
try:
assert len(huey.scheduled()) == 1
except AssertionError:
raise ValueError("have wrong amount of scheduled tasks", huey.scheduled())

View File

@ -24,7 +24,7 @@ from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
from tests.common import ten_minutes_into_past, ten_minutes_into_future
from tests.common import ten_minutes_into_past, ten_hours_into_future
ORIGINAL_DEVICE_NAMES = [