diff --git a/selfprivacy_api/resources/services/restic.py b/selfprivacy_api/resources/services/restic.py index 863d090..282488d 100644 --- a/selfprivacy_api/resources/services/restic.py +++ b/selfprivacy_api/resources/services/restic.py @@ -147,23 +147,26 @@ class AsyncRestoreBackup(Resource): 401: description: Unauthorized """ - backup_restoration_command = ["restic", "-r", "rclone:backblaze:sfbackup", "var", "--json"] + backup_restoration_command = [ + "restic", + "-r", + "rclone:backblaze:sfbackup", + "var", + "--json", + ] - with open("/tmp/backup.log", "w", encoding="utf-8") as backup_log_file_descriptor: + with open( + "/tmp/backup.log", "w", encoding="utf-8" + ) as backup_log_file_descriptor: with subprocess.Popen( backup_restoration_command, shell=False, stdout=subprocess.PIPE, stderr=backup_log_file_descriptor, ) as backup_restoration_process_descriptor: - backup_restoration_status = ( - "Backup restoration procedure started" - ) - - return { - "status": 0, - "message": backup_restoration_status - } + backup_restoration_status = "Backup restoration procedure started" + + return {"status": 0, "message": backup_restoration_status} api.add_resource(ListAllBackups, "/restic/backup/list") diff --git a/selfprivacy_api/resources/services/update.py b/selfprivacy_api/resources/services/update.py index 1d15fbe..73698d5 100644 --- a/selfprivacy_api/resources/services/update.py +++ b/selfprivacy_api/resources/services/update.py @@ -32,28 +32,24 @@ class PullRepositoryChanges(Resource): current_working_directory = os.getcwd() os.chdir("/etc/nixos") - git_pull_process_descriptor = subprocess.Popen( git_pull_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - shell=False + shell=False, ) - git_pull_process_descriptor.communicate()[0] os.chdir(current_working_directory) if git_pull_process_descriptor.returncode == 0: - return { - "status": 0, - "message": "Update completed successfully" - } + return {"status": 0, "message": "Update completed successfully"} elif git_pull_process_descriptor.returncode > 0: return { "status": git_pull_process_descriptor.returncode, - "message": "Something went wrong" + "message": "Something went wrong", }, 500 + api.add_resource(PullRepositoryChanges, "/update") diff --git a/selfprivacy_api/utils.py b/selfprivacy_api/utils.py index 4970db0..b0a7686 100644 --- a/selfprivacy_api/utils.py +++ b/selfprivacy_api/utils.py @@ -7,6 +7,7 @@ from flask import current_app USERDATA_FILE = "/etc/nixos/userdata/userdata.json" + def get_domain(): """Get domain from /var/domain without trailing new line""" with open("/var/domain", "r", encoding="utf-8") as domain_file: @@ -18,9 +19,7 @@ class WriteUserData(object): """Write userdata.json with lock""" def __init__(self): - self.userdata_file = open( - USERDATA_FILE, "r+", encoding="utf-8" - ) + self.userdata_file = open(USERDATA_FILE, "r+", encoding="utf-8") portalocker.lock(self.userdata_file, portalocker.LOCK_EX) self.data = json.load(self.userdata_file) @@ -40,9 +39,7 @@ class ReadUserData(object): """Read userdata.json with lock""" def __init__(self): - self.userdata_file = open( - USERDATA_FILE, "r", encoding="utf-8" - ) + self.userdata_file = open(USERDATA_FILE, "r", encoding="utf-8") portalocker.lock(self.userdata_file, portalocker.LOCK_SH) self.data = json.load(self.userdata_file) @@ -60,4 +57,3 @@ def validate_ssh_public_key(key): if not key.startswith("ssh-rsa"): return False return True - \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index 72fd132..e963224 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,10 +5,12 @@ from selfprivacy_api.app import create_app @pytest.fixture def app(): - app = create_app({ - "AUTH_TOKEN": "TEST_TOKEN", - "ENABLE_SWAGGER": "0", - }) + app = create_app( + { + "AUTH_TOKEN": "TEST_TOKEN", + "ENABLE_SWAGGER": "0", + } + ) yield app @@ -17,6 +19,7 @@ def app(): def client(app): return app.test_client() + class AuthorizedClient(testing.FlaskClient): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -28,11 +31,13 @@ class AuthorizedClient(testing.FlaskClient): kwargs["headers"]["Authorization"] = f"Bearer {self.token}" return super().open(*args, **kwargs) + @pytest.fixture def authorized_client(app): app.test_client_class = AuthorizedClient return app.test_client() + @pytest.fixture def runner(app): - return app.test_cli_runner() \ No newline at end of file + return app.test_cli_runner() diff --git a/tests/services/test_bitwarden.py b/tests/services/test_bitwarden.py index 7e009a4..3977253 100644 --- a/tests/services/test_bitwarden.py +++ b/tests/services/test_bitwarden.py @@ -1,43 +1,54 @@ import json import pytest + def read_json(file_path): with open(file_path, "r") as f: return json.load(f) + ############################################################################### + @pytest.fixture def bitwarden_off(mocker, datadir): mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json") assert read_json(datadir / "turned_off.json")["bitwarden"]["enable"] == False return datadir + @pytest.fixture def bitwarden_on(mocker, datadir): mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json") assert read_json(datadir / "turned_on.json")["bitwarden"]["enable"] == True return datadir + @pytest.fixture def bitwarden_enable_undefined(mocker, datadir): - mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json") + mocker.patch( + "selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json" + ) assert "enable" not in read_json(datadir / "enable_undefined.json")["bitwarden"] return datadir + @pytest.fixture def bitwarden_undefined(mocker, datadir): mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json") assert "bitwarden" not in read_json(datadir / "undefined.json") return datadir + ############################################################################### + @pytest.mark.parametrize("endpoint", ["enable", "disable"]) def test_unauthorized(client, bitwarden_off, endpoint): response = client.post(f"/services/bitwarden/{endpoint}") assert response.status_code == 401 + @pytest.mark.parametrize("endpoint", ["enable", "disable"]) def test_illegal_methods(authorized_client, bitwarden_off, endpoint): response = authorized_client.get(f"/services/bitwarden/{endpoint}") @@ -47,34 +58,68 @@ def test_illegal_methods(authorized_client, bitwarden_off, endpoint): response = authorized_client.delete(f"/services/bitwarden/{endpoint}") assert response.status_code == 405 -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) def test_switch_from_off(authorized_client, bitwarden_off, endpoint, target_file): response = authorized_client.post(f"/services/bitwarden/{endpoint}") assert response.status_code == 200 - assert read_json(bitwarden_off / "turned_off.json") == read_json(bitwarden_off / target_file) + assert read_json(bitwarden_off / "turned_off.json") == read_json( + bitwarden_off / target_file + ) -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) def test_switch_from_on(authorized_client, bitwarden_on, endpoint, target_file): response = authorized_client.post(f"/services/bitwarden/{endpoint}") assert response.status_code == 200 - assert read_json(bitwarden_on / "turned_on.json") == read_json(bitwarden_on / target_file) + assert read_json(bitwarden_on / "turned_on.json") == read_json( + bitwarden_on / target_file + ) -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) def test_switch_twice(authorized_client, bitwarden_off, endpoint, target_file): response = authorized_client.post(f"/services/bitwarden/{endpoint}") assert response.status_code == 200 response = authorized_client.post(f"/services/bitwarden/{endpoint}") assert response.status_code == 200 - assert read_json(bitwarden_off / "turned_off.json") == read_json(bitwarden_off / target_file) + assert read_json(bitwarden_off / "turned_off.json") == read_json( + bitwarden_off / target_file + ) -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) -def test_on_attribute_deleted(authorized_client, bitwarden_enable_undefined, endpoint, target_file): + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_on_attribute_deleted( + authorized_client, bitwarden_enable_undefined, endpoint, target_file +): response = authorized_client.post(f"/services/bitwarden/{endpoint}") assert response.status_code == 200 - assert read_json(bitwarden_enable_undefined / "enable_undefined.json") == read_json(bitwarden_enable_undefined / target_file) + assert read_json(bitwarden_enable_undefined / "enable_undefined.json") == read_json( + bitwarden_enable_undefined / target_file + ) -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) -def test_on_bitwarden_undefined(authorized_client, bitwarden_undefined, endpoint, target_file): + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_on_bitwarden_undefined( + authorized_client, bitwarden_undefined, endpoint, target_file +): response = authorized_client.post(f"/services/bitwarden/{endpoint}") assert response.status_code == 200 - assert read_json(bitwarden_undefined / "undefined.json") == read_json(bitwarden_undefined / target_file) + assert read_json(bitwarden_undefined / "undefined.json") == read_json( + bitwarden_undefined / target_file + ) diff --git a/tests/services/test_gitea.py b/tests/services/test_gitea.py index b2d57b9..0a50c19 100644 --- a/tests/services/test_gitea.py +++ b/tests/services/test_gitea.py @@ -1,43 +1,54 @@ import json import pytest + def read_json(file_path): with open(file_path, "r") as f: return json.load(f) + ############################################################################### + @pytest.fixture def gitea_off(mocker, datadir): mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json") assert read_json(datadir / "turned_off.json")["gitea"]["enable"] == False return datadir + @pytest.fixture def gitea_on(mocker, datadir): mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json") assert read_json(datadir / "turned_on.json")["gitea"]["enable"] == True return datadir + @pytest.fixture def gitea_enable_undefined(mocker, datadir): - mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json") + mocker.patch( + "selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json" + ) assert "enable" not in read_json(datadir / "enable_undefined.json")["gitea"] return datadir + @pytest.fixture def gitea_undefined(mocker, datadir): mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json") assert "gitea" not in read_json(datadir / "undefined.json") return datadir + ############################################################################### + @pytest.mark.parametrize("endpoint", ["enable", "disable"]) def test_unauthorized(client, gitea_off, endpoint): response = client.post(f"/services/gitea/{endpoint}") assert response.status_code == 401 + @pytest.mark.parametrize("endpoint", ["enable", "disable"]) def test_illegal_methods(authorized_client, gitea_off, endpoint): response = authorized_client.get(f"/services/gitea/{endpoint}") @@ -47,34 +58,64 @@ def test_illegal_methods(authorized_client, gitea_off, endpoint): response = authorized_client.delete(f"/services/gitea/{endpoint}") assert response.status_code == 405 -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) def test_switch_from_off(authorized_client, gitea_off, endpoint, target_file): response = authorized_client.post(f"/services/gitea/{endpoint}") assert response.status_code == 200 - assert read_json(gitea_off / "turned_off.json") == read_json(gitea_off / target_file) + assert read_json(gitea_off / "turned_off.json") == read_json( + gitea_off / target_file + ) -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) def test_switch_from_on(authorized_client, gitea_on, endpoint, target_file): response = authorized_client.post(f"/services/gitea/{endpoint}") assert response.status_code == 200 assert read_json(gitea_on / "turned_on.json") == read_json(gitea_on / target_file) -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) def test_switch_twice(authorized_client, gitea_off, endpoint, target_file): response = authorized_client.post(f"/services/gitea/{endpoint}") assert response.status_code == 200 response = authorized_client.post(f"/services/gitea/{endpoint}") assert response.status_code == 200 - assert read_json(gitea_off / "turned_off.json") == read_json(gitea_off / target_file) + assert read_json(gitea_off / "turned_off.json") == read_json( + gitea_off / target_file + ) -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) -def test_on_attribute_deleted(authorized_client, gitea_enable_undefined, endpoint, target_file): + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_on_attribute_deleted( + authorized_client, gitea_enable_undefined, endpoint, target_file +): response = authorized_client.post(f"/services/gitea/{endpoint}") assert response.status_code == 200 - assert read_json(gitea_enable_undefined / "enable_undefined.json") == read_json(gitea_enable_undefined / target_file) + assert read_json(gitea_enable_undefined / "enable_undefined.json") == read_json( + gitea_enable_undefined / target_file + ) -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) def test_on_gitea_undefined(authorized_client, gitea_undefined, endpoint, target_file): response = authorized_client.post(f"/services/gitea/{endpoint}") assert response.status_code == 200 - assert read_json(gitea_undefined / "undefined.json") == read_json(gitea_undefined / target_file) + assert read_json(gitea_undefined / "undefined.json") == read_json( + gitea_undefined / target_file + ) diff --git a/tests/services/test_mailserver.py b/tests/services/test_mailserver.py index aa008c1..f45ad1e 100644 --- a/tests/services/test_mailserver.py +++ b/tests/services/test_mailserver.py @@ -2,14 +2,18 @@ import base64 import json import pytest + def read_json(file_path): with open(file_path, "r", encoding="utf-8") as f: return json.load(f) + ############################################################################### -class ProcessMock(): + +class ProcessMock: """Mock subprocess.Popen""" + def __init__(self, args, **kwargs): self.args = args self.kwargs = kwargs @@ -17,6 +21,7 @@ class ProcessMock(): def communicate(): return (b"I am a DKIM key", None) + class NoFileMock(ProcessMock): def communicate(): return (b"", None) @@ -25,24 +30,36 @@ class NoFileMock(ProcessMock): @pytest.fixture def mock_subproccess_popen(mocker): mock = mocker.patch("subprocess.Popen", autospec=True, return_value=ProcessMock) - mocker.patch("selfprivacy_api.resources.services.mailserver.get_domain", autospec=True, return_value="example.com") + mocker.patch( + "selfprivacy_api.resources.services.mailserver.get_domain", + autospec=True, + return_value="example.com", + ) mocker.patch("os.path.exists", autospec=True, return_value=True) - return mock + return mock + @pytest.fixture def mock_no_file(mocker): mock = mocker.patch("subprocess.Popen", autospec=True, return_value=NoFileMock) - mocker.patch("selfprivacy_api.resources.services.mailserver.get_domain", autospec=True, return_value="example.com") + mocker.patch( + "selfprivacy_api.resources.services.mailserver.get_domain", + autospec=True, + return_value="example.com", + ) mocker.patch("os.path.exists", autospec=True, return_value=False) return mock + ############################################################################### + def test_unauthorized(client, mock_subproccess_popen): """Test unauthorized""" response = client.get("/services/mailserver/dkim") assert response.status_code == 401 + def test_illegal_methods(authorized_client, mock_subproccess_popen): response = authorized_client.post("/services/mailserver/dkim") assert response.status_code == 405 @@ -51,15 +68,20 @@ def test_illegal_methods(authorized_client, mock_subproccess_popen): response = authorized_client.delete("/services/mailserver/dkim") assert response.status_code == 405 + def test_dkim_key(authorized_client, mock_subproccess_popen): """Test DKIM key""" response = authorized_client.get("/services/mailserver/dkim") assert response.status_code == 200 assert base64.b64decode(response.data) == b"I am a DKIM key" - assert mock_subproccess_popen.call_args[0][0] == ["cat", "/var/dkim/example.com.selector.txt"] + assert mock_subproccess_popen.call_args[0][0] == [ + "cat", + "/var/dkim/example.com.selector.txt", + ] + def test_no_dkim_key(authorized_client, mock_no_file): """Test no DKIM key""" response = authorized_client.get("/services/mailserver/dkim") assert response.status_code == 404 - assert mock_no_file.called == False \ No newline at end of file + assert mock_no_file.called == False diff --git a/tests/services/test_nextcloud.py b/tests/services/test_nextcloud.py index 031e0f2..b05c363 100644 --- a/tests/services/test_nextcloud.py +++ b/tests/services/test_nextcloud.py @@ -1,43 +1,54 @@ import json import pytest + def read_json(file_path): with open(file_path, "r") as f: return json.load(f) + ############################################################################### + @pytest.fixture def nextcloud_off(mocker, datadir): mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json") assert read_json(datadir / "turned_off.json")["nextcloud"]["enable"] == False return datadir + @pytest.fixture def nextcloud_on(mocker, datadir): mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json") assert read_json(datadir / "turned_on.json")["nextcloud"]["enable"] == True return datadir + @pytest.fixture def nextcloud_enable_undefined(mocker, datadir): - mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json") + mocker.patch( + "selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json" + ) assert "enable" not in read_json(datadir / "enable_undefined.json")["nextcloud"] return datadir + @pytest.fixture def nextcloud_undefined(mocker, datadir): mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json") assert "nextcloud" not in read_json(datadir / "undefined.json") return datadir + ############################################################################### + @pytest.mark.parametrize("endpoint", ["enable", "disable"]) def test_unauthorized(client, nextcloud_off, endpoint): response = client.post(f"/services/nextcloud/{endpoint}") assert response.status_code == 401 + @pytest.mark.parametrize("endpoint", ["enable", "disable"]) def test_illegal_methods(authorized_client, nextcloud_off, endpoint): response = authorized_client.get(f"/services/nextcloud/{endpoint}") @@ -47,34 +58,66 @@ def test_illegal_methods(authorized_client, nextcloud_off, endpoint): response = authorized_client.delete(f"/services/nextcloud/{endpoint}") assert response.status_code == 405 -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) def test_switch_from_off(authorized_client, nextcloud_off, endpoint, target_file): response = authorized_client.post(f"/services/nextcloud/{endpoint}") assert response.status_code == 200 - assert read_json(nextcloud_off / "turned_off.json") == read_json(nextcloud_off / target_file) + assert read_json(nextcloud_off / "turned_off.json") == read_json( + nextcloud_off / target_file + ) -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) def test_switch_from_on(authorized_client, nextcloud_on, endpoint, target_file): response = authorized_client.post(f"/services/nextcloud/{endpoint}") assert response.status_code == 200 - assert read_json(nextcloud_on / "turned_on.json") == read_json(nextcloud_on / target_file) + assert read_json(nextcloud_on / "turned_on.json") == read_json( + nextcloud_on / target_file + ) -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) def test_switch_twice(authorized_client, nextcloud_off, endpoint, target_file): response = authorized_client.post(f"/services/nextcloud/{endpoint}") assert response.status_code == 200 response = authorized_client.post(f"/services/nextcloud/{endpoint}") assert response.status_code == 200 - assert read_json(nextcloud_off / "turned_off.json") == read_json(nextcloud_off / target_file) + assert read_json(nextcloud_off / "turned_off.json") == read_json( + nextcloud_off / target_file + ) -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) -def test_on_attribute_deleted(authorized_client, nextcloud_enable_undefined, endpoint, target_file): + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_on_attribute_deleted( + authorized_client, nextcloud_enable_undefined, endpoint, target_file +): response = authorized_client.post(f"/services/nextcloud/{endpoint}") assert response.status_code == 200 - assert read_json(nextcloud_enable_undefined / "enable_undefined.json") == read_json(nextcloud_enable_undefined / target_file) + assert read_json(nextcloud_enable_undefined / "enable_undefined.json") == read_json( + nextcloud_enable_undefined / target_file + ) + @pytest.mark.parametrize("endpoint,target", [("enable", True), ("disable", False)]) -def test_on_nextcloud_undefined(authorized_client, nextcloud_undefined, endpoint, target): +def test_on_nextcloud_undefined( + authorized_client, nextcloud_undefined, endpoint, target +): response = authorized_client.post(f"/services/nextcloud/{endpoint}") assert response.status_code == 200 - assert read_json(nextcloud_undefined / "undefined.json")["nextcloud"]["enable"] == target + assert ( + read_json(nextcloud_undefined / "undefined.json")["nextcloud"]["enable"] + == target + ) diff --git a/tests/services/test_ocserv.py b/tests/services/test_ocserv.py index 2d658ea..8f43e70 100644 --- a/tests/services/test_ocserv.py +++ b/tests/services/test_ocserv.py @@ -1,43 +1,54 @@ import json import pytest + def read_json(file_path): with open(file_path, "r") as f: return json.load(f) + ############################################################################### + @pytest.fixture def ocserv_off(mocker, datadir): mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json") assert read_json(datadir / "turned_off.json")["ocserv"]["enable"] == False return datadir + @pytest.fixture def ocserv_on(mocker, datadir): mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json") assert read_json(datadir / "turned_on.json")["ocserv"]["enable"] == True return datadir + @pytest.fixture def ocserv_enable_undefined(mocker, datadir): - mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json") + mocker.patch( + "selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json" + ) assert "enable" not in read_json(datadir / "enable_undefined.json")["ocserv"] return datadir + @pytest.fixture def ocserv_undefined(mocker, datadir): mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json") assert "ocserv" not in read_json(datadir / "undefined.json") return datadir + ############################################################################### + @pytest.mark.parametrize("endpoint", ["enable", "disable"]) def test_unauthorized(client, ocserv_off, endpoint): response = client.post(f"/services/ocserv/{endpoint}") assert response.status_code == 401 + @pytest.mark.parametrize("endpoint", ["enable", "disable"]) def test_illegal_methods(authorized_client, ocserv_off, endpoint): response = authorized_client.get(f"/services/ocserv/{endpoint}") @@ -47,34 +58,66 @@ def test_illegal_methods(authorized_client, ocserv_off, endpoint): response = authorized_client.delete(f"/services/ocserv/{endpoint}") assert response.status_code == 405 -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) def test_switch_from_off(authorized_client, ocserv_off, endpoint, target_file): response = authorized_client.post(f"/services/ocserv/{endpoint}") assert response.status_code == 200 - assert read_json(ocserv_off / "turned_off.json") == read_json(ocserv_off / target_file) + assert read_json(ocserv_off / "turned_off.json") == read_json( + ocserv_off / target_file + ) -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) def test_switch_from_on(authorized_client, ocserv_on, endpoint, target_file): response = authorized_client.post(f"/services/ocserv/{endpoint}") assert response.status_code == 200 assert read_json(ocserv_on / "turned_on.json") == read_json(ocserv_on / target_file) -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) def test_switch_twice(authorized_client, ocserv_off, endpoint, target_file): response = authorized_client.post(f"/services/ocserv/{endpoint}") assert response.status_code == 200 response = authorized_client.post(f"/services/ocserv/{endpoint}") assert response.status_code == 200 - assert read_json(ocserv_off / "turned_off.json") == read_json(ocserv_off / target_file) + assert read_json(ocserv_off / "turned_off.json") == read_json( + ocserv_off / target_file + ) -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) -def test_on_attribute_deleted(authorized_client, ocserv_enable_undefined, endpoint, target_file): + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_on_attribute_deleted( + authorized_client, ocserv_enable_undefined, endpoint, target_file +): response = authorized_client.post(f"/services/ocserv/{endpoint}") assert response.status_code == 200 - assert read_json(ocserv_enable_undefined / "enable_undefined.json") == read_json(ocserv_enable_undefined / target_file) + assert read_json(ocserv_enable_undefined / "enable_undefined.json") == read_json( + ocserv_enable_undefined / target_file + ) -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) -def test_on_ocserv_undefined(authorized_client, ocserv_undefined, endpoint, target_file): + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_on_ocserv_undefined( + authorized_client, ocserv_undefined, endpoint, target_file +): response = authorized_client.post(f"/services/ocserv/{endpoint}") assert response.status_code == 200 - assert read_json(ocserv_undefined / "undefined.json") == read_json(ocserv_undefined / target_file) + assert read_json(ocserv_undefined / "undefined.json") == read_json( + ocserv_undefined / target_file + ) diff --git a/tests/services/test_pleroma.py b/tests/services/test_pleroma.py index 8b7a877..0d7f149 100644 --- a/tests/services/test_pleroma.py +++ b/tests/services/test_pleroma.py @@ -1,43 +1,54 @@ import json import pytest + def read_json(file_path): with open(file_path, "r") as f: return json.load(f) + ############################################################################### + @pytest.fixture def pleroma_off(mocker, datadir): mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json") assert read_json(datadir / "turned_off.json")["pleroma"]["enable"] == False return datadir + @pytest.fixture def pleroma_on(mocker, datadir): mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json") assert read_json(datadir / "turned_on.json")["pleroma"]["enable"] == True return datadir + @pytest.fixture def pleroma_enable_undefined(mocker, datadir): - mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json") + mocker.patch( + "selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json" + ) assert "enable" not in read_json(datadir / "enable_undefined.json")["pleroma"] return datadir + @pytest.fixture def pleroma_undefined(mocker, datadir): mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json") assert "pleroma" not in read_json(datadir / "undefined.json") return datadir + ############################################################################### + @pytest.mark.parametrize("endpoint", ["enable", "disable"]) def test_unauthorized(client, pleroma_off, endpoint): response = client.post(f"/services/pleroma/{endpoint}") assert response.status_code == 401 + @pytest.mark.parametrize("endpoint", ["enable", "disable"]) def test_illegal_methods(authorized_client, pleroma_off, endpoint): response = authorized_client.get(f"/services/pleroma/{endpoint}") @@ -47,34 +58,68 @@ def test_illegal_methods(authorized_client, pleroma_off, endpoint): response = authorized_client.delete(f"/services/pleroma/{endpoint}") assert response.status_code == 405 -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) def test_switch_from_off(authorized_client, pleroma_off, endpoint, target_file): response = authorized_client.post(f"/services/pleroma/{endpoint}") assert response.status_code == 200 - assert read_json(pleroma_off / "turned_off.json") == read_json(pleroma_off / target_file) + assert read_json(pleroma_off / "turned_off.json") == read_json( + pleroma_off / target_file + ) -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) def test_switch_from_on(authorized_client, pleroma_on, endpoint, target_file): response = authorized_client.post(f"/services/pleroma/{endpoint}") assert response.status_code == 200 - assert read_json(pleroma_on / "turned_on.json") == read_json(pleroma_on / target_file) + assert read_json(pleroma_on / "turned_on.json") == read_json( + pleroma_on / target_file + ) -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) def test_switch_twice(authorized_client, pleroma_off, endpoint, target_file): response = authorized_client.post(f"/services/pleroma/{endpoint}") assert response.status_code == 200 response = authorized_client.post(f"/services/pleroma/{endpoint}") assert response.status_code == 200 - assert read_json(pleroma_off / "turned_off.json") == read_json(pleroma_off / target_file) + assert read_json(pleroma_off / "turned_off.json") == read_json( + pleroma_off / target_file + ) -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) -def test_on_attribute_deleted(authorized_client, pleroma_enable_undefined, endpoint, target_file): + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_on_attribute_deleted( + authorized_client, pleroma_enable_undefined, endpoint, target_file +): response = authorized_client.post(f"/services/pleroma/{endpoint}") assert response.status_code == 200 - assert read_json(pleroma_enable_undefined / "enable_undefined.json") == read_json(pleroma_enable_undefined / target_file) + assert read_json(pleroma_enable_undefined / "enable_undefined.json") == read_json( + pleroma_enable_undefined / target_file + ) -@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")]) -def test_on_pleroma_undefined(authorized_client, pleroma_undefined, endpoint, target_file): + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_on_pleroma_undefined( + authorized_client, pleroma_undefined, endpoint, target_file +): response = authorized_client.post(f"/services/pleroma/{endpoint}") assert response.status_code == 200 - assert read_json(pleroma_undefined / "undefined.json") == read_json(pleroma_undefined / target_file) + assert read_json(pleroma_undefined / "undefined.json") == read_json( + pleroma_undefined / target_file + ) diff --git a/tests/services/test_services.py b/tests/services/test_services.py index 0516c2d..859da5a 100644 --- a/tests/services/test_services.py +++ b/tests/services/test_services.py @@ -7,6 +7,7 @@ def read_json(file_path): with open(file_path, "r", encoding="utf-8") as f: return json.load(f) + def call_args_asserts(mocked_object): assert mocked_object.call_count == 8 assert mocked_object.call_args_list[0][0][0] == [ @@ -50,6 +51,7 @@ def call_args_asserts(mocked_object): "pleroma.service", ] + class ProcessMock: """Mock subprocess.Popen""" diff --git a/tests/services/test_ssh.py b/tests/services/test_ssh.py index c140123..7233a53 100644 --- a/tests/services/test_ssh.py +++ b/tests/services/test_ssh.py @@ -172,91 +172,143 @@ def test_set_settings_undefined(authorized_client, undefined_settings, settings) if "passwordAuthentication" in settings: assert data["passwordAuthentication"] == settings["passwordAuthentication"] + def test_add_root_key(authorized_client, ssh_on): - response = authorized_client.put(f"/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"}) + response = authorized_client.put( + f"/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"} + ) assert response.status_code == 201 assert read_json(ssh_on / "turned_on.json")["ssh"]["rootKeys"] == [ "ssh-rsa KEY test@pc", ] + def test_add_root_key_one_more(authorized_client, root_and_admin_have_keys): - response = authorized_client.put(f"/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"}) + response = authorized_client.put( + f"/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"} + ) assert response.status_code == 201 - assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"]["rootKeys"] == [ + assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][ + "rootKeys" + ] == [ "ssh-ed25519 KEY test@pc", "ssh-rsa KEY test@pc", ] + def test_add_existing_root_key(authorized_client, root_and_admin_have_keys): - response = authorized_client.put(f"/services/ssh/key/send", json={"public_key": "ssh-ed25519 KEY test@pc"}) + response = authorized_client.put( + f"/services/ssh/key/send", json={"public_key": "ssh-ed25519 KEY test@pc"} + ) assert response.status_code == 409 - assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"]["rootKeys"] == [ + assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][ + "rootKeys" + ] == [ "ssh-ed25519 KEY test@pc", ] + def test_add_invalid_root_key(authorized_client, ssh_on): - response = authorized_client.put(f"/services/ssh/key/send", json={"public_key": "INVALID KEY test@pc"}) + response = authorized_client.put( + f"/services/ssh/key/send", json={"public_key": "INVALID KEY test@pc"} + ) assert response.status_code == 400 + def test_add_root_key_via_wrong_endpoint(authorized_client, ssh_on): - response = authorized_client.post(f"/services/ssh/keys/root", json={"public_key": "ssh-rsa KEY test@pc"}) + response = authorized_client.post( + f"/services/ssh/keys/root", json={"public_key": "ssh-rsa KEY test@pc"} + ) assert response.status_code == 400 + def test_get_root_key(authorized_client, root_and_admin_have_keys): response = authorized_client.get(f"/services/ssh/keys/root") assert response.status_code == 200 assert response.json == ["ssh-ed25519 KEY test@pc"] + def test_get_root_key_when_none(authorized_client, ssh_on): response = authorized_client.get(f"/services/ssh/keys/root") assert response.status_code == 200 assert response.json == [] + def test_delete_root_key(authorized_client, root_and_admin_have_keys): - response = authorized_client.delete(f"/services/ssh/keys/root", json={"public_key": "ssh-ed25519 KEY test@pc"}) + response = authorized_client.delete( + f"/services/ssh/keys/root", json={"public_key": "ssh-ed25519 KEY test@pc"} + ) assert response.status_code == 200 - assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"]["rootKeys"] == [] + assert ( + read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][ + "rootKeys" + ] + == [] + ) + def test_delete_root_nonexistent_key(authorized_client, root_and_admin_have_keys): - response = authorized_client.delete(f"/services/ssh/keys/root", json={"public_key": "ssh-rsa KEY test@pc"}) + response = authorized_client.delete( + f"/services/ssh/keys/root", json={"public_key": "ssh-rsa KEY test@pc"} + ) assert response.status_code == 404 - assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"]["rootKeys"] == [ + assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][ + "rootKeys" + ] == [ "ssh-ed25519 KEY test@pc", ] + def test_get_admin_key(authorized_client, root_and_admin_have_keys): response = authorized_client.get(f"/services/ssh/keys/tester") assert response.status_code == 200 assert response.json == ["ssh-rsa KEY test@pc"] + def test_get_admin_key_when_none(authorized_client, ssh_on): response = authorized_client.get(f"/services/ssh/keys/tester") assert response.status_code == 200 assert response.json == [] + def test_delete_admin_key(authorized_client, root_and_admin_have_keys): - response = authorized_client.delete(f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"}) + response = authorized_client.delete( + f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"} + ) assert response.status_code == 200 - assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["sshKeys"] == [] + assert ( + read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["sshKeys"] + == [] + ) + def test_add_admin_key(authorized_client, ssh_on): - response = authorized_client.post(f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"}) + response = authorized_client.post( + f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"} + ) assert response.status_code == 201 assert read_json(ssh_on / "turned_on.json")["sshKeys"] == [ "ssh-rsa KEY test@pc", ] + def test_add_admin_key_one_more(authorized_client, root_and_admin_have_keys): - response = authorized_client.post(f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY_2 test@pc"}) + response = authorized_client.post( + f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY_2 test@pc"} + ) assert response.status_code == 201 - assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["sshKeys"] == [ - "ssh-rsa KEY test@pc", - "ssh-rsa KEY_2 test@pc" - ] + assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")[ + "sshKeys" + ] == ["ssh-rsa KEY test@pc", "ssh-rsa KEY_2 test@pc"] + def test_add_existing_admin_key(authorized_client, root_and_admin_have_keys): - response = authorized_client.post(f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"}) + response = authorized_client.post( + f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"} + ) assert response.status_code == 409 - assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["sshKeys"] == [ + assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")[ + "sshKeys" + ] == [ "ssh-rsa KEY test@pc", - ] \ No newline at end of file + ]