Compare commits

..

64 Commits

Author SHA1 Message Date
def 3c63ab2657 fix tests 2022-10-12 18:07:39 +04:00
def 8d21cdcb2b update tests 2022-09-10 23:50:29 +02:00
def 6fd63ac0ea fix some tests 2022-09-08 12:55:34 +02:00
def 1cb35ac48a fix job tests 2022-09-01 21:04:48 +02:00
def 4b9c8efc4b add job tests 2022-09-01 20:46:48 +02:00
def ed8d9b4c8d add more services tests 2022-08-31 23:56:26 +02:00
def 39caea8b5c add service tests 2022-08-31 07:58:55 +02:00
def 13368dcfe3 update storage tests 2022-08-30 02:45:50 +02:00
def 9ea6867edb add return_value 2022-08-27 18:05:19 +02:00
def 8821ec834b fix mock BlockDevicesMockReturnTrue 2022-08-27 17:18:23 +02:00
def da92d6f835 add more tests 2022-08-27 17:10:36 +02:00
def 6b557d8e63 fix tests 2022-08-26 05:07:55 +02:00
def 241753b6f8 fix one 2022-08-25 17:41:31 +02:00
def 62672d133f try to mock 2022-08-25 17:37:32 +02:00
Inex Code 15a900d009 Add Jitsi to services 2022-08-24 03:05:06 +04:00
Inex Code ba434f4fb5 Allow using query params to fetch GraphQL 2022-08-23 00:06:12 +04:00
Inex Code aafc77dce3 Add vscode launch dotflile 2022-08-22 23:49:30 +04:00
Inex Code 28c6d983b9 Fix ws auth 2022-08-22 23:49:14 +04:00
Inex Code ab9e8d81e5 Add auth to service mutations 2022-08-22 23:32:37 +04:00
Inex Code 8c878ea898 Add service mutation endpoints to GraphQL 2022-08-22 22:28:12 +04:00
Inex Code cb5e04567d Add job mutations to GraphQL schema 2022-08-22 21:48:50 +04:00
Inex Code 50a309e2a2 Add remove job by uid endpoint 2022-08-22 21:45:00 +04:00
Inex Code 2e22ad7219 Implement Pull repository changes in GraphQL 2022-08-20 22:50:42 +04:00
Inex Code bf1cd32895 Change the ServiceStatus to match systemctl show 2022-08-20 22:50:25 +04:00
Inex Code 3a5d4d5e86 Implement DNS records getter 2022-08-20 22:49:51 +04:00
Inex Code 07f4da2f23 Reorganize tests 2022-08-20 22:48:44 +04:00
Inex Code c92294350f Make Huey run immediately when testing 2022-08-20 22:47:32 +04:00
Inex Code cd5ae80931 Fix postgresql not being migrated during migration to binds 2022-08-20 22:46:39 +04:00
Inex Code 807df0c1cc Fix Bind migration check 2022-08-19 05:48:26 +04:00
Inex Code 130ab61d4b Register volume migration mutation 2022-08-18 03:35:06 +04:00
Inex Code bb99c2ba58 Fix graphql field 2022-08-18 03:27:10 +04:00
Inex Code a67a0b3de2 Fix type cast 2022-08-18 01:43:02 +04:00
Inex Code 19168dfdaf Add jobs mocking to tests explicitly 2022-08-18 01:38:38 +04:00
Inex Code e5584e0e1c use exists() instead of isFile 2022-08-18 01:13:06 +04:00
Inex Code 87c036de7f Add GraphQL endpoints related to binds 2022-08-18 00:58:56 +04:00
Inex Code 7fe51eb665 Fix update function 2022-08-16 01:44:22 +04:00
Inex Code 039dd2f80e Fix jobs file 2022-08-16 01:31:24 +04:00
Inex Code 64425ac443 fix uid compare 2022-08-15 23:48:43 +04:00
Inex Code 5f34337fb4 Serialize custom types 2022-08-15 23:37:34 +04:00
Inex Code af902923ab replace uuid with str 2022-08-15 23:12:50 +04:00
Inex Code f940a23e7e Make sure jobs file exists on read 2022-08-15 22:51:01 +04:00
Inex Code 1b1bb4966a Use jobs file to transfer data between threads 2022-08-15 22:37:02 +04:00
Inex Code 69557fcf50 add task registry 2022-08-13 03:59:48 +04:00
Inex Code d8a8b2ec29 fixes 2022-08-13 03:39:37 +04:00
Inex Code 79bc2668e1 fix 2022-08-13 02:32:08 +04:00
Inex Code bb14adb8bc Remove infinite recursion 2022-08-13 02:26:47 +04:00
Inex Code f750056ad8 fix circular import 2022-08-13 02:18:13 +04:00
Inex Code 9abc11f187 Add services to volumes 2022-08-13 02:12:28 +04:00
Inex Code 19b5c06fc6 Fix mountpoints bug 2022-08-13 01:52:36 +04:00
Inex Code 5e6c51a8bc add shell module 2022-08-13 01:42:25 +04:00
Inex Code 1ed2b73ec8 Make icons python modules 2022-08-13 01:42:11 +04:00
Inex Code 1e901d1fcb add get working directory endpoint 2022-08-13 01:34:48 +04:00
Inex Code 00badfbbf8 Add some services endpoints 2022-08-13 01:29:18 +04:00
Inex Code e7df559787 Change uvicorn run expression 2022-08-12 22:04:20 +04:00
Inex Code 43675b2d1d Programmatical uvicorn start 2022-08-12 17:43:04 +04:00
Inex Code a96f6bd067 LInting 2022-08-11 23:11:00 +04:00
Inex Code dfd28ad0cd Move to fastapi 2022-08-11 03:36:36 +04:00
Inex Code 9132b70e70 Fix mountpoints 2022-08-03 14:06:18 +03:00
Inex Code 5e62798fde Test 2022-08-02 23:30:03 +03:00
Inex Code b965ffd96a Trying out 2022-08-02 23:12:48 +03:00
Inex Code 8f940e64fd uh 2022-08-02 23:08:32 +03:00
Inex Code 8ea0d89d71 Fix 2022-08-02 22:58:39 +03:00
Inex Code d8d3cd2068 Register subscription 2022-08-02 22:53:35 +03:00
Inex Code 52a58d94e7 Test subscription 2022-08-02 22:50:16 +03:00
14 changed files with 1144 additions and 179 deletions

View File

@ -2,23 +2,18 @@ kind: pipeline
type: exec type: exec
name: default name: default
platform:
os: linux
arch: amd64
steps: steps:
- name: Run Tests and Generate Coverage Report - name: test
commands: commands:
- coverage run -m pytest -q - coverage run -m pytest -q
- coverage xml - coverage xml
- sonar-scanner -Dsonar.projectKey=SelfPrivacy-REST-API -Dsonar.sources=. -Dsonar.host.url=http://analyzer.lan:9000 -Dsonar.login="$SONARQUBE_TOKEN" - name: bandit
environment:
SONARQUBE_TOKEN:
from_secret: SONARQUBE_TOKEN
- name: Run Bandit Checks
commands: commands:
- bandit -ll -r selfprivacy_api - bandit -ll -r selfprivacy_api
- name: formatting
- name: Run Code Formatting Checks
commands: commands:
- black --check . - black --check .
node:
server: builder

View File

@ -53,6 +53,4 @@ async def startup():
if __name__ == "__main__": if __name__ == "__main__":
uvicorn.run( uvicorn.run("selfprivacy_api.app:app", host="0.0.0.0", port=5050, log_level="info")
"selfprivacy_api.app:app", host="127.0.0.1", port=5050, log_level="info"
)

View File

@ -27,4 +27,4 @@ async def get_token_header(
def get_api_version() -> str: def get_api_version() -> str:
"""Get API version""" """Get API version"""
return "2.0.9" return "2.0.0"

View File

@ -3,7 +3,6 @@
import strawberry import strawberry
from selfprivacy_api.graphql.mutations.mutation_interface import GenericMutationReturn from selfprivacy_api.graphql.mutations.mutation_interface import GenericMutationReturn
from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.jobs import Jobs from selfprivacy_api.jobs import Jobs
@ -11,10 +10,10 @@ from selfprivacy_api.jobs import Jobs
class JobMutations: class JobMutations:
"""Mutations related to jobs""" """Mutations related to jobs"""
@strawberry.mutation(permission_classes=[IsAuthenticated]) @strawberry.mutation
def remove_job(self, job_id: str) -> GenericMutationReturn: def remove_job(self, job_id: str) -> GenericMutationReturn:
"""Remove a job from the queue""" """Remove a job from the queue"""
result = Jobs.get_instance().remove_by_uid(job_id) result = Jobs().remove_by_uuid(job_id)
if result: if result:
return GenericMutationReturn( return GenericMutationReturn(
success=True, success=True,

View File

@ -17,10 +17,7 @@ A job is a dictionary with the following keys:
import typing import typing
import datetime import datetime
from uuid import UUID from uuid import UUID
import asyncio
import json import json
import os
import time
import uuid import uuid
from enum import Enum from enum import Enum
@ -45,7 +42,7 @@ class Job(BaseModel):
Job class. Job class.
""" """
uid: UUID uid: UUID = uuid.uuid4()
type_id: str type_id: str
name: str name: str
description: str description: str
@ -108,7 +105,6 @@ class Jobs:
Add a job to the jobs list. Add a job to the jobs list.
""" """
job = Job( job = Job(
uid=uuid.uuid4(),
name=name, name=name,
type_id=type_id, type_id=type_id,
description=description, description=description,
@ -134,9 +130,9 @@ class Jobs:
""" """
Remove a job from the jobs list. Remove a job from the jobs list.
""" """
self.remove_by_uid(str(job.uid)) self.remove_by_uuid(str(job.uid))
def remove_by_uid(self, job_uuid: str) -> bool: def remove_by_uuid(self, job_uuid: str) -> bool:
""" """
Remove a job from the jobs list. Remove a job from the jobs list.
""" """

View File

@ -65,27 +65,14 @@ def move_folder(
else: else:
return return
try: data_path.mkdir(mode=0o750, parents=True, exist_ok=True)
data_path.mkdir(mode=0o750, parents=True, exist_ok=True)
except Exception as e:
print(f"Error creating data path: {e}")
return
try: shutil.chown(str(bind_path), user=user, group=group)
shutil.chown(str(bind_path), user=user, group=group) shutil.chown(str(data_path), user=user, group=group)
shutil.chown(str(data_path), user=user, group=group)
except LookupError:
pass
try: subprocess.run(["mount", "--bind", str(bind_path), str(data_path)], check=True)
subprocess.run(["mount", "--bind", str(bind_path), str(data_path)], check=True)
except subprocess.CalledProcessError as error:
print(error)
try: subprocess.run(["chown", "-R", f"{user}:{group}", str(data_path)], check=True)
subprocess.run(["chown", "-R", f"{user}:{group}", str(data_path)], check=True)
except subprocess.CalledProcessError as error:
print(error)
@huey.task() @huey.task()
@ -170,17 +157,12 @@ def migrate_to_binds(config: BindMigrationConfig, job: Job):
Nextcloud().stop() Nextcloud().stop()
# If /volumes/sda1/nextcloud or /volumes/sdb/nextcloud exists, skip it. move_folder(
if not pathlib.Path("/volumes/sda1/nextcloud").exists(): data_path=pathlib.Path("/var/lib/nextcloud"),
if not pathlib.Path("/volumes/sdb/nextcloud").exists(): bind_path=pathlib.Path(f"/volumes/{config.nextcloud_block_device}/nextcloud"),
move_folder( user="nextcloud",
data_path=pathlib.Path("/var/lib/nextcloud"), group="nextcloud",
bind_path=pathlib.Path( )
f"/volumes/{config.nextcloud_block_device}/nextcloud"
),
user="nextcloud",
group="nextcloud",
)
# Start Nextcloud # Start Nextcloud
Nextcloud().start() Nextcloud().start()
@ -196,27 +178,21 @@ def migrate_to_binds(config: BindMigrationConfig, job: Job):
Bitwarden().stop() Bitwarden().stop()
if not pathlib.Path("/volumes/sda1/bitwarden").exists(): move_folder(
if not pathlib.Path("/volumes/sdb/bitwarden").exists(): data_path=pathlib.Path("/var/lib/bitwarden"),
move_folder( bind_path=pathlib.Path(f"/volumes/{config.bitwarden_block_device}/bitwarden"),
data_path=pathlib.Path("/var/lib/bitwarden"), user="vaultwarden",
bind_path=pathlib.Path( group="vaultwarden",
f"/volumes/{config.bitwarden_block_device}/bitwarden" )
),
user="vaultwarden",
group="vaultwarden",
)
if not pathlib.Path("/volumes/sda1/bitwarden_rs").exists(): move_folder(
if not pathlib.Path("/volumes/sdb/bitwarden_rs").exists(): data_path=pathlib.Path("/var/lib/bitwarden_rs"),
move_folder( bind_path=pathlib.Path(
data_path=pathlib.Path("/var/lib/bitwarden_rs"), f"/volumes/{config.bitwarden_block_device}/bitwarden_rs"
bind_path=pathlib.Path( ),
f"/volumes/{config.bitwarden_block_device}/bitwarden_rs" user="vaultwarden",
), group="vaultwarden",
user="vaultwarden", )
group="vaultwarden",
)
# Start Bitwarden # Start Bitwarden
Bitwarden().start() Bitwarden().start()
@ -232,14 +208,12 @@ def migrate_to_binds(config: BindMigrationConfig, job: Job):
Gitea().stop() Gitea().stop()
if not pathlib.Path("/volumes/sda1/gitea").exists(): move_folder(
if not pathlib.Path("/volumes/sdb/gitea").exists(): data_path=pathlib.Path("/var/lib/gitea"),
move_folder( bind_path=pathlib.Path(f"/volumes/{config.gitea_block_device}/gitea"),
data_path=pathlib.Path("/var/lib/gitea"), user="gitea",
bind_path=pathlib.Path(f"/volumes/{config.gitea_block_device}/gitea"), group="gitea",
user="gitea", )
group="gitea",
)
Gitea().start() Gitea().start()
@ -254,23 +228,19 @@ def migrate_to_binds(config: BindMigrationConfig, job: Job):
MailServer().stop() MailServer().stop()
if not pathlib.Path("/volumes/sda1/vmail").exists(): move_folder(
if not pathlib.Path("/volumes/sdb/vmail").exists(): data_path=pathlib.Path("/var/vmail"),
move_folder( bind_path=pathlib.Path(f"/volumes/{config.email_block_device}/vmail"),
data_path=pathlib.Path("/var/vmail"), user="virtualMail",
bind_path=pathlib.Path(f"/volumes/{config.email_block_device}/vmail"), group="virtualMail",
user="virtualMail", )
group="virtualMail",
)
if not pathlib.Path("/volumes/sda1/sieve").exists(): move_folder(
if not pathlib.Path("/volumes/sdb/sieve").exists(): data_path=pathlib.Path("/var/sieve"),
move_folder( bind_path=pathlib.Path(f"/volumes/{config.email_block_device}/sieve"),
data_path=pathlib.Path("/var/sieve"), user="virtualMail",
bind_path=pathlib.Path(f"/volumes/{config.email_block_device}/sieve"), group="virtualMail",
user="virtualMail", )
group="virtualMail",
)
MailServer().start() MailServer().start()
@ -285,27 +255,19 @@ def migrate_to_binds(config: BindMigrationConfig, job: Job):
Pleroma().stop() Pleroma().stop()
if not pathlib.Path("/volumes/sda1/pleroma").exists(): move_folder(
if not pathlib.Path("/volumes/sdb/pleroma").exists(): data_path=pathlib.Path("/var/lib/pleroma"),
move_folder( bind_path=pathlib.Path(f"/volumes/{config.pleroma_block_device}/pleroma"),
data_path=pathlib.Path("/var/lib/pleroma"), user="pleroma",
bind_path=pathlib.Path( group="pleroma",
f"/volumes/{config.pleroma_block_device}/pleroma" )
),
user="pleroma",
group="pleroma",
)
if not pathlib.Path("/volumes/sda1/postgresql").exists(): move_folder(
if not pathlib.Path("/volumes/sdb/postgresql").exists(): data_path=pathlib.Path("/var/lib/postgresql"),
move_folder( bind_path=pathlib.Path(f"/volumes/{config.pleroma_block_device}/postgresql"),
data_path=pathlib.Path("/var/lib/postgresql"), user="postgres",
bind_path=pathlib.Path( group="postgres",
f"/volumes/{config.pleroma_block_device}/postgresql" )
),
user="postgres",
group="postgres",
)
Pleroma().start() Pleroma().start()

View File

@ -8,7 +8,6 @@ at api.skippedMigrations in userdata.json and populating it
with IDs of the migrations to skip. with IDs of the migrations to skip.
Adding DISABLE_ALL to that array disables the migrations module entirely. Adding DISABLE_ALL to that array disables the migrations module entirely.
""" """
from selfprivacy_api.migrations.check_for_failed_binds_migration import CheckForFailedBindsMigration
from selfprivacy_api.utils import ReadUserData from selfprivacy_api.utils import ReadUserData
from selfprivacy_api.migrations.fix_nixos_config_branch import FixNixosConfigBranch from selfprivacy_api.migrations.fix_nixos_config_branch import FixNixosConfigBranch
from selfprivacy_api.migrations.create_tokens_json import CreateTokensJson from selfprivacy_api.migrations.create_tokens_json import CreateTokensJson
@ -22,7 +21,6 @@ migrations = [
CreateTokensJson(), CreateTokensJson(),
MigrateToSelfprivacyChannel(), MigrateToSelfprivacyChannel(),
MountVolume(), MountVolume(),
CheckForFailedBindsMigration(),
] ]

View File

@ -1,48 +0,0 @@
from selfprivacy_api.jobs import JobStatus, Jobs
from selfprivacy_api.migrations.migration import Migration
from selfprivacy_api.utils import WriteUserData
class CheckForFailedBindsMigration(Migration):
"""Mount volume."""
def get_migration_name(self):
return "check_for_failed_binds_migration"
def get_migration_description(self):
return "If binds migration failed, try again."
def is_migration_needed(self):
try:
jobs = Jobs.get_instance().get_jobs()
# If there is a job with type_id "migrations.migrate_to_binds" and status is not "FINISHED",
# then migration is needed and job is deleted
for job in jobs:
if (
job.type_id == "migrations.migrate_to_binds"
and job.status != JobStatus.FINISHED
):
return True
return False
except Exception as e:
print(e)
return False
def migrate(self):
# Get info about existing volumes
# Write info about volumes to userdata.json
try:
jobs = Jobs.get_instance().get_jobs()
for job in jobs:
if (
job.type_id == "migrations.migrate_to_binds"
and job.status != JobStatus.FINISHED
):
Jobs.get_instance().remove(job)
with WriteUserData() as userdata:
userdata["useBinds"] = False
print("Done")
except Exception as e:
print(e)
print("Error mounting volume")

View File

@ -61,7 +61,7 @@ def move_service(
) )
return return
# Make sure the volume is mounted # Make sure the volume is mounted
if volume.name != "sda1" and f"/volumes/{volume.name}" not in volume.mountpoints: if f"/volumes/{volume.name}" not in volume.mountpoints:
Jobs.get_instance().update( Jobs.get_instance().update(
job=job, job=job,
status=JobStatus.ERROR, status=JobStatus.ERROR,
@ -173,7 +173,7 @@ def move_service(
[ [
"chown", "chown",
"-R", "-R",
f"{folder.owner}:{folder.group}", f"{folder.owner}:f{folder.group}",
f"/volumes/{volume.name}/{folder.name}", f"/volumes/{volume.name}/{folder.name}",
], ],
check=True, check=True,
@ -185,6 +185,7 @@ def move_service(
status=JobStatus.RUNNING, status=JobStatus.RUNNING,
error=f"Unable to set ownership of new volume. {service_name} may not be able to access its files. Continuing anyway.", error=f"Unable to set ownership of new volume. {service_name} may not be able to access its files. Continuing anyway.",
) )
return
# Mount new volume # Mount new volume
Jobs.get_instance().update( Jobs.get_instance().update(
@ -231,6 +232,6 @@ def move_service(
job=job, job=job,
status=JobStatus.FINISHED, status=JobStatus.FINISHED,
result=f"{service_name} moved successfully.", result=f"{service_name} moved successfully.",
status_text=f"Starting {service_name}...", status_text=f"Starting {service}...",
progress=100, progress=100,
) )

View File

@ -12,10 +12,5 @@ def get_storage_usage(path: str) -> int:
for iter_path in pathlib.Path(path).rglob("**/*"): for iter_path in pathlib.Path(path).rglob("**/*"):
if iter_path.is_dir(): if iter_path.is_dir():
continue continue
try: storage_usage += iter_path.stat().st_size
storage_usage += iter_path.stat().st_size
except FileNotFoundError:
pass
except Exception as error:
print(error)
return storage_usage return storage_usage

View File

@ -159,7 +159,7 @@ class Gitea(Service):
owner="gitea", owner="gitea",
), ),
], ],
"gitea", "bitwarden",
) )
return job return job

View File

@ -0,0 +1,598 @@
import pytest
def get_service_by_id_return_none_mock():
return None
def get_service_by_id_mock():
return "nextcloud"
def service_to_graphql_service_mock():
pass
class BlockDevicesMock:
def get_block_device(self, name: str):
pass
class BlockDevicesReturnNoneMock:
def get_block_device(self, name: str):
return None
class NextcloudMock:
def __init__(self, args, **kwargs):
self.args = args
self.kwargs = kwargs
def enable(self):
pass
def disable(self):
pass
def stop(self):
pass
def is_movable(self):
return True
def move_to_volume(self):
pass
returncode = 0
class NextcloudReturnFalseMock:
def __init__(self, args, **kwargs):
self.args = args
self.kwargs = kwargs
def enable(self):
pass
def disable(self):
pass
def stop(self):
pass
def is_movable(self):
return False
def move_to_volume(self):
pass
returncode = 0
@pytest.fixture
def mock_service_to_graphql_service(mocker):
mock = mocker.patch(
"selfprivacy_api.graphql.common_types.service.service_to_graphql_service",
autospec=True,
return_value=service_to_graphql_service_mock,
)
return mock
@pytest.fixture
def mock_nextcloud(mocker):
mock = mocker.patch(
"selfprivacy_api.services.nextcloud.__init__.Nextcloud",
autospec=True,
return_value=NextcloudMock,
)
return mock
@pytest.fixture
def mock_block_devices_return_none(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevices",
autospec=True,
return_value=BlockDevicesReturnNoneMock,
)
return mock
@pytest.fixture
def mock_block_devices(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevices",
autospec=True,
return_value=BlockDevicesMock,
)
return mock
@pytest.fixture
def mock_nextcloud_return_false(mocker):
mock = mocker.patch(
"selfprivacy_api.services.nextcloud.__init__.Nextcloud",
autospec=True,
return_value=NextcloudReturnFalseMock,
)
return mock
@pytest.fixture
def mock_get_service_by_id_return_none(mocker):
mock = mocker.patch(
"selfprivacy_api.services.__init__.get_service_by_id",
autospec=True,
return_value=mock_get_service_by_id_return_none,
)
return mock
@pytest.fixture
def mock_get_service_by_id(mocker):
mock = mocker.patch(
"selfprivacy_api.services.__init__.get_service_by_id",
autospec=True,
return_value=mock_get_service_by_id,
)
return mock
####################################################################
API_ENABLE_SERVICE_MUTATION = """
mutation enableService($service_id: String!) {
enableService(service_id: $service_id) {
success
message
code
}
}
"""
def test_graphql_enable_service_unathorized_client(
client, mock_get_service_by_id_return_none, mock_nextcloud
):
response = client.post(
"/graphql",
json={
"query": API_ENABLE_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_enable_not_found_service(
authorized_client, mock_get_service_by_id_return_none, mock_nextcloud
):
response = authorized_client.post(
"/graphql",
json={
"query": API_ENABLE_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["enableService"]["code"] == 404
assert response.json()["data"]["enableService"]["message"] is not None
assert response.json()["data"]["enableService"]["success"] is False
def test_graphql_enable_service(
authorized_client, mock_get_service_by_id, mock_nextcloud
):
response = authorized_client.post(
"/graphql",
json={
"query": API_ENABLE_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["enableService"]["code"] == 200
assert response.json()["data"]["enableService"]["message"] is not None
assert response.json()["data"]["enableService"]["success"] is True
API_DISABLE_SERVICE_MUTATION = """
mutation disableService($service_id: String!) {
disableService(service_id: $service_id) {
success
message
code
}
}
"""
def test_graphql_disable_service_unathorized_client(
client, mock_get_service_by_id_return_none, mock_nextcloud
):
response = client.post(
"/graphql",
json={
"query": API_DISABLE_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_disable_not_found_service(
authorized_client, mock_get_service_by_id_return_none, mock_nextcloud
):
response = authorized_client.post(
"/graphql",
json={
"query": API_DISABLE_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["disableService"]["code"] == 404
assert response.json()["data"]["disableService"]["message"] is not None
assert response.json()["data"]["disableService"]["success"] is False
def test_graphql_disable_services(
authorized_client, mock_get_service_by_id, mock_nextcloud
):
response = authorized_client.post(
"/graphql",
json={
"query": API_DISABLE_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["disableService"]["code"] == 200
assert response.json()["data"]["disableService"]["message"] is not None
assert response.json()["data"]["disableService"]["success"] is True
API_STOP_SERVICE_MUTATION = """
mutation stopService($service_id: String!) {
stopService(service_id: $service_id) {
success
message
code
}
}
"""
def test_graphql_stop_service_unathorized_client(
client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = client.post(
"/graphql",
json={
"query": API_STOP_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_stop_not_found_service(
authorized_client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_STOP_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["stopService"]["code"] == 404
assert response.json()["data"]["stopService"]["message"] is not None
assert response.json()["data"]["stopService"]["success"] is False
def test_graphql_stop_services(
authorized_client,
mock_get_service_by_id,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_STOP_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["stopService"]["code"] == 200
assert response.json()["data"]["stopService"]["message"] is not None
assert response.json()["data"]["stopService"]["success"] is True
API_START_SERVICE_MUTATION = """
mutation startService($service_id: String!) {
startService(service_id: $service_id) {
success
message
code
}
}
"""
def test_graphql_start_service_unathorized_client(
client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = client.post(
"/graphql",
json={
"query": API_START_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_start_not_found_service(
authorized_client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_START_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["startService"]["code"] == 404
assert response.json()["data"]["startService"]["message"] is not None
assert response.json()["data"]["startService"]["success"] is False
def test_graphql_start_services(
authorized_client,
mock_get_service_by_id,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_START_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["startService"]["code"] == 200
assert response.json()["data"]["startService"]["message"] is not None
assert response.json()["data"]["startService"]["success"] is True
API_RESTART_SERVICE_MUTATION = """
mutation restartService($service_id: String!) {
restartService(service_id: $service_id) {
success
message
code
}
}
"""
def test_graphql_restart_service_unathorized_client(
client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = client.post(
"/graphql",
json={
"query": API_RESTART_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_restart_not_found_service(
authorized_client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_RESTART_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["restartService"]["code"] == 404
assert response.json()["data"]["restartService"]["message"] is not None
assert response.json()["data"]["restartService"]["success"] is False
def test_graphql_restart_service(
authorized_client,
mock_get_service_by_id,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_RESTART_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["restartService"]["code"] == 200
assert response.json()["data"]["restartService"]["message"] is not None
assert response.json()["data"]["restartService"]["success"] is True
API_MOVE_SERVICE_MUTATION = """
mutation moveService($input: MoveServiceInput!) {
moveService(input: $input) {
success
message
code
}
}
"""
def test_graphql_move_service_unathorized_client(
client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = client.post(
"/graphql",
json={
"query": API_MOVE_SERVICE_MUTATION,
"variables": {
"input": {"service_id": "nextcloud", "location": "sdx"},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_move_not_found_service(
authorized_client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOVE_SERVICE_MUTATION,
"variables": {
"input": {"service_id": "nextcloud", "location": "sdx"},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["moveService"]["code"] == 404
assert response.json()["data"]["moveService"]["message"] is not None
assert response.json()["data"]["moveService"]["success"] is False
def test_graphql_move_not_moveble_service(
authorized_client,
mock_get_service_by_id,
mock_nextcloud_return_false,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOVE_SERVICE_MUTATION,
"variables": {
"input": {"service_id": "nextcloud", "location": "sdx"},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["moveService"]["code"] == 400
assert response.json()["data"]["moveService"]["message"] is not None
assert response.json()["data"]["moveService"]["success"] is False
def test_graphql_move_service_volume_not_found(
authorized_client,
mock_get_service_by_id,
mock_nextcloud,
mock_service_to_graphql_service,
mock_block_devices_return_none,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOVE_SERVICE_MUTATION,
"variables": {
"input": {"service_id": "nextcloud", "location": "sdx"},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["moveService"]["code"] == 400
assert response.json()["data"]["moveService"]["message"] is not None
assert response.json()["data"]["moveService"]["success"] is False
def test_graphql_move_service(
authorized_client,
mock_get_service_by_id,
mock_nextcloud,
mock_service_to_graphql_service,
mock_block_devices,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOVE_SERVICE_MUTATION,
"variables": {
"input": {"service_id": "nextcloud", "location": "sdx"},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["moveService"]["code"] == 200
assert response.json()["data"]["moveService"]["message"] is not None
assert response.json()["data"]["moveService"]["success"] is True

View File

@ -0,0 +1,342 @@
import pytest
class BlockDeviceMockReturnNone:
"""Mock BlockDevices"""
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def mount(self):
return None
def unmount(self):
return None
def resize(self):
return None
returncode = 0
class BlockDeviceMockReturnTrue:
"""Mock BlockDevices"""
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def mount(self):
return True
def unmount(self):
return True
def resize(self):
return True
returncode = 0
class BlockDeviceMockReturnFalse:
"""Mock BlockDevices"""
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def mount(self):
return False
def unmount(self):
return False
def resize(self):
return False
returncode = 0
class BlockDevicesMockReturnTrue:
def get_block_device(name: str): # type: ignore
return BlockDeviceMockReturnTrue()
def __new__(cls, *args, **kwargs):
pass
def __init__(self):
pass
class BlockDevicesMockReturnNone:
def get_block_device(name: str): # type: ignore
return None
def __new__(cls, *args, **kwargs):
pass
def __init__(self):
pass
@pytest.fixture
def mock_block_devices_return_true(mocker):
mock = mocker.patch(
"selfprivacy_api.graphql.mutations.storage_mutations.BlockDevices",
# "selfprivacy_api.utils.block_devices.BlockDevices",
autospec=True,
return_value=BlockDevicesMockReturnTrue,
)
return mock
@pytest.fixture
def mock_block_devices_return_none(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevices",
autospec=True,
return_value=BlockDevicesMockReturnNone,
)
return mock
@pytest.fixture
def mock_block_device_return_none(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevice",
autospec=True,
return_value=BlockDeviceMockReturnNone,
)
return mock
@pytest.fixture
def mock_block_device_return_true(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevice",
autospec=True,
return_value=BlockDeviceMockReturnTrue,
)
return mock
@pytest.fixture
def mock_block_device_return_false(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevice",
autospec=True,
return_value=BlockDeviceMockReturnFalse,
)
return mock
API_RESIZE_VOLUME_MUTATION = """
mutation resizeVolume($name: String!) {
resizeVolume(name: $name) {
success
message
code
}
}
"""
def test_graphql_resize_volumea_unathorized_client(
client, mock_block_devices_return_true
):
response = client.post(
"/graphql",
json={
"query": API_RESIZE_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_resize_volume_nonexistent_block_device(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_RESIZE_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["resizeVolume"]["code"] == 404
assert response.json()["data"]["resizeVolume"]["message"] is not None
assert response.json()["data"]["resizeVolume"]["success"] is False
def test_graphql_resize_volume(authorized_client, mock_block_devices_return_true):
response = authorized_client.post(
"/graphql",
json={
"query": API_RESIZE_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["resizeVolume"]["code"] == 200
assert response.json()["data"]["resizeVolume"]["message"] is not None
assert response.json()["data"]["resizeVolume"]["success"] is True
API_MOUNT_VOLUME_MUTATION = """
mutation mountVolume($name: String!) {
mountVolume(name: $name) {
success
message
code
}
}
"""
def test_graphql_mount_volume_unathorized_client(client, mock_block_device_return_true):
response = client.post(
"/graphql",
json={
"query": API_MOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_mount_already_mounted_volume(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["mountVolume"]["code"] == 404
assert response.json()["data"]["mountVolume"]["message"] is not None
assert response.json()["data"]["mountVolume"]["success"] is False
def test_graphql_mount_not_found_volume(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["mountVolume"]["code"] == 404
assert response.json()["data"]["mountVolume"]["message"] is not None
assert response.json()["data"]["mountVolume"]["success"] is False
def test_graphql_mount_volume(authorized_client, mock_block_devices_return_true):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["mountVolume"]["code"] == 200
assert response.json()["data"]["mountVolume"]["message"] is not None
assert response.json()["data"]["mountVolume"]["success"] is True
API_UNMOUNT_VOLUME_MUTATION = """
mutation unmountVolume($name: String!) {
unmountVolume(name: $name) {
success
message
code
}
}
"""
def test_graphql_unmount_volume_unathorized_client(
client, mock_block_devices_return_true
):
response = client.post(
"/graphql",
json={
"query": API_UNMOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_unmount_not_fount_volume(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_UNMOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["unmountVolume"]["code"] == 404
assert response.json()["data"]["unmountVolume"]["message"] is not None
assert response.json()["data"]["unmountVolume"]["success"] is False
def test_graphql_unmount_volume_false(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_UNMOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["unmountVolume"]["code"] == 404
assert response.json()["data"]["unmountVolume"]["message"] is not None
assert response.json()["data"]["unmountVolume"]["success"] is False
def test_graphql_unmount_volume(authorized_client, mock_block_devices_return_true):
response = authorized_client.post(
"/graphql",
json={
"query": API_UNMOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["unmountVolume"]["code"] == 200
assert response.json()["data"]["unmountVolume"]["message"] is not None
assert response.json()["data"]["unmountVolume"]["success"] is True

View File

@ -0,0 +1,129 @@
import pytest
from selfprivacy_api.jobs.__init__ import Job
class ProcessMock:
"""Mock subprocess.Popen"""
def __init__(self, args, **kwargs):
self.args = args
self.kwargs = kwargs
def communicate(): # pylint: disable=no-method-argument
return (b"NEW_HASHED", None)
returncode = 0
class JobsMockReturnTrue:
def __init__(self):
pass
def remove_by_uuid(self, job_uuid: str):
return True
class JobsMockReturnFalse:
def __init__(self):
pass
def remove_by_uuid(self, job_uuid: str):
return False
@pytest.fixture
def mock_subprocess_popen(mocker):
mock = mocker.patch("subprocess.Popen", autospec=True, return_value=ProcessMock)
return mock
@pytest.fixture
def mock_jobs_return_true(mocker):
mock = mocker.patch(
"selfprivacy_api.jobs.__init__.Jobs",
autospec=True,
return_value=JobsMockReturnTrue,
)
return mock
@pytest.fixture
def mock_jobs_return_false(mocker):
mock = mocker.patch(
"selfprivacy_api.jobs.__init__.Jobs",
autospec=True,
return_value=JobsMockReturnTrue,
)
return mock
API_REMOVE_JOB_MUTATION = """
mutation removeJob($job: Job!) {
removeJob(job: $job) {
success
message
code
}
}
"""
def test_graphql_remove_job_unauthorized(
client, mock_subprocess_popen, mock_jobs_return_true
):
response = client.post(
"/graphql",
json={
"query": API_REMOVE_JOB_MUTATION,
"variables": {
"Job": {
"uid": "12345",
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_remove_job(
authorized_client, mock_subprocess_popen, mock_jobs_return_true
):
response = authorized_client.post(
"/graphql",
json={
"query": API_REMOVE_JOB_MUTATION,
"variables": {
"jobId": "12345",
},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["removeJob"]["code"] == 200
assert response.json()["data"]["removeJob"]["message"] is not None
assert response.json()["data"]["removeJob"]["success"] is True
def test_graphql_remove_job_not_found(
authorized_client, mock_subprocess_popen, mock_jobs_return_false
):
response = authorized_client.post(
"/graphql",
json={
"query": API_REMOVE_JOB_MUTATION,
"variables": {
"job_id": "3301",
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["removeJob"]["code"] == 404
assert response.json()["data"]["removeJob"]["message"] is not None
assert response.json()["data"]["removeJob"]["success"] is False