forked from SelfPrivacy/selfprivacy-rest-api
Compare commits
64 Commits
Author | SHA1 | Date |
---|---|---|
def | 3c63ab2657 | |
def | 8d21cdcb2b | |
def | 6fd63ac0ea | |
def | 1cb35ac48a | |
def | 4b9c8efc4b | |
def | ed8d9b4c8d | |
def | 39caea8b5c | |
def | 13368dcfe3 | |
def | 9ea6867edb | |
def | 8821ec834b | |
def | da92d6f835 | |
def | 6b557d8e63 | |
def | 241753b6f8 | |
def | 62672d133f | |
Inex Code | 15a900d009 | |
Inex Code | ba434f4fb5 | |
Inex Code | aafc77dce3 | |
Inex Code | 28c6d983b9 | |
Inex Code | ab9e8d81e5 | |
Inex Code | 8c878ea898 | |
Inex Code | cb5e04567d | |
Inex Code | 50a309e2a2 | |
Inex Code | 2e22ad7219 | |
Inex Code | bf1cd32895 | |
Inex Code | 3a5d4d5e86 | |
Inex Code | 07f4da2f23 | |
Inex Code | c92294350f | |
Inex Code | cd5ae80931 | |
Inex Code | 807df0c1cc | |
Inex Code | 130ab61d4b | |
Inex Code | bb99c2ba58 | |
Inex Code | a67a0b3de2 | |
Inex Code | 19168dfdaf | |
Inex Code | e5584e0e1c | |
Inex Code | 87c036de7f | |
Inex Code | 7fe51eb665 | |
Inex Code | 039dd2f80e | |
Inex Code | 64425ac443 | |
Inex Code | 5f34337fb4 | |
Inex Code | af902923ab | |
Inex Code | f940a23e7e | |
Inex Code | 1b1bb4966a | |
Inex Code | 69557fcf50 | |
Inex Code | d8a8b2ec29 | |
Inex Code | 79bc2668e1 | |
Inex Code | bb14adb8bc | |
Inex Code | f750056ad8 | |
Inex Code | 9abc11f187 | |
Inex Code | 19b5c06fc6 | |
Inex Code | 5e6c51a8bc | |
Inex Code | 1ed2b73ec8 | |
Inex Code | 1e901d1fcb | |
Inex Code | 00badfbbf8 | |
Inex Code | e7df559787 | |
Inex Code | 43675b2d1d | |
Inex Code | a96f6bd067 | |
Inex Code | dfd28ad0cd | |
Inex Code | 9132b70e70 | |
Inex Code | 5e62798fde | |
Inex Code | b965ffd96a | |
Inex Code | 8f940e64fd | |
Inex Code | 8ea0d89d71 | |
Inex Code | d8d3cd2068 | |
Inex Code | 52a58d94e7 |
19
.drone.yml
19
.drone.yml
|
@ -2,23 +2,18 @@ kind: pipeline
|
|||
type: exec
|
||||
name: default
|
||||
|
||||
platform:
|
||||
os: linux
|
||||
arch: amd64
|
||||
|
||||
steps:
|
||||
- name: Run Tests and Generate Coverage Report
|
||||
- name: test
|
||||
commands:
|
||||
- coverage run -m pytest -q
|
||||
- coverage xml
|
||||
- sonar-scanner -Dsonar.projectKey=SelfPrivacy-REST-API -Dsonar.sources=. -Dsonar.host.url=http://analyzer.lan:9000 -Dsonar.login="$SONARQUBE_TOKEN"
|
||||
environment:
|
||||
SONARQUBE_TOKEN:
|
||||
from_secret: SONARQUBE_TOKEN
|
||||
|
||||
- name: Run Bandit Checks
|
||||
- name: bandit
|
||||
commands:
|
||||
- bandit -ll -r selfprivacy_api
|
||||
|
||||
- name: Run Code Formatting Checks
|
||||
- name: formatting
|
||||
commands:
|
||||
- black --check .
|
||||
|
||||
node:
|
||||
server: builder
|
||||
|
|
|
@ -53,6 +53,4 @@ async def startup():
|
|||
|
||||
|
||||
if __name__ == "__main__":
|
||||
uvicorn.run(
|
||||
"selfprivacy_api.app:app", host="127.0.0.1", port=5050, log_level="info"
|
||||
)
|
||||
uvicorn.run("selfprivacy_api.app:app", host="0.0.0.0", port=5050, log_level="info")
|
||||
|
|
|
@ -27,4 +27,4 @@ async def get_token_header(
|
|||
|
||||
def get_api_version() -> str:
|
||||
"""Get API version"""
|
||||
return "2.0.9"
|
||||
return "2.0.0"
|
||||
|
|
|
@ -3,7 +3,6 @@
|
|||
import strawberry
|
||||
|
||||
from selfprivacy_api.graphql.mutations.mutation_interface import GenericMutationReturn
|
||||
from selfprivacy_api.graphql import IsAuthenticated
|
||||
from selfprivacy_api.jobs import Jobs
|
||||
|
||||
|
||||
|
@ -11,10 +10,10 @@ from selfprivacy_api.jobs import Jobs
|
|||
class JobMutations:
|
||||
"""Mutations related to jobs"""
|
||||
|
||||
@strawberry.mutation(permission_classes=[IsAuthenticated])
|
||||
@strawberry.mutation
|
||||
def remove_job(self, job_id: str) -> GenericMutationReturn:
|
||||
"""Remove a job from the queue"""
|
||||
result = Jobs.get_instance().remove_by_uid(job_id)
|
||||
result = Jobs().remove_by_uuid(job_id)
|
||||
if result:
|
||||
return GenericMutationReturn(
|
||||
success=True,
|
||||
|
|
|
@ -17,10 +17,7 @@ A job is a dictionary with the following keys:
|
|||
import typing
|
||||
import datetime
|
||||
from uuid import UUID
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
import uuid
|
||||
from enum import Enum
|
||||
|
||||
|
@ -45,7 +42,7 @@ class Job(BaseModel):
|
|||
Job class.
|
||||
"""
|
||||
|
||||
uid: UUID
|
||||
uid: UUID = uuid.uuid4()
|
||||
type_id: str
|
||||
name: str
|
||||
description: str
|
||||
|
@ -108,7 +105,6 @@ class Jobs:
|
|||
Add a job to the jobs list.
|
||||
"""
|
||||
job = Job(
|
||||
uid=uuid.uuid4(),
|
||||
name=name,
|
||||
type_id=type_id,
|
||||
description=description,
|
||||
|
@ -134,9 +130,9 @@ class Jobs:
|
|||
"""
|
||||
Remove a job from the jobs list.
|
||||
"""
|
||||
self.remove_by_uid(str(job.uid))
|
||||
self.remove_by_uuid(str(job.uid))
|
||||
|
||||
def remove_by_uid(self, job_uuid: str) -> bool:
|
||||
def remove_by_uuid(self, job_uuid: str) -> bool:
|
||||
"""
|
||||
Remove a job from the jobs list.
|
||||
"""
|
||||
|
|
|
@ -65,27 +65,14 @@ def move_folder(
|
|||
else:
|
||||
return
|
||||
|
||||
try:
|
||||
data_path.mkdir(mode=0o750, parents=True, exist_ok=True)
|
||||
except Exception as e:
|
||||
print(f"Error creating data path: {e}")
|
||||
return
|
||||
data_path.mkdir(mode=0o750, parents=True, exist_ok=True)
|
||||
|
||||
try:
|
||||
shutil.chown(str(bind_path), user=user, group=group)
|
||||
shutil.chown(str(data_path), user=user, group=group)
|
||||
except LookupError:
|
||||
pass
|
||||
shutil.chown(str(bind_path), user=user, group=group)
|
||||
shutil.chown(str(data_path), user=user, group=group)
|
||||
|
||||
try:
|
||||
subprocess.run(["mount", "--bind", str(bind_path), str(data_path)], check=True)
|
||||
except subprocess.CalledProcessError as error:
|
||||
print(error)
|
||||
subprocess.run(["mount", "--bind", str(bind_path), str(data_path)], check=True)
|
||||
|
||||
try:
|
||||
subprocess.run(["chown", "-R", f"{user}:{group}", str(data_path)], check=True)
|
||||
except subprocess.CalledProcessError as error:
|
||||
print(error)
|
||||
subprocess.run(["chown", "-R", f"{user}:{group}", str(data_path)], check=True)
|
||||
|
||||
|
||||
@huey.task()
|
||||
|
@ -170,17 +157,12 @@ def migrate_to_binds(config: BindMigrationConfig, job: Job):
|
|||
|
||||
Nextcloud().stop()
|
||||
|
||||
# If /volumes/sda1/nextcloud or /volumes/sdb/nextcloud exists, skip it.
|
||||
if not pathlib.Path("/volumes/sda1/nextcloud").exists():
|
||||
if not pathlib.Path("/volumes/sdb/nextcloud").exists():
|
||||
move_folder(
|
||||
data_path=pathlib.Path("/var/lib/nextcloud"),
|
||||
bind_path=pathlib.Path(
|
||||
f"/volumes/{config.nextcloud_block_device}/nextcloud"
|
||||
),
|
||||
user="nextcloud",
|
||||
group="nextcloud",
|
||||
)
|
||||
move_folder(
|
||||
data_path=pathlib.Path("/var/lib/nextcloud"),
|
||||
bind_path=pathlib.Path(f"/volumes/{config.nextcloud_block_device}/nextcloud"),
|
||||
user="nextcloud",
|
||||
group="nextcloud",
|
||||
)
|
||||
|
||||
# Start Nextcloud
|
||||
Nextcloud().start()
|
||||
|
@ -196,27 +178,21 @@ def migrate_to_binds(config: BindMigrationConfig, job: Job):
|
|||
|
||||
Bitwarden().stop()
|
||||
|
||||
if not pathlib.Path("/volumes/sda1/bitwarden").exists():
|
||||
if not pathlib.Path("/volumes/sdb/bitwarden").exists():
|
||||
move_folder(
|
||||
data_path=pathlib.Path("/var/lib/bitwarden"),
|
||||
bind_path=pathlib.Path(
|
||||
f"/volumes/{config.bitwarden_block_device}/bitwarden"
|
||||
),
|
||||
user="vaultwarden",
|
||||
group="vaultwarden",
|
||||
)
|
||||
move_folder(
|
||||
data_path=pathlib.Path("/var/lib/bitwarden"),
|
||||
bind_path=pathlib.Path(f"/volumes/{config.bitwarden_block_device}/bitwarden"),
|
||||
user="vaultwarden",
|
||||
group="vaultwarden",
|
||||
)
|
||||
|
||||
if not pathlib.Path("/volumes/sda1/bitwarden_rs").exists():
|
||||
if not pathlib.Path("/volumes/sdb/bitwarden_rs").exists():
|
||||
move_folder(
|
||||
data_path=pathlib.Path("/var/lib/bitwarden_rs"),
|
||||
bind_path=pathlib.Path(
|
||||
f"/volumes/{config.bitwarden_block_device}/bitwarden_rs"
|
||||
),
|
||||
user="vaultwarden",
|
||||
group="vaultwarden",
|
||||
)
|
||||
move_folder(
|
||||
data_path=pathlib.Path("/var/lib/bitwarden_rs"),
|
||||
bind_path=pathlib.Path(
|
||||
f"/volumes/{config.bitwarden_block_device}/bitwarden_rs"
|
||||
),
|
||||
user="vaultwarden",
|
||||
group="vaultwarden",
|
||||
)
|
||||
|
||||
# Start Bitwarden
|
||||
Bitwarden().start()
|
||||
|
@ -232,14 +208,12 @@ def migrate_to_binds(config: BindMigrationConfig, job: Job):
|
|||
|
||||
Gitea().stop()
|
||||
|
||||
if not pathlib.Path("/volumes/sda1/gitea").exists():
|
||||
if not pathlib.Path("/volumes/sdb/gitea").exists():
|
||||
move_folder(
|
||||
data_path=pathlib.Path("/var/lib/gitea"),
|
||||
bind_path=pathlib.Path(f"/volumes/{config.gitea_block_device}/gitea"),
|
||||
user="gitea",
|
||||
group="gitea",
|
||||
)
|
||||
move_folder(
|
||||
data_path=pathlib.Path("/var/lib/gitea"),
|
||||
bind_path=pathlib.Path(f"/volumes/{config.gitea_block_device}/gitea"),
|
||||
user="gitea",
|
||||
group="gitea",
|
||||
)
|
||||
|
||||
Gitea().start()
|
||||
|
||||
|
@ -254,23 +228,19 @@ def migrate_to_binds(config: BindMigrationConfig, job: Job):
|
|||
|
||||
MailServer().stop()
|
||||
|
||||
if not pathlib.Path("/volumes/sda1/vmail").exists():
|
||||
if not pathlib.Path("/volumes/sdb/vmail").exists():
|
||||
move_folder(
|
||||
data_path=pathlib.Path("/var/vmail"),
|
||||
bind_path=pathlib.Path(f"/volumes/{config.email_block_device}/vmail"),
|
||||
user="virtualMail",
|
||||
group="virtualMail",
|
||||
)
|
||||
move_folder(
|
||||
data_path=pathlib.Path("/var/vmail"),
|
||||
bind_path=pathlib.Path(f"/volumes/{config.email_block_device}/vmail"),
|
||||
user="virtualMail",
|
||||
group="virtualMail",
|
||||
)
|
||||
|
||||
if not pathlib.Path("/volumes/sda1/sieve").exists():
|
||||
if not pathlib.Path("/volumes/sdb/sieve").exists():
|
||||
move_folder(
|
||||
data_path=pathlib.Path("/var/sieve"),
|
||||
bind_path=pathlib.Path(f"/volumes/{config.email_block_device}/sieve"),
|
||||
user="virtualMail",
|
||||
group="virtualMail",
|
||||
)
|
||||
move_folder(
|
||||
data_path=pathlib.Path("/var/sieve"),
|
||||
bind_path=pathlib.Path(f"/volumes/{config.email_block_device}/sieve"),
|
||||
user="virtualMail",
|
||||
group="virtualMail",
|
||||
)
|
||||
|
||||
MailServer().start()
|
||||
|
||||
|
@ -285,27 +255,19 @@ def migrate_to_binds(config: BindMigrationConfig, job: Job):
|
|||
|
||||
Pleroma().stop()
|
||||
|
||||
if not pathlib.Path("/volumes/sda1/pleroma").exists():
|
||||
if not pathlib.Path("/volumes/sdb/pleroma").exists():
|
||||
move_folder(
|
||||
data_path=pathlib.Path("/var/lib/pleroma"),
|
||||
bind_path=pathlib.Path(
|
||||
f"/volumes/{config.pleroma_block_device}/pleroma"
|
||||
),
|
||||
user="pleroma",
|
||||
group="pleroma",
|
||||
)
|
||||
move_folder(
|
||||
data_path=pathlib.Path("/var/lib/pleroma"),
|
||||
bind_path=pathlib.Path(f"/volumes/{config.pleroma_block_device}/pleroma"),
|
||||
user="pleroma",
|
||||
group="pleroma",
|
||||
)
|
||||
|
||||
if not pathlib.Path("/volumes/sda1/postgresql").exists():
|
||||
if not pathlib.Path("/volumes/sdb/postgresql").exists():
|
||||
move_folder(
|
||||
data_path=pathlib.Path("/var/lib/postgresql"),
|
||||
bind_path=pathlib.Path(
|
||||
f"/volumes/{config.pleroma_block_device}/postgresql"
|
||||
),
|
||||
user="postgres",
|
||||
group="postgres",
|
||||
)
|
||||
move_folder(
|
||||
data_path=pathlib.Path("/var/lib/postgresql"),
|
||||
bind_path=pathlib.Path(f"/volumes/{config.pleroma_block_device}/postgresql"),
|
||||
user="postgres",
|
||||
group="postgres",
|
||||
)
|
||||
|
||||
Pleroma().start()
|
||||
|
||||
|
|
|
@ -8,7 +8,6 @@ at api.skippedMigrations in userdata.json and populating it
|
|||
with IDs of the migrations to skip.
|
||||
Adding DISABLE_ALL to that array disables the migrations module entirely.
|
||||
"""
|
||||
from selfprivacy_api.migrations.check_for_failed_binds_migration import CheckForFailedBindsMigration
|
||||
from selfprivacy_api.utils import ReadUserData
|
||||
from selfprivacy_api.migrations.fix_nixos_config_branch import FixNixosConfigBranch
|
||||
from selfprivacy_api.migrations.create_tokens_json import CreateTokensJson
|
||||
|
@ -22,7 +21,6 @@ migrations = [
|
|||
CreateTokensJson(),
|
||||
MigrateToSelfprivacyChannel(),
|
||||
MountVolume(),
|
||||
CheckForFailedBindsMigration(),
|
||||
]
|
||||
|
||||
|
||||
|
|
|
@ -1,48 +0,0 @@
|
|||
from selfprivacy_api.jobs import JobStatus, Jobs
|
||||
|
||||
from selfprivacy_api.migrations.migration import Migration
|
||||
from selfprivacy_api.utils import WriteUserData
|
||||
|
||||
|
||||
class CheckForFailedBindsMigration(Migration):
|
||||
"""Mount volume."""
|
||||
|
||||
def get_migration_name(self):
|
||||
return "check_for_failed_binds_migration"
|
||||
|
||||
def get_migration_description(self):
|
||||
return "If binds migration failed, try again."
|
||||
|
||||
def is_migration_needed(self):
|
||||
try:
|
||||
jobs = Jobs.get_instance().get_jobs()
|
||||
# If there is a job with type_id "migrations.migrate_to_binds" and status is not "FINISHED",
|
||||
# then migration is needed and job is deleted
|
||||
for job in jobs:
|
||||
if (
|
||||
job.type_id == "migrations.migrate_to_binds"
|
||||
and job.status != JobStatus.FINISHED
|
||||
):
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return False
|
||||
|
||||
def migrate(self):
|
||||
# Get info about existing volumes
|
||||
# Write info about volumes to userdata.json
|
||||
try:
|
||||
jobs = Jobs.get_instance().get_jobs()
|
||||
for job in jobs:
|
||||
if (
|
||||
job.type_id == "migrations.migrate_to_binds"
|
||||
and job.status != JobStatus.FINISHED
|
||||
):
|
||||
Jobs.get_instance().remove(job)
|
||||
with WriteUserData() as userdata:
|
||||
userdata["useBinds"] = False
|
||||
print("Done")
|
||||
except Exception as e:
|
||||
print(e)
|
||||
print("Error mounting volume")
|
|
@ -61,7 +61,7 @@ def move_service(
|
|||
)
|
||||
return
|
||||
# Make sure the volume is mounted
|
||||
if volume.name != "sda1" and f"/volumes/{volume.name}" not in volume.mountpoints:
|
||||
if f"/volumes/{volume.name}" not in volume.mountpoints:
|
||||
Jobs.get_instance().update(
|
||||
job=job,
|
||||
status=JobStatus.ERROR,
|
||||
|
@ -173,7 +173,7 @@ def move_service(
|
|||
[
|
||||
"chown",
|
||||
"-R",
|
||||
f"{folder.owner}:{folder.group}",
|
||||
f"{folder.owner}:f{folder.group}",
|
||||
f"/volumes/{volume.name}/{folder.name}",
|
||||
],
|
||||
check=True,
|
||||
|
@ -185,6 +185,7 @@ def move_service(
|
|||
status=JobStatus.RUNNING,
|
||||
error=f"Unable to set ownership of new volume. {service_name} may not be able to access its files. Continuing anyway.",
|
||||
)
|
||||
return
|
||||
|
||||
# Mount new volume
|
||||
Jobs.get_instance().update(
|
||||
|
@ -231,6 +232,6 @@ def move_service(
|
|||
job=job,
|
||||
status=JobStatus.FINISHED,
|
||||
result=f"{service_name} moved successfully.",
|
||||
status_text=f"Starting {service_name}...",
|
||||
status_text=f"Starting {service}...",
|
||||
progress=100,
|
||||
)
|
||||
|
|
|
@ -12,10 +12,5 @@ def get_storage_usage(path: str) -> int:
|
|||
for iter_path in pathlib.Path(path).rglob("**/*"):
|
||||
if iter_path.is_dir():
|
||||
continue
|
||||
try:
|
||||
storage_usage += iter_path.stat().st_size
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
except Exception as error:
|
||||
print(error)
|
||||
storage_usage += iter_path.stat().st_size
|
||||
return storage_usage
|
||||
|
|
|
@ -159,7 +159,7 @@ class Gitea(Service):
|
|||
owner="gitea",
|
||||
),
|
||||
],
|
||||
"gitea",
|
||||
"bitwarden",
|
||||
)
|
||||
|
||||
return job
|
||||
|
|
|
@ -0,0 +1,598 @@
|
|||
import pytest
|
||||
|
||||
|
||||
def get_service_by_id_return_none_mock():
|
||||
return None
|
||||
|
||||
|
||||
def get_service_by_id_mock():
|
||||
return "nextcloud"
|
||||
|
||||
|
||||
def service_to_graphql_service_mock():
|
||||
pass
|
||||
|
||||
|
||||
class BlockDevicesMock:
|
||||
def get_block_device(self, name: str):
|
||||
pass
|
||||
|
||||
|
||||
class BlockDevicesReturnNoneMock:
|
||||
def get_block_device(self, name: str):
|
||||
return None
|
||||
|
||||
|
||||
class NextcloudMock:
|
||||
def __init__(self, args, **kwargs):
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
|
||||
def enable(self):
|
||||
pass
|
||||
|
||||
def disable(self):
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
pass
|
||||
|
||||
def is_movable(self):
|
||||
return True
|
||||
|
||||
def move_to_volume(self):
|
||||
pass
|
||||
|
||||
returncode = 0
|
||||
|
||||
|
||||
class NextcloudReturnFalseMock:
|
||||
def __init__(self, args, **kwargs):
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
|
||||
def enable(self):
|
||||
pass
|
||||
|
||||
def disable(self):
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
pass
|
||||
|
||||
def is_movable(self):
|
||||
return False
|
||||
|
||||
def move_to_volume(self):
|
||||
pass
|
||||
|
||||
returncode = 0
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_service_to_graphql_service(mocker):
|
||||
mock = mocker.patch(
|
||||
"selfprivacy_api.graphql.common_types.service.service_to_graphql_service",
|
||||
autospec=True,
|
||||
return_value=service_to_graphql_service_mock,
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_nextcloud(mocker):
|
||||
mock = mocker.patch(
|
||||
"selfprivacy_api.services.nextcloud.__init__.Nextcloud",
|
||||
autospec=True,
|
||||
return_value=NextcloudMock,
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_block_devices_return_none(mocker):
|
||||
mock = mocker.patch(
|
||||
"selfprivacy_api.utils.block_devices.BlockDevices",
|
||||
autospec=True,
|
||||
return_value=BlockDevicesReturnNoneMock,
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_block_devices(mocker):
|
||||
mock = mocker.patch(
|
||||
"selfprivacy_api.utils.block_devices.BlockDevices",
|
||||
autospec=True,
|
||||
return_value=BlockDevicesMock,
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_nextcloud_return_false(mocker):
|
||||
mock = mocker.patch(
|
||||
"selfprivacy_api.services.nextcloud.__init__.Nextcloud",
|
||||
autospec=True,
|
||||
return_value=NextcloudReturnFalseMock,
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_get_service_by_id_return_none(mocker):
|
||||
mock = mocker.patch(
|
||||
"selfprivacy_api.services.__init__.get_service_by_id",
|
||||
autospec=True,
|
||||
return_value=mock_get_service_by_id_return_none,
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_get_service_by_id(mocker):
|
||||
mock = mocker.patch(
|
||||
"selfprivacy_api.services.__init__.get_service_by_id",
|
||||
autospec=True,
|
||||
return_value=mock_get_service_by_id,
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
####################################################################
|
||||
|
||||
|
||||
API_ENABLE_SERVICE_MUTATION = """
|
||||
mutation enableService($service_id: String!) {
|
||||
enableService(service_id: $service_id) {
|
||||
success
|
||||
message
|
||||
code
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
def test_graphql_enable_service_unathorized_client(
|
||||
client, mock_get_service_by_id_return_none, mock_nextcloud
|
||||
):
|
||||
response = client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_ENABLE_SERVICE_MUTATION,
|
||||
"variables": {"service_id": "nextcloud"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is None
|
||||
|
||||
|
||||
def test_graphql_enable_not_found_service(
|
||||
authorized_client, mock_get_service_by_id_return_none, mock_nextcloud
|
||||
):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_ENABLE_SERVICE_MUTATION,
|
||||
"variables": {"service_id": "nextcloud"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["enableService"]["code"] == 404
|
||||
assert response.json()["data"]["enableService"]["message"] is not None
|
||||
assert response.json()["data"]["enableService"]["success"] is False
|
||||
|
||||
|
||||
def test_graphql_enable_service(
|
||||
authorized_client, mock_get_service_by_id, mock_nextcloud
|
||||
):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_ENABLE_SERVICE_MUTATION,
|
||||
"variables": {"service_id": "nextcloud"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["enableService"]["code"] == 200
|
||||
assert response.json()["data"]["enableService"]["message"] is not None
|
||||
assert response.json()["data"]["enableService"]["success"] is True
|
||||
|
||||
|
||||
API_DISABLE_SERVICE_MUTATION = """
|
||||
mutation disableService($service_id: String!) {
|
||||
disableService(service_id: $service_id) {
|
||||
success
|
||||
message
|
||||
code
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
def test_graphql_disable_service_unathorized_client(
|
||||
client, mock_get_service_by_id_return_none, mock_nextcloud
|
||||
):
|
||||
response = client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_DISABLE_SERVICE_MUTATION,
|
||||
"variables": {"service_id": "nextcloud"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is None
|
||||
|
||||
|
||||
def test_graphql_disable_not_found_service(
|
||||
authorized_client, mock_get_service_by_id_return_none, mock_nextcloud
|
||||
):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_DISABLE_SERVICE_MUTATION,
|
||||
"variables": {"service_id": "nextcloud"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["disableService"]["code"] == 404
|
||||
assert response.json()["data"]["disableService"]["message"] is not None
|
||||
assert response.json()["data"]["disableService"]["success"] is False
|
||||
|
||||
|
||||
def test_graphql_disable_services(
|
||||
authorized_client, mock_get_service_by_id, mock_nextcloud
|
||||
):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_DISABLE_SERVICE_MUTATION,
|
||||
"variables": {"service_id": "nextcloud"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["disableService"]["code"] == 200
|
||||
assert response.json()["data"]["disableService"]["message"] is not None
|
||||
assert response.json()["data"]["disableService"]["success"] is True
|
||||
|
||||
|
||||
API_STOP_SERVICE_MUTATION = """
|
||||
mutation stopService($service_id: String!) {
|
||||
stopService(service_id: $service_id) {
|
||||
success
|
||||
message
|
||||
code
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
def test_graphql_stop_service_unathorized_client(
|
||||
client,
|
||||
mock_get_service_by_id_return_none,
|
||||
mock_nextcloud,
|
||||
mock_service_to_graphql_service,
|
||||
):
|
||||
response = client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_STOP_SERVICE_MUTATION,
|
||||
"variables": {"service_id": "nextcloud"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is None
|
||||
|
||||
|
||||
def test_graphql_stop_not_found_service(
|
||||
authorized_client,
|
||||
mock_get_service_by_id_return_none,
|
||||
mock_nextcloud,
|
||||
mock_service_to_graphql_service,
|
||||
):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_STOP_SERVICE_MUTATION,
|
||||
"variables": {"service_id": "nextcloud"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["stopService"]["code"] == 404
|
||||
assert response.json()["data"]["stopService"]["message"] is not None
|
||||
assert response.json()["data"]["stopService"]["success"] is False
|
||||
|
||||
|
||||
def test_graphql_stop_services(
|
||||
authorized_client,
|
||||
mock_get_service_by_id,
|
||||
mock_nextcloud,
|
||||
mock_service_to_graphql_service,
|
||||
):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_STOP_SERVICE_MUTATION,
|
||||
"variables": {"service_id": "nextcloud"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["stopService"]["code"] == 200
|
||||
assert response.json()["data"]["stopService"]["message"] is not None
|
||||
assert response.json()["data"]["stopService"]["success"] is True
|
||||
|
||||
|
||||
API_START_SERVICE_MUTATION = """
|
||||
mutation startService($service_id: String!) {
|
||||
startService(service_id: $service_id) {
|
||||
success
|
||||
message
|
||||
code
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
def test_graphql_start_service_unathorized_client(
|
||||
client,
|
||||
mock_get_service_by_id_return_none,
|
||||
mock_nextcloud,
|
||||
mock_service_to_graphql_service,
|
||||
):
|
||||
response = client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_START_SERVICE_MUTATION,
|
||||
"variables": {"service_id": "nextcloud"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is None
|
||||
|
||||
|
||||
def test_graphql_start_not_found_service(
|
||||
authorized_client,
|
||||
mock_get_service_by_id_return_none,
|
||||
mock_nextcloud,
|
||||
mock_service_to_graphql_service,
|
||||
):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_START_SERVICE_MUTATION,
|
||||
"variables": {"service_id": "nextcloud"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["startService"]["code"] == 404
|
||||
assert response.json()["data"]["startService"]["message"] is not None
|
||||
assert response.json()["data"]["startService"]["success"] is False
|
||||
|
||||
|
||||
def test_graphql_start_services(
|
||||
authorized_client,
|
||||
mock_get_service_by_id,
|
||||
mock_nextcloud,
|
||||
mock_service_to_graphql_service,
|
||||
):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_START_SERVICE_MUTATION,
|
||||
"variables": {"service_id": "nextcloud"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["startService"]["code"] == 200
|
||||
assert response.json()["data"]["startService"]["message"] is not None
|
||||
assert response.json()["data"]["startService"]["success"] is True
|
||||
|
||||
|
||||
API_RESTART_SERVICE_MUTATION = """
|
||||
mutation restartService($service_id: String!) {
|
||||
restartService(service_id: $service_id) {
|
||||
success
|
||||
message
|
||||
code
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
def test_graphql_restart_service_unathorized_client(
|
||||
client,
|
||||
mock_get_service_by_id_return_none,
|
||||
mock_nextcloud,
|
||||
mock_service_to_graphql_service,
|
||||
):
|
||||
response = client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_RESTART_SERVICE_MUTATION,
|
||||
"variables": {"service_id": "nextcloud"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is None
|
||||
|
||||
|
||||
def test_graphql_restart_not_found_service(
|
||||
authorized_client,
|
||||
mock_get_service_by_id_return_none,
|
||||
mock_nextcloud,
|
||||
mock_service_to_graphql_service,
|
||||
):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_RESTART_SERVICE_MUTATION,
|
||||
"variables": {"service_id": "nextcloud"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["restartService"]["code"] == 404
|
||||
assert response.json()["data"]["restartService"]["message"] is not None
|
||||
assert response.json()["data"]["restartService"]["success"] is False
|
||||
|
||||
|
||||
def test_graphql_restart_service(
|
||||
authorized_client,
|
||||
mock_get_service_by_id,
|
||||
mock_nextcloud,
|
||||
mock_service_to_graphql_service,
|
||||
):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_RESTART_SERVICE_MUTATION,
|
||||
"variables": {"service_id": "nextcloud"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["restartService"]["code"] == 200
|
||||
assert response.json()["data"]["restartService"]["message"] is not None
|
||||
assert response.json()["data"]["restartService"]["success"] is True
|
||||
|
||||
|
||||
API_MOVE_SERVICE_MUTATION = """
|
||||
mutation moveService($input: MoveServiceInput!) {
|
||||
moveService(input: $input) {
|
||||
success
|
||||
message
|
||||
code
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
def test_graphql_move_service_unathorized_client(
|
||||
client,
|
||||
mock_get_service_by_id_return_none,
|
||||
mock_nextcloud,
|
||||
mock_service_to_graphql_service,
|
||||
):
|
||||
response = client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_MOVE_SERVICE_MUTATION,
|
||||
"variables": {
|
||||
"input": {"service_id": "nextcloud", "location": "sdx"},
|
||||
},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is None
|
||||
|
||||
|
||||
def test_graphql_move_not_found_service(
|
||||
authorized_client,
|
||||
mock_get_service_by_id_return_none,
|
||||
mock_nextcloud,
|
||||
mock_service_to_graphql_service,
|
||||
):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_MOVE_SERVICE_MUTATION,
|
||||
"variables": {
|
||||
"input": {"service_id": "nextcloud", "location": "sdx"},
|
||||
},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["moveService"]["code"] == 404
|
||||
assert response.json()["data"]["moveService"]["message"] is not None
|
||||
assert response.json()["data"]["moveService"]["success"] is False
|
||||
|
||||
|
||||
def test_graphql_move_not_moveble_service(
|
||||
authorized_client,
|
||||
mock_get_service_by_id,
|
||||
mock_nextcloud_return_false,
|
||||
mock_service_to_graphql_service,
|
||||
):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_MOVE_SERVICE_MUTATION,
|
||||
"variables": {
|
||||
"input": {"service_id": "nextcloud", "location": "sdx"},
|
||||
},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["moveService"]["code"] == 400
|
||||
assert response.json()["data"]["moveService"]["message"] is not None
|
||||
assert response.json()["data"]["moveService"]["success"] is False
|
||||
|
||||
|
||||
def test_graphql_move_service_volume_not_found(
|
||||
authorized_client,
|
||||
mock_get_service_by_id,
|
||||
mock_nextcloud,
|
||||
mock_service_to_graphql_service,
|
||||
mock_block_devices_return_none,
|
||||
):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_MOVE_SERVICE_MUTATION,
|
||||
"variables": {
|
||||
"input": {"service_id": "nextcloud", "location": "sdx"},
|
||||
},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["moveService"]["code"] == 400
|
||||
assert response.json()["data"]["moveService"]["message"] is not None
|
||||
assert response.json()["data"]["moveService"]["success"] is False
|
||||
|
||||
|
||||
def test_graphql_move_service(
|
||||
authorized_client,
|
||||
mock_get_service_by_id,
|
||||
mock_nextcloud,
|
||||
mock_service_to_graphql_service,
|
||||
mock_block_devices,
|
||||
):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_MOVE_SERVICE_MUTATION,
|
||||
"variables": {
|
||||
"input": {"service_id": "nextcloud", "location": "sdx"},
|
||||
},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["moveService"]["code"] == 200
|
||||
assert response.json()["data"]["moveService"]["message"] is not None
|
||||
assert response.json()["data"]["moveService"]["success"] is True
|
|
@ -0,0 +1,342 @@
|
|||
import pytest
|
||||
|
||||
|
||||
class BlockDeviceMockReturnNone:
|
||||
"""Mock BlockDevices"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
|
||||
def mount(self):
|
||||
return None
|
||||
|
||||
def unmount(self):
|
||||
return None
|
||||
|
||||
def resize(self):
|
||||
return None
|
||||
|
||||
returncode = 0
|
||||
|
||||
|
||||
class BlockDeviceMockReturnTrue:
|
||||
"""Mock BlockDevices"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
|
||||
def mount(self):
|
||||
return True
|
||||
|
||||
def unmount(self):
|
||||
return True
|
||||
|
||||
def resize(self):
|
||||
return True
|
||||
|
||||
returncode = 0
|
||||
|
||||
|
||||
class BlockDeviceMockReturnFalse:
|
||||
"""Mock BlockDevices"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
|
||||
def mount(self):
|
||||
return False
|
||||
|
||||
def unmount(self):
|
||||
return False
|
||||
|
||||
def resize(self):
|
||||
return False
|
||||
|
||||
returncode = 0
|
||||
|
||||
|
||||
class BlockDevicesMockReturnTrue:
|
||||
def get_block_device(name: str): # type: ignore
|
||||
return BlockDeviceMockReturnTrue()
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
|
||||
class BlockDevicesMockReturnNone:
|
||||
def get_block_device(name: str): # type: ignore
|
||||
return None
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_block_devices_return_true(mocker):
|
||||
mock = mocker.patch(
|
||||
"selfprivacy_api.graphql.mutations.storage_mutations.BlockDevices",
|
||||
# "selfprivacy_api.utils.block_devices.BlockDevices",
|
||||
autospec=True,
|
||||
return_value=BlockDevicesMockReturnTrue,
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_block_devices_return_none(mocker):
|
||||
mock = mocker.patch(
|
||||
"selfprivacy_api.utils.block_devices.BlockDevices",
|
||||
autospec=True,
|
||||
return_value=BlockDevicesMockReturnNone,
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_block_device_return_none(mocker):
|
||||
mock = mocker.patch(
|
||||
"selfprivacy_api.utils.block_devices.BlockDevice",
|
||||
autospec=True,
|
||||
return_value=BlockDeviceMockReturnNone,
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_block_device_return_true(mocker):
|
||||
mock = mocker.patch(
|
||||
"selfprivacy_api.utils.block_devices.BlockDevice",
|
||||
autospec=True,
|
||||
return_value=BlockDeviceMockReturnTrue,
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_block_device_return_false(mocker):
|
||||
mock = mocker.patch(
|
||||
"selfprivacy_api.utils.block_devices.BlockDevice",
|
||||
autospec=True,
|
||||
return_value=BlockDeviceMockReturnFalse,
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
API_RESIZE_VOLUME_MUTATION = """
|
||||
mutation resizeVolume($name: String!) {
|
||||
resizeVolume(name: $name) {
|
||||
success
|
||||
message
|
||||
code
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
def test_graphql_resize_volumea_unathorized_client(
|
||||
client, mock_block_devices_return_true
|
||||
):
|
||||
response = client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_RESIZE_VOLUME_MUTATION,
|
||||
"variables": {"name": "sdx"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is None
|
||||
|
||||
|
||||
def test_graphql_resize_volume_nonexistent_block_device(
|
||||
authorized_client, mock_block_devices_return_none
|
||||
):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_RESIZE_VOLUME_MUTATION,
|
||||
"variables": {"name": "sdx"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["resizeVolume"]["code"] == 404
|
||||
assert response.json()["data"]["resizeVolume"]["message"] is not None
|
||||
assert response.json()["data"]["resizeVolume"]["success"] is False
|
||||
|
||||
|
||||
def test_graphql_resize_volume(authorized_client, mock_block_devices_return_true):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_RESIZE_VOLUME_MUTATION,
|
||||
"variables": {"name": "sdx"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["resizeVolume"]["code"] == 200
|
||||
assert response.json()["data"]["resizeVolume"]["message"] is not None
|
||||
assert response.json()["data"]["resizeVolume"]["success"] is True
|
||||
|
||||
|
||||
API_MOUNT_VOLUME_MUTATION = """
|
||||
mutation mountVolume($name: String!) {
|
||||
mountVolume(name: $name) {
|
||||
success
|
||||
message
|
||||
code
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
def test_graphql_mount_volume_unathorized_client(client, mock_block_device_return_true):
|
||||
response = client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_MOUNT_VOLUME_MUTATION,
|
||||
"variables": {"name": "sdx"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is None
|
||||
|
||||
|
||||
def test_graphql_mount_already_mounted_volume(
|
||||
authorized_client, mock_block_devices_return_none
|
||||
):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_MOUNT_VOLUME_MUTATION,
|
||||
"variables": {"name": "sdx"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["mountVolume"]["code"] == 404
|
||||
assert response.json()["data"]["mountVolume"]["message"] is not None
|
||||
assert response.json()["data"]["mountVolume"]["success"] is False
|
||||
|
||||
|
||||
def test_graphql_mount_not_found_volume(
|
||||
authorized_client, mock_block_devices_return_none
|
||||
):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_MOUNT_VOLUME_MUTATION,
|
||||
"variables": {"name": "sdx"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["mountVolume"]["code"] == 404
|
||||
assert response.json()["data"]["mountVolume"]["message"] is not None
|
||||
assert response.json()["data"]["mountVolume"]["success"] is False
|
||||
|
||||
|
||||
def test_graphql_mount_volume(authorized_client, mock_block_devices_return_true):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_MOUNT_VOLUME_MUTATION,
|
||||
"variables": {"name": "sdx"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["mountVolume"]["code"] == 200
|
||||
assert response.json()["data"]["mountVolume"]["message"] is not None
|
||||
assert response.json()["data"]["mountVolume"]["success"] is True
|
||||
|
||||
|
||||
API_UNMOUNT_VOLUME_MUTATION = """
|
||||
mutation unmountVolume($name: String!) {
|
||||
unmountVolume(name: $name) {
|
||||
success
|
||||
message
|
||||
code
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
def test_graphql_unmount_volume_unathorized_client(
|
||||
client, mock_block_devices_return_true
|
||||
):
|
||||
response = client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_UNMOUNT_VOLUME_MUTATION,
|
||||
"variables": {"name": "sdx"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is None
|
||||
|
||||
|
||||
def test_graphql_unmount_not_fount_volume(
|
||||
authorized_client, mock_block_devices_return_none
|
||||
):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_UNMOUNT_VOLUME_MUTATION,
|
||||
"variables": {"name": "sdx"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["unmountVolume"]["code"] == 404
|
||||
assert response.json()["data"]["unmountVolume"]["message"] is not None
|
||||
assert response.json()["data"]["unmountVolume"]["success"] is False
|
||||
|
||||
|
||||
def test_graphql_unmount_volume_false(
|
||||
authorized_client, mock_block_devices_return_none
|
||||
):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_UNMOUNT_VOLUME_MUTATION,
|
||||
"variables": {"name": "sdx"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["unmountVolume"]["code"] == 404
|
||||
assert response.json()["data"]["unmountVolume"]["message"] is not None
|
||||
assert response.json()["data"]["unmountVolume"]["success"] is False
|
||||
|
||||
|
||||
def test_graphql_unmount_volume(authorized_client, mock_block_devices_return_true):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_UNMOUNT_VOLUME_MUTATION,
|
||||
"variables": {"name": "sdx"},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["unmountVolume"]["code"] == 200
|
||||
assert response.json()["data"]["unmountVolume"]["message"] is not None
|
||||
assert response.json()["data"]["unmountVolume"]["success"] is True
|
|
@ -0,0 +1,129 @@
|
|||
import pytest
|
||||
from selfprivacy_api.jobs.__init__ import Job
|
||||
|
||||
|
||||
class ProcessMock:
|
||||
"""Mock subprocess.Popen"""
|
||||
|
||||
def __init__(self, args, **kwargs):
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
|
||||
def communicate(): # pylint: disable=no-method-argument
|
||||
return (b"NEW_HASHED", None)
|
||||
|
||||
returncode = 0
|
||||
|
||||
|
||||
class JobsMockReturnTrue:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def remove_by_uuid(self, job_uuid: str):
|
||||
return True
|
||||
|
||||
|
||||
class JobsMockReturnFalse:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def remove_by_uuid(self, job_uuid: str):
|
||||
return False
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_subprocess_popen(mocker):
|
||||
mock = mocker.patch("subprocess.Popen", autospec=True, return_value=ProcessMock)
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_jobs_return_true(mocker):
|
||||
mock = mocker.patch(
|
||||
"selfprivacy_api.jobs.__init__.Jobs",
|
||||
autospec=True,
|
||||
return_value=JobsMockReturnTrue,
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_jobs_return_false(mocker):
|
||||
mock = mocker.patch(
|
||||
"selfprivacy_api.jobs.__init__.Jobs",
|
||||
autospec=True,
|
||||
return_value=JobsMockReturnTrue,
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
API_REMOVE_JOB_MUTATION = """
|
||||
mutation removeJob($job: Job!) {
|
||||
removeJob(job: $job) {
|
||||
success
|
||||
message
|
||||
code
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
def test_graphql_remove_job_unauthorized(
|
||||
client, mock_subprocess_popen, mock_jobs_return_true
|
||||
):
|
||||
response = client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_REMOVE_JOB_MUTATION,
|
||||
"variables": {
|
||||
"Job": {
|
||||
"uid": "12345",
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is None
|
||||
|
||||
|
||||
def test_graphql_remove_job(
|
||||
authorized_client, mock_subprocess_popen, mock_jobs_return_true
|
||||
):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_REMOVE_JOB_MUTATION,
|
||||
"variables": {
|
||||
"jobId": "12345",
|
||||
},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is None
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["removeJob"]["code"] == 200
|
||||
assert response.json()["data"]["removeJob"]["message"] is not None
|
||||
assert response.json()["data"]["removeJob"]["success"] is True
|
||||
|
||||
|
||||
def test_graphql_remove_job_not_found(
|
||||
authorized_client, mock_subprocess_popen, mock_jobs_return_false
|
||||
):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_REMOVE_JOB_MUTATION,
|
||||
"variables": {
|
||||
"job_id": "3301",
|
||||
},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["removeJob"]["code"] == 404
|
||||
assert response.json()["data"]["removeJob"]["message"] is not None
|
||||
assert response.json()["data"]["removeJob"]["success"] is False
|
Loading…
Reference in New Issue