diff --git a/selfprivacy_api/resources/services/ssh.py b/selfprivacy_api/resources/services/ssh.py index 2b90087..1665751 100644 --- a/selfprivacy_api/resources/services/ssh.py +++ b/selfprivacy_api/resources/services/ssh.py @@ -216,10 +216,10 @@ class SSHKeys(Resource): if "users" not in data: data["users"] = [] for user in data["users"]: - if user["name"] == username: + if user["username"] == username: if "sshKeys" not in user: user["sshKeys"] = [] - return user["ssh"]["sshKeys"] + return user["sshKeys"] return { "error": "User not found", }, 404 diff --git a/tests/services/test_ssh.py b/tests/services/test_ssh.py index 7233a53..db83a6b 100644 --- a/tests/services/test_ssh.py +++ b/tests/services/test_ssh.py @@ -1,41 +1,39 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument import json from os import read import pytest def read_json(file_path): - with open(file_path, "r") as f: - return json.load(f) + with open(file_path, "r", encoding="utf-8") as file: + return json.load(file) -############################################################################### +## FIXTURES ################################################### @pytest.fixture def ssh_off(mocker, datadir): mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json") - assert read_json(datadir / "turned_off.json")["ssh"]["enable"] == False - assert ( - read_json(datadir / "turned_off.json")["ssh"]["passwordAuthentication"] == True - ) + assert not read_json(datadir / "turned_off.json")["ssh"]["enable"] + assert read_json(datadir / "turned_off.json")["ssh"]["passwordAuthentication"] return datadir @pytest.fixture def ssh_on(mocker, datadir): mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json") - assert ( - read_json(datadir / "turned_off.json")["ssh"]["passwordAuthentication"] == True - ) - assert read_json(datadir / "turned_on.json")["ssh"]["enable"] == True + assert read_json(datadir / "turned_off.json")["ssh"]["passwordAuthentication"] + assert read_json(datadir / "turned_on.json")["ssh"]["enable"] return datadir @pytest.fixture def all_off(mocker, datadir): mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "all_off.json") - assert read_json(datadir / "all_off.json")["ssh"]["passwordAuthentication"] == False - assert read_json(datadir / "all_off.json")["ssh"]["enable"] == False + assert not read_json(datadir / "all_off.json")["ssh"]["passwordAuthentication"] + assert not read_json(datadir / "all_off.json")["ssh"]["enable"] return datadir @@ -46,19 +44,30 @@ def undefined_settings(mocker, datadir): return datadir +@pytest.fixture +def undefined_values(mocker, datadir): + mocker.patch( + "selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined_values.json" + ) + assert "ssh" in read_json(datadir / "undefined_values.json") + assert "enable" not in read_json(datadir / "undefined_values.json")["ssh"] + assert ( + "passwordAuthentication" + not in read_json(datadir / "undefined_values.json")["ssh"] + ) + return datadir + + @pytest.fixture def root_and_admin_have_keys(mocker, datadir): mocker.patch( "selfprivacy_api.utils.USERDATA_FILE", new=datadir / "root_and_admin_have_keys.json", ) - assert read_json(datadir / "root_and_admin_have_keys.json")["ssh"]["enable"] == True - assert ( - read_json(datadir / "root_and_admin_have_keys.json")["ssh"][ - "passwordAuthentication" - ] - == True - ) + assert read_json(datadir / "root_and_admin_have_keys.json")["ssh"]["enable"] + assert read_json(datadir / "root_and_admin_have_keys.json")["ssh"][ + "passwordAuthentication" + ] assert read_json(datadir / "root_and_admin_have_keys.json")["ssh"]["rootKeys"] == [ "ssh-ed25519 KEY test@pc" ] @@ -68,7 +77,23 @@ def root_and_admin_have_keys(mocker, datadir): return datadir -############################################################################### +@pytest.fixture +def some_users(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "some_users.json") + assert "users" in read_json(datadir / "some_users.json") + assert read_json(datadir / "some_users.json")["users"] == [ + { + "username": "user1", + "hashedPassword": "HASHED_PASSWORD_1", + "sshKeys": ["ssh-rsa KEY user1@pc"], + }, + {"username": "user2", "hashedPassword": "HASHED_PASSWORD_2", "sshKeys": []}, + {"username": "user3", "hashedPassword": "HASHED_PASSWORD_3"}, + ] + return datadir + + +## TEST 401 ###################################################### @pytest.mark.parametrize( @@ -79,20 +104,33 @@ def test_unauthorized(client, ssh_off, endpoint): assert response.status_code == 401 +## TEST ENABLE ###################################################### + + def test_legacy_enable(authorized_client, ssh_off): - response = authorized_client.post(f"/services/ssh/enable") + response = authorized_client.post("/services/ssh/enable") assert response.status_code == 200 assert read_json(ssh_off / "turned_off.json") == read_json( ssh_off / "turned_on.json" ) +def test_legacy_on_undefined(authorized_client, undefined_settings): + response = authorized_client.post("/services/ssh/enable") + assert response.status_code == 200 + data = read_json(undefined_settings / "undefined.json") + assert data["ssh"]["enable"] == True + + def test_legacy_enable_when_enabled(authorized_client, ssh_on): - response = authorized_client.post(f"/services/ssh/enable") + response = authorized_client.post("/services/ssh/enable") assert response.status_code == 200 assert read_json(ssh_on / "turned_on.json") == read_json(ssh_on / "turned_on.json") +## GET ON /ssh ###################################################### + + def test_get_current_settings_ssh_off(authorized_client, ssh_off): response = authorized_client.get("/services/ssh") assert response.status_code == 200 @@ -117,6 +155,14 @@ def test_get_current_settings_undefined(authorized_client, undefined_settings): assert response.json == {"enable": True, "passwordAuthentication": True} +def test_get_current_settings_mostly_undefined(authorized_client, undefined_values): + response = authorized_client.get("/services/ssh") + assert response.status_code == 200 + assert response.json == {"enable": True, "passwordAuthentication": True} + + +## PUT ON /ssh ###################################################### + available_settings = [ {"enable": True, "passwordAuthentication": True}, {"enable": True, "passwordAuthentication": False}, @@ -131,7 +177,7 @@ available_settings = [ @pytest.mark.parametrize("settings", available_settings) def test_set_settings_ssh_off(authorized_client, ssh_off, settings): - response = authorized_client.put(f"/services/ssh", json=settings) + response = authorized_client.put("/services/ssh", json=settings) assert response.status_code == 200 data = read_json(ssh_off / "turned_off.json")["ssh"] if "enable" in settings: @@ -142,7 +188,7 @@ def test_set_settings_ssh_off(authorized_client, ssh_off, settings): @pytest.mark.parametrize("settings", available_settings) def test_set_settings_ssh_on(authorized_client, ssh_on, settings): - response = authorized_client.put(f"/services/ssh", json=settings) + response = authorized_client.put("/services/ssh", json=settings) assert response.status_code == 200 data = read_json(ssh_on / "turned_on.json")["ssh"] if "enable" in settings: @@ -153,7 +199,7 @@ def test_set_settings_ssh_on(authorized_client, ssh_on, settings): @pytest.mark.parametrize("settings", available_settings) def test_set_settings_all_off(authorized_client, all_off, settings): - response = authorized_client.put(f"/services/ssh", json=settings) + response = authorized_client.put("/services/ssh", json=settings) assert response.status_code == 200 data = read_json(all_off / "all_off.json")["ssh"] if "enable" in settings: @@ -164,7 +210,7 @@ def test_set_settings_all_off(authorized_client, all_off, settings): @pytest.mark.parametrize("settings", available_settings) def test_set_settings_undefined(authorized_client, undefined_settings, settings): - response = authorized_client.put(f"/services/ssh", json=settings) + response = authorized_client.put("/services/ssh", json=settings) assert response.status_code == 200 data = read_json(undefined_settings / "undefined.json")["ssh"] if "enable" in settings: @@ -173,9 +219,12 @@ def test_set_settings_undefined(authorized_client, undefined_settings, settings) assert data["passwordAuthentication"] == settings["passwordAuthentication"] +## PUT ON /ssh/key/send ###################################################### + + def test_add_root_key(authorized_client, ssh_on): response = authorized_client.put( - f"/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"} + "/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"} ) assert response.status_code == 201 assert read_json(ssh_on / "turned_on.json")["ssh"]["rootKeys"] == [ @@ -183,9 +232,18 @@ def test_add_root_key(authorized_client, ssh_on): ] +def test_add_root_key_on_undefined(authorized_client, undefined_settings): + response = authorized_client.put( + "/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"} + ) + assert response.status_code == 201 + data = read_json(undefined_settings / "undefined.json") + assert data["ssh"]["rootKeys"] == ["ssh-rsa KEY test@pc"] + + def test_add_root_key_one_more(authorized_client, root_and_admin_have_keys): response = authorized_client.put( - f"/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"} + "/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"} ) assert response.status_code == 201 assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][ @@ -198,7 +256,7 @@ def test_add_root_key_one_more(authorized_client, root_and_admin_have_keys): def test_add_existing_root_key(authorized_client, root_and_admin_have_keys): response = authorized_client.put( - f"/services/ssh/key/send", json={"public_key": "ssh-ed25519 KEY test@pc"} + "/services/ssh/key/send", json={"public_key": "ssh-ed25519 KEY test@pc"} ) assert response.status_code == 409 assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][ @@ -210,33 +268,42 @@ def test_add_existing_root_key(authorized_client, root_and_admin_have_keys): def test_add_invalid_root_key(authorized_client, ssh_on): response = authorized_client.put( - f"/services/ssh/key/send", json={"public_key": "INVALID KEY test@pc"} + "/services/ssh/key/send", json={"public_key": "INVALID KEY test@pc"} ) assert response.status_code == 400 +## /ssh/keys/{user} ###################################################### + + def test_add_root_key_via_wrong_endpoint(authorized_client, ssh_on): response = authorized_client.post( - f"/services/ssh/keys/root", json={"public_key": "ssh-rsa KEY test@pc"} + "/services/ssh/keys/root", json={"public_key": "ssh-rsa KEY test@pc"} ) assert response.status_code == 400 def test_get_root_key(authorized_client, root_and_admin_have_keys): - response = authorized_client.get(f"/services/ssh/keys/root") + response = authorized_client.get("/services/ssh/keys/root") assert response.status_code == 200 assert response.json == ["ssh-ed25519 KEY test@pc"] def test_get_root_key_when_none(authorized_client, ssh_on): - response = authorized_client.get(f"/services/ssh/keys/root") + response = authorized_client.get("/services/ssh/keys/root") + assert response.status_code == 200 + assert response.json == [] + + +def test_get_root_key_on_undefined(authorized_client, undefined_settings): + response = authorized_client.get("/services/ssh/keys/root") assert response.status_code == 200 assert response.json == [] def test_delete_root_key(authorized_client, root_and_admin_have_keys): response = authorized_client.delete( - f"/services/ssh/keys/root", json={"public_key": "ssh-ed25519 KEY test@pc"} + "/services/ssh/keys/root", json={"public_key": "ssh-ed25519 KEY test@pc"} ) assert response.status_code == 200 assert ( @@ -249,7 +316,7 @@ def test_delete_root_key(authorized_client, root_and_admin_have_keys): def test_delete_root_nonexistent_key(authorized_client, root_and_admin_have_keys): response = authorized_client.delete( - f"/services/ssh/keys/root", json={"public_key": "ssh-rsa KEY test@pc"} + "/services/ssh/keys/root", json={"public_key": "ssh-rsa KEY test@pc"} ) assert response.status_code == 404 assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][ @@ -259,21 +326,29 @@ def test_delete_root_nonexistent_key(authorized_client, root_and_admin_have_keys ] +def test_delete_root_key_on_undefined(authorized_client, undefined_settings): + response = authorized_client.delete( + "/services/ssh/keys/root", json={"public_key": "ssh-ed25519 KEY test@pc"} + ) + assert response.status_code == 404 + assert read_json(undefined_settings / "undefined.json")["ssh"]["rootKeys"] == [] + + def test_get_admin_key(authorized_client, root_and_admin_have_keys): - response = authorized_client.get(f"/services/ssh/keys/tester") + response = authorized_client.get("/services/ssh/keys/tester") assert response.status_code == 200 assert response.json == ["ssh-rsa KEY test@pc"] def test_get_admin_key_when_none(authorized_client, ssh_on): - response = authorized_client.get(f"/services/ssh/keys/tester") + response = authorized_client.get("/services/ssh/keys/tester") assert response.status_code == 200 assert response.json == [] def test_delete_admin_key(authorized_client, root_and_admin_have_keys): response = authorized_client.delete( - f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"} + "/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"} ) assert response.status_code == 200 assert ( @@ -282,9 +357,27 @@ def test_delete_admin_key(authorized_client, root_and_admin_have_keys): ) +def test_delete_nonexistent_admin_key(authorized_client, root_and_admin_have_keys): + response = authorized_client.delete( + "/services/ssh/keys/tester", json={"public_key": "ssh-rsa NO KEY test@pc"} + ) + assert response.status_code == 404 + assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")[ + "sshKeys" + ] == ["ssh-rsa KEY test@pc"] + + +def test_delete_admin_key_on_undefined(authorized_client, undefined_settings): + response = authorized_client.delete( + "/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"} + ) + assert response.status_code == 404 + assert read_json(undefined_settings / "undefined.json")["sshKeys"] == [] + + def test_add_admin_key(authorized_client, ssh_on): response = authorized_client.post( - f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"} + "/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"} ) assert response.status_code == 201 assert read_json(ssh_on / "turned_on.json")["sshKeys"] == [ @@ -294,7 +387,7 @@ def test_add_admin_key(authorized_client, ssh_on): def test_add_admin_key_one_more(authorized_client, root_and_admin_have_keys): response = authorized_client.post( - f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY_2 test@pc"} + "/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY_2 test@pc"} ) assert response.status_code == 201 assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")[ @@ -304,7 +397,7 @@ def test_add_admin_key_one_more(authorized_client, root_and_admin_have_keys): def test_add_existing_admin_key(authorized_client, root_and_admin_have_keys): response = authorized_client.post( - f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"} + "/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"} ) assert response.status_code == 409 assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")[ @@ -312,3 +405,111 @@ def test_add_existing_admin_key(authorized_client, root_and_admin_have_keys): ] == [ "ssh-rsa KEY test@pc", ] + + +def test_add_invalid_admin_key(authorized_client, ssh_on): + response = authorized_client.post( + "/services/ssh/keys/tester", json={"public_key": "INVALID KEY test@pc"} + ) + assert response.status_code == 400 + + +@pytest.mark.parametrize("user", [1, 2, 3]) +def test_get_user_key(authorized_client, some_users, user): + response = authorized_client.get(f"/services/ssh/keys/user{user}") + assert response.status_code == 200 + if user == 1: + assert response.json == ["ssh-rsa KEY user1@pc"] + else: + assert response.json == [] + + +def test_get_keys_of_nonexistent_user(authorized_client, some_users): + response = authorized_client.get("/services/ssh/keys/user4") + assert response.status_code == 404 + + +def test_get_keys_of_undefined_users(authorized_client, undefined_settings): + response = authorized_client.get("/services/ssh/keys/user1") + assert response.status_code == 404 + + +@pytest.mark.parametrize("user", [1, 2, 3]) +def test_add_user_key(authorized_client, some_users, user): + response = authorized_client.post( + f"/services/ssh/keys/user{user}", json={"public_key": "ssh-ed25519 KEY test@pc"} + ) + assert response.status_code == 201 + if user == 1: + assert read_json(some_users / "some_users.json")["users"][user - 1][ + "sshKeys" + ] == [ + "ssh-rsa KEY user1@pc", + "ssh-ed25519 KEY test@pc", + ] + else: + assert read_json(some_users / "some_users.json")["users"][user - 1][ + "sshKeys" + ] == ["ssh-ed25519 KEY test@pc"] + + +def test_add_existing_user_key(authorized_client, some_users): + response = authorized_client.post( + "/services/ssh/keys/user1", json={"public_key": "ssh-rsa KEY user1@pc"} + ) + assert response.status_code == 409 + assert read_json(some_users / "some_users.json")["users"][0]["sshKeys"] == [ + "ssh-rsa KEY user1@pc", + ] + + +def test_add_invalid_user_key(authorized_client, some_users): + response = authorized_client.post( + "/services/ssh/keys/user1", json={"public_key": "INVALID KEY user1@pc"} + ) + assert response.status_code == 400 + + +def test_delete_user_key(authorized_client, some_users): + response = authorized_client.delete( + "/services/ssh/keys/user1", json={"public_key": "ssh-rsa KEY user1@pc"} + ) + assert response.status_code == 200 + assert read_json(some_users / "some_users.json")["users"][0]["sshKeys"] == [] + + +@pytest.mark.parametrize("user", [2, 3]) +def test_delete_nonexistent_user_key(authorized_client, some_users, user): + response = authorized_client.delete( + f"/services/ssh/keys/user{user}", json={"public_key": "ssh-rsa KEY user1@pc"} + ) + assert response.status_code == 404 + assert read_json(some_users / "some_users.json")["users"][user - 1]["sshKeys"] == [] + + +def test_add_keys_of_nonexistent_user(authorized_client, some_users): + response = authorized_client.post( + "/services/ssh/keys/user4", json={"public_key": "ssh-rsa KEY user4@pc"} + ) + assert response.status_code == 404 + + +def test_add_key_on_undefined_users(authorized_client, undefined_settings): + response = authorized_client.post( + "/services/ssh/keys/user1", json={"public_key": "ssh-rsa KEY user4@pc"} + ) + assert response.status_code == 404 + + +def test_delete_keys_of_nonexistent_user(authorized_client, some_users): + response = authorized_client.delete( + "/services/ssh/keys/user4", json={"public_key": "ssh-rsa KEY user4@pc"} + ) + assert response.status_code == 404 + + +def test_delete_key_when_undefined_users(authorized_client, undefined_settings): + response = authorized_client.delete( + "/services/ssh/keys/user1", json={"public_key": "ssh-rsa KEY user1@pc"} + ) + assert response.status_code == 404 diff --git a/tests/services/test_ssh/some_users.json b/tests/services/test_ssh/some_users.json new file mode 100644 index 0000000..569253a --- /dev/null +++ b/tests/services/test_ssh/some_users.json @@ -0,0 +1,71 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ], + "users": [ + { + "username": "user1", + "hashedPassword": "HASHED_PASSWORD_1", + "sshKeys": [ + "ssh-rsa KEY user1@pc" + ] + }, + { + "username": "user2", + "hashedPassword": "HASHED_PASSWORD_2", + "sshKeys": [ + ] + }, + { + "username": "user3", + "hashedPassword": "HASHED_PASSWORD_3" + } + ] +} \ No newline at end of file diff --git a/tests/services/test_ssh/undefined.json b/tests/services/test_ssh/undefined.json index 3f5545f..a214cc3 100644 --- a/tests/services/test_ssh/undefined.json +++ b/tests/services/test_ssh/undefined.json @@ -38,8 +38,5 @@ "enable": true, "allowReboot": true }, - "timezone": "Europe/Moscow", - "sshKeys": [ - "ssh-rsa KEY test@pc" - ] + "timezone": "Europe/Moscow" } \ No newline at end of file diff --git a/tests/services/test_ssh/undefined_values.json b/tests/services/test_ssh/undefined_values.json new file mode 100644 index 0000000..235a220 --- /dev/null +++ b/tests/services/test_ssh/undefined_values.json @@ -0,0 +1,46 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": {}, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file