Compare commits

..

No commits in common. "master" and "vm-disk" have entirely different histories.

38 changed files with 186 additions and 904 deletions

View File

@ -5,11 +5,18 @@ name: default
steps:
- name: Run Tests and Generate Coverage Report
commands:
- nix flake check -L
- kill $(ps aux | grep 'redis-server 127.0.0.1:6389' | awk '{print $2}') || true
- redis-server --bind 127.0.0.1 --port 6389 >/dev/null &
# We do not care about persistance on CI
- sleep 10
- redis-cli -h 127.0.0.1 -p 6389 config set stop-writes-on-bgsave-error no
- coverage run -m pytest -q
- coverage xml
- sonar-scanner -Dsonar.projectKey=SelfPrivacy-REST-API -Dsonar.sources=. -Dsonar.host.url=http://analyzer.lan:9000 -Dsonar.login="$SONARQUBE_TOKEN"
environment:
SONARQUBE_TOKEN:
from_secret: SONARQUBE_TOKEN
USE_REDIS_PORT: 6389
- name: Run Bandit Checks

0
.gitignore vendored Normal file → Executable file
View File

View File

@ -1,2 +0,0 @@
[mypy]
plugins = pydantic.mypy

View File

@ -1,8 +1,6 @@
# SelfPrivacy GraphQL API which allows app to control your server
![CI status](https://ci.selfprivacy.org/api/badges/SelfPrivacy/selfprivacy-rest-api/status.svg)
## Build
## build
```console
$ nix build
@ -10,7 +8,7 @@ $ nix build
In case of successful build, you should get the `./result` symlink to a folder (in `/nix/store`) with build contents.
## Develop
## develop
```console
$ nix develop
@ -23,10 +21,10 @@ Type "help", "copyright", "credits" or "license" for more information.
If you don't have experimental flakes enabled, you can use the following command:
```console
$ nix --extra-experimental-features nix-command --extra-experimental-features flakes develop
nix --extra-experimental-features nix-command --extra-experimental-features flakes develop
```
## Testing
## testing
Run the test suite by running coverage with pytest inside an ephemeral NixOS VM with redis service enabled:
```console
@ -63,7 +61,7 @@ $ TMPDIR=".nixos-vm-tmp-dir" nix run .#checks.x86_64-linux.default.driverInterac
Option `-L`/`--print-build-logs` is optional for all nix commands. It tells nix to print each log line one after another instead of overwriting a single one.
## Dependencies and Dependant Modules
## dependencies and dependant modules
This flake depends on a single Nix flake input - nixpkgs repository. nixpkgs repository is used for all software packages used to build, run API service, tests, etc.
@ -87,6 +85,6 @@ $ nix flake metadata git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nix
Nix code for NixOS service module for API is located in NixOS configuration repository.
## Troubleshooting
## troubleshooting
Sometimes commands inside `nix develop` refuse to work properly if the calling shell lacks `LANG` environment variable. Try to set it before entering `nix develop`.

View File

@ -2,11 +2,11 @@
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1709677081,
"narHash": "sha256-tix36Y7u0rkn6mTm0lA45b45oab2cFLqAzDbJxeXS+c=",
"lastModified": 1702780907,
"narHash": "sha256-blbrBBXjjZt6OKTcYX1jpe9SRof2P9ZYWPzq22tzXAA=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "880992dcc006a5e00dd0591446fdf723e6a51a64",
"rev": "1e2e384c5b7c50dbf8e9c441a9e58d85f408b01f",
"type": "github"
},
"original": {

View File

@ -27,7 +27,6 @@
python-lsp-server
pyflakes
typer # for strawberry
types-redis # for mypy
] ++ strawberry-graphql.optional-dependencies.cli));
vmtest-src-dir = "/root/source";
@ -136,6 +135,7 @@
services.redis.servers.sp-api = {
enable = true;
save = [ ];
port = 6379; # FIXME
settings.notify-keyspace-events = "KEA";
};
environment.systemPackages = with pkgs; [

View File

@ -259,7 +259,7 @@ class Backups:
Backups._prune_auto_snaps(service)
service.post_restore()
except Exception as error:
Jobs.update(job, status=JobStatus.ERROR, error=str(error))
Jobs.update(job, status=JobStatus.ERROR, status_text=str(error))
raise error
Jobs.update(job, status=JobStatus.FINISHED)

View File

@ -172,21 +172,6 @@ class ResticBackupper(AbstractBackupper):
return messages
@staticmethod
def _replace_in_array(array: List[str], target, replacement) -> None:
if target == "":
return
for i, value in enumerate(array):
if target in value:
array[i] = array[i].replace(target, replacement)
def _censor_command(self, command: List[str]) -> List[str]:
result = command.copy()
ResticBackupper._replace_in_array(result, self.key, "CENSORED")
ResticBackupper._replace_in_array(result, LocalBackupSecret.get(), "CENSORED")
return result
@staticmethod
def _get_backup_job(service_name: str) -> Optional[Job]:
service = get_service_by_id(service_name)
@ -233,7 +218,7 @@ class ResticBackupper(AbstractBackupper):
"Could not create a snapshot: ",
str(error),
"command: ",
self._censor_command(backup_command),
backup_command,
) from error
@staticmethod

View File

@ -21,8 +21,6 @@ PROVIDER_MAPPING: dict[BackupProviderEnum, Type[AbstractBackupProvider]] = {
def get_provider(
provider_type: BackupProviderEnum,
) -> Type[AbstractBackupProvider]:
if provider_type not in PROVIDER_MAPPING.keys():
raise LookupError("could not look up provider", provider_type)
return PROVIDER_MAPPING[provider_type]

View File

@ -27,4 +27,4 @@ async def get_token_header(
def get_api_version() -> str:
"""Get API version"""
return "3.2.1"
return "3.1.0"

View File

@ -2,7 +2,6 @@ import typing
import strawberry
# TODO: use https://strawberry.rocks/docs/integrations/pydantic when it is stable
@strawberry.type
class DnsRecord:
"""DNS record"""

View File

@ -1,17 +1,14 @@
from enum import Enum
from typing import Optional, List
import datetime
import typing
import strawberry
import datetime
from selfprivacy_api.graphql.common_types.backup import BackupReason
from selfprivacy_api.graphql.common_types.dns import DnsRecord
from selfprivacy_api.services import get_service_by_id, get_services_by_location
from selfprivacy_api.services import Service as ServiceInterface
from selfprivacy_api.services import ServiceDnsRecord
from selfprivacy_api.utils.block_devices import BlockDevices
from selfprivacy_api.utils.network import get_ip4, get_ip6
import selfprivacy_api.utils.network as network_utils
def get_usages(root: "StorageVolume") -> list["StorageUsageInterface"]:
@ -36,8 +33,8 @@ class StorageVolume:
used_space: str
root: bool
name: str
model: Optional[str]
serial: Optional[str]
model: typing.Optional[str]
serial: typing.Optional[str]
type: str
@strawberry.field
@ -49,7 +46,7 @@ class StorageVolume:
@strawberry.interface
class StorageUsageInterface:
used_space: str
volume: Optional[StorageVolume]
volume: typing.Optional[StorageVolume]
title: str
@ -57,7 +54,7 @@ class StorageUsageInterface:
class ServiceStorageUsage(StorageUsageInterface):
"""Storage usage for a service"""
service: Optional["Service"]
service: typing.Optional["Service"]
@strawberry.enum
@ -89,20 +86,6 @@ def get_storage_usage(root: "Service") -> ServiceStorageUsage:
)
# TODO: This won't be needed when deriving DnsRecord via strawberry pydantic integration
# https://strawberry.rocks/docs/integrations/pydantic
# Remove when the link above says it got stable.
def service_dns_to_graphql(record: ServiceDnsRecord) -> DnsRecord:
return DnsRecord(
record_type=record.type,
name=record.name,
content=record.content,
ttl=record.ttl,
priority=record.priority,
display_name=record.display_name,
)
@strawberry.type
class Service:
id: str
@ -115,26 +98,16 @@ class Service:
can_be_backed_up: bool
backup_description: str
status: ServiceStatusEnum
url: Optional[str]
@strawberry.field
def dns_records(self) -> Optional[List[DnsRecord]]:
service = get_service_by_id(self.id)
if service is None:
raise LookupError(f"no service {self.id}. Should be unreachable")
raw_records = service.get_dns_records(get_ip4(), get_ip6())
dns_records = [service_dns_to_graphql(record) for record in raw_records]
return dns_records
url: typing.Optional[str]
dns_records: typing.Optional[typing.List[DnsRecord]]
@strawberry.field
def storage_usage(self) -> ServiceStorageUsage:
"""Get storage usage for a service"""
return get_storage_usage(self)
# TODO: fill this
@strawberry.field
def backup_snapshots(self) -> Optional[List["SnapshotInfo"]]:
def backup_snapshots(self) -> typing.Optional[typing.List["SnapshotInfo"]]:
return None
@ -160,10 +133,23 @@ def service_to_graphql_service(service: ServiceInterface) -> Service:
backup_description=service.get_backup_description(),
status=ServiceStatusEnum(service.get_status().value),
url=service.get_url(),
dns_records=[
DnsRecord(
record_type=record.type,
name=record.name,
content=record.content,
ttl=record.ttl,
priority=record.priority,
display_name=record.display_name,
)
for record in service.get_dns_records(
network_utils.get_ip4(), network_utils.get_ip6()
)
],
)
def get_volume_by_id(volume_id: str) -> Optional[StorageVolume]:
def get_volume_by_id(volume_id: str) -> typing.Optional[StorageVolume]:
"""Get volume by id"""
volume = BlockDevices().get_block_device(volume_id)
if volume is None:

View File

@ -8,12 +8,9 @@ from selfprivacy_api.graphql.mutations.mutation_interface import (
GenericJobMutationReturn,
GenericMutationReturn,
MutationReturnInterface,
GenericJobMutationReturn,
)
import selfprivacy_api.actions.system as system_actions
from selfprivacy_api.graphql.common_types.jobs import job_to_api_job
from selfprivacy_api.jobs.nix_collect_garbage import start_nix_collect_garbage
import selfprivacy_api.actions.ssh as ssh_actions
@ -198,14 +195,3 @@ class SystemMutations:
message=f"Failed to pull repository changes:\n{result.data}",
code=500,
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def nix_collect_garbage(self) -> GenericJobMutationReturn:
job = start_nix_collect_garbage()
return GenericJobMutationReturn(
success=True,
code=200,
message="Garbage collector started...",
job=job_to_api_job(job),
)

View File

@ -34,24 +34,6 @@ class BackupConfiguration:
location_id: typing.Optional[str]
# TODO: Ideally this should not be done in API but making an internal Service requires more work
# than to make an API record about a service
def tombstone_service(service_id: str) -> Service:
return Service(
id=service_id,
display_name=f"{service_id} (Orphaned)",
description="",
svg_icon="",
is_movable=False,
is_required=False,
is_enabled=False,
status=ServiceStatusEnum.OFF,
url=None,
can_be_backed_up=False,
backup_description="",
)
@strawberry.type
class Backup:
@strawberry.field
@ -73,21 +55,27 @@ class Backup:
result = []
snapshots = Backups.get_all_snapshots()
for snap in snapshots:
api_service = None
service = get_service_by_id(snap.service_name)
if service is None:
api_service = tombstone_service(snap.service_name)
else:
api_service = service_to_graphql_service(service)
if api_service is None:
raise NotImplementedError(
f"Could not construct API Service record for:{snap.service_name}. This should be unreachable and is a bug if you see it."
service = Service(
id=snap.service_name,
display_name=f"{snap.service_name} (Orphaned)",
description="",
svg_icon="",
is_movable=False,
is_required=False,
is_enabled=False,
status=ServiceStatusEnum.OFF,
url=None,
dns_records=None,
can_be_backed_up=False,
backup_description="",
)
else:
service = service_to_graphql_service(service)
graphql_snap = SnapshotInfo(
id=snap.id,
service=api_service,
service=service,
created_at=snap.created_at,
reason=snap.reason,
)

View File

@ -14,7 +14,6 @@ class DnsProvider(Enum):
class ServerProvider(Enum):
HETZNER = "HETZNER"
DIGITALOCEAN = "DIGITALOCEAN"
OTHER = "OTHER"
@strawberry.enum

View File

@ -67,8 +67,8 @@ def move_folder(
try:
data_path.mkdir(mode=0o750, parents=True, exist_ok=True)
except Exception as error:
print(f"Error creating data path: {error}")
except Exception as e:
print(f"Error creating data path: {e}")
return
try:

View File

@ -1,147 +0,0 @@
import re
import subprocess
from typing import Tuple, Iterable
from selfprivacy_api.utils.huey import huey
from selfprivacy_api.jobs import JobStatus, Jobs, Job
class ShellException(Exception):
"""Shell-related errors"""
COMPLETED_WITH_ERROR = "Error occurred, please report this to the support chat."
RESULT_WAS_NOT_FOUND_ERROR = (
"We are sorry, garbage collection result was not found. "
"Something went wrong, please report this to the support chat."
)
CLEAR_COMPLETED = "Garbage collection completed."
def delete_old_gens_and_return_dead_report() -> str:
subprocess.run(
["nix-env", "-p", "/nix/var/nix/profiles/system", "--delete-generations old"],
check=False,
)
result = subprocess.check_output(["nix-store", "--gc", "--print-dead"]).decode(
"utf-8"
)
return " " if result is None else result
def run_nix_collect_garbage() -> Iterable[bytes]:
process = subprocess.Popen(
["nix-store", "--gc"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
return process.stdout if process.stdout else iter([])
def parse_line(job: Job, line: str) -> Job:
"""
We parse the string for the presence of a final line,
with the final amount of space cleared.
Simply put, we're just looking for a similar string:
"1537 store paths deleted, 339.84 MiB freed".
"""
pattern = re.compile(r"[+-]?\d+\.\d+ \w+(?= freed)")
match = re.search(pattern, line)
if match is None:
raise ShellException("nix returned gibberish output")
else:
Jobs.update(
job=job,
status=JobStatus.FINISHED,
status_text=CLEAR_COMPLETED,
result=f"{match.group(0)} have been cleared",
)
return job
def process_stream(job: Job, stream: Iterable[bytes], total_dead_packages: int) -> None:
completed_packages = 0
prev_progress = 0
for line in stream:
line = line.decode("utf-8")
if "deleting '/nix/store/" in line:
completed_packages += 1
percent = int((completed_packages / total_dead_packages) * 100)
if percent - prev_progress >= 5:
Jobs.update(
job=job,
status=JobStatus.RUNNING,
progress=percent,
status_text="Cleaning...",
)
prev_progress = percent
elif "store paths deleted," in line:
parse_line(job, line)
def get_dead_packages(output) -> Tuple[int, float]:
dead = len(re.findall("/nix/store/", output))
percent = 0
if dead != 0:
percent = 100 / dead
return dead, percent
@huey.task()
def calculate_and_clear_dead_paths(job: Job):
Jobs.update(
job=job,
status=JobStatus.RUNNING,
progress=0,
status_text="Calculate the number of dead packages...",
)
dead_packages, package_equal_to_percent = get_dead_packages(
delete_old_gens_and_return_dead_report()
)
if dead_packages == 0:
Jobs.update(
job=job,
status=JobStatus.FINISHED,
status_text="Nothing to clear",
result="System is clear",
)
return True
Jobs.update(
job=job,
status=JobStatus.RUNNING,
progress=0,
status_text=f"Found {dead_packages} packages to remove!",
)
stream = run_nix_collect_garbage()
try:
process_stream(job, stream, dead_packages)
except ShellException as error:
Jobs.update(
job=job,
status=JobStatus.ERROR,
status_text=COMPLETED_WITH_ERROR,
error=RESULT_WAS_NOT_FOUND_ERROR,
)
def start_nix_collect_garbage() -> Job:
job = Jobs.add(
type_id="maintenance.collect_nix_garbage",
name="Collect garbage",
description="Cleaning up unused packages",
)
calculate_and_clear_dead_paths(job=job)
return job

View File

@ -63,13 +63,9 @@ def check_running_status(job: Job, unit_name: str):
return False
def rebuild_system(job: Job, upgrade: bool = False):
"""
Broken out to allow calling it synchronously.
We cannot just block until task is done because it will require a second worker
Which we do not have
"""
@huey.task()
def rebuild_system_task(job: Job, upgrade: bool = False):
"""Rebuild the system"""
unit_name = "sp-nixos-upgrade.service" if upgrade else "sp-nixos-rebuild.service"
try:
command = ["systemctl", "start", unit_name]
@ -128,9 +124,3 @@ def rebuild_system(job: Job, upgrade: bool = False):
status=JobStatus.ERROR,
status_text=str(e),
)
@huey.task()
def rebuild_system_task(job: Job, upgrade: bool = False):
"""Rebuild the system"""
rebuild_system(job, upgrade)

View File

@ -1,24 +0,0 @@
from enum import Enum
from typing import Optional
from pydantic import BaseModel
class ServiceStatus(Enum):
"""Enum for service status"""
ACTIVE = "ACTIVE"
RELOADING = "RELOADING"
INACTIVE = "INACTIVE"
FAILED = "FAILED"
ACTIVATING = "ACTIVATING"
DEACTIVATING = "DEACTIVATING"
OFF = "OFF"
class ServiceDnsRecord(BaseModel):
type: str
name: str
content: str
ttl: int
display_name: str
priority: Optional[int] = None

View File

@ -30,7 +30,7 @@ class RedisTokensRepository(AbstractTokensRepository):
@staticmethod
def token_key_for_device(device_name: str):
md5_hash = md5(usedforsecurity=False)
md5_hash = md5()
md5_hash.update(bytes(device_name, "utf-8"))
digest = md5_hash.hexdigest()
return TOKENS_PREFIX + digest

View File

@ -1,16 +1,13 @@
"""Abstract class for a service running on a server"""
from abc import ABC, abstractmethod
from enum import Enum
from typing import List, Optional
from selfprivacy_api import utils
from selfprivacy_api.utils import ReadUserData, WriteUserData
from selfprivacy_api.utils.waitloop import wait_until_true
from pydantic import BaseModel
from selfprivacy_api.jobs import Job, Jobs, JobStatus, report_progress
from selfprivacy_api.utils.block_devices import BlockDevice, BlockDevices
from selfprivacy_api.jobs import Job, Jobs, JobStatus, report_progress
from selfprivacy_api.jobs.upgrade_system import rebuild_system
from selfprivacy_api.models.services import ServiceStatus, ServiceDnsRecord
from selfprivacy_api.services.generic_size_counter import get_storage_usage
from selfprivacy_api.services.owned_path import OwnedPath, Bind
from selfprivacy_api.services.moving import (
@ -23,10 +20,34 @@ from selfprivacy_api.services.moving import (
move_data_to_volume,
)
from selfprivacy_api import utils
from selfprivacy_api.utils.waitloop import wait_until_true
from selfprivacy_api.utils import ReadUserData, WriteUserData
DEFAULT_START_STOP_TIMEOUT = 5 * 60
class ServiceStatus(Enum):
"""Enum for service status"""
ACTIVE = "ACTIVE"
RELOADING = "RELOADING"
INACTIVE = "INACTIVE"
FAILED = "FAILED"
ACTIVATING = "ACTIVATING"
DEACTIVATING = "DEACTIVATING"
OFF = "OFF"
class ServiceDnsRecord(BaseModel):
type: str
name: str
content: str
ttl: int
display_name: str
priority: Optional[int] = None
class Service(ABC):
"""
Service here is some software that is hosted on the server and
@ -366,6 +387,14 @@ class Service(ABC):
report_progress(95, job, f"Finishing moving {service_name}...")
self.set_location(new_volume)
Jobs.update(
job=job,
status=JobStatus.FINISHED,
result=f"{service_name} moved successfully.",
status_text=f"Starting {service_name}...",
progress=100,
)
def move_to_volume(self, volume: BlockDevice, job: Job) -> Job:
service_name = self.get_display_name()
@ -378,17 +407,6 @@ class Service(ABC):
report_progress(9, job, "Stopped service, starting the move...")
self.do_move_to_volume(volume, job)
report_progress(98, job, "Move complete, rebuilding...")
rebuild_system(job, upgrade=False)
Jobs.update(
job=job,
status=JobStatus.FINISHED,
result=f"{service_name} moved successfully.",
status_text=f"Starting {service_name}...",
progress=100,
)
return job
@classmethod

View File

@ -8,9 +8,10 @@ from os import path
# from enum import Enum
from selfprivacy_api.jobs import Job
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.jobs import Job, Jobs, JobStatus
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
from selfprivacy_api.utils.block_devices import BlockDevice
import selfprivacy_api.utils.network as network_utils
from selfprivacy_api.services.test_service.icon import BITWARDEN_ICON
@ -88,7 +89,7 @@ class DummyService(Service):
@classmethod
def set_status(cls, status: ServiceStatus):
with open(cls.status_file(), "w") as file:
file.write(status.value)
status_string = file.write(status.value)
@classmethod
def get_status(cls) -> ServiceStatus:
@ -101,17 +102,16 @@ class DummyService(Service):
cls, new_status: ServiceStatus, delay_sec: float
):
"""simulating a delay on systemd side"""
if delay_sec == 0:
cls.set_status(new_status)
return
status_file = cls.status_file()
command = [
"bash",
"-c",
f" sleep {delay_sec} && echo {new_status.value} > {status_file}",
]
subprocess.Popen(command)
handle = subprocess.Popen(command)
if delay_sec == 0:
handle.communicate()
@classmethod
def set_backuppable(cls, new_value: bool) -> None:
@ -192,5 +192,6 @@ class DummyService(Service):
if self.simulate_moving is False:
return super(DummyService, self).do_move_to_volume(volume, job)
else:
Jobs.update(job, status=JobStatus.FINISHED)
self.set_drive(volume.name)
return job

View File

@ -1,14 +1,5 @@
from os import environ
from selfprivacy_api.utils.huey import huey
from selfprivacy_api.jobs.test import test_job
from selfprivacy_api.backup.tasks import *
from selfprivacy_api.services.tasks import move_service
from selfprivacy_api.jobs.upgrade_system import rebuild_system_task
from selfprivacy_api.jobs.test import test_job
from selfprivacy_api.jobs.nix_collect_garbage import calculate_and_clear_dead_paths
if environ.get("TEST_MODE"):
from tests.test_huey import sum

View File

@ -1,24 +1,16 @@
"""MiniHuey singleton."""
from os import environ
from huey import RedisHuey
from selfprivacy_api.utils.redis_pool import RedisPool
HUEY_DATABASE_NUMBER = 10
def immediate() -> bool:
if environ.get("HUEY_QUEUES_FOR_TESTS"):
return False
if environ.get("TEST_MODE"):
return True
return False
import os
from huey import SqliteHuey
HUEY_DATABASE = "/etc/selfprivacy/tasks.db"
# Singleton instance containing the huey database.
huey = RedisHuey(
test_mode = os.environ.get("TEST_MODE")
huey = SqliteHuey(
"selfprivacy-api",
url=RedisPool.connection_url(dbnumber=HUEY_DATABASE_NUMBER),
immediate=immediate(),
filename=HUEY_DATABASE if not test_mode else None,
immediate=test_mode == "true",
utc=True,
)

View File

@ -1,8 +1,8 @@
"""
Redis pool module for selfprivacy_api
"""
from os import environ
import redis
from selfprivacy_api.utils.singleton_metaclass import SingletonMetaclass
REDIS_SOCKET = "/run/redis-sp-api/redis.sock"
@ -14,19 +14,19 @@ class RedisPool(metaclass=SingletonMetaclass):
"""
def __init__(self):
self._pool = redis.ConnectionPool.from_url(
RedisPool.connection_url(dbnumber=0),
decode_responses=True,
)
self._pubsub_connection = self.get_connection()
if "USE_REDIS_PORT" in environ:
self._pool = redis.ConnectionPool(
host="127.0.0.1",
port=int(environ["USE_REDIS_PORT"]),
decode_responses=True,
)
@staticmethod
def connection_url(dbnumber: int) -> str:
"""
redis://[[username]:[password]]@localhost:6379/0
unix://[username@]/path/to/socket.sock?db=0[&password=password]
"""
return f"unix://{REDIS_SOCKET}?db={dbnumber}"
else:
self._pool = redis.ConnectionPool.from_url(
f"unix://{REDIS_SOCKET}",
decode_responses=True,
)
self._pubsub_connection = self.get_connection()
def get_connection(self):
"""

View File

@ -2,16 +2,16 @@
import subprocess
from typing import List
from selfprivacy_api.models.services import ServiceStatus
from selfprivacy_api.services.service import ServiceStatus
def get_service_status(unit: str) -> ServiceStatus:
def get_service_status(service: str) -> ServiceStatus:
"""
Return service status from systemd.
Use systemctl show to get the status of a service.
Get ActiveState from the output.
"""
service_status = subprocess.check_output(["systemctl", "show", unit])
service_status = subprocess.check_output(["systemctl", "show", service])
if b"LoadState=not-found" in service_status:
return ServiceStatus.OFF
if b"ActiveState=active" in service_status:

2
setup.py Normal file → Executable file
View File

@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name="selfprivacy_api",
version="3.2.1",
version="3.1.0",
packages=find_packages(),
scripts=[
"selfprivacy_api/app.py",

View File

@ -1,4 +0,0 @@
#!/usr/bin/bash
# sync the version of nixpkgs used in the repo with one set in nixos-config
nix flake lock --override-input nixpkgs nixpkgs --inputs-from 'git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes'

View File

@ -2,23 +2,21 @@ import json
from datetime import datetime, timezone, timedelta
from mnemonic import Mnemonic
from selfprivacy_api.jobs import Job, JobStatus
# for expiration tests. If headache, consider freezegun
RECOVERY_KEY_VALIDATION_DATETIME = "selfprivacy_api.models.tokens.time.datetime"
DEVICE_KEY_VALIDATION_DATETIME = RECOVERY_KEY_VALIDATION_DATETIME
def ten_hours_into_future_naive():
return datetime.now() + timedelta(hours=10)
def ten_minutes_into_future_naive():
return datetime.now() + timedelta(minutes=10)
def ten_hours_into_future_naive_utc():
return datetime.utcnow() + timedelta(hours=10)
def ten_minutes_into_future_naive_utc():
return datetime.utcnow() + timedelta(minutes=10)
def ten_hours_into_future():
return datetime.now(timezone.utc) + timedelta(hours=10)
def ten_minutes_into_future():
return datetime.now(timezone.utc) + timedelta(minutes=10)
def ten_minutes_into_past_naive():
@ -36,11 +34,11 @@ def ten_minutes_into_past():
class NearFuture(datetime):
@classmethod
def now(cls, tz=None):
return datetime.now(tz) + timedelta(hours=13)
return datetime.now(tz) + timedelta(minutes=13)
@classmethod
def utcnow(cls):
return datetime.utcnow() + timedelta(hours=13)
return datetime.utcnow() + timedelta(minutes=13)
def read_json(file_path):
@ -81,12 +79,3 @@ def assert_recovery_recent(time_generated: str):
assert datetime.fromisoformat(time_generated) - timedelta(seconds=5) < datetime.now(
timezone.utc
)
def assert_job_errored(job: Job):
assert job is not None
assert job.status == JobStatus.ERROR
# consider adding a useful error message to an errored-out job
assert job.error is not None
assert job.error != ""

View File

@ -99,14 +99,23 @@ def generic_userdata(mocker, tmpdir):
@pytest.fixture
def client(redis_repo_with_tokens):
def huey_database(mocker, shared_datadir):
"""Mock huey database."""
mock = mocker.patch(
"selfprivacy_api.utils.huey.HUEY_DATABASE", shared_datadir / "huey.db"
)
return mock
@pytest.fixture
def client(huey_database, redis_repo_with_tokens):
from selfprivacy_api.app import app
return TestClient(app)
@pytest.fixture
def authorized_client(redis_repo_with_tokens):
def authorized_client(huey_database, redis_repo_with_tokens):
"""Authorized test client fixture."""
from selfprivacy_api.app import app
@ -118,7 +127,7 @@ def authorized_client(redis_repo_with_tokens):
@pytest.fixture
def wrong_auth_client(redis_repo_with_tokens):
def wrong_auth_client(huey_database, redis_repo_with_tokens):
"""Wrong token test client fixture."""
from selfprivacy_api.app import app
@ -208,7 +217,5 @@ def dummy_service(
service.enable()
yield service
# Cleanup because apparently it matters wrt tasks
# Some tests may remove it from the list intentionally, this is fine
if service in services.services:
services.services.remove(service)
# cleanup because apparently it matters wrt tasks
services.services.remove(service)

View File

@ -14,14 +14,13 @@ from selfprivacy_api.utils.huey import huey
from selfprivacy_api.services.service import ServiceStatus
from selfprivacy_api.graphql.queries.providers import BackupProvider as ProviderEnum
from selfprivacy_api.graphql.queries.providers import BackupProvider
from selfprivacy_api.graphql.common_types.backup import (
RestoreStrategy,
BackupReason,
)
from selfprivacy_api.graphql.queries.providers import BackupProvider
from selfprivacy_api.jobs import Job, Jobs, JobStatus
from selfprivacy_api.jobs import Jobs, JobStatus
from selfprivacy_api.models.backup.snapshot import Snapshot
@ -39,10 +38,6 @@ from selfprivacy_api.backup.tasks import (
reload_snapshot_cache,
)
from selfprivacy_api.backup.storage import Storage
from selfprivacy_api.backup.local_secret import LocalBackupSecret
from selfprivacy_api.backup.jobs import get_backup_fail
from tests.common import assert_job_errored
REPO_NAME = "test_backup"
@ -193,78 +188,6 @@ def test_backup_service(dummy_service, backups):
assert_job_finished(f"services.{id}.backup", count=1)
def all_job_text(job: Job) -> str:
# Use when we update to pydantic 2.xxx
# return Job.model_dump_json()
result = ""
if job.status_text is not None:
result += job.status_text
if job.description is not None:
result += job.description
if job.error is not None:
result += job.error
return result
def test_error_censoring_encryptionkey(dummy_service, backups):
# Discard our key to inject a failure
old_key = LocalBackupSecret.get()
LocalBackupSecret.reset()
new_key = LocalBackupSecret.get()
with pytest.raises(ValueError):
# Should fail without correct key
Backups.back_up(dummy_service)
job = get_backup_fail(dummy_service)
assert_job_errored(job)
job_text = all_job_text(job)
assert old_key not in job_text
assert new_key not in job_text
# local backups do not have login key
# assert Backups.provider().key not in job_text
assert "CENSORED" in job_text
def test_error_censoring_loginkey(dummy_service, backups, fp):
# We do not want to screw up our teardown
old_provider = Backups.provider()
secret = "aSecretNYA"
Backups.set_provider(
ProviderEnum.BACKBLAZE, login="meow", key=secret, location="moon"
)
assert Backups.provider().key == secret
# We could have called real backblaze but it is kind of not privacy so.
fp.allow_unregistered(True)
fp.register(
["restic", fp.any()],
returncode=1,
stdout="only real cats are allowed",
# We do not want to suddenly call real backblaze even if code changes
occurrences=100,
)
with pytest.raises(ValueError):
Backups.back_up(dummy_service)
job = get_backup_fail(dummy_service)
assert_job_errored(job)
job_text = all_job_text(job)
assert secret not in job_text
assert job_text.count("CENSORED") == 2
# We do not want to screw up our teardown
Storage.store_provider(old_provider)
def test_no_repo(memory_backup):
with pytest.raises(ValueError):
assert memory_backup.backupper.get_snapshots() == []

View File

@ -3,8 +3,6 @@ from tests.test_backup import backups
from tests.common import generate_backup_query
import selfprivacy_api.services as all_services
from selfprivacy_api.services import get_service_by_id
from selfprivacy_api.graphql.common_types.service import service_to_graphql_service
from selfprivacy_api.graphql.common_types.backup import (
_AutobackupQuotas,
@ -145,7 +143,6 @@ allSnapshots {
id
service {
id
displayName
}
createdAt
reason
@ -309,20 +306,6 @@ def test_snapshots_empty(authorized_client, dummy_service, backups):
assert snaps == []
def test_snapshots_orphaned_service(authorized_client, dummy_service, backups):
api_backup(authorized_client, dummy_service)
snaps = api_snapshots(authorized_client)
assert len(snaps) == 1
all_services.services.remove(dummy_service)
assert get_service_by_id(dummy_service.get_id()) is None
snaps = api_snapshots(authorized_client)
assert len(snaps) == 1
assert "Orphaned" in snaps[0]["service"]["displayName"]
assert dummy_service.get_id() in snaps[0]["service"]["displayName"]
def test_start_backup(authorized_client, dummy_service, backups):
response = api_backup(authorized_client, dummy_service)
data = get_data(response)["backup"]["startBackup"]

View File

@ -14,9 +14,9 @@ from tests.common import (
)
# Graphql API's output should be timezone-naive
from tests.common import ten_hours_into_future_naive_utc as ten_hours_into_future
from tests.common import ten_hours_into_future as ten_hours_into_future_tz
from tests.common import ten_minutes_into_past_naive_utc as ten_hours_into_past
from tests.common import ten_minutes_into_future_naive_utc as ten_minutes_into_future
from tests.common import ten_minutes_into_future as ten_minutes_into_future_tz
from tests.common import ten_minutes_into_past_naive_utc as ten_minutes_into_past
from tests.test_graphql.common import (
assert_empty,
@ -168,7 +168,7 @@ def test_graphql_generate_recovery_key(client, authorized_client):
@pytest.mark.parametrize(
"expiration_date", [ten_hours_into_future(), ten_hours_into_future_tz()]
"expiration_date", [ten_minutes_into_future(), ten_minutes_into_future_tz()]
)
def test_graphql_generate_recovery_key_with_expiration_date(
client, authorized_client, expiration_date: datetime
@ -193,7 +193,7 @@ def test_graphql_generate_recovery_key_with_expiration_date(
def test_graphql_use_recovery_key_after_expiration(client, authorized_client, mocker):
expiration_date = ten_hours_into_future()
expiration_date = ten_minutes_into_future()
key = graphql_make_new_recovery_key(authorized_client, expires_at=expiration_date)
# Timewarp to after it expires
@ -219,7 +219,7 @@ def test_graphql_use_recovery_key_after_expiration(client, authorized_client, mo
def test_graphql_generate_recovery_key_with_expiration_in_the_past(authorized_client):
expiration_date = ten_hours_into_past()
expiration_date = ten_minutes_into_past()
response = request_make_new_recovery_key(
authorized_client, expires_at=expiration_date
)

View File

@ -1,229 +0,0 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=missing-function-docstring
import pytest
from selfprivacy_api.utils.huey import huey
from selfprivacy_api.jobs import JobStatus, Jobs
from tests.test_graphql.common import (
get_data,
assert_ok,
assert_empty,
)
from selfprivacy_api.jobs.nix_collect_garbage import (
get_dead_packages,
parse_line,
ShellException,
)
OUTPUT_PRINT_DEAD = """
finding garbage collector roots...
determining live/dead paths...
/nix/store/02k8pmw00p7p7mf2dg3n057771w7liia-python3.10-cchardet-2.1.7
/nix/store/03vc6dznx8njbvyd3gfhfa4n5j4lvhbl-python3.10-async-timeout-4.0.2
/nix/store/03ybv2dvfk7c3cpb527y5kzf6i35ch41-python3.10-pycparser-2.21
/nix/store/04dn9slfqwhqisn1j3jv531lms9w5wlj-python3.10-hypothesis-6.50.1.drv
/nix/store/04hhx2z1iyi3b48hxykiw1g03lp46jk7-python-remove-bin-bytecode-hook
"""
OUTPUT_COLLECT_GARBAGE = """
removing old generations of profile /nix/var/nix/profiles/per-user/def/channels
finding garbage collector roots...
deleting garbage...
deleting '/nix/store/02k8pmw00p7p7mf2dg3n057771w7liia-python3.10-cchardet-2.1.7'
deleting '/nix/store/03vc6dznx8njbvyd3gfhfa4n5j4lvhbl-python3.10-async-timeout-4.0.2'
deleting '/nix/store/03ybv2dvfk7c3cpb527y5kzf6i35ch41-python3.10-pycparser-2.21'
deleting '/nix/store/04dn9slfqwhqisn1j3jv531lms9w5wlj-python3.10-hypothesis-6.50.1.drv'
deleting '/nix/store/04hhx2z1iyi3b48hxykiw1g03lp46jk7-python-remove-bin-bytecode-hook'
deleting unused links...
note: currently hard linking saves -0.00 MiB
190 store paths deleted, 425.51 MiB freed
"""
OUTPUT_COLLECT_GARBAGE_ZERO_TRASH = """
removing old generations of profile /nix/var/nix/profiles/per-user/def/profile
removing old generations of profile /nix/var/nix/profiles/per-user/def/channels
finding garbage collector roots...
deleting garbage...
deleting unused links...
note: currently hard linking saves 0.00 MiB
0 store paths deleted, 0.00 MiB freed
"""
# ---
def test_parse_line():
txt = "note: currently hard linking saves -0.00 MiB 190 store paths deleted, 425.51 MiB freed"
job = Jobs.add(
name="name",
type_id="parse_line",
description="description",
)
output = parse_line(job, txt)
assert output.result == "425.51 MiB have been cleared"
assert output.status == JobStatus.FINISHED
assert output.error is None
def test_parse_line_with_blank_line():
txt = ""
job = Jobs.add(
name="name",
type_id="parse_line",
description="description",
)
with pytest.raises(ShellException):
output = parse_line(job, txt)
def test_get_dead_packages():
assert get_dead_packages(OUTPUT_PRINT_DEAD) == (5, 20.0)
def test_get_dead_packages_zero():
assert get_dead_packages("") == (0, 0)
RUN_NIX_COLLECT_GARBAGE_MUTATION = """
mutation CollectGarbage {
system {
nixCollectGarbage {
success
message
code
job {
uid,
typeId,
name,
description,
status,
statusText,
progress,
createdAt,
updatedAt,
finishedAt,
error,
result,
}
}
}
}
"""
def test_graphql_nix_collect_garbage(authorized_client, fp):
assert huey.immediate is True
fp.register(
["nix-env", "-p", "/nix/var/nix/profiles/system", "--delete-generations old"],
stdout="",
)
fp.register(["nix-store", "--gc", "--print-dead"], stdout=OUTPUT_PRINT_DEAD)
fp.register(["nix-store", "--gc"], stdout=OUTPUT_COLLECT_GARBAGE)
response = authorized_client.post(
"/graphql",
json={
"query": RUN_NIX_COLLECT_GARBAGE_MUTATION,
},
)
output = get_data(response)["system"]["nixCollectGarbage"]
assert_ok(output)
assert output["job"] is not None
assert output["job"]["status"] == "FINISHED"
assert output["job"]["error"] is None
assert (
fp.call_count(
[
"nix-env",
"-p",
"/nix/var/nix/profiles/system",
"--delete-generations old",
]
)
== 1
)
assert fp.call_count(["nix-store", "--gc", "--print-dead"]) == 1
assert fp.call_count(["nix-store", "--gc"]) == 1
def test_graphql_nix_collect_garbage_return_zero_trash(authorized_client, fp):
assert huey.immediate is True
fp.register(
["nix-env", "-p", "/nix/var/nix/profiles/system", "--delete-generations old"],
stdout="",
)
fp.register(["nix-store", "--gc", "--print-dead"], stdout=OUTPUT_PRINT_DEAD)
fp.register(["nix-store", "--gc"], stdout=OUTPUT_COLLECT_GARBAGE_ZERO_TRASH)
response = authorized_client.post(
"/graphql",
json={
"query": RUN_NIX_COLLECT_GARBAGE_MUTATION,
},
)
output = get_data(response)["system"]["nixCollectGarbage"]
assert_ok(output)
assert output["job"] is not None
assert output["job"]["status"] == "FINISHED"
assert output["job"]["error"] is None
assert (
fp.call_count(
[
"nix-env",
"-p",
"/nix/var/nix/profiles/system",
"--delete-generations old",
]
)
== 1
)
assert fp.call_count(["nix-store", "--gc", "--print-dead"]) == 1
assert fp.call_count(["nix-store", "--gc"]) == 1
def test_graphql_nix_collect_garbage_not_authorized_client(client, fp):
assert huey.immediate is True
fp.register(
["nix-env", "-p", "/nix/var/nix/profiles/system", "--delete-generations old"],
stdout="",
)
fp.register(["nix-store", "--gc", "--print-dead"], stdout=OUTPUT_PRINT_DEAD)
fp.register(["nix-store", "--gc"], stdout=OUTPUT_COLLECT_GARBAGE)
response = client.post(
"/graphql",
json={
"query": RUN_NIX_COLLECT_GARBAGE_MUTATION,
},
)
assert_empty(response)
assert (
fp.call_count(
[
"nix-env",
"-p",
"/nix/var/nix/profiles/system",
"--delete-generations old",
]
)
== 0
)
assert fp.call_count(["nix-store", "--gc", "--print-dead"]) == 0
assert fp.call_count(["nix-store", "--gc"]) == 0

View File

@ -13,7 +13,8 @@ from selfprivacy_api.services.test_service import DummyService
from tests.common import generate_service_query
from tests.test_graphql.common import assert_empty, assert_ok, get_data
from tests.test_graphql.test_system_nixos_tasks import prepare_nixos_rebuild_calls
from tests.test_block_device_utils import lsblk_singular_mock
LSBLK_BLOCKDEVICES_DICTS = [
{
@ -617,7 +618,10 @@ def test_graphql_move_service_without_folders_on_old_volume(
def test_graphql_move_service(
authorized_client, generic_userdata, mock_check_volume, dummy_service_with_binds, fp
authorized_client,
generic_userdata,
mock_check_volume,
dummy_service_with_binds,
):
dummy_service = dummy_service_with_binds
@ -629,30 +633,10 @@ def test_graphql_move_service(
dummy_service.set_drive(origin)
dummy_service.set_simulated_moves(False)
unit_name = "sp-nixos-rebuild.service"
rebuild_command = ["systemctl", "start", unit_name]
prepare_nixos_rebuild_calls(fp, unit_name)
# We will be mounting and remounting folders
mount_command = ["mount", fp.any()]
unmount_command = ["umount", fp.any()]
fp.pass_command(mount_command, 2)
fp.pass_command(unmount_command, 2)
# We will be changing ownership
chown_command = ["chown", fp.any()]
fp.pass_command(chown_command, 2)
mutation_response = api_move(authorized_client, dummy_service, target)
data = get_data(mutation_response)["services"]["moveService"]
assert_ok(data)
assert data["service"] is not None
assert fp.call_count(rebuild_command) == 1
assert fp.call_count(mount_command) == 2
assert fp.call_count(unmount_command) == 2
assert fp.call_count(chown_command) == 2
def test_mailservice_cannot_enable_disable(authorized_client):

View File

@ -97,7 +97,16 @@ def test_graphql_system_rebuild_unauthorized(client, fp, action):
assert fp.call_count([fp.any()]) == 0
def prepare_nixos_rebuild_calls(fp, unit_name):
@pytest.mark.parametrize("action", ["rebuild", "upgrade"])
def test_graphql_system_rebuild(authorized_client, fp, action, mock_sleep_intervals):
"""Test system rebuild"""
unit_name = f"sp-nixos-{action}.service"
query = (
API_REBUILD_SYSTEM_MUTATION
if action == "rebuild"
else API_UPGRADE_SYSTEM_MUTATION
)
# Start the unit
fp.register(["systemctl", "start", unit_name])
@ -120,19 +129,6 @@ def prepare_nixos_rebuild_calls(fp, unit_name):
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
@pytest.mark.parametrize("action", ["rebuild", "upgrade"])
def test_graphql_system_rebuild(authorized_client, fp, action, mock_sleep_intervals):
"""Test system rebuild"""
unit_name = f"sp-nixos-{action}.service"
query = (
API_REBUILD_SYSTEM_MUTATION
if action == "rebuild"
else API_UPGRADE_SYSTEM_MUTATION
)
prepare_nixos_rebuild_calls(fp, unit_name)
response = authorized_client.post(
"/graphql",
json={

View File

@ -1,132 +0,0 @@
import pytest
import redis
from typing import List
import subprocess
from subprocess import Popen, check_output, TimeoutExpired
from os import environ, path, set_blocking
from io import BufferedReader
from huey.exceptions import HueyException
from selfprivacy_api.utils.huey import huey, immediate, HUEY_DATABASE_NUMBER
from selfprivacy_api.utils.redis_pool import RedisPool, REDIS_SOCKET
@huey.task()
def sum(a: int, b: int) -> int:
return a + b
def reset_huey_storage():
huey.storage = huey.create_storage()
def flush_huey_redis_forcefully():
url = RedisPool.connection_url(HUEY_DATABASE_NUMBER)
pool = redis.ConnectionPool.from_url(url, decode_responses=True)
connection = redis.Redis(connection_pool=pool)
connection.flushdb()
# TODO: may be useful in other places too, move to utils/ tests common if using it somewhere
def read_all_ready_output(stream: BufferedReader) -> str:
set_blocking(stream.fileno(), False)
output: List[bytes] = []
while True:
line = stream.readline()
raise ValueError(line)
if line == b"":
break
else:
output.append(line)
set_blocking(stream.fileno(), True)
result = b"".join(output)
return result.decode("utf-8")
@pytest.fixture()
def not_immediate():
assert environ["TEST_MODE"] == "true"
old_immediate = huey.immediate
environ["HUEY_QUEUES_FOR_TESTS"] = "Yes"
huey.immediate = False
assert huey.immediate is False
yield
del environ["HUEY_QUEUES_FOR_TESTS"]
huey.immediate = old_immediate
assert huey.immediate == old_immediate
@pytest.fixture()
def huey_socket_consumer(not_immediate):
"""
Same as above, but with socketed redis
"""
flush_huey_redis_forcefully()
command = ["huey_consumer.py", "selfprivacy_api.task_registry.huey"]
# First assert that consumer does not fail by itself
# Idk yet how to do it more elegantly
try:
check_output(command, timeout=2)
except TimeoutExpired:
pass
# Then open it for real
consumer_handle = Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert path.exists(REDIS_SOCKET)
yield consumer_handle
consumer_handle.kill()
def test_huey_over_redis_socket(huey_socket_consumer):
assert huey.immediate is False
assert immediate() is False
assert "unix" in RedisPool.connection_url(HUEY_DATABASE_NUMBER)
try:
assert (
RedisPool.connection_url(HUEY_DATABASE_NUMBER)
in huey.storage_kwargs.values()
)
except AssertionError:
raise ValueError(
"our test-side huey does not connect over socket: ", huey.storage_kwargs
)
result = sum(2, 5)
try:
assert result(blocking=True, timeout=100) == 7
except HueyException as error:
if "timed out" in str(error):
output = read_all_ready_output(huey_socket_consumer.stdout)
errorstream = read_all_ready_output(huey_socket_consumer.stderr)
raise TimeoutError(
f"Huey timed out: {str(error)}",
f"Consumer output: {output}",
f"Consumer errorstream: {errorstream}",
)
else:
raise error
@pytest.mark.xfail(reason="cannot yet schedule with sockets for some reason")
def test_huey_schedule(huey_queues_socket):
# We do not schedule tasks anywhere, but concerning that it fails.
sum.schedule((2, 5), delay=10)
try:
assert len(huey.scheduled()) == 1
except AssertionError:
raise ValueError("have wrong amount of scheduled tasks", huey.scheduled())

View File

@ -24,7 +24,7 @@ from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
from tests.common import ten_minutes_into_past, ten_hours_into_future
from tests.common import ten_minutes_into_past, ten_minutes_into_future
ORIGINAL_DEVICE_NAMES = [