Compare commits

..

64 Commits

Author SHA1 Message Date
def 3c63ab2657 fix tests 2022-10-12 18:07:39 +04:00
def 8d21cdcb2b update tests 2022-09-10 23:50:29 +02:00
def 6fd63ac0ea fix some tests 2022-09-08 12:55:34 +02:00
def 1cb35ac48a fix job tests 2022-09-01 21:04:48 +02:00
def 4b9c8efc4b add job tests 2022-09-01 20:46:48 +02:00
def ed8d9b4c8d add more services tests 2022-08-31 23:56:26 +02:00
def 39caea8b5c add service tests 2022-08-31 07:58:55 +02:00
def 13368dcfe3 update storage tests 2022-08-30 02:45:50 +02:00
def 9ea6867edb add return_value 2022-08-27 18:05:19 +02:00
def 8821ec834b fix mock BlockDevicesMockReturnTrue 2022-08-27 17:18:23 +02:00
def da92d6f835 add more tests 2022-08-27 17:10:36 +02:00
def 6b557d8e63 fix tests 2022-08-26 05:07:55 +02:00
def 241753b6f8 fix one 2022-08-25 17:41:31 +02:00
def 62672d133f try to mock 2022-08-25 17:37:32 +02:00
Inex Code 15a900d009 Add Jitsi to services 2022-08-24 03:05:06 +04:00
Inex Code ba434f4fb5 Allow using query params to fetch GraphQL 2022-08-23 00:06:12 +04:00
Inex Code aafc77dce3 Add vscode launch dotflile 2022-08-22 23:49:30 +04:00
Inex Code 28c6d983b9 Fix ws auth 2022-08-22 23:49:14 +04:00
Inex Code ab9e8d81e5 Add auth to service mutations 2022-08-22 23:32:37 +04:00
Inex Code 8c878ea898 Add service mutation endpoints to GraphQL 2022-08-22 22:28:12 +04:00
Inex Code cb5e04567d Add job mutations to GraphQL schema 2022-08-22 21:48:50 +04:00
Inex Code 50a309e2a2 Add remove job by uid endpoint 2022-08-22 21:45:00 +04:00
Inex Code 2e22ad7219 Implement Pull repository changes in GraphQL 2022-08-20 22:50:42 +04:00
Inex Code bf1cd32895 Change the ServiceStatus to match systemctl show 2022-08-20 22:50:25 +04:00
Inex Code 3a5d4d5e86 Implement DNS records getter 2022-08-20 22:49:51 +04:00
Inex Code 07f4da2f23 Reorganize tests 2022-08-20 22:48:44 +04:00
Inex Code c92294350f Make Huey run immediately when testing 2022-08-20 22:47:32 +04:00
Inex Code cd5ae80931 Fix postgresql not being migrated during migration to binds 2022-08-20 22:46:39 +04:00
Inex Code 807df0c1cc Fix Bind migration check 2022-08-19 05:48:26 +04:00
Inex Code 130ab61d4b Register volume migration mutation 2022-08-18 03:35:06 +04:00
Inex Code bb99c2ba58 Fix graphql field 2022-08-18 03:27:10 +04:00
Inex Code a67a0b3de2 Fix type cast 2022-08-18 01:43:02 +04:00
Inex Code 19168dfdaf Add jobs mocking to tests explicitly 2022-08-18 01:38:38 +04:00
Inex Code e5584e0e1c use exists() instead of isFile 2022-08-18 01:13:06 +04:00
Inex Code 87c036de7f Add GraphQL endpoints related to binds 2022-08-18 00:58:56 +04:00
Inex Code 7fe51eb665 Fix update function 2022-08-16 01:44:22 +04:00
Inex Code 039dd2f80e Fix jobs file 2022-08-16 01:31:24 +04:00
Inex Code 64425ac443 fix uid compare 2022-08-15 23:48:43 +04:00
Inex Code 5f34337fb4 Serialize custom types 2022-08-15 23:37:34 +04:00
Inex Code af902923ab replace uuid with str 2022-08-15 23:12:50 +04:00
Inex Code f940a23e7e Make sure jobs file exists on read 2022-08-15 22:51:01 +04:00
Inex Code 1b1bb4966a Use jobs file to transfer data between threads 2022-08-15 22:37:02 +04:00
Inex Code 69557fcf50 add task registry 2022-08-13 03:59:48 +04:00
Inex Code d8a8b2ec29 fixes 2022-08-13 03:39:37 +04:00
Inex Code 79bc2668e1 fix 2022-08-13 02:32:08 +04:00
Inex Code bb14adb8bc Remove infinite recursion 2022-08-13 02:26:47 +04:00
Inex Code f750056ad8 fix circular import 2022-08-13 02:18:13 +04:00
Inex Code 9abc11f187 Add services to volumes 2022-08-13 02:12:28 +04:00
Inex Code 19b5c06fc6 Fix mountpoints bug 2022-08-13 01:52:36 +04:00
Inex Code 5e6c51a8bc add shell module 2022-08-13 01:42:25 +04:00
Inex Code 1ed2b73ec8 Make icons python modules 2022-08-13 01:42:11 +04:00
Inex Code 1e901d1fcb add get working directory endpoint 2022-08-13 01:34:48 +04:00
Inex Code 00badfbbf8 Add some services endpoints 2022-08-13 01:29:18 +04:00
Inex Code e7df559787 Change uvicorn run expression 2022-08-12 22:04:20 +04:00
Inex Code 43675b2d1d Programmatical uvicorn start 2022-08-12 17:43:04 +04:00
Inex Code a96f6bd067 LInting 2022-08-11 23:11:00 +04:00
Inex Code dfd28ad0cd Move to fastapi 2022-08-11 03:36:36 +04:00
Inex Code 9132b70e70 Fix mountpoints 2022-08-03 14:06:18 +03:00
Inex Code 5e62798fde Test 2022-08-02 23:30:03 +03:00
Inex Code b965ffd96a Trying out 2022-08-02 23:12:48 +03:00
Inex Code 8f940e64fd uh 2022-08-02 23:08:32 +03:00
Inex Code 8ea0d89d71 Fix 2022-08-02 22:58:39 +03:00
Inex Code d8d3cd2068 Register subscription 2022-08-02 22:53:35 +03:00
Inex Code 52a58d94e7 Test subscription 2022-08-02 22:50:16 +03:00
14 changed files with 1144 additions and 179 deletions

View File

@ -2,23 +2,18 @@ kind: pipeline
type: exec
name: default
platform:
os: linux
arch: amd64
steps:
- name: Run Tests and Generate Coverage Report
- name: test
commands:
- coverage run -m pytest -q
- coverage xml
- sonar-scanner -Dsonar.projectKey=SelfPrivacy-REST-API -Dsonar.sources=. -Dsonar.host.url=http://analyzer.lan:9000 -Dsonar.login="$SONARQUBE_TOKEN"
environment:
SONARQUBE_TOKEN:
from_secret: SONARQUBE_TOKEN
- name: Run Bandit Checks
- name: bandit
commands:
- bandit -ll -r selfprivacy_api
- name: Run Code Formatting Checks
- name: formatting
commands:
- black --check .
node:
server: builder

View File

@ -53,6 +53,4 @@ async def startup():
if __name__ == "__main__":
uvicorn.run(
"selfprivacy_api.app:app", host="127.0.0.1", port=5050, log_level="info"
)
uvicorn.run("selfprivacy_api.app:app", host="0.0.0.0", port=5050, log_level="info")

View File

@ -27,4 +27,4 @@ async def get_token_header(
def get_api_version() -> str:
"""Get API version"""
return "2.0.9"
return "2.0.0"

View File

@ -3,7 +3,6 @@
import strawberry
from selfprivacy_api.graphql.mutations.mutation_interface import GenericMutationReturn
from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.jobs import Jobs
@ -11,10 +10,10 @@ from selfprivacy_api.jobs import Jobs
class JobMutations:
"""Mutations related to jobs"""
@strawberry.mutation(permission_classes=[IsAuthenticated])
@strawberry.mutation
def remove_job(self, job_id: str) -> GenericMutationReturn:
"""Remove a job from the queue"""
result = Jobs.get_instance().remove_by_uid(job_id)
result = Jobs().remove_by_uuid(job_id)
if result:
return GenericMutationReturn(
success=True,

View File

@ -17,10 +17,7 @@ A job is a dictionary with the following keys:
import typing
import datetime
from uuid import UUID
import asyncio
import json
import os
import time
import uuid
from enum import Enum
@ -45,7 +42,7 @@ class Job(BaseModel):
Job class.
"""
uid: UUID
uid: UUID = uuid.uuid4()
type_id: str
name: str
description: str
@ -108,7 +105,6 @@ class Jobs:
Add a job to the jobs list.
"""
job = Job(
uid=uuid.uuid4(),
name=name,
type_id=type_id,
description=description,
@ -134,9 +130,9 @@ class Jobs:
"""
Remove a job from the jobs list.
"""
self.remove_by_uid(str(job.uid))
self.remove_by_uuid(str(job.uid))
def remove_by_uid(self, job_uuid: str) -> bool:
def remove_by_uuid(self, job_uuid: str) -> bool:
"""
Remove a job from the jobs list.
"""

View File

@ -65,27 +65,14 @@ def move_folder(
else:
return
try:
data_path.mkdir(mode=0o750, parents=True, exist_ok=True)
except Exception as e:
print(f"Error creating data path: {e}")
return
data_path.mkdir(mode=0o750, parents=True, exist_ok=True)
try:
shutil.chown(str(bind_path), user=user, group=group)
shutil.chown(str(data_path), user=user, group=group)
except LookupError:
pass
shutil.chown(str(bind_path), user=user, group=group)
shutil.chown(str(data_path), user=user, group=group)
try:
subprocess.run(["mount", "--bind", str(bind_path), str(data_path)], check=True)
except subprocess.CalledProcessError as error:
print(error)
subprocess.run(["mount", "--bind", str(bind_path), str(data_path)], check=True)
try:
subprocess.run(["chown", "-R", f"{user}:{group}", str(data_path)], check=True)
except subprocess.CalledProcessError as error:
print(error)
subprocess.run(["chown", "-R", f"{user}:{group}", str(data_path)], check=True)
@huey.task()
@ -170,17 +157,12 @@ def migrate_to_binds(config: BindMigrationConfig, job: Job):
Nextcloud().stop()
# If /volumes/sda1/nextcloud or /volumes/sdb/nextcloud exists, skip it.
if not pathlib.Path("/volumes/sda1/nextcloud").exists():
if not pathlib.Path("/volumes/sdb/nextcloud").exists():
move_folder(
data_path=pathlib.Path("/var/lib/nextcloud"),
bind_path=pathlib.Path(
f"/volumes/{config.nextcloud_block_device}/nextcloud"
),
user="nextcloud",
group="nextcloud",
)
move_folder(
data_path=pathlib.Path("/var/lib/nextcloud"),
bind_path=pathlib.Path(f"/volumes/{config.nextcloud_block_device}/nextcloud"),
user="nextcloud",
group="nextcloud",
)
# Start Nextcloud
Nextcloud().start()
@ -196,27 +178,21 @@ def migrate_to_binds(config: BindMigrationConfig, job: Job):
Bitwarden().stop()
if not pathlib.Path("/volumes/sda1/bitwarden").exists():
if not pathlib.Path("/volumes/sdb/bitwarden").exists():
move_folder(
data_path=pathlib.Path("/var/lib/bitwarden"),
bind_path=pathlib.Path(
f"/volumes/{config.bitwarden_block_device}/bitwarden"
),
user="vaultwarden",
group="vaultwarden",
)
move_folder(
data_path=pathlib.Path("/var/lib/bitwarden"),
bind_path=pathlib.Path(f"/volumes/{config.bitwarden_block_device}/bitwarden"),
user="vaultwarden",
group="vaultwarden",
)
if not pathlib.Path("/volumes/sda1/bitwarden_rs").exists():
if not pathlib.Path("/volumes/sdb/bitwarden_rs").exists():
move_folder(
data_path=pathlib.Path("/var/lib/bitwarden_rs"),
bind_path=pathlib.Path(
f"/volumes/{config.bitwarden_block_device}/bitwarden_rs"
),
user="vaultwarden",
group="vaultwarden",
)
move_folder(
data_path=pathlib.Path("/var/lib/bitwarden_rs"),
bind_path=pathlib.Path(
f"/volumes/{config.bitwarden_block_device}/bitwarden_rs"
),
user="vaultwarden",
group="vaultwarden",
)
# Start Bitwarden
Bitwarden().start()
@ -232,14 +208,12 @@ def migrate_to_binds(config: BindMigrationConfig, job: Job):
Gitea().stop()
if not pathlib.Path("/volumes/sda1/gitea").exists():
if not pathlib.Path("/volumes/sdb/gitea").exists():
move_folder(
data_path=pathlib.Path("/var/lib/gitea"),
bind_path=pathlib.Path(f"/volumes/{config.gitea_block_device}/gitea"),
user="gitea",
group="gitea",
)
move_folder(
data_path=pathlib.Path("/var/lib/gitea"),
bind_path=pathlib.Path(f"/volumes/{config.gitea_block_device}/gitea"),
user="gitea",
group="gitea",
)
Gitea().start()
@ -254,23 +228,19 @@ def migrate_to_binds(config: BindMigrationConfig, job: Job):
MailServer().stop()
if not pathlib.Path("/volumes/sda1/vmail").exists():
if not pathlib.Path("/volumes/sdb/vmail").exists():
move_folder(
data_path=pathlib.Path("/var/vmail"),
bind_path=pathlib.Path(f"/volumes/{config.email_block_device}/vmail"),
user="virtualMail",
group="virtualMail",
)
move_folder(
data_path=pathlib.Path("/var/vmail"),
bind_path=pathlib.Path(f"/volumes/{config.email_block_device}/vmail"),
user="virtualMail",
group="virtualMail",
)
if not pathlib.Path("/volumes/sda1/sieve").exists():
if not pathlib.Path("/volumes/sdb/sieve").exists():
move_folder(
data_path=pathlib.Path("/var/sieve"),
bind_path=pathlib.Path(f"/volumes/{config.email_block_device}/sieve"),
user="virtualMail",
group="virtualMail",
)
move_folder(
data_path=pathlib.Path("/var/sieve"),
bind_path=pathlib.Path(f"/volumes/{config.email_block_device}/sieve"),
user="virtualMail",
group="virtualMail",
)
MailServer().start()
@ -285,27 +255,19 @@ def migrate_to_binds(config: BindMigrationConfig, job: Job):
Pleroma().stop()
if not pathlib.Path("/volumes/sda1/pleroma").exists():
if not pathlib.Path("/volumes/sdb/pleroma").exists():
move_folder(
data_path=pathlib.Path("/var/lib/pleroma"),
bind_path=pathlib.Path(
f"/volumes/{config.pleroma_block_device}/pleroma"
),
user="pleroma",
group="pleroma",
)
move_folder(
data_path=pathlib.Path("/var/lib/pleroma"),
bind_path=pathlib.Path(f"/volumes/{config.pleroma_block_device}/pleroma"),
user="pleroma",
group="pleroma",
)
if not pathlib.Path("/volumes/sda1/postgresql").exists():
if not pathlib.Path("/volumes/sdb/postgresql").exists():
move_folder(
data_path=pathlib.Path("/var/lib/postgresql"),
bind_path=pathlib.Path(
f"/volumes/{config.pleroma_block_device}/postgresql"
),
user="postgres",
group="postgres",
)
move_folder(
data_path=pathlib.Path("/var/lib/postgresql"),
bind_path=pathlib.Path(f"/volumes/{config.pleroma_block_device}/postgresql"),
user="postgres",
group="postgres",
)
Pleroma().start()

View File

@ -8,7 +8,6 @@ at api.skippedMigrations in userdata.json and populating it
with IDs of the migrations to skip.
Adding DISABLE_ALL to that array disables the migrations module entirely.
"""
from selfprivacy_api.migrations.check_for_failed_binds_migration import CheckForFailedBindsMigration
from selfprivacy_api.utils import ReadUserData
from selfprivacy_api.migrations.fix_nixos_config_branch import FixNixosConfigBranch
from selfprivacy_api.migrations.create_tokens_json import CreateTokensJson
@ -22,7 +21,6 @@ migrations = [
CreateTokensJson(),
MigrateToSelfprivacyChannel(),
MountVolume(),
CheckForFailedBindsMigration(),
]

View File

@ -1,48 +0,0 @@
from selfprivacy_api.jobs import JobStatus, Jobs
from selfprivacy_api.migrations.migration import Migration
from selfprivacy_api.utils import WriteUserData
class CheckForFailedBindsMigration(Migration):
"""Mount volume."""
def get_migration_name(self):
return "check_for_failed_binds_migration"
def get_migration_description(self):
return "If binds migration failed, try again."
def is_migration_needed(self):
try:
jobs = Jobs.get_instance().get_jobs()
# If there is a job with type_id "migrations.migrate_to_binds" and status is not "FINISHED",
# then migration is needed and job is deleted
for job in jobs:
if (
job.type_id == "migrations.migrate_to_binds"
and job.status != JobStatus.FINISHED
):
return True
return False
except Exception as e:
print(e)
return False
def migrate(self):
# Get info about existing volumes
# Write info about volumes to userdata.json
try:
jobs = Jobs.get_instance().get_jobs()
for job in jobs:
if (
job.type_id == "migrations.migrate_to_binds"
and job.status != JobStatus.FINISHED
):
Jobs.get_instance().remove(job)
with WriteUserData() as userdata:
userdata["useBinds"] = False
print("Done")
except Exception as e:
print(e)
print("Error mounting volume")

View File

@ -61,7 +61,7 @@ def move_service(
)
return
# Make sure the volume is mounted
if volume.name != "sda1" and f"/volumes/{volume.name}" not in volume.mountpoints:
if f"/volumes/{volume.name}" not in volume.mountpoints:
Jobs.get_instance().update(
job=job,
status=JobStatus.ERROR,
@ -173,7 +173,7 @@ def move_service(
[
"chown",
"-R",
f"{folder.owner}:{folder.group}",
f"{folder.owner}:f{folder.group}",
f"/volumes/{volume.name}/{folder.name}",
],
check=True,
@ -185,6 +185,7 @@ def move_service(
status=JobStatus.RUNNING,
error=f"Unable to set ownership of new volume. {service_name} may not be able to access its files. Continuing anyway.",
)
return
# Mount new volume
Jobs.get_instance().update(
@ -231,6 +232,6 @@ def move_service(
job=job,
status=JobStatus.FINISHED,
result=f"{service_name} moved successfully.",
status_text=f"Starting {service_name}...",
status_text=f"Starting {service}...",
progress=100,
)

View File

@ -12,10 +12,5 @@ def get_storage_usage(path: str) -> int:
for iter_path in pathlib.Path(path).rglob("**/*"):
if iter_path.is_dir():
continue
try:
storage_usage += iter_path.stat().st_size
except FileNotFoundError:
pass
except Exception as error:
print(error)
storage_usage += iter_path.stat().st_size
return storage_usage

View File

@ -159,7 +159,7 @@ class Gitea(Service):
owner="gitea",
),
],
"gitea",
"bitwarden",
)
return job

View File

@ -0,0 +1,598 @@
import pytest
def get_service_by_id_return_none_mock():
return None
def get_service_by_id_mock():
return "nextcloud"
def service_to_graphql_service_mock():
pass
class BlockDevicesMock:
def get_block_device(self, name: str):
pass
class BlockDevicesReturnNoneMock:
def get_block_device(self, name: str):
return None
class NextcloudMock:
def __init__(self, args, **kwargs):
self.args = args
self.kwargs = kwargs
def enable(self):
pass
def disable(self):
pass
def stop(self):
pass
def is_movable(self):
return True
def move_to_volume(self):
pass
returncode = 0
class NextcloudReturnFalseMock:
def __init__(self, args, **kwargs):
self.args = args
self.kwargs = kwargs
def enable(self):
pass
def disable(self):
pass
def stop(self):
pass
def is_movable(self):
return False
def move_to_volume(self):
pass
returncode = 0
@pytest.fixture
def mock_service_to_graphql_service(mocker):
mock = mocker.patch(
"selfprivacy_api.graphql.common_types.service.service_to_graphql_service",
autospec=True,
return_value=service_to_graphql_service_mock,
)
return mock
@pytest.fixture
def mock_nextcloud(mocker):
mock = mocker.patch(
"selfprivacy_api.services.nextcloud.__init__.Nextcloud",
autospec=True,
return_value=NextcloudMock,
)
return mock
@pytest.fixture
def mock_block_devices_return_none(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevices",
autospec=True,
return_value=BlockDevicesReturnNoneMock,
)
return mock
@pytest.fixture
def mock_block_devices(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevices",
autospec=True,
return_value=BlockDevicesMock,
)
return mock
@pytest.fixture
def mock_nextcloud_return_false(mocker):
mock = mocker.patch(
"selfprivacy_api.services.nextcloud.__init__.Nextcloud",
autospec=True,
return_value=NextcloudReturnFalseMock,
)
return mock
@pytest.fixture
def mock_get_service_by_id_return_none(mocker):
mock = mocker.patch(
"selfprivacy_api.services.__init__.get_service_by_id",
autospec=True,
return_value=mock_get_service_by_id_return_none,
)
return mock
@pytest.fixture
def mock_get_service_by_id(mocker):
mock = mocker.patch(
"selfprivacy_api.services.__init__.get_service_by_id",
autospec=True,
return_value=mock_get_service_by_id,
)
return mock
####################################################################
API_ENABLE_SERVICE_MUTATION = """
mutation enableService($service_id: String!) {
enableService(service_id: $service_id) {
success
message
code
}
}
"""
def test_graphql_enable_service_unathorized_client(
client, mock_get_service_by_id_return_none, mock_nextcloud
):
response = client.post(
"/graphql",
json={
"query": API_ENABLE_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_enable_not_found_service(
authorized_client, mock_get_service_by_id_return_none, mock_nextcloud
):
response = authorized_client.post(
"/graphql",
json={
"query": API_ENABLE_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["enableService"]["code"] == 404
assert response.json()["data"]["enableService"]["message"] is not None
assert response.json()["data"]["enableService"]["success"] is False
def test_graphql_enable_service(
authorized_client, mock_get_service_by_id, mock_nextcloud
):
response = authorized_client.post(
"/graphql",
json={
"query": API_ENABLE_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["enableService"]["code"] == 200
assert response.json()["data"]["enableService"]["message"] is not None
assert response.json()["data"]["enableService"]["success"] is True
API_DISABLE_SERVICE_MUTATION = """
mutation disableService($service_id: String!) {
disableService(service_id: $service_id) {
success
message
code
}
}
"""
def test_graphql_disable_service_unathorized_client(
client, mock_get_service_by_id_return_none, mock_nextcloud
):
response = client.post(
"/graphql",
json={
"query": API_DISABLE_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_disable_not_found_service(
authorized_client, mock_get_service_by_id_return_none, mock_nextcloud
):
response = authorized_client.post(
"/graphql",
json={
"query": API_DISABLE_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["disableService"]["code"] == 404
assert response.json()["data"]["disableService"]["message"] is not None
assert response.json()["data"]["disableService"]["success"] is False
def test_graphql_disable_services(
authorized_client, mock_get_service_by_id, mock_nextcloud
):
response = authorized_client.post(
"/graphql",
json={
"query": API_DISABLE_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["disableService"]["code"] == 200
assert response.json()["data"]["disableService"]["message"] is not None
assert response.json()["data"]["disableService"]["success"] is True
API_STOP_SERVICE_MUTATION = """
mutation stopService($service_id: String!) {
stopService(service_id: $service_id) {
success
message
code
}
}
"""
def test_graphql_stop_service_unathorized_client(
client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = client.post(
"/graphql",
json={
"query": API_STOP_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_stop_not_found_service(
authorized_client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_STOP_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["stopService"]["code"] == 404
assert response.json()["data"]["stopService"]["message"] is not None
assert response.json()["data"]["stopService"]["success"] is False
def test_graphql_stop_services(
authorized_client,
mock_get_service_by_id,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_STOP_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["stopService"]["code"] == 200
assert response.json()["data"]["stopService"]["message"] is not None
assert response.json()["data"]["stopService"]["success"] is True
API_START_SERVICE_MUTATION = """
mutation startService($service_id: String!) {
startService(service_id: $service_id) {
success
message
code
}
}
"""
def test_graphql_start_service_unathorized_client(
client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = client.post(
"/graphql",
json={
"query": API_START_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_start_not_found_service(
authorized_client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_START_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["startService"]["code"] == 404
assert response.json()["data"]["startService"]["message"] is not None
assert response.json()["data"]["startService"]["success"] is False
def test_graphql_start_services(
authorized_client,
mock_get_service_by_id,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_START_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["startService"]["code"] == 200
assert response.json()["data"]["startService"]["message"] is not None
assert response.json()["data"]["startService"]["success"] is True
API_RESTART_SERVICE_MUTATION = """
mutation restartService($service_id: String!) {
restartService(service_id: $service_id) {
success
message
code
}
}
"""
def test_graphql_restart_service_unathorized_client(
client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = client.post(
"/graphql",
json={
"query": API_RESTART_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_restart_not_found_service(
authorized_client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_RESTART_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["restartService"]["code"] == 404
assert response.json()["data"]["restartService"]["message"] is not None
assert response.json()["data"]["restartService"]["success"] is False
def test_graphql_restart_service(
authorized_client,
mock_get_service_by_id,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_RESTART_SERVICE_MUTATION,
"variables": {"service_id": "nextcloud"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["restartService"]["code"] == 200
assert response.json()["data"]["restartService"]["message"] is not None
assert response.json()["data"]["restartService"]["success"] is True
API_MOVE_SERVICE_MUTATION = """
mutation moveService($input: MoveServiceInput!) {
moveService(input: $input) {
success
message
code
}
}
"""
def test_graphql_move_service_unathorized_client(
client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = client.post(
"/graphql",
json={
"query": API_MOVE_SERVICE_MUTATION,
"variables": {
"input": {"service_id": "nextcloud", "location": "sdx"},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_move_not_found_service(
authorized_client,
mock_get_service_by_id_return_none,
mock_nextcloud,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOVE_SERVICE_MUTATION,
"variables": {
"input": {"service_id": "nextcloud", "location": "sdx"},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["moveService"]["code"] == 404
assert response.json()["data"]["moveService"]["message"] is not None
assert response.json()["data"]["moveService"]["success"] is False
def test_graphql_move_not_moveble_service(
authorized_client,
mock_get_service_by_id,
mock_nextcloud_return_false,
mock_service_to_graphql_service,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOVE_SERVICE_MUTATION,
"variables": {
"input": {"service_id": "nextcloud", "location": "sdx"},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["moveService"]["code"] == 400
assert response.json()["data"]["moveService"]["message"] is not None
assert response.json()["data"]["moveService"]["success"] is False
def test_graphql_move_service_volume_not_found(
authorized_client,
mock_get_service_by_id,
mock_nextcloud,
mock_service_to_graphql_service,
mock_block_devices_return_none,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOVE_SERVICE_MUTATION,
"variables": {
"input": {"service_id": "nextcloud", "location": "sdx"},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["moveService"]["code"] == 400
assert response.json()["data"]["moveService"]["message"] is not None
assert response.json()["data"]["moveService"]["success"] is False
def test_graphql_move_service(
authorized_client,
mock_get_service_by_id,
mock_nextcloud,
mock_service_to_graphql_service,
mock_block_devices,
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOVE_SERVICE_MUTATION,
"variables": {
"input": {"service_id": "nextcloud", "location": "sdx"},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["moveService"]["code"] == 200
assert response.json()["data"]["moveService"]["message"] is not None
assert response.json()["data"]["moveService"]["success"] is True

View File

@ -0,0 +1,342 @@
import pytest
class BlockDeviceMockReturnNone:
"""Mock BlockDevices"""
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def mount(self):
return None
def unmount(self):
return None
def resize(self):
return None
returncode = 0
class BlockDeviceMockReturnTrue:
"""Mock BlockDevices"""
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def mount(self):
return True
def unmount(self):
return True
def resize(self):
return True
returncode = 0
class BlockDeviceMockReturnFalse:
"""Mock BlockDevices"""
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def mount(self):
return False
def unmount(self):
return False
def resize(self):
return False
returncode = 0
class BlockDevicesMockReturnTrue:
def get_block_device(name: str): # type: ignore
return BlockDeviceMockReturnTrue()
def __new__(cls, *args, **kwargs):
pass
def __init__(self):
pass
class BlockDevicesMockReturnNone:
def get_block_device(name: str): # type: ignore
return None
def __new__(cls, *args, **kwargs):
pass
def __init__(self):
pass
@pytest.fixture
def mock_block_devices_return_true(mocker):
mock = mocker.patch(
"selfprivacy_api.graphql.mutations.storage_mutations.BlockDevices",
# "selfprivacy_api.utils.block_devices.BlockDevices",
autospec=True,
return_value=BlockDevicesMockReturnTrue,
)
return mock
@pytest.fixture
def mock_block_devices_return_none(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevices",
autospec=True,
return_value=BlockDevicesMockReturnNone,
)
return mock
@pytest.fixture
def mock_block_device_return_none(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevice",
autospec=True,
return_value=BlockDeviceMockReturnNone,
)
return mock
@pytest.fixture
def mock_block_device_return_true(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevice",
autospec=True,
return_value=BlockDeviceMockReturnTrue,
)
return mock
@pytest.fixture
def mock_block_device_return_false(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevice",
autospec=True,
return_value=BlockDeviceMockReturnFalse,
)
return mock
API_RESIZE_VOLUME_MUTATION = """
mutation resizeVolume($name: String!) {
resizeVolume(name: $name) {
success
message
code
}
}
"""
def test_graphql_resize_volumea_unathorized_client(
client, mock_block_devices_return_true
):
response = client.post(
"/graphql",
json={
"query": API_RESIZE_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_resize_volume_nonexistent_block_device(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_RESIZE_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["resizeVolume"]["code"] == 404
assert response.json()["data"]["resizeVolume"]["message"] is not None
assert response.json()["data"]["resizeVolume"]["success"] is False
def test_graphql_resize_volume(authorized_client, mock_block_devices_return_true):
response = authorized_client.post(
"/graphql",
json={
"query": API_RESIZE_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["resizeVolume"]["code"] == 200
assert response.json()["data"]["resizeVolume"]["message"] is not None
assert response.json()["data"]["resizeVolume"]["success"] is True
API_MOUNT_VOLUME_MUTATION = """
mutation mountVolume($name: String!) {
mountVolume(name: $name) {
success
message
code
}
}
"""
def test_graphql_mount_volume_unathorized_client(client, mock_block_device_return_true):
response = client.post(
"/graphql",
json={
"query": API_MOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_mount_already_mounted_volume(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["mountVolume"]["code"] == 404
assert response.json()["data"]["mountVolume"]["message"] is not None
assert response.json()["data"]["mountVolume"]["success"] is False
def test_graphql_mount_not_found_volume(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["mountVolume"]["code"] == 404
assert response.json()["data"]["mountVolume"]["message"] is not None
assert response.json()["data"]["mountVolume"]["success"] is False
def test_graphql_mount_volume(authorized_client, mock_block_devices_return_true):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["mountVolume"]["code"] == 200
assert response.json()["data"]["mountVolume"]["message"] is not None
assert response.json()["data"]["mountVolume"]["success"] is True
API_UNMOUNT_VOLUME_MUTATION = """
mutation unmountVolume($name: String!) {
unmountVolume(name: $name) {
success
message
code
}
}
"""
def test_graphql_unmount_volume_unathorized_client(
client, mock_block_devices_return_true
):
response = client.post(
"/graphql",
json={
"query": API_UNMOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_unmount_not_fount_volume(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_UNMOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["unmountVolume"]["code"] == 404
assert response.json()["data"]["unmountVolume"]["message"] is not None
assert response.json()["data"]["unmountVolume"]["success"] is False
def test_graphql_unmount_volume_false(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_UNMOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["unmountVolume"]["code"] == 404
assert response.json()["data"]["unmountVolume"]["message"] is not None
assert response.json()["data"]["unmountVolume"]["success"] is False
def test_graphql_unmount_volume(authorized_client, mock_block_devices_return_true):
response = authorized_client.post(
"/graphql",
json={
"query": API_UNMOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["unmountVolume"]["code"] == 200
assert response.json()["data"]["unmountVolume"]["message"] is not None
assert response.json()["data"]["unmountVolume"]["success"] is True

View File

@ -0,0 +1,129 @@
import pytest
from selfprivacy_api.jobs.__init__ import Job
class ProcessMock:
"""Mock subprocess.Popen"""
def __init__(self, args, **kwargs):
self.args = args
self.kwargs = kwargs
def communicate(): # pylint: disable=no-method-argument
return (b"NEW_HASHED", None)
returncode = 0
class JobsMockReturnTrue:
def __init__(self):
pass
def remove_by_uuid(self, job_uuid: str):
return True
class JobsMockReturnFalse:
def __init__(self):
pass
def remove_by_uuid(self, job_uuid: str):
return False
@pytest.fixture
def mock_subprocess_popen(mocker):
mock = mocker.patch("subprocess.Popen", autospec=True, return_value=ProcessMock)
return mock
@pytest.fixture
def mock_jobs_return_true(mocker):
mock = mocker.patch(
"selfprivacy_api.jobs.__init__.Jobs",
autospec=True,
return_value=JobsMockReturnTrue,
)
return mock
@pytest.fixture
def mock_jobs_return_false(mocker):
mock = mocker.patch(
"selfprivacy_api.jobs.__init__.Jobs",
autospec=True,
return_value=JobsMockReturnTrue,
)
return mock
API_REMOVE_JOB_MUTATION = """
mutation removeJob($job: Job!) {
removeJob(job: $job) {
success
message
code
}
}
"""
def test_graphql_remove_job_unauthorized(
client, mock_subprocess_popen, mock_jobs_return_true
):
response = client.post(
"/graphql",
json={
"query": API_REMOVE_JOB_MUTATION,
"variables": {
"Job": {
"uid": "12345",
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_remove_job(
authorized_client, mock_subprocess_popen, mock_jobs_return_true
):
response = authorized_client.post(
"/graphql",
json={
"query": API_REMOVE_JOB_MUTATION,
"variables": {
"jobId": "12345",
},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["removeJob"]["code"] == 200
assert response.json()["data"]["removeJob"]["message"] is not None
assert response.json()["data"]["removeJob"]["success"] is True
def test_graphql_remove_job_not_found(
authorized_client, mock_subprocess_popen, mock_jobs_return_false
):
response = authorized_client.post(
"/graphql",
json={
"query": API_REMOVE_JOB_MUTATION,
"variables": {
"job_id": "3301",
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["removeJob"]["code"] == 404
assert response.json()["data"]["removeJob"]["message"] is not None
assert response.json()["data"]["removeJob"]["success"] is False