Compare commits
23 Commits
3e44b3999f
...
6f38b2309f
Author | SHA1 | Date |
---|---|---|
Houkime | 6f38b2309f | |
Houkime | baf7843349 | |
Houkime | 8e48a5ad5f | |
Houkime | fde461b4b9 | |
Houkime | 9954737791 | |
Houkime | 2b19633cbd | |
Houkime | 83592b7bf4 | |
houkime | efc6b47cfe | |
Houkime | b2edfe784a | |
Houkime | 6e29da4a4f | |
Houkime | 12b2153b7c | |
Houkime | 8c8c9a51cc | |
Houkime | fed5735b24 | |
Houkime | b257d7f39e | |
Houkime | 70a0287794 | |
Houkime | 534d965cab | |
Houkime | f333e791e1 | |
houkime | 962e8d5ca7 | |
Alexander | 5e29816c84 | |
Alexander | 53ec774c90 | |
Inex Code | bda21b7507 | |
Inex Code | 2d5ac51c06 | |
Alexander | 61b9a00cea |
|
@ -5,18 +5,11 @@ name: default
|
|||
steps:
|
||||
- name: Run Tests and Generate Coverage Report
|
||||
commands:
|
||||
- kill $(ps aux | grep 'redis-server 127.0.0.1:6389' | awk '{print $2}') || true
|
||||
- redis-server --bind 127.0.0.1 --port 6389 >/dev/null &
|
||||
# We do not care about persistance on CI
|
||||
- sleep 10
|
||||
- redis-cli -h 127.0.0.1 -p 6389 config set stop-writes-on-bgsave-error no
|
||||
- coverage run -m pytest -q
|
||||
- coverage xml
|
||||
- nix flake check -L
|
||||
- sonar-scanner -Dsonar.projectKey=SelfPrivacy-REST-API -Dsonar.sources=. -Dsonar.host.url=http://analyzer.lan:9000 -Dsonar.login="$SONARQUBE_TOKEN"
|
||||
environment:
|
||||
SONARQUBE_TOKEN:
|
||||
from_secret: SONARQUBE_TOKEN
|
||||
USE_REDIS_PORT: 6389
|
||||
|
||||
|
||||
- name: Run Bandit Checks
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
python-lsp-server
|
||||
pyflakes
|
||||
typer # for strawberry
|
||||
types-redis # for mypy
|
||||
] ++ strawberry-graphql.optional-dependencies.cli));
|
||||
|
||||
vmtest-src-dir = "/root/source";
|
||||
|
@ -135,7 +136,6 @@
|
|||
services.redis.servers.sp-api = {
|
||||
enable = true;
|
||||
save = [ ];
|
||||
port = 6379; # FIXME
|
||||
settings.notify-keyspace-events = "KEA";
|
||||
};
|
||||
environment.systemPackages = with pkgs; [
|
||||
|
|
|
@ -2,6 +2,7 @@ import typing
|
|||
import strawberry
|
||||
|
||||
|
||||
# TODO: use https://strawberry.rocks/docs/integrations/pydantic when it is stable
|
||||
@strawberry.type
|
||||
class DnsRecord:
|
||||
"""DNS record"""
|
||||
|
|
|
@ -1,14 +1,17 @@
|
|||
from enum import Enum
|
||||
import typing
|
||||
import strawberry
|
||||
from typing import Optional, List
|
||||
import datetime
|
||||
import strawberry
|
||||
|
||||
from selfprivacy_api.graphql.common_types.backup import BackupReason
|
||||
from selfprivacy_api.graphql.common_types.dns import DnsRecord
|
||||
|
||||
from selfprivacy_api.services import get_service_by_id, get_services_by_location
|
||||
from selfprivacy_api.services import Service as ServiceInterface
|
||||
from selfprivacy_api.services import ServiceDnsRecord
|
||||
|
||||
from selfprivacy_api.utils.block_devices import BlockDevices
|
||||
import selfprivacy_api.utils.network as network_utils
|
||||
from selfprivacy_api.utils.network import get_ip4, get_ip6
|
||||
|
||||
|
||||
def get_usages(root: "StorageVolume") -> list["StorageUsageInterface"]:
|
||||
|
@ -33,8 +36,8 @@ class StorageVolume:
|
|||
used_space: str
|
||||
root: bool
|
||||
name: str
|
||||
model: typing.Optional[str]
|
||||
serial: typing.Optional[str]
|
||||
model: Optional[str]
|
||||
serial: Optional[str]
|
||||
type: str
|
||||
|
||||
@strawberry.field
|
||||
|
@ -46,7 +49,7 @@ class StorageVolume:
|
|||
@strawberry.interface
|
||||
class StorageUsageInterface:
|
||||
used_space: str
|
||||
volume: typing.Optional[StorageVolume]
|
||||
volume: Optional[StorageVolume]
|
||||
title: str
|
||||
|
||||
|
||||
|
@ -54,7 +57,7 @@ class StorageUsageInterface:
|
|||
class ServiceStorageUsage(StorageUsageInterface):
|
||||
"""Storage usage for a service"""
|
||||
|
||||
service: typing.Optional["Service"]
|
||||
service: Optional["Service"]
|
||||
|
||||
|
||||
@strawberry.enum
|
||||
|
@ -86,6 +89,20 @@ def get_storage_usage(root: "Service") -> ServiceStorageUsage:
|
|||
)
|
||||
|
||||
|
||||
# TODO: This won't be needed when deriving DnsRecord via strawberry pydantic integration
|
||||
# https://strawberry.rocks/docs/integrations/pydantic
|
||||
# Remove when the link above says it got stable.
|
||||
def service_dns_to_graphql(record: ServiceDnsRecord) -> DnsRecord:
|
||||
return DnsRecord(
|
||||
record_type=record.type,
|
||||
name=record.name,
|
||||
content=record.content,
|
||||
ttl=record.ttl,
|
||||
priority=record.priority,
|
||||
display_name=record.display_name,
|
||||
)
|
||||
|
||||
|
||||
@strawberry.type
|
||||
class Service:
|
||||
id: str
|
||||
|
@ -98,16 +115,26 @@ class Service:
|
|||
can_be_backed_up: bool
|
||||
backup_description: str
|
||||
status: ServiceStatusEnum
|
||||
url: typing.Optional[str]
|
||||
dns_records: typing.Optional[typing.List[DnsRecord]]
|
||||
url: Optional[str]
|
||||
|
||||
@strawberry.field
|
||||
def dns_records(self) -> Optional[List[DnsRecord]]:
|
||||
service = get_service_by_id(self.id)
|
||||
if service is None:
|
||||
raise LookupError(f"no service {self.id}. Should be unreachable")
|
||||
|
||||
raw_records = service.get_dns_records(get_ip4(), get_ip6())
|
||||
dns_records = [service_dns_to_graphql(record) for record in raw_records]
|
||||
return dns_records
|
||||
|
||||
@strawberry.field
|
||||
def storage_usage(self) -> ServiceStorageUsage:
|
||||
"""Get storage usage for a service"""
|
||||
return get_storage_usage(self)
|
||||
|
||||
# TODO: fill this
|
||||
@strawberry.field
|
||||
def backup_snapshots(self) -> typing.Optional[typing.List["SnapshotInfo"]]:
|
||||
def backup_snapshots(self) -> Optional[List["SnapshotInfo"]]:
|
||||
return None
|
||||
|
||||
|
||||
|
@ -133,23 +160,10 @@ def service_to_graphql_service(service: ServiceInterface) -> Service:
|
|||
backup_description=service.get_backup_description(),
|
||||
status=ServiceStatusEnum(service.get_status().value),
|
||||
url=service.get_url(),
|
||||
dns_records=[
|
||||
DnsRecord(
|
||||
record_type=record.type,
|
||||
name=record.name,
|
||||
content=record.content,
|
||||
ttl=record.ttl,
|
||||
priority=record.priority,
|
||||
display_name=record.display_name,
|
||||
)
|
||||
for record in service.get_dns_records(
|
||||
network_utils.get_ip4(), network_utils.get_ip6()
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
def get_volume_by_id(volume_id: str) -> typing.Optional[StorageVolume]:
|
||||
def get_volume_by_id(volume_id: str) -> Optional[StorageVolume]:
|
||||
"""Get volume by id"""
|
||||
volume = BlockDevices().get_block_device(volume_id)
|
||||
if volume is None:
|
||||
|
|
|
@ -63,9 +63,13 @@ def check_running_status(job: Job, unit_name: str):
|
|||
return False
|
||||
|
||||
|
||||
@huey.task()
|
||||
def rebuild_system_task(job: Job, upgrade: bool = False):
|
||||
"""Rebuild the system"""
|
||||
def rebuild_system(job: Job, upgrade: bool = False):
|
||||
"""
|
||||
Broken out to allow calling it synchronously.
|
||||
We cannot just block until task is done because it will require a second worker
|
||||
Which we do not have
|
||||
"""
|
||||
|
||||
unit_name = "sp-nixos-upgrade.service" if upgrade else "sp-nixos-rebuild.service"
|
||||
try:
|
||||
command = ["systemctl", "start", unit_name]
|
||||
|
@ -124,3 +128,9 @@ def rebuild_system_task(job: Job, upgrade: bool = False):
|
|||
status=JobStatus.ERROR,
|
||||
status_text=str(e),
|
||||
)
|
||||
|
||||
|
||||
@huey.task()
|
||||
def rebuild_system_task(job: Job, upgrade: bool = False):
|
||||
"""Rebuild the system"""
|
||||
rebuild_system(job, upgrade)
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
from enum import Enum
|
||||
from typing import Optional
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class ServiceStatus(Enum):
|
||||
"""Enum for service status"""
|
||||
|
||||
ACTIVE = "ACTIVE"
|
||||
RELOADING = "RELOADING"
|
||||
INACTIVE = "INACTIVE"
|
||||
FAILED = "FAILED"
|
||||
ACTIVATING = "ACTIVATING"
|
||||
DEACTIVATING = "DEACTIVATING"
|
||||
OFF = "OFF"
|
||||
|
||||
|
||||
class ServiceDnsRecord(BaseModel):
|
||||
type: str
|
||||
name: str
|
||||
content: str
|
||||
ttl: int
|
||||
display_name: str
|
||||
priority: Optional[int] = None
|
|
@ -30,7 +30,7 @@ class RedisTokensRepository(AbstractTokensRepository):
|
|||
|
||||
@staticmethod
|
||||
def token_key_for_device(device_name: str):
|
||||
md5_hash = md5()
|
||||
md5_hash = md5(usedforsecurity=False)
|
||||
md5_hash.update(bytes(device_name, "utf-8"))
|
||||
digest = md5_hash.hexdigest()
|
||||
return TOKENS_PREFIX + digest
|
||||
|
|
|
@ -1,13 +1,16 @@
|
|||
"""Abstract class for a service running on a server"""
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
from typing import List, Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
from selfprivacy_api.jobs import Job, Jobs, JobStatus, report_progress
|
||||
|
||||
from selfprivacy_api import utils
|
||||
from selfprivacy_api.utils import ReadUserData, WriteUserData
|
||||
from selfprivacy_api.utils.waitloop import wait_until_true
|
||||
from selfprivacy_api.utils.block_devices import BlockDevice, BlockDevices
|
||||
|
||||
from selfprivacy_api.jobs import Job, Jobs, JobStatus, report_progress
|
||||
from selfprivacy_api.jobs.upgrade_system import rebuild_system
|
||||
|
||||
from selfprivacy_api.models.services import ServiceStatus, ServiceDnsRecord
|
||||
from selfprivacy_api.services.generic_size_counter import get_storage_usage
|
||||
from selfprivacy_api.services.owned_path import OwnedPath, Bind
|
||||
from selfprivacy_api.services.moving import (
|
||||
|
@ -20,34 +23,10 @@ from selfprivacy_api.services.moving import (
|
|||
move_data_to_volume,
|
||||
)
|
||||
|
||||
from selfprivacy_api import utils
|
||||
from selfprivacy_api.utils.waitloop import wait_until_true
|
||||
from selfprivacy_api.utils import ReadUserData, WriteUserData
|
||||
|
||||
DEFAULT_START_STOP_TIMEOUT = 5 * 60
|
||||
|
||||
|
||||
class ServiceStatus(Enum):
|
||||
"""Enum for service status"""
|
||||
|
||||
ACTIVE = "ACTIVE"
|
||||
RELOADING = "RELOADING"
|
||||
INACTIVE = "INACTIVE"
|
||||
FAILED = "FAILED"
|
||||
ACTIVATING = "ACTIVATING"
|
||||
DEACTIVATING = "DEACTIVATING"
|
||||
OFF = "OFF"
|
||||
|
||||
|
||||
class ServiceDnsRecord(BaseModel):
|
||||
type: str
|
||||
name: str
|
||||
content: str
|
||||
ttl: int
|
||||
display_name: str
|
||||
priority: Optional[int] = None
|
||||
|
||||
|
||||
class Service(ABC):
|
||||
"""
|
||||
Service here is some software that is hosted on the server and
|
||||
|
@ -387,14 +366,6 @@ class Service(ABC):
|
|||
report_progress(95, job, f"Finishing moving {service_name}...")
|
||||
self.set_location(new_volume)
|
||||
|
||||
Jobs.update(
|
||||
job=job,
|
||||
status=JobStatus.FINISHED,
|
||||
result=f"{service_name} moved successfully.",
|
||||
status_text=f"Starting {service_name}...",
|
||||
progress=100,
|
||||
)
|
||||
|
||||
def move_to_volume(self, volume: BlockDevice, job: Job) -> Job:
|
||||
service_name = self.get_display_name()
|
||||
|
||||
|
@ -407,6 +378,17 @@ class Service(ABC):
|
|||
report_progress(9, job, "Stopped service, starting the move...")
|
||||
self.do_move_to_volume(volume, job)
|
||||
|
||||
report_progress(98, job, "Move complete, rebuilding...")
|
||||
rebuild_system(job, upgrade=False)
|
||||
|
||||
Jobs.update(
|
||||
job=job,
|
||||
status=JobStatus.FINISHED,
|
||||
result=f"{service_name} moved successfully.",
|
||||
status_text=f"Starting {service_name}...",
|
||||
progress=100,
|
||||
)
|
||||
|
||||
return job
|
||||
|
||||
@classmethod
|
||||
|
|
|
@ -8,10 +8,9 @@ from os import path
|
|||
|
||||
# from enum import Enum
|
||||
|
||||
from selfprivacy_api.jobs import Job, Jobs, JobStatus
|
||||
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
|
||||
from selfprivacy_api.jobs import Job
|
||||
from selfprivacy_api.services.service import Service, ServiceStatus
|
||||
from selfprivacy_api.utils.block_devices import BlockDevice
|
||||
import selfprivacy_api.utils.network as network_utils
|
||||
|
||||
from selfprivacy_api.services.test_service.icon import BITWARDEN_ICON
|
||||
|
||||
|
@ -89,7 +88,7 @@ class DummyService(Service):
|
|||
@classmethod
|
||||
def set_status(cls, status: ServiceStatus):
|
||||
with open(cls.status_file(), "w") as file:
|
||||
status_string = file.write(status.value)
|
||||
file.write(status.value)
|
||||
|
||||
@classmethod
|
||||
def get_status(cls) -> ServiceStatus:
|
||||
|
@ -102,16 +101,17 @@ class DummyService(Service):
|
|||
cls, new_status: ServiceStatus, delay_sec: float
|
||||
):
|
||||
"""simulating a delay on systemd side"""
|
||||
status_file = cls.status_file()
|
||||
if delay_sec == 0:
|
||||
cls.set_status(new_status)
|
||||
return
|
||||
|
||||
status_file = cls.status_file()
|
||||
command = [
|
||||
"bash",
|
||||
"-c",
|
||||
f" sleep {delay_sec} && echo {new_status.value} > {status_file}",
|
||||
]
|
||||
handle = subprocess.Popen(command)
|
||||
if delay_sec == 0:
|
||||
handle.communicate()
|
||||
subprocess.Popen(command)
|
||||
|
||||
@classmethod
|
||||
def set_backuppable(cls, new_value: bool) -> None:
|
||||
|
@ -192,6 +192,5 @@ class DummyService(Service):
|
|||
if self.simulate_moving is False:
|
||||
return super(DummyService, self).do_move_to_volume(volume, job)
|
||||
else:
|
||||
Jobs.update(job, status=JobStatus.FINISHED)
|
||||
self.set_drive(volume.name)
|
||||
return job
|
||||
|
|
|
@ -1,5 +1,12 @@
|
|||
from os import environ
|
||||
|
||||
from selfprivacy_api.utils.huey import huey
|
||||
from selfprivacy_api.jobs.test import test_job
|
||||
|
||||
from selfprivacy_api.backup.tasks import *
|
||||
from selfprivacy_api.services.tasks import move_service
|
||||
from selfprivacy_api.jobs.upgrade_system import rebuild_system_task
|
||||
|
||||
from selfprivacy_api.jobs.test import test_job
|
||||
|
||||
if environ.get("TEST_MODE"):
|
||||
from tests.test_huey import sum
|
||||
|
|
|
@ -1,16 +1,24 @@
|
|||
"""MiniHuey singleton."""
|
||||
import os
|
||||
from huey import SqliteHuey
|
||||
from os import environ
|
||||
from huey import RedisHuey
|
||||
|
||||
from selfprivacy_api.utils.redis_pool import RedisPool
|
||||
|
||||
HUEY_DATABASE_NUMBER = 10
|
||||
|
||||
|
||||
def immediate() -> bool:
|
||||
if environ.get("HUEY_QUEUES_FOR_TESTS"):
|
||||
return False
|
||||
if environ.get("TEST_MODE"):
|
||||
return True
|
||||
return False
|
||||
|
||||
HUEY_DATABASE = "/etc/selfprivacy/tasks.db"
|
||||
|
||||
# Singleton instance containing the huey database.
|
||||
|
||||
test_mode = os.environ.get("TEST_MODE")
|
||||
|
||||
huey = SqliteHuey(
|
||||
huey = RedisHuey(
|
||||
"selfprivacy-api",
|
||||
filename=HUEY_DATABASE if not test_mode else None,
|
||||
immediate=test_mode == "true",
|
||||
url=RedisPool.connection_url(dbnumber=HUEY_DATABASE_NUMBER),
|
||||
immediate=immediate(),
|
||||
utc=True,
|
||||
)
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
"""
|
||||
Redis pool module for selfprivacy_api
|
||||
"""
|
||||
from os import environ
|
||||
import redis
|
||||
|
||||
from selfprivacy_api.utils.singleton_metaclass import SingletonMetaclass
|
||||
|
||||
REDIS_SOCKET = "/run/redis-sp-api/redis.sock"
|
||||
|
@ -14,20 +14,20 @@ class RedisPool(metaclass=SingletonMetaclass):
|
|||
"""
|
||||
|
||||
def __init__(self):
|
||||
if "USE_REDIS_PORT" in environ:
|
||||
self._pool = redis.ConnectionPool(
|
||||
host="127.0.0.1",
|
||||
port=int(environ["USE_REDIS_PORT"]),
|
||||
decode_responses=True,
|
||||
)
|
||||
|
||||
else:
|
||||
self._pool = redis.ConnectionPool.from_url(
|
||||
f"unix://{REDIS_SOCKET}",
|
||||
decode_responses=True,
|
||||
)
|
||||
self._pool = redis.ConnectionPool.from_url(
|
||||
RedisPool.connection_url(dbnumber=0),
|
||||
decode_responses=True,
|
||||
)
|
||||
self._pubsub_connection = self.get_connection()
|
||||
|
||||
@staticmethod
|
||||
def connection_url(dbnumber: int) -> str:
|
||||
"""
|
||||
redis://[[username]:[password]]@localhost:6379/0
|
||||
unix://[username@]/path/to/socket.sock?db=0[&password=password]
|
||||
"""
|
||||
return f"unix://{REDIS_SOCKET}?db={dbnumber}"
|
||||
|
||||
def get_connection(self):
|
||||
"""
|
||||
Get a connection from the pool.
|
||||
|
|
|
@ -2,16 +2,16 @@
|
|||
import subprocess
|
||||
from typing import List
|
||||
|
||||
from selfprivacy_api.services.service import ServiceStatus
|
||||
from selfprivacy_api.models.services import ServiceStatus
|
||||
|
||||
|
||||
def get_service_status(service: str) -> ServiceStatus:
|
||||
def get_service_status(unit: str) -> ServiceStatus:
|
||||
"""
|
||||
Return service status from systemd.
|
||||
Use systemctl show to get the status of a service.
|
||||
Get ActiveState from the output.
|
||||
"""
|
||||
service_status = subprocess.check_output(["systemctl", "show", service])
|
||||
service_status = subprocess.check_output(["systemctl", "show", unit])
|
||||
if b"LoadState=not-found" in service_status:
|
||||
return ServiceStatus.OFF
|
||||
if b"ActiveState=active" in service_status:
|
||||
|
|
|
@ -7,16 +7,16 @@ RECOVERY_KEY_VALIDATION_DATETIME = "selfprivacy_api.models.tokens.time.datetime"
|
|||
DEVICE_KEY_VALIDATION_DATETIME = RECOVERY_KEY_VALIDATION_DATETIME
|
||||
|
||||
|
||||
def ten_minutes_into_future_naive():
|
||||
return datetime.now() + timedelta(minutes=10)
|
||||
def ten_hours_into_future_naive():
|
||||
return datetime.now() + timedelta(hours=10)
|
||||
|
||||
|
||||
def ten_minutes_into_future_naive_utc():
|
||||
return datetime.utcnow() + timedelta(minutes=10)
|
||||
def ten_hours_into_future_naive_utc():
|
||||
return datetime.utcnow() + timedelta(hours=10)
|
||||
|
||||
|
||||
def ten_minutes_into_future():
|
||||
return datetime.now(timezone.utc) + timedelta(minutes=10)
|
||||
def ten_hours_into_future():
|
||||
return datetime.now(timezone.utc) + timedelta(hours=10)
|
||||
|
||||
|
||||
def ten_minutes_into_past_naive():
|
||||
|
@ -34,11 +34,11 @@ def ten_minutes_into_past():
|
|||
class NearFuture(datetime):
|
||||
@classmethod
|
||||
def now(cls, tz=None):
|
||||
return datetime.now(tz) + timedelta(minutes=13)
|
||||
return datetime.now(tz) + timedelta(hours=13)
|
||||
|
||||
@classmethod
|
||||
def utcnow(cls):
|
||||
return datetime.utcnow() + timedelta(minutes=13)
|
||||
return datetime.utcnow() + timedelta(hours=13)
|
||||
|
||||
|
||||
def read_json(file_path):
|
||||
|
|
|
@ -99,23 +99,14 @@ def generic_userdata(mocker, tmpdir):
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def huey_database(mocker, shared_datadir):
|
||||
"""Mock huey database."""
|
||||
mock = mocker.patch(
|
||||
"selfprivacy_api.utils.huey.HUEY_DATABASE", shared_datadir / "huey.db"
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(huey_database, redis_repo_with_tokens):
|
||||
def client(redis_repo_with_tokens):
|
||||
from selfprivacy_api.app import app
|
||||
|
||||
return TestClient(app)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def authorized_client(huey_database, redis_repo_with_tokens):
|
||||
def authorized_client(redis_repo_with_tokens):
|
||||
"""Authorized test client fixture."""
|
||||
from selfprivacy_api.app import app
|
||||
|
||||
|
@ -127,7 +118,7 @@ def authorized_client(huey_database, redis_repo_with_tokens):
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def wrong_auth_client(huey_database, redis_repo_with_tokens):
|
||||
def wrong_auth_client(redis_repo_with_tokens):
|
||||
"""Wrong token test client fixture."""
|
||||
from selfprivacy_api.app import app
|
||||
|
||||
|
|
|
@ -14,9 +14,9 @@ from tests.common import (
|
|||
)
|
||||
|
||||
# Graphql API's output should be timezone-naive
|
||||
from tests.common import ten_minutes_into_future_naive_utc as ten_minutes_into_future
|
||||
from tests.common import ten_minutes_into_future as ten_minutes_into_future_tz
|
||||
from tests.common import ten_minutes_into_past_naive_utc as ten_minutes_into_past
|
||||
from tests.common import ten_hours_into_future_naive_utc as ten_hours_into_future
|
||||
from tests.common import ten_hours_into_future as ten_hours_into_future_tz
|
||||
from tests.common import ten_minutes_into_past_naive_utc as ten_hours_into_past
|
||||
|
||||
from tests.test_graphql.common import (
|
||||
assert_empty,
|
||||
|
@ -168,7 +168,7 @@ def test_graphql_generate_recovery_key(client, authorized_client):
|
|||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"expiration_date", [ten_minutes_into_future(), ten_minutes_into_future_tz()]
|
||||
"expiration_date", [ten_hours_into_future(), ten_hours_into_future_tz()]
|
||||
)
|
||||
def test_graphql_generate_recovery_key_with_expiration_date(
|
||||
client, authorized_client, expiration_date: datetime
|
||||
|
@ -193,7 +193,7 @@ def test_graphql_generate_recovery_key_with_expiration_date(
|
|||
|
||||
|
||||
def test_graphql_use_recovery_key_after_expiration(client, authorized_client, mocker):
|
||||
expiration_date = ten_minutes_into_future()
|
||||
expiration_date = ten_hours_into_future()
|
||||
key = graphql_make_new_recovery_key(authorized_client, expires_at=expiration_date)
|
||||
|
||||
# Timewarp to after it expires
|
||||
|
@ -219,7 +219,7 @@ def test_graphql_use_recovery_key_after_expiration(client, authorized_client, mo
|
|||
|
||||
|
||||
def test_graphql_generate_recovery_key_with_expiration_in_the_past(authorized_client):
|
||||
expiration_date = ten_minutes_into_past()
|
||||
expiration_date = ten_hours_into_past()
|
||||
response = request_make_new_recovery_key(
|
||||
authorized_client, expires_at=expiration_date
|
||||
)
|
||||
|
|
|
@ -13,8 +13,7 @@ from selfprivacy_api.services.test_service import DummyService
|
|||
|
||||
from tests.common import generate_service_query
|
||||
from tests.test_graphql.common import assert_empty, assert_ok, get_data
|
||||
from tests.test_block_device_utils import lsblk_singular_mock
|
||||
|
||||
from tests.test_graphql.test_system_nixos_tasks import prepare_nixos_rebuild_calls
|
||||
|
||||
LSBLK_BLOCKDEVICES_DICTS = [
|
||||
{
|
||||
|
@ -618,10 +617,7 @@ def test_graphql_move_service_without_folders_on_old_volume(
|
|||
|
||||
|
||||
def test_graphql_move_service(
|
||||
authorized_client,
|
||||
generic_userdata,
|
||||
mock_check_volume,
|
||||
dummy_service_with_binds,
|
||||
authorized_client, generic_userdata, mock_check_volume, dummy_service_with_binds, fp
|
||||
):
|
||||
dummy_service = dummy_service_with_binds
|
||||
|
||||
|
@ -633,10 +629,30 @@ def test_graphql_move_service(
|
|||
dummy_service.set_drive(origin)
|
||||
dummy_service.set_simulated_moves(False)
|
||||
|
||||
unit_name = "sp-nixos-rebuild.service"
|
||||
rebuild_command = ["systemctl", "start", unit_name]
|
||||
prepare_nixos_rebuild_calls(fp, unit_name)
|
||||
|
||||
# We will be mounting and remounting folders
|
||||
mount_command = ["mount", fp.any()]
|
||||
unmount_command = ["umount", fp.any()]
|
||||
fp.pass_command(mount_command, 2)
|
||||
fp.pass_command(unmount_command, 2)
|
||||
|
||||
# We will be changing ownership
|
||||
chown_command = ["chown", fp.any()]
|
||||
fp.pass_command(chown_command, 2)
|
||||
|
||||
mutation_response = api_move(authorized_client, dummy_service, target)
|
||||
|
||||
data = get_data(mutation_response)["services"]["moveService"]
|
||||
assert_ok(data)
|
||||
assert data["service"] is not None
|
||||
|
||||
assert fp.call_count(rebuild_command) == 1
|
||||
assert fp.call_count(mount_command) == 2
|
||||
assert fp.call_count(unmount_command) == 2
|
||||
assert fp.call_count(chown_command) == 2
|
||||
|
||||
|
||||
def test_mailservice_cannot_enable_disable(authorized_client):
|
||||
|
|
|
@ -97,16 +97,7 @@ def test_graphql_system_rebuild_unauthorized(client, fp, action):
|
|||
assert fp.call_count([fp.any()]) == 0
|
||||
|
||||
|
||||
@pytest.mark.parametrize("action", ["rebuild", "upgrade"])
|
||||
def test_graphql_system_rebuild(authorized_client, fp, action, mock_sleep_intervals):
|
||||
"""Test system rebuild"""
|
||||
unit_name = f"sp-nixos-{action}.service"
|
||||
query = (
|
||||
API_REBUILD_SYSTEM_MUTATION
|
||||
if action == "rebuild"
|
||||
else API_UPGRADE_SYSTEM_MUTATION
|
||||
)
|
||||
|
||||
def prepare_nixos_rebuild_calls(fp, unit_name):
|
||||
# Start the unit
|
||||
fp.register(["systemctl", "start", unit_name])
|
||||
|
||||
|
@ -129,6 +120,19 @@ def test_graphql_system_rebuild(authorized_client, fp, action, mock_sleep_interv
|
|||
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("action", ["rebuild", "upgrade"])
|
||||
def test_graphql_system_rebuild(authorized_client, fp, action, mock_sleep_intervals):
|
||||
"""Test system rebuild"""
|
||||
unit_name = f"sp-nixos-{action}.service"
|
||||
query = (
|
||||
API_REBUILD_SYSTEM_MUTATION
|
||||
if action == "rebuild"
|
||||
else API_UPGRADE_SYSTEM_MUTATION
|
||||
)
|
||||
|
||||
prepare_nixos_rebuild_calls(fp, unit_name)
|
||||
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
|
|
|
@ -0,0 +1,132 @@
|
|||
import pytest
|
||||
import redis
|
||||
from typing import List
|
||||
|
||||
import subprocess
|
||||
from subprocess import Popen, check_output, TimeoutExpired
|
||||
from os import environ, path, set_blocking
|
||||
from io import BufferedReader
|
||||
from huey.exceptions import HueyException
|
||||
|
||||
from selfprivacy_api.utils.huey import huey, immediate, HUEY_DATABASE_NUMBER
|
||||
from selfprivacy_api.utils.redis_pool import RedisPool, REDIS_SOCKET
|
||||
|
||||
|
||||
@huey.task()
|
||||
def sum(a: int, b: int) -> int:
|
||||
return a + b
|
||||
|
||||
|
||||
def reset_huey_storage():
|
||||
huey.storage = huey.create_storage()
|
||||
|
||||
|
||||
def flush_huey_redis_forcefully():
|
||||
url = RedisPool.connection_url(HUEY_DATABASE_NUMBER)
|
||||
|
||||
pool = redis.ConnectionPool.from_url(url, decode_responses=True)
|
||||
connection = redis.Redis(connection_pool=pool)
|
||||
connection.flushdb()
|
||||
|
||||
|
||||
# TODO: may be useful in other places too, move to utils/ tests common if using it somewhere
|
||||
def read_all_ready_output(stream: BufferedReader) -> str:
|
||||
set_blocking(stream.fileno(), False)
|
||||
output: List[bytes] = []
|
||||
while True:
|
||||
line = stream.readline()
|
||||
raise ValueError(line)
|
||||
if line == b"":
|
||||
break
|
||||
else:
|
||||
output.append(line)
|
||||
|
||||
set_blocking(stream.fileno(), True)
|
||||
|
||||
result = b"".join(output)
|
||||
return result.decode("utf-8")
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def not_immediate():
|
||||
assert environ["TEST_MODE"] == "true"
|
||||
|
||||
old_immediate = huey.immediate
|
||||
environ["HUEY_QUEUES_FOR_TESTS"] = "Yes"
|
||||
huey.immediate = False
|
||||
assert huey.immediate is False
|
||||
|
||||
yield
|
||||
|
||||
del environ["HUEY_QUEUES_FOR_TESTS"]
|
||||
huey.immediate = old_immediate
|
||||
assert huey.immediate == old_immediate
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def huey_socket_consumer(not_immediate):
|
||||
"""
|
||||
Same as above, but with socketed redis
|
||||
"""
|
||||
|
||||
flush_huey_redis_forcefully()
|
||||
command = ["huey_consumer.py", "selfprivacy_api.task_registry.huey"]
|
||||
|
||||
# First assert that consumer does not fail by itself
|
||||
# Idk yet how to do it more elegantly
|
||||
try:
|
||||
check_output(command, timeout=2)
|
||||
except TimeoutExpired:
|
||||
pass
|
||||
|
||||
# Then open it for real
|
||||
consumer_handle = Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
|
||||
assert path.exists(REDIS_SOCKET)
|
||||
|
||||
yield consumer_handle
|
||||
|
||||
consumer_handle.kill()
|
||||
|
||||
|
||||
def test_huey_over_redis_socket(huey_socket_consumer):
|
||||
assert huey.immediate is False
|
||||
assert immediate() is False
|
||||
|
||||
assert "unix" in RedisPool.connection_url(HUEY_DATABASE_NUMBER)
|
||||
try:
|
||||
assert (
|
||||
RedisPool.connection_url(HUEY_DATABASE_NUMBER)
|
||||
in huey.storage_kwargs.values()
|
||||
)
|
||||
except AssertionError:
|
||||
raise ValueError(
|
||||
"our test-side huey does not connect over socket: ", huey.storage_kwargs
|
||||
)
|
||||
|
||||
result = sum(2, 5)
|
||||
try:
|
||||
assert result(blocking=True, timeout=10) == 7
|
||||
|
||||
except HueyException as error:
|
||||
if "timed out" in str(error):
|
||||
output = read_all_ready_output(huey_socket_consumer.stdout)
|
||||
errorstream = read_all_ready_output(huey_socket_consumer.stderr)
|
||||
raise TimeoutError(
|
||||
f"Huey timed out: {str(error)}",
|
||||
f"Consumer output: {output}",
|
||||
f"Consumer errorstream: {errorstream}",
|
||||
)
|
||||
else:
|
||||
raise error
|
||||
|
||||
|
||||
@pytest.mark.xfail(reason="cannot yet schedule with sockets for some reason")
|
||||
def test_huey_schedule(huey_queues_socket):
|
||||
# We do not schedule tasks anywhere, but concerning that it fails.
|
||||
sum.schedule((2, 5), delay=10)
|
||||
|
||||
try:
|
||||
assert len(huey.scheduled()) == 1
|
||||
except AssertionError:
|
||||
raise ValueError("have wrong amount of scheduled tasks", huey.scheduled())
|
|
@ -24,7 +24,7 @@ from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
|
|||
AbstractTokensRepository,
|
||||
)
|
||||
|
||||
from tests.common import ten_minutes_into_past, ten_minutes_into_future
|
||||
from tests.common import ten_minutes_into_past, ten_hours_into_future
|
||||
|
||||
|
||||
ORIGINAL_DEVICE_NAMES = [
|
||||
|
|
Loading…
Reference in New Issue