selfprivacy-rest-api/tests/test_graphql/test_api_services.py

627 lines
16 KiB
Python
Raw Normal View History

2022-10-21 19:37:32 +03:00
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
2022-10-15 18:43:05 +03:00
import pytest
2022-10-21 19:37:32 +03:00
from tests.common import read_json
2022-10-15 18:43:05 +03:00
2022-10-21 19:37:32 +03:00
class NextcloudMockReturnTrue:
2022-10-15 18:43:05 +03:00
def __init__(self, args, **kwargs):
self.args = args
self.kwargs = kwargs
2022-10-21 19:37:32 +03:00
def enable():
2022-10-15 18:43:05 +03:00
pass
2022-10-21 19:37:32 +03:00
def disable():
2022-10-15 18:43:05 +03:00
pass
2022-10-21 19:37:32 +03:00
def stop():
2022-10-15 18:43:05 +03:00
pass
2022-10-21 19:37:32 +03:00
def is_movable():
2022-10-15 18:43:05 +03:00
return True
2022-10-21 19:37:32 +03:00
def move_to_volume(what):
return None
def start():
pass
def restart():
2022-10-15 18:43:05 +03:00
pass
returncode = 0
2022-10-21 19:37:32 +03:00
class BlockDevices:
def get_block_device(location):
return True
2022-10-21 19:37:32 +03:00
class ProcessMock:
"""Mock subprocess.Popen"""
2022-10-15 18:43:05 +03:00
def __init__(self, args, **kwargs):
self.args = args
self.kwargs = kwargs
2022-10-21 19:37:32 +03:00
def communicate(): # pylint: disable=no-method-argument
return (b"", None)
2022-10-15 18:43:05 +03:00
2022-10-21 19:37:32 +03:00
returncode = 0
2022-10-15 18:43:05 +03:00
2022-10-21 19:37:32 +03:00
@pytest.fixture
def mock_subprocess_popen(mocker):
mock = mocker.patch("subprocess.Popen", autospec=True, return_value=ProcessMock)
return mock
2022-10-15 18:43:05 +03:00
2022-10-21 19:37:32 +03:00
@pytest.fixture
def one_user(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "one_user.json")
assert read_json(datadir / "one_user.json")["users"] == [
{
"username": "user1",
"hashedPassword": "HASHED_PASSWORD_1",
"sshKeys": ["ssh-rsa KEY user1@pc"],
}
]
return datadir
2022-10-15 18:43:05 +03:00
@pytest.fixture
def mock_service_to_graphql_service(mocker):
mock = mocker.patch(
2022-10-21 19:37:32 +03:00
"selfprivacy_api.graphql.mutations.services_mutations.service_to_graphql_service",
2022-10-15 18:43:05 +03:00
autospec=True,
2022-10-21 19:37:32 +03:00
return_value=None,
2022-10-15 18:43:05 +03:00
)
return mock
2022-10-15 18:43:05 +03:00
@pytest.fixture
2022-10-21 19:37:32 +03:00
def mock_job_to_api_job(mocker):
2022-10-15 18:43:05 +03:00
mock = mocker.patch(
2022-10-21 19:37:32 +03:00
"selfprivacy_api.graphql.mutations.services_mutations.job_to_api_job",
2022-10-15 18:43:05 +03:00
autospec=True,
2022-10-21 19:37:32 +03:00
return_value=None,
2022-10-15 18:43:05 +03:00
)
return mock
2022-10-15 18:43:05 +03:00
@pytest.fixture
def mock_block_devices_return_none(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevices",
autospec=True,
2022-10-21 19:37:32 +03:00
return_value=None,
2022-10-15 18:43:05 +03:00
)
return mock
@pytest.fixture
def mock_block_devices(mocker):
mock = mocker.patch(
2022-10-21 19:37:32 +03:00
"selfprivacy_api.graphql.mutations.services_mutations.BlockDevices",
2022-10-15 18:43:05 +03:00
autospec=True,
2022-10-21 19:37:32 +03:00
return_value=BlockDevices,
2022-10-15 18:43:05 +03:00
)
return mock
@pytest.fixture
def mock_get_service_by_id_return_none(mocker):
mock = mocker.patch(
2022-10-21 19:37:32 +03:00
"selfprivacy_api.graphql.mutations.services_mutations.get_service_by_id",
2022-10-15 18:43:05 +03:00
autospec=True,
2022-10-21 19:37:32 +03:00
return_value=None,
2022-10-15 18:43:05 +03:00
)
return mock
@pytest.fixture
def mock_get_service_by_id(mocker):
mock = mocker.patch(
2022-10-21 19:37:32 +03:00
"selfprivacy_api.graphql.mutations.services_mutations.get_service_by_id",
2022-10-15 18:43:05 +03:00
autospec=True,
2022-10-21 19:37:32 +03:00
return_value=NextcloudMockReturnTrue,
2022-10-15 18:43:05 +03:00
)
return mock
####################################################################
API_ENABLE_SERVICE_MUTATION = """
2022-10-21 19:37:32 +03:00
mutation enableService($serviceId: String!) {
enableService(serviceId: $serviceId) {
2022-10-15 18:43:05 +03:00
success
message
code
}
}
"""
def test_graphql_enable_service_unauthorized_client(
2022-10-21 19:37:32 +03:00
client, mock_get_service_by_id_return_none, mock_subprocess_popen
2022-10-15 18:43:05 +03:00
):
response = client.post(
"/graphql",
json={
"query": API_ENABLE_SERVICE_MUTATION,
2022-10-21 19:37:32 +03:00
"variables": {"serviceId": "nextcloud"},
2022-10-15 18:43:05 +03:00
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_enable_not_found_service(
2022-10-21 19:37:32 +03:00
authorized_client,
mock_get_service_by_id_return_none,
mock_subprocess_popen,
one_user,
mock_service_to_graphql_service,
2022-10-15 18:43:05 +03:00
):
response = authorized_client.post(
"/graphql",
json={
"query": API_ENABLE_SERVICE_MUTATION,
2022-10-21 19:37:32 +03:00
"variables": {"serviceId": "nextcloud"},
2022-10-15 18:43:05 +03:00
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["enableService"]["code"] == 404
assert response.json()["data"]["enableService"]["message"] is not None
assert response.json()["data"]["enableService"]["success"] is False
def test_graphql_enable_service(
2022-10-21 19:37:32 +03:00
authorized_client,
mock_get_service_by_id,
mock_subprocess_popen,
one_user,
mock_service_to_graphql_service,
2022-10-15 18:43:05 +03:00
):
response = authorized_client.post(
"/graphql",
json={
"query": API_ENABLE_SERVICE_MUTATION,
2022-10-21 19:37:32 +03:00
"variables": {"serviceId": "nextcloud"},
2022-10-15 18:43:05 +03:00
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["enableService"]["code"] == 200
assert response.json()["data"]["enableService"]["message"] is not None
assert response.json()["data"]["enableService"]["success"] is True
API_DISABLE_SERVICE_MUTATION = """
2022-10-21 19:37:32 +03:00
mutation disableService($serviceId: String!) {
disableService(serviceId: $serviceId) {
2022-10-15 18:43:05 +03:00
success
message
code
}
}
"""
def test_graphql_disable_service_unauthorized_client(
2022-10-21 19:37:32 +03:00
client,
mock_get_service_by_id_return_none,
mock_subprocess_popen,
one_user,
mock_service_to_graphql_service,
2022-10-15 18:43:05 +03:00
):
response = client.post(
"/graphql",
json={
"query": API_DISABLE_SERVICE_MUTATION,
2022-10-21 19:37:32 +03:00
"variables": {"serviceId": "nextcloud"},
2022-10-15 18:43:05 +03:00
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_disable_not_found_service(
2022-10-21 19:37:32 +03:00
authorized_client,
mock_get_service_by_id_return_none,
mock_subprocess_popen,
one_user,
mock_service_to_graphql_service,
2022-10-15 18:43:05 +03:00
):
response = authorized_client.post(
"/graphql",
json={
"query": API_DISABLE_SERVICE_MUTATION,
2022-10-21 19:37:32 +03:00
"variables": {"serviceId": "nextcloud"},
2022-10-15 18:43:05 +03:00
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["disableService"]["code"] == 404
assert response.json()["data"]["disableService"]["message"] is not None
assert response.json()["data"]["disableService"]["success"] is False
def test_graphql_disable_services(
2022-10-21 19:37:32 +03:00
authorized_client,
mock_get_service_by_id,
mock_subprocess_popen,
one_user,
mock_service_to_graphql_service,
2022-10-15 18:43:05 +03:00
):
response = authorized_client.post(
"/graphql",
json={
"query": API_DISABLE_SERVICE_MUTATION,
2022-10-21 19:37:32 +03:00
"variables": {"serviceId": "nextcloud"},
2022-10-15 18:43:05 +03:00
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["disableService"]["code"] == 200
assert response.json()["data"]["disableService"]["message"] is not None
assert response.json()["data"]["disableService"]["success"] is True
API_STOP_SERVICE_MUTATION = """
2022-10-21 19:37:32 +03:00
mutation stopService($serviceId: String!) {
stopService(serviceId: $serviceId) {
2022-10-15 18:43:05 +03:00
success
message
code
}
}
"""
def test_graphql_stop_service_unauthorized_client(
2022-10-15 18:43:05 +03:00
client,
mock_get_service_by_id_return_none,
mock_service_to_graphql_service,
2022-10-21 19:37:32 +03:00
mock_subprocess_popen,
one_user,
2022-10-15 18:43:05 +03:00
):
response = client.post(
"/graphql",
json={
"query": API_STOP_SERVICE_MUTATION,
2022-10-21 19:37:32 +03:00
"variables": {"serviceId": "nextcloud"},
2022-10-15 18:43:05 +03:00
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_stop_not_found_service(
authorized_client,
mock_get_service_by_id_return_none,
mock_service_to_graphql_service,
2022-10-21 19:37:32 +03:00
mock_subprocess_popen,
one_user,
2022-10-15 18:43:05 +03:00
):
response = authorized_client.post(
"/graphql",
json={
"query": API_STOP_SERVICE_MUTATION,
2022-10-21 19:37:32 +03:00
"variables": {"serviceId": "nextcloud"},
2022-10-15 18:43:05 +03:00
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["stopService"]["code"] == 404
assert response.json()["data"]["stopService"]["message"] is not None
assert response.json()["data"]["stopService"]["success"] is False
def test_graphql_stop_service(
2022-10-15 18:43:05 +03:00
authorized_client,
mock_get_service_by_id,
mock_service_to_graphql_service,
2022-10-21 19:37:32 +03:00
mock_subprocess_popen,
one_user,
2022-10-15 18:43:05 +03:00
):
response = authorized_client.post(
"/graphql",
json={
"query": API_STOP_SERVICE_MUTATION,
2022-10-21 19:37:32 +03:00
"variables": {"serviceId": "nextcloud"},
2022-10-15 18:43:05 +03:00
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["stopService"]["code"] == 200
assert response.json()["data"]["stopService"]["message"] is not None
assert response.json()["data"]["stopService"]["success"] is True
API_START_SERVICE_MUTATION = """
2022-10-21 19:37:32 +03:00
mutation startService($serviceId: String!) {
startService(serviceId: $serviceId) {
2022-10-15 18:43:05 +03:00
success
message
code
}
}
"""
def test_graphql_start_service_unauthorized_client(
2022-10-15 18:43:05 +03:00
client,
mock_get_service_by_id_return_none,
mock_service_to_graphql_service,
2022-10-21 19:37:32 +03:00
mock_subprocess_popen,
one_user,
2022-10-15 18:43:05 +03:00
):
response = client.post(
"/graphql",
json={
"query": API_START_SERVICE_MUTATION,
2022-10-21 19:37:32 +03:00
"variables": {"serviceId": "nextcloud"},
2022-10-15 18:43:05 +03:00
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_start_not_found_service(
authorized_client,
mock_get_service_by_id_return_none,
mock_service_to_graphql_service,
2022-10-21 19:37:32 +03:00
mock_subprocess_popen,
one_user,
2022-10-15 18:43:05 +03:00
):
response = authorized_client.post(
"/graphql",
json={
"query": API_START_SERVICE_MUTATION,
2022-10-21 19:37:32 +03:00
"variables": {"serviceId": "nextcloud"},
2022-10-15 18:43:05 +03:00
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["startService"]["code"] == 404
assert response.json()["data"]["startService"]["message"] is not None
assert response.json()["data"]["startService"]["success"] is False
def test_graphql_start_service(
2022-10-15 18:43:05 +03:00
authorized_client,
mock_get_service_by_id,
mock_service_to_graphql_service,
2022-10-21 19:37:32 +03:00
mock_subprocess_popen,
one_user,
2022-10-15 18:43:05 +03:00
):
response = authorized_client.post(
"/graphql",
json={
"query": API_START_SERVICE_MUTATION,
2022-10-21 19:37:32 +03:00
"variables": {"serviceId": "nextcloud"},
2022-10-15 18:43:05 +03:00
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["startService"]["code"] == 200
assert response.json()["data"]["startService"]["message"] is not None
assert response.json()["data"]["startService"]["success"] is True
API_RESTART_SERVICE_MUTATION = """
2022-10-21 19:37:32 +03:00
mutation restartService($serviceId: String!) {
restartService(serviceId: $serviceId) {
2022-10-15 18:43:05 +03:00
success
message
code
}
}
"""
def test_graphql_restart_service_unauthorized_client(
2022-10-15 18:43:05 +03:00
client,
mock_get_service_by_id_return_none,
mock_service_to_graphql_service,
2022-10-21 19:37:32 +03:00
mock_subprocess_popen,
one_user,
2022-10-15 18:43:05 +03:00
):
response = client.post(
"/graphql",
json={
"query": API_RESTART_SERVICE_MUTATION,
2022-10-21 19:37:32 +03:00
"variables": {"serviceId": "nextcloud"},
2022-10-15 18:43:05 +03:00
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_restart_not_found_service(
authorized_client,
mock_get_service_by_id_return_none,
mock_service_to_graphql_service,
2022-10-21 19:37:32 +03:00
mock_subprocess_popen,
one_user,
2022-10-15 18:43:05 +03:00
):
response = authorized_client.post(
"/graphql",
json={
"query": API_RESTART_SERVICE_MUTATION,
2022-10-21 19:37:32 +03:00
"variables": {"serviceId": "nextcloud"},
2022-10-15 18:43:05 +03:00
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["restartService"]["code"] == 404
assert response.json()["data"]["restartService"]["message"] is not None
assert response.json()["data"]["restartService"]["success"] is False
def test_graphql_restart_service(
authorized_client,
mock_get_service_by_id,
mock_service_to_graphql_service,
2022-10-21 19:37:32 +03:00
mock_subprocess_popen,
one_user,
2022-10-15 18:43:05 +03:00
):
response = authorized_client.post(
"/graphql",
json={
"query": API_RESTART_SERVICE_MUTATION,
2022-10-21 19:37:32 +03:00
"variables": {"serviceId": "nextcloud"},
2022-10-15 18:43:05 +03:00
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["restartService"]["code"] == 200
assert response.json()["data"]["restartService"]["message"] is not None
assert response.json()["data"]["restartService"]["success"] is True
API_MOVE_SERVICE_MUTATION = """
mutation moveService($input: MoveServiceInput!) {
moveService(input: $input) {
success
message
code
}
}
"""
def test_graphql_move_service_unauthorized_client(
2022-10-15 18:43:05 +03:00
client,
mock_get_service_by_id_return_none,
mock_service_to_graphql_service,
2022-10-21 19:37:32 +03:00
mock_subprocess_popen,
one_user,
2022-10-15 18:43:05 +03:00
):
response = client.post(
"/graphql",
json={
"query": API_MOVE_SERVICE_MUTATION,
"variables": {
2022-10-21 19:37:32 +03:00
"input": {"serviceId": "nextcloud", "location": "sdx"},
2022-10-15 18:43:05 +03:00
},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_move_not_found_service(
authorized_client,
mock_get_service_by_id_return_none,
mock_service_to_graphql_service,
2022-10-21 19:37:32 +03:00
mock_subprocess_popen,
one_user,
2022-10-15 18:43:05 +03:00
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOVE_SERVICE_MUTATION,
"variables": {
2022-10-21 19:37:32 +03:00
"input": {"serviceId": "nextcloud", "location": "sdx"},
2022-10-15 18:43:05 +03:00
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["moveService"]["code"] == 404
assert response.json()["data"]["moveService"]["message"] is not None
assert response.json()["data"]["moveService"]["success"] is False
def test_graphql_move_not_movable_service(
2022-10-15 18:43:05 +03:00
authorized_client,
2022-10-21 19:37:32 +03:00
mock_get_service_by_id_return_none,
2022-10-15 18:43:05 +03:00
mock_service_to_graphql_service,
2022-10-21 19:37:32 +03:00
mock_subprocess_popen,
one_user,
2022-10-15 18:43:05 +03:00
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOVE_SERVICE_MUTATION,
"variables": {
2022-10-21 19:37:32 +03:00
"input": {"serviceId": "nextcloud", "location": "sdx"},
2022-10-15 18:43:05 +03:00
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
2022-10-21 19:37:32 +03:00
assert response.json()["data"]["moveService"]["code"] == 404
2022-10-15 18:43:05 +03:00
assert response.json()["data"]["moveService"]["message"] is not None
assert response.json()["data"]["moveService"]["success"] is False
def test_graphql_move_service_volume_not_found(
authorized_client,
2022-10-21 19:37:32 +03:00
mock_get_service_by_id_return_none,
2022-10-15 18:43:05 +03:00
mock_service_to_graphql_service,
mock_block_devices_return_none,
2022-10-21 19:37:32 +03:00
mock_subprocess_popen,
one_user,
2022-10-15 18:43:05 +03:00
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOVE_SERVICE_MUTATION,
"variables": {
2022-10-21 19:37:32 +03:00
"input": {"serviceId": "nextcloud", "location": "sdx"},
2022-10-15 18:43:05 +03:00
},
},
)
2022-10-21 19:37:32 +03:00
2022-10-15 18:43:05 +03:00
assert response.status_code == 200
assert response.json().get("data") is not None
2022-10-21 19:37:32 +03:00
assert response.json()["data"]["moveService"]["code"] == 404
2022-10-15 18:43:05 +03:00
assert response.json()["data"]["moveService"]["message"] is not None
assert response.json()["data"]["moveService"]["success"] is False
def test_graphql_move_service(
authorized_client,
mock_get_service_by_id,
mock_service_to_graphql_service,
mock_block_devices,
2022-10-21 19:37:32 +03:00
mock_subprocess_popen,
one_user,
mock_job_to_api_job,
2022-10-15 18:43:05 +03:00
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOVE_SERVICE_MUTATION,
"variables": {
2022-10-21 19:37:32 +03:00
"input": {"serviceId": "nextcloud", "location": "sdx"},
2022-10-15 18:43:05 +03:00
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["moveService"]["code"] == 200
assert response.json()["data"]["moveService"]["message"] is not None
assert response.json()["data"]["moveService"]["success"] is True