Browse Source

SSH coverage and bug fixes

pull/7/head
Inex Code 12 months ago
parent
commit
103cb22e88
  1. 4
      selfprivacy_api/resources/services/ssh.py
  2. 285
      tests/services/test_ssh.py
  3. 71
      tests/services/test_ssh/some_users.json
  4. 5
      tests/services/test_ssh/undefined.json
  5. 46
      tests/services/test_ssh/undefined_values.json

4
selfprivacy_api/resources/services/ssh.py

@ -216,10 +216,10 @@ class SSHKeys(Resource):
if "users" not in data:
data["users"] = []
for user in data["users"]:
if user["name"] == username:
if user["username"] == username:
if "sshKeys" not in user:
user["sshKeys"] = []
return user["ssh"]["sshKeys"]
return user["sshKeys"]
return {
"error": "User not found",
}, 404

285
tests/services/test_ssh.py

@ -1,41 +1,39 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
import json
from os import read
import pytest
def read_json(file_path):
with open(file_path, "r") as f:
return json.load(f)
with open(file_path, "r", encoding="utf-8") as file:
return json.load(file)
###############################################################################
## FIXTURES ###################################################
@pytest.fixture
def ssh_off(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json")
assert read_json(datadir / "turned_off.json")["ssh"]["enable"] == False
assert (
read_json(datadir / "turned_off.json")["ssh"]["passwordAuthentication"] == True
)
assert not read_json(datadir / "turned_off.json")["ssh"]["enable"]
assert read_json(datadir / "turned_off.json")["ssh"]["passwordAuthentication"]
return datadir
@pytest.fixture
def ssh_on(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json")
assert (
read_json(datadir / "turned_off.json")["ssh"]["passwordAuthentication"] == True
)
assert read_json(datadir / "turned_on.json")["ssh"]["enable"] == True
assert read_json(datadir / "turned_off.json")["ssh"]["passwordAuthentication"]
assert read_json(datadir / "turned_on.json")["ssh"]["enable"]
return datadir
@pytest.fixture
def all_off(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "all_off.json")
assert read_json(datadir / "all_off.json")["ssh"]["passwordAuthentication"] == False
assert read_json(datadir / "all_off.json")["ssh"]["enable"] == False
assert not read_json(datadir / "all_off.json")["ssh"]["passwordAuthentication"]
assert not read_json(datadir / "all_off.json")["ssh"]["enable"]
return datadir
@ -46,19 +44,30 @@ def undefined_settings(mocker, datadir):
return datadir
@pytest.fixture
def undefined_values(mocker, datadir):
mocker.patch(
"selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined_values.json"
)
assert "ssh" in read_json(datadir / "undefined_values.json")
assert "enable" not in read_json(datadir / "undefined_values.json")["ssh"]
assert (
"passwordAuthentication"
not in read_json(datadir / "undefined_values.json")["ssh"]
)
return datadir
@pytest.fixture
def root_and_admin_have_keys(mocker, datadir):
mocker.patch(
"selfprivacy_api.utils.USERDATA_FILE",
new=datadir / "root_and_admin_have_keys.json",
)
assert read_json(datadir / "root_and_admin_have_keys.json")["ssh"]["enable"] == True
assert (
read_json(datadir / "root_and_admin_have_keys.json")["ssh"][
"passwordAuthentication"
]
== True
)
assert read_json(datadir / "root_and_admin_have_keys.json")["ssh"]["enable"]
assert read_json(datadir / "root_and_admin_have_keys.json")["ssh"][
"passwordAuthentication"
]
assert read_json(datadir / "root_and_admin_have_keys.json")["ssh"]["rootKeys"] == [
"ssh-ed25519 KEY test@pc"
]
@ -68,7 +77,23 @@ def root_and_admin_have_keys(mocker, datadir):
return datadir
###############################################################################
@pytest.fixture
def some_users(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "some_users.json")
assert "users" in read_json(datadir / "some_users.json")
assert read_json(datadir / "some_users.json")["users"] == [
{
"username": "user1",
"hashedPassword": "HASHED_PASSWORD_1",
"sshKeys": ["ssh-rsa KEY user1@pc"],
},
{"username": "user2", "hashedPassword": "HASHED_PASSWORD_2", "sshKeys": []},
{"username": "user3", "hashedPassword": "HASHED_PASSWORD_3"},
]
return datadir
## TEST 401 ######################################################
@pytest.mark.parametrize(
@ -79,20 +104,33 @@ def test_unauthorized(client, ssh_off, endpoint):
assert response.status_code == 401
## TEST ENABLE ######################################################
def test_legacy_enable(authorized_client, ssh_off):
response = authorized_client.post(f"/services/ssh/enable")
response = authorized_client.post("/services/ssh/enable")
assert response.status_code == 200
assert read_json(ssh_off / "turned_off.json") == read_json(
ssh_off / "turned_on.json"
)
def test_legacy_on_undefined(authorized_client, undefined_settings):
response = authorized_client.post("/services/ssh/enable")
assert response.status_code == 200
data = read_json(undefined_settings / "undefined.json")
assert data["ssh"]["enable"] == True
def test_legacy_enable_when_enabled(authorized_client, ssh_on):
response = authorized_client.post(f"/services/ssh/enable")
response = authorized_client.post("/services/ssh/enable")
assert response.status_code == 200
assert read_json(ssh_on / "turned_on.json") == read_json(ssh_on / "turned_on.json")
## GET ON /ssh ######################################################
def test_get_current_settings_ssh_off(authorized_client, ssh_off):
response = authorized_client.get("/services/ssh")
assert response.status_code == 200
@ -117,6 +155,14 @@ def test_get_current_settings_undefined(authorized_client, undefined_settings):
assert response.json == {"enable": True, "passwordAuthentication": True}
def test_get_current_settings_mostly_undefined(authorized_client, undefined_values):
response = authorized_client.get("/services/ssh")
assert response.status_code == 200
assert response.json == {"enable": True, "passwordAuthentication": True}
## PUT ON /ssh ######################################################
available_settings = [
{"enable": True, "passwordAuthentication": True},
{"enable": True, "passwordAuthentication": False},
@ -131,7 +177,7 @@ available_settings = [
@pytest.mark.parametrize("settings", available_settings)
def test_set_settings_ssh_off(authorized_client, ssh_off, settings):
response = authorized_client.put(f"/services/ssh", json=settings)
response = authorized_client.put("/services/ssh", json=settings)
assert response.status_code == 200
data = read_json(ssh_off / "turned_off.json")["ssh"]
if "enable" in settings:
@ -142,7 +188,7 @@ def test_set_settings_ssh_off(authorized_client, ssh_off, settings):
@pytest.mark.parametrize("settings", available_settings)
def test_set_settings_ssh_on(authorized_client, ssh_on, settings):
response = authorized_client.put(f"/services/ssh", json=settings)
response = authorized_client.put("/services/ssh", json=settings)
assert response.status_code == 200
data = read_json(ssh_on / "turned_on.json")["ssh"]
if "enable" in settings:
@ -153,7 +199,7 @@ def test_set_settings_ssh_on(authorized_client, ssh_on, settings):
@pytest.mark.parametrize("settings", available_settings)
def test_set_settings_all_off(authorized_client, all_off, settings):
response = authorized_client.put(f"/services/ssh", json=settings)
response = authorized_client.put("/services/ssh", json=settings)
assert response.status_code == 200
data = read_json(all_off / "all_off.json")["ssh"]
if "enable" in settings:
@ -164,7 +210,7 @@ def test_set_settings_all_off(authorized_client, all_off, settings):
@pytest.mark.parametrize("settings", available_settings)
def test_set_settings_undefined(authorized_client, undefined_settings, settings):
response = authorized_client.put(f"/services/ssh", json=settings)
response = authorized_client.put("/services/ssh", json=settings)
assert response.status_code == 200
data = read_json(undefined_settings / "undefined.json")["ssh"]
if "enable" in settings:
@ -173,9 +219,12 @@ def test_set_settings_undefined(authorized_client, undefined_settings, settings)
assert data["passwordAuthentication"] == settings["passwordAuthentication"]
## PUT ON /ssh/key/send ######################################################
def test_add_root_key(authorized_client, ssh_on):
response = authorized_client.put(
f"/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"}
"/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"}
)
assert response.status_code == 201
assert read_json(ssh_on / "turned_on.json")["ssh"]["rootKeys"] == [
@ -183,9 +232,18 @@ def test_add_root_key(authorized_client, ssh_on):
]
def test_add_root_key_on_undefined(authorized_client, undefined_settings):
response = authorized_client.put(
"/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"}
)
assert response.status_code == 201
data = read_json(undefined_settings / "undefined.json")
assert data["ssh"]["rootKeys"] == ["ssh-rsa KEY test@pc"]
def test_add_root_key_one_more(authorized_client, root_and_admin_have_keys):
response = authorized_client.put(
f"/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"}
"/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"}
)
assert response.status_code == 201
assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][
@ -198,7 +256,7 @@ def test_add_root_key_one_more(authorized_client, root_and_admin_have_keys):
def test_add_existing_root_key(authorized_client, root_and_admin_have_keys):
response = authorized_client.put(
f"/services/ssh/key/send", json={"public_key": "ssh-ed25519 KEY test@pc"}
"/services/ssh/key/send", json={"public_key": "ssh-ed25519 KEY test@pc"}
)
assert response.status_code == 409
assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][
@ -210,33 +268,42 @@ def test_add_existing_root_key(authorized_client, root_and_admin_have_keys):
def test_add_invalid_root_key(authorized_client, ssh_on):
response = authorized_client.put(
f"/services/ssh/key/send", json={"public_key": "INVALID KEY test@pc"}
"/services/ssh/key/send", json={"public_key": "INVALID KEY test@pc"}
)
assert response.status_code == 400
## /ssh/keys/{user} ######################################################
def test_add_root_key_via_wrong_endpoint(authorized_client, ssh_on):
response = authorized_client.post(
f"/services/ssh/keys/root", json={"public_key": "ssh-rsa KEY test@pc"}
"/services/ssh/keys/root", json={"public_key": "ssh-rsa KEY test@pc"}
)
assert response.status_code == 400
def test_get_root_key(authorized_client, root_and_admin_have_keys):
response = authorized_client.get(f"/services/ssh/keys/root")
response = authorized_client.get("/services/ssh/keys/root")
assert response.status_code == 200
assert response.json == ["ssh-ed25519 KEY test@pc"]
def test_get_root_key_when_none(authorized_client, ssh_on):
response = authorized_client.get(f"/services/ssh/keys/root")
response = authorized_client.get("/services/ssh/keys/root")
assert response.status_code == 200
assert response.json == []
def test_get_root_key_on_undefined(authorized_client, undefined_settings):
response = authorized_client.get("/services/ssh/keys/root")
assert response.status_code == 200
assert response.json == []
def test_delete_root_key(authorized_client, root_and_admin_have_keys):
response = authorized_client.delete(
f"/services/ssh/keys/root", json={"public_key": "ssh-ed25519 KEY test@pc"}
"/services/ssh/keys/root", json={"public_key": "ssh-ed25519 KEY test@pc"}
)
assert response.status_code == 200
assert (
@ -249,7 +316,7 @@ def test_delete_root_key(authorized_client, root_and_admin_have_keys):
def test_delete_root_nonexistent_key(authorized_client, root_and_admin_have_keys):
response = authorized_client.delete(
f"/services/ssh/keys/root", json={"public_key": "ssh-rsa KEY test@pc"}
"/services/ssh/keys/root", json={"public_key": "ssh-rsa KEY test@pc"}
)
assert response.status_code == 404
assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][
@ -259,21 +326,29 @@ def test_delete_root_nonexistent_key(authorized_client, root_and_admin_have_keys
]
def test_delete_root_key_on_undefined(authorized_client, undefined_settings):
response = authorized_client.delete(
"/services/ssh/keys/root", json={"public_key": "ssh-ed25519 KEY test@pc"}
)
assert response.status_code == 404
assert read_json(undefined_settings / "undefined.json")["ssh"]["rootKeys"] == []
def test_get_admin_key(authorized_client, root_and_admin_have_keys):
response = authorized_client.get(f"/services/ssh/keys/tester")
response = authorized_client.get("/services/ssh/keys/tester")
assert response.status_code == 200
assert response.json == ["ssh-rsa KEY test@pc"]
def test_get_admin_key_when_none(authorized_client, ssh_on):
response = authorized_client.get(f"/services/ssh/keys/tester")
response = authorized_client.get("/services/ssh/keys/tester")
assert response.status_code == 200
assert response.json == []
def test_delete_admin_key(authorized_client, root_and_admin_have_keys):
response = authorized_client.delete(
f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"}
"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"}
)
assert response.status_code == 200
assert (
@ -282,9 +357,27 @@ def test_delete_admin_key(authorized_client, root_and_admin_have_keys):
)
def test_delete_nonexistent_admin_key(authorized_client, root_and_admin_have_keys):
response = authorized_client.delete(
"/services/ssh/keys/tester", json={"public_key": "ssh-rsa NO KEY test@pc"}
)
assert response.status_code == 404
assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")[
"sshKeys"
] == ["ssh-rsa KEY test@pc"]
def test_delete_admin_key_on_undefined(authorized_client, undefined_settings):
response = authorized_client.delete(
"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"}
)
assert response.status_code == 404
assert read_json(undefined_settings / "undefined.json")["sshKeys"] == []
def test_add_admin_key(authorized_client, ssh_on):
response = authorized_client.post(
f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"}
"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"}
)
assert response.status_code == 201
assert read_json(ssh_on / "turned_on.json")["sshKeys"] == [
@ -294,7 +387,7 @@ def test_add_admin_key(authorized_client, ssh_on):
def test_add_admin_key_one_more(authorized_client, root_and_admin_have_keys):
response = authorized_client.post(
f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY_2 test@pc"}
"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY_2 test@pc"}
)
assert response.status_code == 201
assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")[
@ -304,7 +397,7 @@ def test_add_admin_key_one_more(authorized_client, root_and_admin_have_keys):
def test_add_existing_admin_key(authorized_client, root_and_admin_have_keys):
response = authorized_client.post(
f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"}
"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"}
)
assert response.status_code == 409
assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")[
@ -312,3 +405,111 @@ def test_add_existing_admin_key(authorized_client, root_and_admin_have_keys):
] == [
"ssh-rsa KEY test@pc",
]
def test_add_invalid_admin_key(authorized_client, ssh_on):
response = authorized_client.post(
"/services/ssh/keys/tester", json={"public_key": "INVALID KEY test@pc"}
)
assert response.status_code == 400
@pytest.mark.parametrize("user", [1, 2, 3])
def test_get_user_key(authorized_client, some_users, user):
response = authorized_client.get(f"/services/ssh/keys/user{user}")
assert response.status_code == 200
if user == 1:
assert response.json == ["ssh-rsa KEY user1@pc"]
else:
assert response.json == []
def test_get_keys_of_nonexistent_user(authorized_client, some_users):
response = authorized_client.get("/services/ssh/keys/user4")
assert response.status_code == 404
def test_get_keys_of_undefined_users(authorized_client, undefined_settings):
response = authorized_client.get("/services/ssh/keys/user1")
assert response.status_code == 404
@pytest.mark.parametrize("user", [1, 2, 3])
def test_add_user_key(authorized_client, some_users, user):
response = authorized_client.post(
f"/services/ssh/keys/user{user}", json={"public_key": "ssh-ed25519 KEY test@pc"}
)
assert response.status_code == 201
if user == 1:
assert read_json(some_users / "some_users.json")["users"][user - 1][
"sshKeys"
] == [
"ssh-rsa KEY user1@pc",
"ssh-ed25519 KEY test@pc",
]
else:
assert read_json(some_users / "some_users.json")["users"][user - 1][
"sshKeys"
] == ["ssh-ed25519 KEY test@pc"]
def test_add_existing_user_key(authorized_client, some_users):
response = authorized_client.post(
"/services/ssh/keys/user1", json={"public_key": "ssh-rsa KEY user1@pc"}
)
assert response.status_code == 409
assert read_json(some_users / "some_users.json")["users"][0]["sshKeys"] == [
"ssh-rsa KEY user1@pc",
]
def test_add_invalid_user_key(authorized_client, some_users):
response = authorized_client.post(
"/services/ssh/keys/user1", json={"public_key": "INVALID KEY user1@pc"}
)
assert response.status_code == 400
def test_delete_user_key(authorized_client, some_users):
response = authorized_client.delete(
"/services/ssh/keys/user1", json={"public_key": "ssh-rsa KEY user1@pc"}
)
assert response.status_code == 200
assert read_json(some_users / "some_users.json")["users"][0]["sshKeys"] == []
@pytest.mark.parametrize("user", [2, 3])
def test_delete_nonexistent_user_key(authorized_client, some_users, user):
response = authorized_client.delete(
f"/services/ssh/keys/user{user}", json={"public_key": "ssh-rsa KEY user1@pc"}
)
assert response.status_code == 404
assert read_json(some_users / "some_users.json")["users"][user - 1]["sshKeys"] == []
def test_add_keys_of_nonexistent_user(authorized_client, some_users):
response = authorized_client.post(
"/services/ssh/keys/user4", json={"public_key": "ssh-rsa KEY user4@pc"}
)
assert response.status_code == 404
def test_add_key_on_undefined_users(authorized_client, undefined_settings):
response = authorized_client.post(
"/services/ssh/keys/user1", json={"public_key": "ssh-rsa KEY user4@pc"}
)
assert response.status_code == 404
def test_delete_keys_of_nonexistent_user(authorized_client, some_users):
response = authorized_client.delete(
"/services/ssh/keys/user4", json={"public_key": "ssh-rsa KEY user4@pc"}
)
assert response.status_code == 404
def test_delete_key_when_undefined_users(authorized_client, undefined_settings):
response = authorized_client.delete(
"/services/ssh/keys/user1", json={"public_key": "ssh-rsa KEY user1@pc"}
)
assert response.status_code == 404

71
tests/services/test_ssh/some_users.json

@ -0,0 +1,71 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"users": [
{
"username": "user1",
"hashedPassword": "HASHED_PASSWORD_1",
"sshKeys": [
"ssh-rsa KEY user1@pc"
]
},
{
"username": "user2",
"hashedPassword": "HASHED_PASSWORD_2",
"sshKeys": [
]
},
{
"username": "user3",
"hashedPassword": "HASHED_PASSWORD_3"
}
]
}

5
tests/services/test_ssh/undefined.json

@ -38,8 +38,5 @@
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
"timezone": "Europe/Moscow"
}

46
tests/services/test_ssh/undefined_values.json

@ -0,0 +1,46 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}
Loading…
Cancel
Save