Compare commits

..

16 Commits

Author SHA1 Message Date
Inex Code 0a09a338b8 Register migration 2022-09-22 19:41:48 +03:00
Inex Code 7a1e8af8fe Catch error during binds migration and delete the job if it is stuck during restart 2022-09-22 19:38:59 +03:00
Inex Code e387e30983 Fix handling of FileNotFoundError during size calculation 2022-09-22 18:34:33 +03:00
Inex Code 582e38452d Fix Gitea moving 2022-09-19 03:50:43 +03:00
Inex Code 6bbceca917 Fix ownership issue 2022-09-19 03:04:57 +03:00
Inex Code 9a339729b7 Fix Generic service mover success output 2022-09-19 02:57:22 +03:00
Inex Code a7208c1a91 Fix generic service mover being unable to move 2022-09-19 02:43:06 +03:00
Inex Code 49571b6ef2 Fix generic service mover being unable to move to sda1 2022-09-19 02:32:29 +03:00
Alya Sirko a3260aadc3 Add SonarQube to Pipeline (#15)
Co-authored-by: Alya Sirko <alya@selfprivacy.org>
Reviewed-on: SelfPrivacy/selfprivacy-rest-api#15
Co-authored-by: Alya Sirko <alya.sirko@tuta.io>
Co-committed-by: Alya Sirko <alya.sirko@tuta.io>
2022-09-16 10:31:51 +03:00
Inex Code 9489180363 Fix job deletion 2022-09-09 17:51:41 +03:00
Inex Code 97acae189f Bump API version 2022-09-09 17:46:58 +03:00
Inex Code d7cba49c4a Fix job uid generation 2022-09-09 17:42:40 +03:00
Inex Code 32278e9063 Bind to 127.0.0.1 instead of 0.0.0.0 2022-08-26 21:01:14 +04:00
Inex Code 4f2332f8a0 Add permission check for deleting job 2022-08-25 22:42:37 +04:00
Inex Code 0e68ef1386 Merge pull request 'SelfPrivacy API 2.0' (#14) from graphql into master
Reviewed-on: SelfPrivacy/selfprivacy-rest-api#14
2022-08-25 20:25:03 +03:00
Inex Code 7935de0fe1 Migrate to FastAPI (#13)
Co-authored-by: inexcode <inex.code@selfprivacy.org>
Reviewed-on: SelfPrivacy/selfprivacy-rest-api#13
2022-08-25 20:03:56 +03:00
14 changed files with 179 additions and 1144 deletions

View File

@ -2,18 +2,23 @@ kind: pipeline
type: exec
name: default
platform:
os: linux
arch: amd64
steps:
- name: test
- name: Run Tests and Generate Coverage Report
commands:
- coverage run -m pytest -q
- coverage xml
- name: bandit
- sonar-scanner -Dsonar.projectKey=SelfPrivacy-REST-API -Dsonar.sources=. -Dsonar.host.url=http://analyzer.lan:9000 -Dsonar.login="$SONARQUBE_TOKEN"
environment:
SONARQUBE_TOKEN:
from_secret: SONARQUBE_TOKEN
- name: Run Bandit Checks
commands:
- bandit -ll -r selfprivacy_api
- name: formatting
- name: Run Code Formatting Checks
commands:
- black --check .
node:
server: builder

View File

@ -53,4 +53,6 @@ async def startup():
if __name__ == "__main__":
uvicorn.run("selfprivacy_api.app:app", host="0.0.0.0", port=5050, log_level="info")
uvicorn.run(
"selfprivacy_api.app:app", host="127.0.0.1", port=5050, log_level="info"
)

View File

@ -27,4 +27,4 @@ async def get_token_header(
def get_api_version() -> str:
"""Get API version"""
return "2.0.0"
return "2.0.9"

View File

@ -3,6 +3,7 @@
import strawberry
from selfprivacy_api.graphql.mutations.mutation_interface import GenericMutationReturn
from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.jobs import Jobs
@ -10,10 +11,10 @@ from selfprivacy_api.jobs import Jobs
class JobMutations:
"""Mutations related to jobs"""
@strawberry.mutation
@strawberry.mutation(permission_classes=[IsAuthenticated])
def remove_job(self, job_id: str) -> GenericMutationReturn:
"""Remove a job from the queue"""
result = Jobs().remove_by_uuid(job_id)
result = Jobs.get_instance().remove_by_uid(job_id)
if result:
return GenericMutationReturn(
success=True,

View File

@ -17,7 +17,10 @@ A job is a dictionary with the following keys:
import typing
import datetime
from uuid import UUID
import asyncio
import json
import os
import time
import uuid
from enum import Enum
@ -42,7 +45,7 @@ class Job(BaseModel):
Job class.
"""
uid: UUID = uuid.uuid4()
uid: UUID
type_id: str
name: str
description: str
@ -105,6 +108,7 @@ class Jobs:
Add a job to the jobs list.
"""
job = Job(
uid=uuid.uuid4(),
name=name,
type_id=type_id,
description=description,
@ -130,9 +134,9 @@ class Jobs:
"""
Remove a job from the jobs list.
"""
self.remove_by_uuid(str(job.uid))
self.remove_by_uid(str(job.uid))
def remove_by_uuid(self, job_uuid: str) -> bool:
def remove_by_uid(self, job_uuid: str) -> bool:
"""
Remove a job from the jobs list.
"""

View File

@ -65,14 +65,27 @@ def move_folder(
else:
return
data_path.mkdir(mode=0o750, parents=True, exist_ok=True)
try:
data_path.mkdir(mode=0o750, parents=True, exist_ok=True)
except Exception as e:
print(f"Error creating data path: {e}")
return
shutil.chown(str(bind_path), user=user, group=group)
shutil.chown(str(data_path), user=user, group=group)
try:
shutil.chown(str(bind_path), user=user, group=group)
shutil.chown(str(data_path), user=user, group=group)
except LookupError:
pass
subprocess.run(["mount", "--bind", str(bind_path), str(data_path)], check=True)
try:
subprocess.run(["mount", "--bind", str(bind_path), str(data_path)], check=True)
except subprocess.CalledProcessError as error:
print(error)
subprocess.run(["chown", "-R", f"{user}:{group}", str(data_path)], check=True)
try:
subprocess.run(["chown", "-R", f"{user}:{group}", str(data_path)], check=True)
except subprocess.CalledProcessError as error:
print(error)
@huey.task()
@ -157,12 +170,17 @@ def migrate_to_binds(config: BindMigrationConfig, job: Job):
Nextcloud().stop()
move_folder(
data_path=pathlib.Path("/var/lib/nextcloud"),
bind_path=pathlib.Path(f"/volumes/{config.nextcloud_block_device}/nextcloud"),
user="nextcloud",
group="nextcloud",
)
# If /volumes/sda1/nextcloud or /volumes/sdb/nextcloud exists, skip it.
if not pathlib.Path("/volumes/sda1/nextcloud").exists():
if not pathlib.Path("/volumes/sdb/nextcloud").exists():
move_folder(
data_path=pathlib.Path("/var/lib/nextcloud"),
bind_path=pathlib.Path(
f"/volumes/{config.nextcloud_block_device}/nextcloud"
),
user="nextcloud",
group="nextcloud",
)
# Start Nextcloud
Nextcloud().start()
@ -178,21 +196,27 @@ def migrate_to_binds(config: BindMigrationConfig, job: Job):
Bitwarden().stop()
move_folder(
data_path=pathlib.Path("/var/lib/bitwarden"),
bind_path=pathlib.Path(f"/volumes/{config.bitwarden_block_device}/bitwarden"),
user="vaultwarden",
group="vaultwarden",
)
if not pathlib.Path("/volumes/sda1/bitwarden").exists():
if not pathlib.Path("/volumes/sdb/bitwarden").exists():
move_folder(
data_path=pathlib.Path("/var/lib/bitwarden"),
bind_path=pathlib.Path(
f"/volumes/{config.bitwarden_block_device}/bitwarden"
),
user="vaultwarden",
group="vaultwarden",
)
move_folder(
data_path=pathlib.Path("/var/lib/bitwarden_rs"),
bind_path=pathlib.Path(
f"/volumes/{config.bitwarden_block_device}/bitwarden_rs"
),
user="vaultwarden",
group="vaultwarden",
)
if not pathlib.Path("/volumes/sda1/bitwarden_rs").exists():
if not pathlib.Path("/volumes/sdb/bitwarden_rs").exists():
move_folder(
data_path=pathlib.Path("/var/lib/bitwarden_rs"),
bind_path=pathlib.Path(
f"/volumes/{config.bitwarden_block_device}/bitwarden_rs"
),
user="vaultwarden",
group="vaultwarden",
)
# Start Bitwarden
Bitwarden().start()
@ -208,12 +232,14 @@ def migrate_to_binds(config: BindMigrationConfig, job: Job):
Gitea().stop()
move_folder(
data_path=pathlib.Path("/var/lib/gitea"),
bind_path=pathlib.Path(f"/volumes/{config.gitea_block_device}/gitea"),
user="gitea",
group="gitea",
)
if not pathlib.Path("/volumes/sda1/gitea").exists():
if not pathlib.Path("/volumes/sdb/gitea").exists():
move_folder(
data_path=pathlib.Path("/var/lib/gitea"),
bind_path=pathlib.Path(f"/volumes/{config.gitea_block_device}/gitea"),
user="gitea",
group="gitea",
)
Gitea().start()
@ -228,19 +254,23 @@ def migrate_to_binds(config: BindMigrationConfig, job: Job):
MailServer().stop()
move_folder(
data_path=pathlib.Path("/var/vmail"),
bind_path=pathlib.Path(f"/volumes/{config.email_block_device}/vmail"),
user="virtualMail",
group="virtualMail",
)
if not pathlib.Path("/volumes/sda1/vmail").exists():
if not pathlib.Path("/volumes/sdb/vmail").exists():
move_folder(
data_path=pathlib.Path("/var/vmail"),
bind_path=pathlib.Path(f"/volumes/{config.email_block_device}/vmail"),
user="virtualMail",
group="virtualMail",
)
move_folder(
data_path=pathlib.Path("/var/sieve"),
bind_path=pathlib.Path(f"/volumes/{config.email_block_device}/sieve"),
user="virtualMail",
group="virtualMail",
)
if not pathlib.Path("/volumes/sda1/sieve").exists():
if not pathlib.Path("/volumes/sdb/sieve").exists():
move_folder(
data_path=pathlib.Path("/var/sieve"),
bind_path=pathlib.Path(f"/volumes/{config.email_block_device}/sieve"),
user="virtualMail",
group="virtualMail",
)
MailServer().start()
@ -255,19 +285,27 @@ def migrate_to_binds(config: BindMigrationConfig, job: Job):
Pleroma().stop()
move_folder(
data_path=pathlib.Path("/var/lib/pleroma"),
bind_path=pathlib.Path(f"/volumes/{config.pleroma_block_device}/pleroma"),
user="pleroma",
group="pleroma",
)
if not pathlib.Path("/volumes/sda1/pleroma").exists():
if not pathlib.Path("/volumes/sdb/pleroma").exists():
move_folder(
data_path=pathlib.Path("/var/lib/pleroma"),
bind_path=pathlib.Path(
f"/volumes/{config.pleroma_block_device}/pleroma"
),
user="pleroma",
group="pleroma",
)
move_folder(
data_path=pathlib.Path("/var/lib/postgresql"),
bind_path=pathlib.Path(f"/volumes/{config.pleroma_block_device}/postgresql"),
user="postgres",
group="postgres",
)
if not pathlib.Path("/volumes/sda1/postgresql").exists():
if not pathlib.Path("/volumes/sdb/postgresql").exists():
move_folder(
data_path=pathlib.Path("/var/lib/postgresql"),
bind_path=pathlib.Path(
f"/volumes/{config.pleroma_block_device}/postgresql"
),
user="postgres",
group="postgres",
)
Pleroma().start()

View File

@ -8,6 +8,7 @@ at api.skippedMigrations in userdata.json and populating it
with IDs of the migrations to skip.
Adding DISABLE_ALL to that array disables the migrations module entirely.
"""
from selfprivacy_api.migrations.check_for_failed_binds_migration import CheckForFailedBindsMigration
from selfprivacy_api.utils import ReadUserData
from selfprivacy_api.migrations.fix_nixos_config_branch import FixNixosConfigBranch
from selfprivacy_api.migrations.create_tokens_json import CreateTokensJson
@ -21,6 +22,7 @@ migrations = [
CreateTokensJson(),
MigrateToSelfprivacyChannel(),
MountVolume(),
CheckForFailedBindsMigration(),
]

View File

@ -0,0 +1,48 @@
from selfprivacy_api.jobs import JobStatus, Jobs
from selfprivacy_api.migrations.migration import Migration
from selfprivacy_api.utils import WriteUserData
class CheckForFailedBindsMigration(Migration):
"""Mount volume."""
def get_migration_name(self):
return "check_for_failed_binds_migration"
def get_migration_description(self):
return "If binds migration failed, try again."
def is_migration_needed(self):
try:
jobs = Jobs.get_instance().get_jobs()
# If there is a job with type_id "migrations.migrate_to_binds" and status is not "FINISHED",
# then migration is needed and job is deleted
for job in jobs:
if (
job.type_id == "migrations.migrate_to_binds"
and job.status != JobStatus.FINISHED
):
return True
return False
except Exception as e:
print(e)
return False
def migrate(self):
# Get info about existing volumes
# Write info about volumes to userdata.json
try:
jobs = Jobs.get_instance().get_jobs()
for job in jobs:
if (
job.type_id == "migrations.migrate_to_binds"
and job.status != JobStatus.FINISHED
):
Jobs.get_instance().remove(job)
with WriteUserData() as userdata:
userdata["useBinds"] = False
print("Done")
except Exception as e:
print(e)
print("Error mounting volume")

View File

@ -61,7 +61,7 @@ def move_service(
)
return
# Make sure the volume is mounted
if f"/volumes/{volume.name}" not in volume.mountpoints:
if volume.name != "sda1" and f"/volumes/{volume.name}" not in volume.mountpoints:
Jobs.get_instance().update(
job=job,
status=JobStatus.ERROR,
@ -173,7 +173,7 @@ def move_service(
[
"chown",
"-R",
f"{folder.owner}:f{folder.group}",
f"{folder.owner}:{folder.group}",
f"/volumes/{volume.name}/{folder.name}",
],
check=True,
@ -185,7 +185,6 @@ def move_service(
status=JobStatus.RUNNING,
error=f"Unable to set ownership of new volume. {service_name} may not be able to access its files. Continuing anyway.",
)
return
# Mount new volume
Jobs.get_instance().update(
@ -232,6 +231,6 @@ def move_service(
job=job,
status=JobStatus.FINISHED,
result=f"{service_name} moved successfully.",
status_text=f"Starting {service}...",
status_text=f"Starting {service_name}...",
progress=100,
)

View File

@ -12,5 +12,10 @@ def get_storage_usage(path: str) -> int:
for iter_path in pathlib.Path(path).rglob("**/*"):
if iter_path.is_dir():
continue
storage_usage += iter_path.stat().st_size
try:
storage_usage += iter_path.stat().st_size
except FileNotFoundError:
pass
except Exception as error:
print(error)
return storage_usage

View File

@ -159,7 +159,7 @@ class Gitea(Service):
owner="gitea",
),
],
"bitwarden",
"gitea",
)
return job

View File

@ -1,598 +0,0 @@
import pytest
def get_service_by_id_return_none_mock():
return None
def get_service_by_id_mock():
return "nextcloud"
def service_to_graphql_service_mock():
pass
class BlockDevicesMock:
def get_block_device(self, name: str):
pass
class BlockDevicesReturnNoneMock:
def get_block_device(self, name: str):
return None
class NextcloudMock:
def __init__(self, args, **kwargs):
self.args = args
self.kwargs = kwargs
def enable(self):
pass
def disable(self):
pass
def stop(self):
pass
def is_movable(self):
return True
def move_to_volume(self):
pass
returncode = 0
class NextcloudReturnFalseMock:
def __init__(self, args, **kwargs):
self.args = args
self.kwargs = kwargs
def enable(self):
pass
def disable(self):
pass
def stop(self):
pass
def is_movable(self):
return False
def move_to_volume(self):
pass
returncode = 0
@pytest.fixture
def mock_service_to_graphql_service(mocker):
mock = mocker.patch(
"selfprivacy_api.graphql.common_types.service.service_to_graphql_service",
autospec=True,
return_value=service_to_graphql_service_mock,
)
return mock
@pytest.fixture
def mock_nextcloud(mocker):
mock = mocker.patch(
"selfprivacy_api.services.nextcloud.__init__.Nextcloud",
autospec=True,
return_value=NextcloudMock,
)
return mock
@pytest.fixture
def mock_block_devices_return_none(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevices",
autospec=True,
return_value=BlockDevicesReturnNoneMock,
)
return mock
@pytest.fixture
def mock_block_devices(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevices",
autospec=True,
return_value=BlockDevicesMock,
)
return mock
@pytest.fixture
def mock_nextcloud_return_false(mocker):
mock = mocker.patch(
"selfprivacy_api.services.nextcloud.__init__.Nextcloud",
autospec=True,
return_value=NextcloudReturnFalseMock,
)
return mock
@pytest.fixture
def mock_get_service_by_id_return_none(mocker):
mock = mocker.patch(
"selfprivacy_api.services.__init__.get_service_by_id",
autospec=True,
return_value=mock_get_service_by_id_return_none,
)
return mock
@pytest.fixture
def mock_get_service_by_id(mocker):
mock = mocker.patch(
"selfprivacy_api.services.__init__.get_service_by_id",
autospec=True,
return_value=mock_get_service_by_id,
)
return mock
####################################################################
API_ENABLE_SERVICE_MUTATION = """
mutation enableService($service_id: String!) {
enableService(service_id: $service_id) {
success
message
code
}
}
"""
def test_graphql_enable_service_unathorized_client(
client, mock_get_service_by_id_return_none, mock_nextcloud
):
response = client.post(
"/graphql",
json={
"query": API_ENABLE_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_enable_not_found_service(
authorized_client, mock_get_service_by_id_return_none, mock_nextcloud
):
response = authorized_client.post(
"/graphql",
json={
"query": API_ENABLE_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["enableService"]["code"] == 404
assert response.json()["data"]["enableService"]["message"] is not None
assert response.json()["data"]["enableService"]["success"] is False
def test_graphql_enable_service(
authorized_client, mock_get_service_by_id, mock_nextcloud
):
response = authorized_client.post(
"/graphql",
json={
"query": API_ENABLE_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["enableService"]["code"] == 200
assert response.json()["data"]["enableService"]["message"] is not None
assert response.json()["data"]["enableService"]["success"] is True
API_DISABLE_SERVICE_MUTATION = """
mutation disableService($service_id: String!) {
disableService(service_id: $service_id) {
success
message
code
}
}
"""
def test_graphql_disable_service_unathorized_client(
client, mock_get_service_by_id_return_none, mock_nextcloud
):
response = client.post(
"/graphql",
json={
"query": API_DISABLE_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_disable_not_found_service(
authorized_client, mock_get_service_by_id_return_none, mock_nextcloud
):
response = authorized_client.post(
"/graphql",
json={
"query": API_DISABLE_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["disableService"]["code"] == 404
assert response.json()["data"]["disableService"]["message"] is not None
assert response.json()["data"]["disableService"]["success"] is False
def test_graphql_disable_services(
authorized_client, mock_get_service_by_id, mock_nextcloud
):
response = authorized_client.post(
"/graphql",
json={
"query": API_DISABLE_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["disableService"]["code"] == 200
assert response.json()["data"]["disableService"]["message"] is not None
assert response.json()["data"]["disableService"]["success"] is True
API_STOP_SERVICE_MUTATION = """
mutation stopService($service_id: String!) {
stopService(service_id: $service_id) {
success
message
code
}
}
"""
def test_graphql_stop_service_unathorized_client(
client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = client.post(
"/graphql",
json={
"query": API_STOP_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_stop_not_found_service(
authorized_client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_STOP_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["stopService"]["code"] == 404
assert response.json()["data"]["stopService"]["message"] is not None
assert response.json()["data"]["stopService"]["success"] is False
def test_graphql_stop_services(
authorized_client,
mock_get_service_by_id,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_STOP_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["stopService"]["code"] == 200
assert response.json()["data"]["stopService"]["message"] is not None
assert response.json()["data"]["stopService"]["success"] is True
API_START_SERVICE_MUTATION = """
mutation startService($service_id: String!) {
startService(service_id: $service_id) {
success
message
code
}
}
"""
def test_graphql_start_service_unathorized_client(
client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = client.post(
"/graphql",
json={
"query": API_START_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_start_not_found_service(
authorized_client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_START_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["startService"]["code"] == 404
assert response.json()["data"]["startService"]["message"] is not None
assert response.json()["data"]["startService"]["success"] is False
def test_graphql_start_services(
authorized_client,
mock_get_service_by_id,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_START_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["startService"]["code"] == 200
assert response.json()["data"]["startService"]["message"] is not None
assert response.json()["data"]["startService"]["success"] is True
API_RESTART_SERVICE_MUTATION = """
mutation restartService($service_id: String!) {
restartService(service_id: $service_id) {
success
message
code
}
}
"""
def test_graphql_restart_service_unathorized_client(
client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = client.post(
"/graphql",
json={
"query": API_RESTART_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_restart_not_found_service(
authorized_client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_RESTART_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["restartService"]["code"] == 404
assert response.json()["data"]["restartService"]["message"] is not None
assert response.json()["data"]["restartService"]["success"] is False
def test_graphql_restart_service(
authorized_client,
mock_get_service_by_id,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_RESTART_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["restartService"]["code"] == 200
assert response.json()["data"]["restartService"]["message"] is not None
assert response.json()["data"]["restartService"]["success"] is True
API_MOVE_SERVICE_MUTATION = """
mutation moveService($input: MoveServiceInput!) {
moveService(input: $input) {
success
message
code
}
}
"""
def test_graphql_move_service_unathorized_client(
client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = client.post(
"/graphql",
json={
"query": API_MOVE_SERVICE_MUTATION,
"variables": {
"input": {"service_id": "nextcloud", "location": "sdx"},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_move_not_found_service(
authorized_client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOVE_SERVICE_MUTATION,
"variables": {
"input": {"service_id": "nextcloud", "location": "sdx"},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["moveService"]["code"] == 404
assert response.json()["data"]["moveService"]["message"] is not None
assert response.json()["data"]["moveService"]["success"] is False
def test_graphql_move_not_moveble_service(
authorized_client,
mock_get_service_by_id,
mock_nextcloud_return_false,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOVE_SERVICE_MUTATION,
"variables": {
"input": {"service_id": "nextcloud", "location": "sdx"},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["moveService"]["code"] == 400
assert response.json()["data"]["moveService"]["message"] is not None
assert response.json()["data"]["moveService"]["success"] is False
def test_graphql_move_service_volume_not_found(
authorized_client,
mock_get_service_by_id,
mock_nextcloud,
mock_service_to_graphql_service,
mock_block_devices_return_none,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOVE_SERVICE_MUTATION,
"variables": {
"input": {"service_id": "nextcloud", "location": "sdx"},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["moveService"]["code"] == 400
assert response.json()["data"]["moveService"]["message"] is not None
assert response.json()["data"]["moveService"]["success"] is False
def test_graphql_move_service(
authorized_client,
mock_get_service_by_id,
mock_nextcloud,
mock_service_to_graphql_service,
mock_block_devices,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOVE_SERVICE_MUTATION,
"variables": {
"input": {"service_id": "nextcloud", "location": "sdx"},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["moveService"]["code"] == 200
assert response.json()["data"]["moveService"]["message"] is not None
assert response.json()["data"]["moveService"]["success"] is True

View File

@ -1,342 +0,0 @@
import pytest
class BlockDeviceMockReturnNone:
"""Mock BlockDevices"""
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def mount(self):
return None
def unmount(self):
return None
def resize(self):
return None
returncode = 0
class BlockDeviceMockReturnTrue:
"""Mock BlockDevices"""
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def mount(self):
return True
def unmount(self):
return True
def resize(self):
return True
returncode = 0
class BlockDeviceMockReturnFalse:
"""Mock BlockDevices"""
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def mount(self):
return False
def unmount(self):
return False
def resize(self):
return False
returncode = 0
class BlockDevicesMockReturnTrue:
def get_block_device(name: str): # type: ignore
return BlockDeviceMockReturnTrue()
def __new__(cls, *args, **kwargs):
pass
def __init__(self):
pass
class BlockDevicesMockReturnNone:
def get_block_device(name: str): # type: ignore
return None
def __new__(cls, *args, **kwargs):
pass
def __init__(self):
pass
@pytest.fixture
def mock_block_devices_return_true(mocker):
mock = mocker.patch(
"selfprivacy_api.graphql.mutations.storage_mutations.BlockDevices",
# "selfprivacy_api.utils.block_devices.BlockDevices",
autospec=True,
return_value=BlockDevicesMockReturnTrue,
)
return mock
@pytest.fixture
def mock_block_devices_return_none(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevices",
autospec=True,
return_value=BlockDevicesMockReturnNone,
)
return mock
@pytest.fixture
def mock_block_device_return_none(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevice",
autospec=True,
return_value=BlockDeviceMockReturnNone,
)
return mock
@pytest.fixture
def mock_block_device_return_true(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevice",
autospec=True,
return_value=BlockDeviceMockReturnTrue,
)
return mock
@pytest.fixture
def mock_block_device_return_false(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevice",
autospec=True,
return_value=BlockDeviceMockReturnFalse,
)
return mock
API_RESIZE_VOLUME_MUTATION = """
mutation resizeVolume($name: String!) {
resizeVolume(name: $name) {
success
message
code
}
}
"""
def test_graphql_resize_volumea_unathorized_client(
client, mock_block_devices_return_true
):
response = client.post(
"/graphql",
json={
"query": API_RESIZE_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_resize_volume_nonexistent_block_device(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_RESIZE_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["resizeVolume"]["code"] == 404
assert response.json()["data"]["resizeVolume"]["message"] is not None
assert response.json()["data"]["resizeVolume"]["success"] is False
def test_graphql_resize_volume(authorized_client, mock_block_devices_return_true):
response = authorized_client.post(
"/graphql",
json={
"query": API_RESIZE_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["resizeVolume"]["code"] == 200
assert response.json()["data"]["resizeVolume"]["message"] is not None
assert response.json()["data"]["resizeVolume"]["success"] is True
API_MOUNT_VOLUME_MUTATION = """
mutation mountVolume($name: String!) {
mountVolume(name: $name) {
success
message
code
}
}
"""
def test_graphql_mount_volume_unathorized_client(client, mock_block_device_return_true):
response = client.post(
"/graphql",
json={
"query": API_MOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_mount_already_mounted_volume(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["mountVolume"]["code"] == 404
assert response.json()["data"]["mountVolume"]["message"] is not None
assert response.json()["data"]["mountVolume"]["success"] is False
def test_graphql_mount_not_found_volume(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["mountVolume"]["code"] == 404
assert response.json()["data"]["mountVolume"]["message"] is not None
assert response.json()["data"]["mountVolume"]["success"] is False
def test_graphql_mount_volume(authorized_client, mock_block_devices_return_true):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["mountVolume"]["code"] == 200
assert response.json()["data"]["mountVolume"]["message"] is not None
assert response.json()["data"]["mountVolume"]["success"] is True
API_UNMOUNT_VOLUME_MUTATION = """
mutation unmountVolume($name: String!) {
unmountVolume(name: $name) {
success
message
code
}
}
"""
def test_graphql_unmount_volume_unathorized_client(
client, mock_block_devices_return_true
):
response = client.post(
"/graphql",
json={
"query": API_UNMOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_unmount_not_fount_volume(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_UNMOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["unmountVolume"]["code"] == 404
assert response.json()["data"]["unmountVolume"]["message"] is not None
assert response.json()["data"]["unmountVolume"]["success"] is False
def test_graphql_unmount_volume_false(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_UNMOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["unmountVolume"]["code"] == 404
assert response.json()["data"]["unmountVolume"]["message"] is not None
assert response.json()["data"]["unmountVolume"]["success"] is False
def test_graphql_unmount_volume(authorized_client, mock_block_devices_return_true):
response = authorized_client.post(
"/graphql",
json={
"query": API_UNMOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["unmountVolume"]["code"] == 200
assert response.json()["data"]["unmountVolume"]["message"] is not None
assert response.json()["data"]["unmountVolume"]["success"] is True

View File

@ -1,129 +0,0 @@
import pytest
from selfprivacy_api.jobs.__init__ import Job
class ProcessMock:
"""Mock subprocess.Popen"""
def __init__(self, args, **kwargs):
self.args = args
self.kwargs = kwargs
def communicate(): # pylint: disable=no-method-argument
return (b"NEW_HASHED", None)
returncode = 0
class JobsMockReturnTrue:
def __init__(self):
pass
def remove_by_uuid(self, job_uuid: str):
return True
class JobsMockReturnFalse:
def __init__(self):
pass
def remove_by_uuid(self, job_uuid: str):
return False
@pytest.fixture
def mock_subprocess_popen(mocker):
mock = mocker.patch("subprocess.Popen", autospec=True, return_value=ProcessMock)
return mock
@pytest.fixture
def mock_jobs_return_true(mocker):
mock = mocker.patch(
"selfprivacy_api.jobs.__init__.Jobs",
autospec=True,
return_value=JobsMockReturnTrue,
)
return mock
@pytest.fixture
def mock_jobs_return_false(mocker):
mock = mocker.patch(
"selfprivacy_api.jobs.__init__.Jobs",
autospec=True,
return_value=JobsMockReturnTrue,
)
return mock
API_REMOVE_JOB_MUTATION = """
mutation removeJob($job: Job!) {
removeJob(job: $job) {
success
message
code
}
}
"""
def test_graphql_remove_job_unauthorized(
client, mock_subprocess_popen, mock_jobs_return_true
):
response = client.post(
"/graphql",
json={
"query": API_REMOVE_JOB_MUTATION,
"variables": {
"Job": {
"uid": "12345",
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_remove_job(
authorized_client, mock_subprocess_popen, mock_jobs_return_true
):
response = authorized_client.post(
"/graphql",
json={
"query": API_REMOVE_JOB_MUTATION,
"variables": {
"jobId": "12345",
},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["removeJob"]["code"] == 200
assert response.json()["data"]["removeJob"]["message"] is not None
assert response.json()["data"]["removeJob"]["success"] is True
def test_graphql_remove_job_not_found(
authorized_client, mock_subprocess_popen, mock_jobs_return_false
):
response = authorized_client.post(
"/graphql",
json={
"query": API_REMOVE_JOB_MUTATION,
"variables": {
"job_id": "3301",
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["removeJob"]["code"] == 404
assert response.json()["data"]["removeJob"]["message"] is not None
assert response.json()["data"]["removeJob"]["success"] is False