diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..f227e0f --- /dev/null +++ b/.coveragerc @@ -0,0 +1,2 @@ +[run] +source = selfprivacy_api \ No newline at end of file diff --git a/.drone.yml b/.drone.yml index 36f069c..5f89a9c 100644 --- a/.drone.yml +++ b/.drone.yml @@ -9,5 +9,11 @@ platform: steps: - name: test commands: - - pytest - \ No newline at end of file + - coverage run -m pytest -q + - coverage xml +- name: bandit + commands: + - bandit -ll -r selfprivacy_api +- name: formatting + commands: + - black --check . diff --git a/selfprivacy_api/app.py b/selfprivacy_api/app.py index f74f650..fc04aeb 100644 --- a/selfprivacy_api/app.py +++ b/selfprivacy_api/app.py @@ -10,7 +10,7 @@ from flask_swagger import swagger from flask_swagger_ui import get_swaggerui_blueprint from selfprivacy_api.resources.users import User, Users -from selfprivacy_api.resources.common import ApiVersion, DecryptDisk +from selfprivacy_api.resources.common import ApiVersion from selfprivacy_api.resources.system import api_system from selfprivacy_api.resources.services import services as api_services @@ -51,7 +51,6 @@ def create_app(test_config=None): api.add_resource(ApiVersion, "/api/version") api.add_resource(Users, "/users") api.add_resource(User, "/users/") - api.add_resource(DecryptDisk, "/decryptDisk") app.register_blueprint(api_system) app.register_blueprint(api_services) diff --git a/selfprivacy_api/resources/common.py b/selfprivacy_api/resources/common.py index a9663aa..ba7412c 100644 --- a/selfprivacy_api/resources/common.py +++ b/selfprivacy_api/resources/common.py @@ -25,55 +25,3 @@ class ApiVersion(Resource): description: Unauthorized """ return {"version": "1.1.0"} - - -class DecryptDisk(Resource): - """Decrypt disk""" - - def post(self): - """ - Decrypt /dev/sdb using cryptsetup luksOpen - --- - consumes: - - application/json - tags: - - System - security: - - bearerAuth: [] - parameters: - - in: body - name: body - required: true - description: Provide a password for decryption - schema: - type: object - required: - - password - properties: - password: - type: string - description: Decryption password. - responses: - 201: - description: OK - 400: - description: Bad request - 401: - description: Unauthorized - """ - parser = reqparse.RequestParser(bundle_errors=True) - parser.add_argument("password", type=str, required=True) - args = parser.parse_args() - - decryption_command = ["cryptsetup", "luksOpen", "/dev/sdb", "decryptedVar"] - - # TODO: Check if this works at all - - decryption_service = subprocess.Popen( - decryption_command, - shell=False, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - ) - decryption_service.communicate(input=args["password"]) - return {"status": decryption_service.returncode}, 201 diff --git a/selfprivacy_api/resources/services/restic.py b/selfprivacy_api/resources/services/restic.py index 64ce2a8..dd22c9a 100644 --- a/selfprivacy_api/resources/services/restic.py +++ b/selfprivacy_api/resources/services/restic.py @@ -222,6 +222,8 @@ class BackblazeConfig(Resource): args = parser.parse_args() with WriteUserData() as data: + if "backblaze" not in data: + data["backblaze"] = {} data["backblaze"]["accountId"] = args["accountId"] data["backblaze"]["accountKey"] = args["accountKey"] data["backblaze"]["bucket"] = args["bucket"] diff --git a/selfprivacy_api/resources/services/ssh.py b/selfprivacy_api/resources/services/ssh.py index 2b90087..1665751 100644 --- a/selfprivacy_api/resources/services/ssh.py +++ b/selfprivacy_api/resources/services/ssh.py @@ -216,10 +216,10 @@ class SSHKeys(Resource): if "users" not in data: data["users"] = [] for user in data["users"]: - if user["name"] == username: + if user["username"] == username: if "sshKeys" not in user: user["sshKeys"] = [] - return user["ssh"]["sshKeys"] + return user["sshKeys"] return { "error": "User not found", }, 404 diff --git a/selfprivacy_api/resources/users.py b/selfprivacy_api/resources/users.py index 057a5e3..747e33a 100644 --- a/selfprivacy_api/resources/users.py +++ b/selfprivacy_api/resources/users.py @@ -4,7 +4,7 @@ import subprocess import re from flask_restful import Resource, reqparse -from selfprivacy_api.utils import WriteUserData, ReadUserData +from selfprivacy_api.utils import WriteUserData, ReadUserData, is_username_forbidden class Users(Resource): @@ -26,8 +26,9 @@ class Users(Resource): """ with ReadUserData() as data: users = [] - for user in data["users"]: - users.append(user["username"]) + if "users" in data: + for user in data["users"]: + users.append(user["username"]) return users def post(self): @@ -71,7 +72,6 @@ class Users(Resource): parser.add_argument("username", type=str, required=True) parser.add_argument("password", type=str, required=True) args = parser.parse_args() - hashing_command = ["mkpasswd", "-m", "sha-512", args["password"]] password_hash_process_descriptor = subprocess.Popen( hashing_command, @@ -82,7 +82,9 @@ class Users(Resource): hashed_password = password_hash_process_descriptor.communicate()[0] hashed_password = hashed_password.decode("ascii") hashed_password = hashed_password.rstrip() - + # Check if username is forbidden + if is_username_forbidden(args["username"]): + return {"message": "Username is forbidden"}, 409 # Check is username passes regex if not re.match(r"^[a-z_][a-z0-9_]+$", args["username"]): return {"error": "username must be alphanumeric"}, 400 @@ -137,9 +139,6 @@ class User(Resource): description: User not found """ with WriteUserData() as data: - # Return 400 if username is not provided - if username is None: - return {"error": "username is required"}, 400 if username == data["username"]: return {"error": "Cannot delete root user"}, 400 # Return 400 if user does not exist diff --git a/selfprivacy_api/restic_controller/__init__.py b/selfprivacy_api/restic_controller/__init__.py index cefef53..24158b0 100644 --- a/selfprivacy_api/restic_controller/__init__.py +++ b/selfprivacy_api/restic_controller/__init__.py @@ -40,7 +40,6 @@ class ResticController: _initialized = False def __new__(cls): - print("new is called!") if not cls._instance: with cls._lock: cls._instance = super(ResticController, cls).__new__(cls) @@ -58,7 +57,6 @@ class ResticController: self.snapshot_list = [] self.error_message = None self._initialized = True - print("init is called!") self.load_configuration() self.write_rclone_config() self.load_snapshots() @@ -112,7 +110,6 @@ class ResticController: or self.state == ResticStates.RESTORING ): return - print("preparing to read snapshots") with subprocess.Popen( backup_listing_command, shell=False, @@ -181,7 +178,7 @@ class ResticController: "backup", "/var", ] - with open("/tmp/backup.log", "w", encoding="utf-8") as log_file: + with open("/var/backup.log", "w", encoding="utf-8") as log_file: subprocess.Popen( backup_command, shell=False, @@ -196,7 +193,7 @@ class ResticController: """ Check progress of ongoing backup operation """ - backup_status_check_command = ["tail", "-1", "/tmp/backup.log"] + backup_status_check_command = ["tail", "-1", "/var/backup.log"] if ( self.state == ResticStates.NO_KEY @@ -205,7 +202,7 @@ class ResticController: return # If the log file does not exists - if os.path.exists("/tmp/backup.log") is False: + if os.path.exists("/var/backup.log") is False: self.state = ResticStates.INITIALIZED with subprocess.Popen( diff --git a/selfprivacy_api/utils.py b/selfprivacy_api/utils.py index 1b0c43c..80c8e6d 100644 --- a/selfprivacy_api/utils.py +++ b/selfprivacy_api/utils.py @@ -5,11 +5,12 @@ import portalocker USERDATA_FILE = "/etc/nixos/userdata/userdata.json" +DOMAIN_FILE = "/var/domain" def get_domain(): """Get domain from /var/domain without trailing new line""" - with open("/var/domain", "r", encoding="utf-8") as domain_file: + with open(DOMAIN_FILE, "r", encoding="utf-8") as domain_file: domain = domain_file.readline().rstrip() return domain @@ -56,3 +57,46 @@ def validate_ssh_public_key(key): if not key.startswith("ssh-rsa"): return False return True + + +def is_username_forbidden(username): + forbidden_prefixes = ["systemd", "nixbld"] + + forbidden_usernames = [ + "root", + "messagebus", + "postfix", + "polkituser", + "dovecot2", + "dovenull", + "nginx", + "postgres", + "prosody", + "opendkim", + "rspamd", + "sshd", + "selfprivacy-api", + "restic", + "redis", + "pleroma", + "ocserv", + "nextcloud", + "memcached", + "knot-resolver", + "gitea", + "bitwarden_rs", + "vaultwarden", + "acme", + "virtualMail", + "nobody", + ] + + for prefix in forbidden_prefixes: + if username.startswith(prefix): + return True + + for forbidden_username in forbidden_usernames: + if username == forbidden_username: + return True + + return False diff --git a/tests/conftest.py b/tests/conftest.py index e963224..aab30dd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -32,12 +32,30 @@ class AuthorizedClient(testing.FlaskClient): return super().open(*args, **kwargs) +class WrongAuthClient(testing.FlaskClient): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.token = "WRONG_TOKEN" + + def open(self, *args, **kwargs): + if "headers" not in kwargs: + kwargs["headers"] = {} + kwargs["headers"]["Authorization"] = f"Bearer {self.token}" + return super().open(*args, **kwargs) + + @pytest.fixture def authorized_client(app): app.test_client_class = AuthorizedClient return app.test_client() +@pytest.fixture +def wrong_auth_client(app): + app.test_client_class = WrongAuthClient + return app.test_client() + + @pytest.fixture def runner(app): return app.test_cli_runner() diff --git a/tests/services/test_mailserver.py b/tests/services/test_mailserver.py index f45ad1e..a9e5f12 100644 --- a/tests/services/test_mailserver.py +++ b/tests/services/test_mailserver.py @@ -2,12 +2,6 @@ import base64 import json import pytest - -def read_json(file_path): - with open(file_path, "r", encoding="utf-8") as f: - return json.load(f) - - ############################################################################### diff --git a/tests/services/test_restic.py b/tests/services/test_restic.py new file mode 100644 index 0000000..913362f --- /dev/null +++ b/tests/services/test_restic.py @@ -0,0 +1,503 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +import json +import pytest +from selfprivacy_api.restic_controller import ResticStates + + +def read_json(file_path): + with open(file_path, "r") as f: + return json.load(f) + + +MOCKED_SNAPSHOTS = [ + { + "time": "2021-12-06T09:05:04.224685677+03:00", + "tree": "b76152d1e716d86d420407ead05d9911f2b6d971fe1589c12b63e4de65b14d4e", + "paths": ["/var"], + "hostname": "test-host", + "username": "root", + "id": "f96b428f1ca1252089ea3e25cd8ee33e63fb24615f1cc07559ba907d990d81c5", + "short_id": "f96b428f", + }, + { + "time": "2021-12-08T07:42:06.998894055+03:00", + "parent": "f96b428f1ca1252089ea3e25cd8ee33e63fb24615f1cc07559ba907d990d81c5", + "tree": "8379b4fdc9ee3e9bb7c322f632a7bed9fc334b0258abbf4e7134f8fe5b3d61b0", + "paths": ["/var"], + "hostname": "test-host", + "username": "root", + "id": "db96b36efec97e5ba385099b43f9062d214c7312c20138aee7b8bd2c6cd8995a", + "short_id": "db96b36e", + }, +] + + +class ResticControllerMock: + snapshot_list = MOCKED_SNAPSHOTS + state = ResticStates.INITIALIZED + progress = 0 + error_message = None + + +@pytest.fixture +def mock_restic_controller(mocker): + mock = mocker.patch( + "selfprivacy_api.resources.services.restic.ResticController", + autospec=True, + return_value=ResticControllerMock, + ) + return mock + + +class ResticControllerMockNoKey: + snapshot_list = [] + state = ResticStates.NO_KEY + progress = 0 + error_message = None + + +@pytest.fixture +def mock_restic_controller_no_key(mocker): + mock = mocker.patch( + "selfprivacy_api.resources.services.restic.ResticController", + autospec=True, + return_value=ResticControllerMockNoKey, + ) + return mock + + +class ResticControllerNotInitialized: + snapshot_list = [] + state = ResticStates.NOT_INITIALIZED + progress = 0 + error_message = None + + +@pytest.fixture +def mock_restic_controller_not_initialized(mocker): + mock = mocker.patch( + "selfprivacy_api.resources.services.restic.ResticController", + autospec=True, + return_value=ResticControllerNotInitialized, + ) + return mock + + +class ResticControllerInitializing: + snapshot_list = [] + state = ResticStates.INITIALIZING + progress = 0 + error_message = None + + +@pytest.fixture +def mock_restic_controller_initializing(mocker): + mock = mocker.patch( + "selfprivacy_api.resources.services.restic.ResticController", + autospec=True, + return_value=ResticControllerInitializing, + ) + return mock + + +class ResticControllerBackingUp: + snapshot_list = MOCKED_SNAPSHOTS + state = ResticStates.BACKING_UP + progress = 0.42 + error_message = None + + +@pytest.fixture +def mock_restic_controller_backing_up(mocker): + mock = mocker.patch( + "selfprivacy_api.resources.services.restic.ResticController", + autospec=True, + return_value=ResticControllerBackingUp, + ) + return mock + + +class ResticControllerError: + snapshot_list = MOCKED_SNAPSHOTS + state = ResticStates.ERROR + progress = 0 + error_message = "Error message" + + +@pytest.fixture +def mock_restic_controller_error(mocker): + mock = mocker.patch( + "selfprivacy_api.resources.services.restic.ResticController", + autospec=True, + return_value=ResticControllerError, + ) + return mock + + +class ResticControllerRestoring: + snapshot_list = MOCKED_SNAPSHOTS + state = ResticStates.RESTORING + progress = 0 + error_message = None + + +@pytest.fixture +def mock_restic_controller_restoring(mocker): + mock = mocker.patch( + "selfprivacy_api.resources.services.restic.ResticController", + autospec=True, + return_value=ResticControllerRestoring, + ) + return mock + + +@pytest.fixture +def mock_restic_tasks(mocker): + mock = mocker.patch( + "selfprivacy_api.resources.services.restic.restic_tasks", autospec=True + ) + return mock + + +@pytest.fixture +def undefined_settings(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json") + assert "backblaze" not in read_json(datadir / "undefined.json") + return datadir + + +@pytest.fixture +def some_settings(mocker, datadir): + mocker.patch( + "selfprivacy_api.utils.USERDATA_FILE", new=datadir / "some_values.json" + ) + assert "backblaze" in read_json(datadir / "some_values.json") + assert read_json(datadir / "some_values.json")["backblaze"]["accountId"] == "ID" + assert read_json(datadir / "some_values.json")["backblaze"]["accountKey"] == "KEY" + assert read_json(datadir / "some_values.json")["backblaze"]["bucket"] == "BUCKET" + return datadir + + +@pytest.fixture +def no_values(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "no_values.json") + assert "backblaze" in read_json(datadir / "no_values.json") + assert "accountId" not in read_json(datadir / "no_values.json")["backblaze"] + assert "accountKey" not in read_json(datadir / "no_values.json")["backblaze"] + assert "bucket" not in read_json(datadir / "no_values.json")["backblaze"] + return datadir + + +def test_get_snapshots_unauthorized(client, mock_restic_controller, mock_restic_tasks): + response = client.get("/services/restic/backup/list") + assert response.status_code == 401 + + +def test_get_snapshots(authorized_client, mock_restic_controller, mock_restic_tasks): + response = authorized_client.get("/services/restic/backup/list") + assert response.status_code == 200 + assert response.get_json() == MOCKED_SNAPSHOTS + + +def test_create_backup_unauthorized(client, mock_restic_controller, mock_restic_tasks): + response = client.put("/services/restic/backup/create") + assert response.status_code == 401 + + +def test_create_backup(authorized_client, mock_restic_controller, mock_restic_tasks): + response = authorized_client.put("/services/restic/backup/create") + assert response.status_code == 200 + assert mock_restic_tasks.start_backup.call_count == 1 + + +def test_create_backup_without_key( + authorized_client, mock_restic_controller_no_key, mock_restic_tasks +): + response = authorized_client.put("/services/restic/backup/create") + assert response.status_code == 400 + assert mock_restic_tasks.start_backup.call_count == 0 + + +def test_create_backup_initializing( + authorized_client, mock_restic_controller_initializing, mock_restic_tasks +): + response = authorized_client.put("/services/restic/backup/create") + assert response.status_code == 400 + assert mock_restic_tasks.start_backup.call_count == 0 + + +def test_create_backup_backing_up( + authorized_client, mock_restic_controller_backing_up, mock_restic_tasks +): + response = authorized_client.put("/services/restic/backup/create") + assert response.status_code == 409 + assert mock_restic_tasks.start_backup.call_count == 0 + + +def test_check_backup_status_unauthorized( + client, mock_restic_controller, mock_restic_tasks +): + response = client.get("/services/restic/backup/status") + assert response.status_code == 401 + + +def test_check_backup_status( + authorized_client, mock_restic_controller, mock_restic_tasks +): + response = authorized_client.get("/services/restic/backup/status") + assert response.status_code == 200 + assert response.get_json() == { + "status": "INITIALIZED", + "progress": 0, + "error_message": None, + } + + +def test_check_backup_status_no_key( + authorized_client, mock_restic_controller_no_key, mock_restic_tasks +): + response = authorized_client.get("/services/restic/backup/status") + assert response.status_code == 200 + assert response.get_json() == { + "status": "NO_KEY", + "progress": 0, + "error_message": None, + } + + +def test_check_backup_status_not_initialized( + authorized_client, mock_restic_controller_not_initialized, mock_restic_tasks +): + response = authorized_client.get("/services/restic/backup/status") + assert response.status_code == 200 + assert response.get_json() == { + "status": "NOT_INITIALIZED", + "progress": 0, + "error_message": None, + } + + +def test_check_backup_status_initializing( + authorized_client, mock_restic_controller_initializing, mock_restic_tasks +): + response = authorized_client.get("/services/restic/backup/status") + assert response.status_code == 200 + assert response.get_json() == { + "status": "INITIALIZING", + "progress": 0, + "error_message": None, + } + + +def test_check_backup_status_backing_up( + authorized_client, mock_restic_controller_backing_up +): + response = authorized_client.get("/services/restic/backup/status") + assert response.status_code == 200 + assert response.get_json() == { + "status": "BACKING_UP", + "progress": 0.42, + "error_message": None, + } + + +def test_check_backup_status_error( + authorized_client, mock_restic_controller_error, mock_restic_tasks +): + response = authorized_client.get("/services/restic/backup/status") + assert response.status_code == 200 + assert response.get_json() == { + "status": "ERROR", + "progress": 0, + "error_message": "Error message", + } + + +def test_check_backup_status_restoring( + authorized_client, mock_restic_controller_restoring, mock_restic_tasks +): + response = authorized_client.get("/services/restic/backup/status") + assert response.status_code == 200 + assert response.get_json() == { + "status": "RESTORING", + "progress": 0, + "error_message": None, + } + + +def test_reload_unauthenticated(client, mock_restic_controller, mock_restic_tasks): + response = client.get("/services/restic/backup/reload") + assert response.status_code == 401 + + +def test_backup_reload(authorized_client, mock_restic_controller, mock_restic_tasks): + response = authorized_client.get("/services/restic/backup/reload") + assert response.status_code == 200 + assert mock_restic_tasks.load_snapshots.call_count == 1 + + +def test_backup_restore_unauthorized(client, mock_restic_controller, mock_restic_tasks): + response = client.put("/services/restic/backup/restore") + assert response.status_code == 401 + + +def test_backup_restore_without_backup_id( + authorized_client, mock_restic_controller, mock_restic_tasks +): + response = authorized_client.put("/services/restic/backup/restore", json={}) + assert response.status_code == 400 + assert mock_restic_tasks.restore_from_backup.call_count == 0 + + +def test_backup_restore_with_nonexistent_backup_id( + authorized_client, mock_restic_controller, mock_restic_tasks +): + response = authorized_client.put( + "/services/restic/backup/restore", json={"backupId": "nonexistent"} + ) + assert response.status_code == 404 + assert mock_restic_tasks.restore_from_backup.call_count == 0 + + +def test_backup_restore_when_no_key( + authorized_client, mock_restic_controller_no_key, mock_restic_tasks +): + response = authorized_client.put( + "/services/restic/backup/restore", json={"backupId": "f96b428f"} + ) + assert response.status_code == 400 + assert mock_restic_tasks.restore_from_backup.call_count == 0 + + +def test_backup_restore_when_not_initialized( + authorized_client, mock_restic_controller_not_initialized, mock_restic_tasks +): + response = authorized_client.put( + "/services/restic/backup/restore", json={"backupId": "f96b428f"} + ) + assert response.status_code == 400 + assert mock_restic_tasks.restore_from_backup.call_count == 0 + + +def test_backup_restore_when_initializing( + authorized_client, mock_restic_controller_initializing, mock_restic_tasks +): + response = authorized_client.put( + "/services/restic/backup/restore", json={"backupId": "f96b428f"} + ) + assert response.status_code == 400 + assert mock_restic_tasks.restore_from_backup.call_count == 0 + + +def test_backup_restore_when_backing_up( + authorized_client, mock_restic_controller_backing_up, mock_restic_tasks +): + response = authorized_client.put( + "/services/restic/backup/restore", json={"backupId": "f96b428f"} + ) + assert response.status_code == 409 + assert mock_restic_tasks.restore_from_backup.call_count == 0 + + +def test_backup_restore_when_restoring( + authorized_client, mock_restic_controller_restoring, mock_restic_tasks +): + response = authorized_client.put( + "/services/restic/backup/restore", json={"backupId": "f96b428f"} + ) + assert response.status_code == 409 + assert mock_restic_tasks.restore_from_backup.call_count == 0 + + +def test_backup_restore_when_error( + authorized_client, mock_restic_controller_error, mock_restic_tasks +): + response = authorized_client.put( + "/services/restic/backup/restore", json={"backupId": "f96b428f"} + ) + assert response.status_code == 200 + assert mock_restic_tasks.restore_from_backup.call_count == 1 + + +def test_backup_restore(authorized_client, mock_restic_controller, mock_restic_tasks): + response = authorized_client.put( + "/services/restic/backup/restore", json={"backupId": "f96b428f"} + ) + assert response.status_code == 200 + assert mock_restic_tasks.restore_from_backup.call_count == 1 + + +def test_set_backblaze_config_unauthorized( + client, mock_restic_controller, mock_restic_tasks, some_settings +): + response = client.put("/services/restic/backblaze/config") + assert response.status_code == 401 + assert mock_restic_tasks.update_keys_from_userdata.call_count == 0 + + +def test_set_backblaze_config_without_arguments( + authorized_client, mock_restic_controller, mock_restic_tasks, some_settings +): + response = authorized_client.put("/services/restic/backblaze/config") + assert response.status_code == 400 + assert mock_restic_tasks.update_keys_from_userdata.call_count == 0 + + +def test_set_backblaze_config_without_all_values( + authorized_client, mock_restic_controller, mock_restic_tasks, some_settings +): + response = authorized_client.put( + "/services/restic/backblaze/config", + json={"accountId": "123", "applicationKey": "456"}, + ) + assert response.status_code == 400 + assert mock_restic_tasks.update_keys_from_userdata.call_count == 0 + + +def test_set_backblaze_config( + authorized_client, mock_restic_controller, mock_restic_tasks, some_settings +): + response = authorized_client.put( + "/services/restic/backblaze/config", + json={"accountId": "123", "accountKey": "456", "bucket": "789"}, + ) + assert response.status_code == 200 + assert mock_restic_tasks.update_keys_from_userdata.call_count == 1 + assert read_json(some_settings / "some_values.json")["backblaze"] == { + "accountId": "123", + "accountKey": "456", + "bucket": "789", + } + + +def test_set_backblaze_config_on_undefined( + authorized_client, mock_restic_controller, mock_restic_tasks, undefined_settings +): + response = authorized_client.put( + "/services/restic/backblaze/config", + json={"accountId": "123", "accountKey": "456", "bucket": "789"}, + ) + assert response.status_code == 200 + assert mock_restic_tasks.update_keys_from_userdata.call_count == 1 + assert read_json(undefined_settings / "undefined.json")["backblaze"] == { + "accountId": "123", + "accountKey": "456", + "bucket": "789", + } + + +def test_set_backblaze_config_on_no_values( + authorized_client, mock_restic_controller, mock_restic_tasks, no_values +): + response = authorized_client.put( + "/services/restic/backblaze/config", + json={"accountId": "123", "accountKey": "456", "bucket": "789"}, + ) + assert response.status_code == 200 + assert mock_restic_tasks.update_keys_from_userdata.call_count == 1 + assert read_json(no_values / "no_values.json")["backblaze"] == { + "accountId": "123", + "accountKey": "456", + "bucket": "789", + } diff --git a/tests/services/test_restic/no_values.json b/tests/services/test_restic/no_values.json new file mode 100644 index 0000000..c1ef7a0 --- /dev/null +++ b/tests/services/test_restic/no_values.json @@ -0,0 +1,68 @@ +{ + "backblaze": { + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ], + "users": [ + { + "username": "user1", + "hashedPassword": "HASHED_PASSWORD_1", + "sshKeys": [ + "ssh-rsa KEY user1@pc" + ] + }, + { + "username": "user2", + "hashedPassword": "HASHED_PASSWORD_2", + "sshKeys": [ + ] + }, + { + "username": "user3", + "hashedPassword": "HASHED_PASSWORD_3" + } + ] +} \ No newline at end of file diff --git a/tests/services/test_restic/some_values.json b/tests/services/test_restic/some_values.json new file mode 100644 index 0000000..a7dbf39 --- /dev/null +++ b/tests/services/test_restic/some_values.json @@ -0,0 +1,71 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "BUCKET" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ], + "users": [ + { + "username": "user1", + "hashedPassword": "HASHED_PASSWORD_1", + "sshKeys": [ + "ssh-rsa KEY user1@pc" + ] + }, + { + "username": "user2", + "hashedPassword": "HASHED_PASSWORD_2", + "sshKeys": [ + ] + }, + { + "username": "user3", + "hashedPassword": "HASHED_PASSWORD_3" + } + ] +} \ No newline at end of file diff --git a/tests/services/test_restic/undefined.json b/tests/services/test_restic/undefined.json new file mode 100644 index 0000000..59e42a0 --- /dev/null +++ b/tests/services/test_restic/undefined.json @@ -0,0 +1,66 @@ +{ + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ], + "users": [ + { + "username": "user1", + "hashedPassword": "HASHED_PASSWORD_1", + "sshKeys": [ + "ssh-rsa KEY user1@pc" + ] + }, + { + "username": "user2", + "hashedPassword": "HASHED_PASSWORD_2", + "sshKeys": [ + ] + }, + { + "username": "user3", + "hashedPassword": "HASHED_PASSWORD_3" + } + ] +} \ No newline at end of file diff --git a/tests/services/test_services.py b/tests/services/test_services.py index 859da5a..b059f90 100644 --- a/tests/services/test_services.py +++ b/tests/services/test_services.py @@ -4,8 +4,8 @@ import pytest def read_json(file_path): - with open(file_path, "r", encoding="utf-8") as f: - return json.load(f) + with open(file_path, "r", encoding="utf-8") as file: + return json.load(file) def call_args_asserts(mocked_object): diff --git a/tests/services/test_ssh.py b/tests/services/test_ssh.py index 7233a53..8bbe261 100644 --- a/tests/services/test_ssh.py +++ b/tests/services/test_ssh.py @@ -1,41 +1,38 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument import json -from os import read import pytest def read_json(file_path): - with open(file_path, "r") as f: - return json.load(f) + with open(file_path, "r", encoding="utf-8") as file: + return json.load(file) -############################################################################### +## FIXTURES ################################################### @pytest.fixture def ssh_off(mocker, datadir): mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json") - assert read_json(datadir / "turned_off.json")["ssh"]["enable"] == False - assert ( - read_json(datadir / "turned_off.json")["ssh"]["passwordAuthentication"] == True - ) + assert not read_json(datadir / "turned_off.json")["ssh"]["enable"] + assert read_json(datadir / "turned_off.json")["ssh"]["passwordAuthentication"] return datadir @pytest.fixture def ssh_on(mocker, datadir): mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json") - assert ( - read_json(datadir / "turned_off.json")["ssh"]["passwordAuthentication"] == True - ) - assert read_json(datadir / "turned_on.json")["ssh"]["enable"] == True + assert read_json(datadir / "turned_off.json")["ssh"]["passwordAuthentication"] + assert read_json(datadir / "turned_on.json")["ssh"]["enable"] return datadir @pytest.fixture def all_off(mocker, datadir): mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "all_off.json") - assert read_json(datadir / "all_off.json")["ssh"]["passwordAuthentication"] == False - assert read_json(datadir / "all_off.json")["ssh"]["enable"] == False + assert not read_json(datadir / "all_off.json")["ssh"]["passwordAuthentication"] + assert not read_json(datadir / "all_off.json")["ssh"]["enable"] return datadir @@ -46,19 +43,30 @@ def undefined_settings(mocker, datadir): return datadir +@pytest.fixture +def undefined_values(mocker, datadir): + mocker.patch( + "selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined_values.json" + ) + assert "ssh" in read_json(datadir / "undefined_values.json") + assert "enable" not in read_json(datadir / "undefined_values.json")["ssh"] + assert ( + "passwordAuthentication" + not in read_json(datadir / "undefined_values.json")["ssh"] + ) + return datadir + + @pytest.fixture def root_and_admin_have_keys(mocker, datadir): mocker.patch( "selfprivacy_api.utils.USERDATA_FILE", new=datadir / "root_and_admin_have_keys.json", ) - assert read_json(datadir / "root_and_admin_have_keys.json")["ssh"]["enable"] == True - assert ( - read_json(datadir / "root_and_admin_have_keys.json")["ssh"][ - "passwordAuthentication" - ] - == True - ) + assert read_json(datadir / "root_and_admin_have_keys.json")["ssh"]["enable"] + assert read_json(datadir / "root_and_admin_have_keys.json")["ssh"][ + "passwordAuthentication" + ] assert read_json(datadir / "root_and_admin_have_keys.json")["ssh"]["rootKeys"] == [ "ssh-ed25519 KEY test@pc" ] @@ -68,7 +76,23 @@ def root_and_admin_have_keys(mocker, datadir): return datadir -############################################################################### +@pytest.fixture +def some_users(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "some_users.json") + assert "users" in read_json(datadir / "some_users.json") + assert read_json(datadir / "some_users.json")["users"] == [ + { + "username": "user1", + "hashedPassword": "HASHED_PASSWORD_1", + "sshKeys": ["ssh-rsa KEY user1@pc"], + }, + {"username": "user2", "hashedPassword": "HASHED_PASSWORD_2", "sshKeys": []}, + {"username": "user3", "hashedPassword": "HASHED_PASSWORD_3"}, + ] + return datadir + + +## TEST 401 ###################################################### @pytest.mark.parametrize( @@ -79,20 +103,33 @@ def test_unauthorized(client, ssh_off, endpoint): assert response.status_code == 401 +## TEST ENABLE ###################################################### + + def test_legacy_enable(authorized_client, ssh_off): - response = authorized_client.post(f"/services/ssh/enable") + response = authorized_client.post("/services/ssh/enable") assert response.status_code == 200 assert read_json(ssh_off / "turned_off.json") == read_json( ssh_off / "turned_on.json" ) +def test_legacy_on_undefined(authorized_client, undefined_settings): + response = authorized_client.post("/services/ssh/enable") + assert response.status_code == 200 + data = read_json(undefined_settings / "undefined.json") + assert data["ssh"]["enable"] == True + + def test_legacy_enable_when_enabled(authorized_client, ssh_on): - response = authorized_client.post(f"/services/ssh/enable") + response = authorized_client.post("/services/ssh/enable") assert response.status_code == 200 assert read_json(ssh_on / "turned_on.json") == read_json(ssh_on / "turned_on.json") +## GET ON /ssh ###################################################### + + def test_get_current_settings_ssh_off(authorized_client, ssh_off): response = authorized_client.get("/services/ssh") assert response.status_code == 200 @@ -117,6 +154,14 @@ def test_get_current_settings_undefined(authorized_client, undefined_settings): assert response.json == {"enable": True, "passwordAuthentication": True} +def test_get_current_settings_mostly_undefined(authorized_client, undefined_values): + response = authorized_client.get("/services/ssh") + assert response.status_code == 200 + assert response.json == {"enable": True, "passwordAuthentication": True} + + +## PUT ON /ssh ###################################################### + available_settings = [ {"enable": True, "passwordAuthentication": True}, {"enable": True, "passwordAuthentication": False}, @@ -131,7 +176,7 @@ available_settings = [ @pytest.mark.parametrize("settings", available_settings) def test_set_settings_ssh_off(authorized_client, ssh_off, settings): - response = authorized_client.put(f"/services/ssh", json=settings) + response = authorized_client.put("/services/ssh", json=settings) assert response.status_code == 200 data = read_json(ssh_off / "turned_off.json")["ssh"] if "enable" in settings: @@ -142,7 +187,7 @@ def test_set_settings_ssh_off(authorized_client, ssh_off, settings): @pytest.mark.parametrize("settings", available_settings) def test_set_settings_ssh_on(authorized_client, ssh_on, settings): - response = authorized_client.put(f"/services/ssh", json=settings) + response = authorized_client.put("/services/ssh", json=settings) assert response.status_code == 200 data = read_json(ssh_on / "turned_on.json")["ssh"] if "enable" in settings: @@ -153,7 +198,7 @@ def test_set_settings_ssh_on(authorized_client, ssh_on, settings): @pytest.mark.parametrize("settings", available_settings) def test_set_settings_all_off(authorized_client, all_off, settings): - response = authorized_client.put(f"/services/ssh", json=settings) + response = authorized_client.put("/services/ssh", json=settings) assert response.status_code == 200 data = read_json(all_off / "all_off.json")["ssh"] if "enable" in settings: @@ -164,7 +209,7 @@ def test_set_settings_all_off(authorized_client, all_off, settings): @pytest.mark.parametrize("settings", available_settings) def test_set_settings_undefined(authorized_client, undefined_settings, settings): - response = authorized_client.put(f"/services/ssh", json=settings) + response = authorized_client.put("/services/ssh", json=settings) assert response.status_code == 200 data = read_json(undefined_settings / "undefined.json")["ssh"] if "enable" in settings: @@ -173,9 +218,12 @@ def test_set_settings_undefined(authorized_client, undefined_settings, settings) assert data["passwordAuthentication"] == settings["passwordAuthentication"] +## PUT ON /ssh/key/send ###################################################### + + def test_add_root_key(authorized_client, ssh_on): response = authorized_client.put( - f"/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"} + "/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"} ) assert response.status_code == 201 assert read_json(ssh_on / "turned_on.json")["ssh"]["rootKeys"] == [ @@ -183,9 +231,18 @@ def test_add_root_key(authorized_client, ssh_on): ] +def test_add_root_key_on_undefined(authorized_client, undefined_settings): + response = authorized_client.put( + "/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"} + ) + assert response.status_code == 201 + data = read_json(undefined_settings / "undefined.json") + assert data["ssh"]["rootKeys"] == ["ssh-rsa KEY test@pc"] + + def test_add_root_key_one_more(authorized_client, root_and_admin_have_keys): response = authorized_client.put( - f"/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"} + "/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"} ) assert response.status_code == 201 assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][ @@ -198,7 +255,7 @@ def test_add_root_key_one_more(authorized_client, root_and_admin_have_keys): def test_add_existing_root_key(authorized_client, root_and_admin_have_keys): response = authorized_client.put( - f"/services/ssh/key/send", json={"public_key": "ssh-ed25519 KEY test@pc"} + "/services/ssh/key/send", json={"public_key": "ssh-ed25519 KEY test@pc"} ) assert response.status_code == 409 assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][ @@ -210,33 +267,42 @@ def test_add_existing_root_key(authorized_client, root_and_admin_have_keys): def test_add_invalid_root_key(authorized_client, ssh_on): response = authorized_client.put( - f"/services/ssh/key/send", json={"public_key": "INVALID KEY test@pc"} + "/services/ssh/key/send", json={"public_key": "INVALID KEY test@pc"} ) assert response.status_code == 400 +## /ssh/keys/{user} ###################################################### + + def test_add_root_key_via_wrong_endpoint(authorized_client, ssh_on): response = authorized_client.post( - f"/services/ssh/keys/root", json={"public_key": "ssh-rsa KEY test@pc"} + "/services/ssh/keys/root", json={"public_key": "ssh-rsa KEY test@pc"} ) assert response.status_code == 400 def test_get_root_key(authorized_client, root_and_admin_have_keys): - response = authorized_client.get(f"/services/ssh/keys/root") + response = authorized_client.get("/services/ssh/keys/root") assert response.status_code == 200 assert response.json == ["ssh-ed25519 KEY test@pc"] def test_get_root_key_when_none(authorized_client, ssh_on): - response = authorized_client.get(f"/services/ssh/keys/root") + response = authorized_client.get("/services/ssh/keys/root") + assert response.status_code == 200 + assert response.json == [] + + +def test_get_root_key_on_undefined(authorized_client, undefined_settings): + response = authorized_client.get("/services/ssh/keys/root") assert response.status_code == 200 assert response.json == [] def test_delete_root_key(authorized_client, root_and_admin_have_keys): response = authorized_client.delete( - f"/services/ssh/keys/root", json={"public_key": "ssh-ed25519 KEY test@pc"} + "/services/ssh/keys/root", json={"public_key": "ssh-ed25519 KEY test@pc"} ) assert response.status_code == 200 assert ( @@ -249,7 +315,7 @@ def test_delete_root_key(authorized_client, root_and_admin_have_keys): def test_delete_root_nonexistent_key(authorized_client, root_and_admin_have_keys): response = authorized_client.delete( - f"/services/ssh/keys/root", json={"public_key": "ssh-rsa KEY test@pc"} + "/services/ssh/keys/root", json={"public_key": "ssh-rsa KEY test@pc"} ) assert response.status_code == 404 assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][ @@ -259,21 +325,29 @@ def test_delete_root_nonexistent_key(authorized_client, root_and_admin_have_keys ] +def test_delete_root_key_on_undefined(authorized_client, undefined_settings): + response = authorized_client.delete( + "/services/ssh/keys/root", json={"public_key": "ssh-ed25519 KEY test@pc"} + ) + assert response.status_code == 404 + assert read_json(undefined_settings / "undefined.json")["ssh"]["rootKeys"] == [] + + def test_get_admin_key(authorized_client, root_and_admin_have_keys): - response = authorized_client.get(f"/services/ssh/keys/tester") + response = authorized_client.get("/services/ssh/keys/tester") assert response.status_code == 200 assert response.json == ["ssh-rsa KEY test@pc"] def test_get_admin_key_when_none(authorized_client, ssh_on): - response = authorized_client.get(f"/services/ssh/keys/tester") + response = authorized_client.get("/services/ssh/keys/tester") assert response.status_code == 200 assert response.json == [] def test_delete_admin_key(authorized_client, root_and_admin_have_keys): response = authorized_client.delete( - f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"} + "/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"} ) assert response.status_code == 200 assert ( @@ -282,9 +356,27 @@ def test_delete_admin_key(authorized_client, root_and_admin_have_keys): ) +def test_delete_nonexistent_admin_key(authorized_client, root_and_admin_have_keys): + response = authorized_client.delete( + "/services/ssh/keys/tester", json={"public_key": "ssh-rsa NO KEY test@pc"} + ) + assert response.status_code == 404 + assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")[ + "sshKeys" + ] == ["ssh-rsa KEY test@pc"] + + +def test_delete_admin_key_on_undefined(authorized_client, undefined_settings): + response = authorized_client.delete( + "/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"} + ) + assert response.status_code == 404 + assert read_json(undefined_settings / "undefined.json")["sshKeys"] == [] + + def test_add_admin_key(authorized_client, ssh_on): response = authorized_client.post( - f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"} + "/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"} ) assert response.status_code == 201 assert read_json(ssh_on / "turned_on.json")["sshKeys"] == [ @@ -294,7 +386,7 @@ def test_add_admin_key(authorized_client, ssh_on): def test_add_admin_key_one_more(authorized_client, root_and_admin_have_keys): response = authorized_client.post( - f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY_2 test@pc"} + "/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY_2 test@pc"} ) assert response.status_code == 201 assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")[ @@ -304,7 +396,7 @@ def test_add_admin_key_one_more(authorized_client, root_and_admin_have_keys): def test_add_existing_admin_key(authorized_client, root_and_admin_have_keys): response = authorized_client.post( - f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"} + "/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"} ) assert response.status_code == 409 assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")[ @@ -312,3 +404,111 @@ def test_add_existing_admin_key(authorized_client, root_and_admin_have_keys): ] == [ "ssh-rsa KEY test@pc", ] + + +def test_add_invalid_admin_key(authorized_client, ssh_on): + response = authorized_client.post( + "/services/ssh/keys/tester", json={"public_key": "INVALID KEY test@pc"} + ) + assert response.status_code == 400 + + +@pytest.mark.parametrize("user", [1, 2, 3]) +def test_get_user_key(authorized_client, some_users, user): + response = authorized_client.get(f"/services/ssh/keys/user{user}") + assert response.status_code == 200 + if user == 1: + assert response.json == ["ssh-rsa KEY user1@pc"] + else: + assert response.json == [] + + +def test_get_keys_of_nonexistent_user(authorized_client, some_users): + response = authorized_client.get("/services/ssh/keys/user4") + assert response.status_code == 404 + + +def test_get_keys_of_undefined_users(authorized_client, undefined_settings): + response = authorized_client.get("/services/ssh/keys/user1") + assert response.status_code == 404 + + +@pytest.mark.parametrize("user", [1, 2, 3]) +def test_add_user_key(authorized_client, some_users, user): + response = authorized_client.post( + f"/services/ssh/keys/user{user}", json={"public_key": "ssh-ed25519 KEY test@pc"} + ) + assert response.status_code == 201 + if user == 1: + assert read_json(some_users / "some_users.json")["users"][user - 1][ + "sshKeys" + ] == [ + "ssh-rsa KEY user1@pc", + "ssh-ed25519 KEY test@pc", + ] + else: + assert read_json(some_users / "some_users.json")["users"][user - 1][ + "sshKeys" + ] == ["ssh-ed25519 KEY test@pc"] + + +def test_add_existing_user_key(authorized_client, some_users): + response = authorized_client.post( + "/services/ssh/keys/user1", json={"public_key": "ssh-rsa KEY user1@pc"} + ) + assert response.status_code == 409 + assert read_json(some_users / "some_users.json")["users"][0]["sshKeys"] == [ + "ssh-rsa KEY user1@pc", + ] + + +def test_add_invalid_user_key(authorized_client, some_users): + response = authorized_client.post( + "/services/ssh/keys/user1", json={"public_key": "INVALID KEY user1@pc"} + ) + assert response.status_code == 400 + + +def test_delete_user_key(authorized_client, some_users): + response = authorized_client.delete( + "/services/ssh/keys/user1", json={"public_key": "ssh-rsa KEY user1@pc"} + ) + assert response.status_code == 200 + assert read_json(some_users / "some_users.json")["users"][0]["sshKeys"] == [] + + +@pytest.mark.parametrize("user", [2, 3]) +def test_delete_nonexistent_user_key(authorized_client, some_users, user): + response = authorized_client.delete( + f"/services/ssh/keys/user{user}", json={"public_key": "ssh-rsa KEY user1@pc"} + ) + assert response.status_code == 404 + assert read_json(some_users / "some_users.json")["users"][user - 1]["sshKeys"] == [] + + +def test_add_keys_of_nonexistent_user(authorized_client, some_users): + response = authorized_client.post( + "/services/ssh/keys/user4", json={"public_key": "ssh-rsa KEY user4@pc"} + ) + assert response.status_code == 404 + + +def test_add_key_on_undefined_users(authorized_client, undefined_settings): + response = authorized_client.post( + "/services/ssh/keys/user1", json={"public_key": "ssh-rsa KEY user4@pc"} + ) + assert response.status_code == 404 + + +def test_delete_keys_of_nonexistent_user(authorized_client, some_users): + response = authorized_client.delete( + "/services/ssh/keys/user4", json={"public_key": "ssh-rsa KEY user4@pc"} + ) + assert response.status_code == 404 + + +def test_delete_key_when_undefined_users(authorized_client, undefined_settings): + response = authorized_client.delete( + "/services/ssh/keys/user1", json={"public_key": "ssh-rsa KEY user1@pc"} + ) + assert response.status_code == 404 diff --git a/tests/services/test_ssh/some_users.json b/tests/services/test_ssh/some_users.json new file mode 100644 index 0000000..569253a --- /dev/null +++ b/tests/services/test_ssh/some_users.json @@ -0,0 +1,71 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ], + "users": [ + { + "username": "user1", + "hashedPassword": "HASHED_PASSWORD_1", + "sshKeys": [ + "ssh-rsa KEY user1@pc" + ] + }, + { + "username": "user2", + "hashedPassword": "HASHED_PASSWORD_2", + "sshKeys": [ + ] + }, + { + "username": "user3", + "hashedPassword": "HASHED_PASSWORD_3" + } + ] +} \ No newline at end of file diff --git a/tests/services/test_ssh/undefined.json b/tests/services/test_ssh/undefined.json index 3f5545f..a214cc3 100644 --- a/tests/services/test_ssh/undefined.json +++ b/tests/services/test_ssh/undefined.json @@ -38,8 +38,5 @@ "enable": true, "allowReboot": true }, - "timezone": "Europe/Moscow", - "sshKeys": [ - "ssh-rsa KEY test@pc" - ] + "timezone": "Europe/Moscow" } \ No newline at end of file diff --git a/tests/services/test_ssh/undefined_values.json b/tests/services/test_ssh/undefined_values.json new file mode 100644 index 0000000..235a220 --- /dev/null +++ b/tests/services/test_ssh/undefined_values.json @@ -0,0 +1,46 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": {}, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/test_common.py b/tests/test_common.py new file mode 100644 index 0000000..f8aa36b --- /dev/null +++ b/tests/test_common.py @@ -0,0 +1,16 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +import json +import pytest + + +def test_get_api_version(authorized_client): + response = authorized_client.get("/api/version") + assert response.status_code == 200 + assert "version" in response.get_json() + + +def test_get_api_version_unauthorized(client): + response = client.get("/api/version") + assert response.status_code == 200 + assert "version" in response.get_json() diff --git a/tests/test_system.py b/tests/test_system.py new file mode 100644 index 0000000..a4e24ad --- /dev/null +++ b/tests/test_system.py @@ -0,0 +1,397 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +import json +import pytest +from selfprivacy_api.utils import get_domain + + +def read_json(file_path): + with open(file_path, "r", encoding="utf-8") as file: + return json.load(file) + + +@pytest.fixture +def domain_file(mocker, datadir): + mocker.patch("selfprivacy_api.utils.DOMAIN_FILE", datadir / "domain") + return datadir + + +@pytest.fixture +def turned_on(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json") + assert read_json(datadir / "turned_on.json")["autoUpgrade"]["enable"] == True + assert read_json(datadir / "turned_on.json")["autoUpgrade"]["allowReboot"] == True + assert read_json(datadir / "turned_on.json")["timezone"] == "Europe/Moscow" + return datadir + + +@pytest.fixture +def turned_off(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json") + assert read_json(datadir / "turned_off.json")["autoUpgrade"]["enable"] == False + assert read_json(datadir / "turned_off.json")["autoUpgrade"]["allowReboot"] == False + assert read_json(datadir / "turned_off.json")["timezone"] == "Europe/Moscow" + return datadir + + +@pytest.fixture +def undefined_config(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json") + assert "autoUpgrade" not in read_json(datadir / "undefined.json") + assert "timezone" not in read_json(datadir / "undefined.json") + return datadir + + +@pytest.fixture +def no_values(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "no_values.json") + assert "enable" not in read_json(datadir / "no_values.json")["autoUpgrade"] + assert "allowReboot" not in read_json(datadir / "no_values.json")["autoUpgrade"] + return datadir + + +class ProcessMock: + """Mock subprocess.Popen""" + + def __init__(self, args, **kwargs): + self.args = args + self.kwargs = kwargs + + def communicate(): + return (b"", None) + + returncode = 0 + + +class BrokenServiceMock(ProcessMock): + def communicate(): + return (b"Testing error", None) + + returncode = 3 + + +@pytest.fixture +def mock_subprocess_popen(mocker): + mock = mocker.patch("subprocess.Popen", autospec=True, return_value=ProcessMock) + return mock + + +@pytest.fixture +def mock_broken_service(mocker): + mock = mocker.patch( + "subprocess.Popen", autospec=True, return_value=BrokenServiceMock + ) + return mock + + +@pytest.fixture +def mock_subprocess_check_output(mocker): + mock = mocker.patch( + "subprocess.check_output", autospec=True, return_value=b"Testing Linux" + ) + return mock + + +def test_wrong_auth(wrong_auth_client): + response = wrong_auth_client.get("/system/pythonVersion") + assert response.status_code == 401 + + +def test_get_domain(authorized_client, domain_file): + assert get_domain() == "test-domain.tld" + + +## Timezones + + +def test_get_timezone_unauthorized(client, turned_on): + response = client.get("/system/configuration/timezone") + assert response.status_code == 401 + + +def test_get_timezone(authorized_client, turned_on): + response = authorized_client.get("/system/configuration/timezone") + assert response.status_code == 200 + assert response.get_json() == "Europe/Moscow" + + +def test_get_timezone_on_undefined(authorized_client, undefined_config): + response = authorized_client.get("/system/configuration/timezone") + assert response.status_code == 200 + assert response.get_json() == "Europe/Uzhgorod" + + +def test_put_timezone_unauthorized(client, turned_on): + response = client.put( + "/system/configuration/timezone", json={"timezone": "Europe/Moscow"} + ) + assert response.status_code == 401 + + +def test_put_timezone(authorized_client, turned_on): + response = authorized_client.put( + "/system/configuration/timezone", json={"timezone": "Europe/Helsinki"} + ) + assert response.status_code == 200 + assert read_json(turned_on / "turned_on.json")["timezone"] == "Europe/Helsinki" + + +def test_put_timezone_on_undefined(authorized_client, undefined_config): + response = authorized_client.put( + "/system/configuration/timezone", json={"timezone": "Europe/Helsinki"} + ) + assert response.status_code == 200 + assert ( + read_json(undefined_config / "undefined.json")["timezone"] == "Europe/Helsinki" + ) + + +def test_put_timezone_without_timezone(authorized_client, turned_on): + response = authorized_client.put("/system/configuration/timezone", json={}) + assert response.status_code == 400 + assert read_json(turned_on / "turned_on.json")["timezone"] == "Europe/Moscow" + + +def test_put_invalid_timezone(authorized_client, turned_on): + response = authorized_client.put( + "/system/configuration/timezone", json={"timezone": "Invalid/Timezone"} + ) + assert response.status_code == 400 + assert read_json(turned_on / "turned_on.json")["timezone"] == "Europe/Moscow" + + +## AutoUpgrade + + +def test_get_auto_upgrade_unauthorized(client, turned_on): + response = client.get("/system/configuration/autoUpgrade") + assert response.status_code == 401 + + +def test_get_auto_upgrade(authorized_client, turned_on): + response = authorized_client.get("/system/configuration/autoUpgrade") + assert response.status_code == 200 + assert response.get_json() == { + "enable": True, + "allowReboot": True, + } + + +def test_get_auto_upgrade_on_undefined(authorized_client, undefined_config): + response = authorized_client.get("/system/configuration/autoUpgrade") + assert response.status_code == 200 + assert response.get_json() == { + "enable": True, + "allowReboot": False, + } + + +def test_get_auto_upgrade_without_values(authorized_client, no_values): + response = authorized_client.get("/system/configuration/autoUpgrade") + assert response.status_code == 200 + assert response.get_json() == { + "enable": True, + "allowReboot": False, + } + + +def test_get_auto_upgrade_turned_off(authorized_client, turned_off): + response = authorized_client.get("/system/configuration/autoUpgrade") + assert response.status_code == 200 + assert response.get_json() == { + "enable": False, + "allowReboot": False, + } + + +def test_put_auto_upgrade_unauthorized(client, turned_on): + response = client.put( + "/system/configuration/autoUpgrade", json={"enable": True, "allowReboot": True} + ) + assert response.status_code == 401 + + +def test_put_auto_upgrade(authorized_client, turned_on): + response = authorized_client.put( + "/system/configuration/autoUpgrade", json={"enable": False, "allowReboot": True} + ) + assert response.status_code == 200 + assert read_json(turned_on / "turned_on.json")["autoUpgrade"] == { + "enable": False, + "allowReboot": True, + } + + +def test_put_auto_upgrade_on_undefined(authorized_client, undefined_config): + response = authorized_client.put( + "/system/configuration/autoUpgrade", json={"enable": False, "allowReboot": True} + ) + assert response.status_code == 200 + assert read_json(undefined_config / "undefined.json")["autoUpgrade"] == { + "enable": False, + "allowReboot": True, + } + + +def test_put_auto_upgrade_without_values(authorized_client, no_values): + response = authorized_client.put( + "/system/configuration/autoUpgrade", json={"enable": True, "allowReboot": True} + ) + assert response.status_code == 200 + assert read_json(no_values / "no_values.json")["autoUpgrade"] == { + "enable": True, + "allowReboot": True, + } + + +def test_put_auto_upgrade_turned_off(authorized_client, turned_off): + response = authorized_client.put( + "/system/configuration/autoUpgrade", json={"enable": True, "allowReboot": True} + ) + assert response.status_code == 200 + assert read_json(turned_off / "turned_off.json")["autoUpgrade"] == { + "enable": True, + "allowReboot": True, + } + + +def test_put_auto_upgrade_without_enable(authorized_client, turned_off): + response = authorized_client.put( + "/system/configuration/autoUpgrade", json={"allowReboot": True} + ) + assert response.status_code == 200 + assert read_json(turned_off / "turned_off.json")["autoUpgrade"] == { + "enable": False, + "allowReboot": True, + } + + +def test_put_auto_upgrade_without_allow_reboot(authorized_client, turned_off): + response = authorized_client.put( + "/system/configuration/autoUpgrade", json={"enable": True} + ) + assert response.status_code == 200 + assert read_json(turned_off / "turned_off.json")["autoUpgrade"] == { + "enable": True, + "allowReboot": False, + } + + +def test_put_auto_upgrade_with_empty_json(authorized_client, turned_off): + response = authorized_client.put("/system/configuration/autoUpgrade", json={}) + assert response.status_code == 200 + assert read_json(turned_off / "turned_off.json")["autoUpgrade"] == { + "enable": False, + "allowReboot": False, + } + + +def test_system_rebuild_unauthorized(client, mock_subprocess_popen): + response = client.get("/system/configuration/apply") + assert response.status_code == 401 + assert mock_subprocess_popen.call_count == 0 + + +def test_system_rebuild(authorized_client, mock_subprocess_popen): + response = authorized_client.get("/system/configuration/apply") + assert response.status_code == 200 + assert mock_subprocess_popen.call_count == 1 + assert mock_subprocess_popen.call_args[0][0] == [ + "systemctl", + "start", + "sp-nixos-rebuild.service", + ] + + +def test_system_upgrade_unauthorized(client, mock_subprocess_popen): + response = client.get("/system/configuration/upgrade") + assert response.status_code == 401 + assert mock_subprocess_popen.call_count == 0 + + +def test_system_upgrade(authorized_client, mock_subprocess_popen): + response = authorized_client.get("/system/configuration/upgrade") + assert response.status_code == 200 + assert mock_subprocess_popen.call_count == 1 + assert mock_subprocess_popen.call_args[0][0] == [ + "systemctl", + "start", + "sp-nixos-upgrade.service", + ] + + +def test_system_rollback_unauthorized(client, mock_subprocess_popen): + response = client.get("/system/configuration/rollback") + assert response.status_code == 401 + assert mock_subprocess_popen.call_count == 0 + + +def test_system_rollback(authorized_client, mock_subprocess_popen): + response = authorized_client.get("/system/configuration/rollback") + assert response.status_code == 200 + assert mock_subprocess_popen.call_count == 1 + assert mock_subprocess_popen.call_args[0][0] == [ + "systemctl", + "start", + "sp-nixos-rollback.service", + ] + + +def test_get_system_version_unauthorized(client, mock_subprocess_check_output): + response = client.get("/system/version") + assert response.status_code == 401 + assert mock_subprocess_check_output.call_count == 0 + + +def test_get_system_version(authorized_client, mock_subprocess_check_output): + response = authorized_client.get("/system/version") + assert response.status_code == 200 + assert response.get_json() == {"system_version": "Testing Linux"} + assert mock_subprocess_check_output.call_count == 1 + assert mock_subprocess_check_output.call_args[0][0] == ["uname", "-a"] + + +def test_reboot_system_unauthorized(client, mock_subprocess_popen): + response = client.get("/system/reboot") + assert response.status_code == 401 + assert mock_subprocess_popen.call_count == 0 + + +def test_reboot_system(authorized_client, mock_subprocess_popen): + response = authorized_client.get("/system/reboot") + assert response.status_code == 200 + assert mock_subprocess_popen.call_count == 1 + assert mock_subprocess_popen.call_args[0][0] == ["reboot"] + + +def test_get_python_version_unauthorized(client, mock_subprocess_check_output): + response = client.get("/system/pythonVersion") + assert response.status_code == 401 + assert mock_subprocess_check_output.call_count == 0 + + +def test_get_python_version(authorized_client, mock_subprocess_check_output): + response = authorized_client.get("/system/pythonVersion") + assert response.status_code == 200 + assert response.get_json() == "Testing Linux" + assert mock_subprocess_check_output.call_count == 1 + assert mock_subprocess_check_output.call_args[0][0] == ["python", "-V"] + + +def test_pull_system_unauthorized(client, mock_subprocess_popen): + response = client.get("/system/configuration/pull") + assert response.status_code == 401 + assert mock_subprocess_popen.call_count == 0 + + +def test_pull_system(authorized_client, mock_subprocess_popen): + response = authorized_client.get("/system/configuration/pull") + assert response.status_code == 200 + assert mock_subprocess_popen.call_count == 1 + assert mock_subprocess_popen.call_args[0][0] == ["git", "pull"] + + +def test_pull_system_broken_repo(authorized_client, mock_broken_service): + response = authorized_client.get("/system/configuration/pull") + assert response.status_code == 500 + assert mock_broken_service.call_count == 1 diff --git a/tests/test_system/domain b/tests/test_system/domain new file mode 100644 index 0000000..3679d0d --- /dev/null +++ b/tests/test_system/domain @@ -0,0 +1 @@ +test-domain.tld \ No newline at end of file diff --git a/tests/test_system/no_values.json b/tests/test_system/no_values.json new file mode 100644 index 0000000..59e5e71 --- /dev/null +++ b/tests/test_system/no_values.json @@ -0,0 +1,50 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": true + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/test_system/turned_off.json b/tests/test_system/turned_off.json new file mode 100644 index 0000000..f451683 --- /dev/null +++ b/tests/test_system/turned_off.json @@ -0,0 +1,52 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": true + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": false, + "allowReboot": false + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/test_system/turned_on.json b/tests/test_system/turned_on.json new file mode 100644 index 0000000..337e47f --- /dev/null +++ b/tests/test_system/turned_on.json @@ -0,0 +1,52 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": true + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/test_system/undefined.json b/tests/test_system/undefined.json new file mode 100644 index 0000000..b67b296 --- /dev/null +++ b/tests/test_system/undefined.json @@ -0,0 +1,47 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": true + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/test_users.py b/tests/test_users.py new file mode 100644 index 0000000..bc07951 --- /dev/null +++ b/tests/test_users.py @@ -0,0 +1,270 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +import json +import pytest + + +def read_json(file_path): + with open(file_path, "r", encoding="utf-8") as file: + return json.load(file) + + +invalid_usernames = [ + "root", + "messagebus", + "postfix", + "polkituser", + "dovecot2", + "dovenull", + "nginx", + "postgres", + "systemd-journal-gateway", + "prosody", + "systemd-network", + "systemd-resolve", + "systemd-timesync", + "opendkim", + "rspamd", + "sshd", + "selfprivacy-api", + "restic", + "redis", + "pleroma", + "ocserv", + "nextcloud", + "memcached", + "knot-resolver", + "gitea", + "bitwarden_rs", + "vaultwarden", + "acme", + "virtualMail", + "nixbld1", + "nixbld2", + "nixbld29", + "nobody", +] + + +## FIXTURES ################################################### + + +@pytest.fixture +def no_users(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "no_users.json") + assert read_json(datadir / "no_users.json")["users"] == [] + return datadir + + +@pytest.fixture +def one_user(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "one_user.json") + assert read_json(datadir / "one_user.json")["users"] == [ + { + "username": "user1", + "hashedPassword": "HASHED_PASSWORD_1", + "sshKeys": ["ssh-rsa KEY user1@pc"], + } + ] + return datadir + + +@pytest.fixture +def some_users(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "some_users.json") + assert read_json(datadir / "some_users.json")["users"] == [ + { + "username": "user1", + "hashedPassword": "HASHED_PASSWORD_1", + "sshKeys": ["ssh-rsa KEY user1@pc"], + }, + {"username": "user2", "hashedPassword": "HASHED_PASSWORD_2", "sshKeys": []}, + {"username": "user3", "hashedPassword": "HASHED_PASSWORD_3"}, + ] + return datadir + + +@pytest.fixture +def undefined_settings(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json") + assert "users" not in read_json(datadir / "undefined.json") + return datadir + + +class ProcessMock: + """Mock subprocess.Popen""" + + def __init__(self, args, **kwargs): + self.args = args + self.kwargs = kwargs + + def communicate(): + return (b"NEW_HASHED", None) + + returncode = 0 + + +@pytest.fixture +def mock_subprocess_popen(mocker): + mock = mocker.patch("subprocess.Popen", autospec=True, return_value=ProcessMock) + return mock + + +## TESTS ###################################################### + + +def test_get_users_unauthorized(client, some_users, mock_subprocess_popen): + response = client.get("/users") + assert response.status_code == 401 + + +def test_get_some_users(authorized_client, some_users, mock_subprocess_popen): + response = authorized_client.get("/users") + assert response.status_code == 200 + assert response.json == ["user1", "user2", "user3"] + + +def test_get_one_user(authorized_client, one_user, mock_subprocess_popen): + response = authorized_client.get("/users") + assert response.status_code == 200 + assert response.json == ["user1"] + + +def test_get_no_users(authorized_client, no_users, mock_subprocess_popen): + response = authorized_client.get("/users") + assert response.status_code == 200 + assert response.json == [] + + +def test_get_undefined_users( + authorized_client, undefined_settings, mock_subprocess_popen +): + response = authorized_client.get("/users") + assert response.status_code == 200 + assert response.json == [] + + +def test_post_users_unauthorized(client, some_users, mock_subprocess_popen): + response = client.post("/users") + assert response.status_code == 401 + + +def test_post_one_user(authorized_client, one_user, mock_subprocess_popen): + response = authorized_client.post( + "/users", json={"username": "user4", "password": "password"} + ) + assert response.status_code == 201 + assert read_json(one_user / "one_user.json")["users"] == [ + { + "username": "user1", + "hashedPassword": "HASHED_PASSWORD_1", + "sshKeys": ["ssh-rsa KEY user1@pc"], + }, + { + "username": "user4", + "hashedPassword": "NEW_HASHED", + }, + ] + + +def test_post_without_username(authorized_client, one_user, mock_subprocess_popen): + response = authorized_client.post("/users", json={"password": "password"}) + assert response.status_code == 400 + + +def test_post_without_password(authorized_client, one_user, mock_subprocess_popen): + response = authorized_client.post("/users", json={"username": "user4"}) + assert response.status_code == 400 + + +def test_post_without_username_and_password( + authorized_client, one_user, mock_subprocess_popen +): + response = authorized_client.post("/users", json={}) + assert response.status_code == 400 + + +@pytest.mark.parametrize("username", invalid_usernames) +def test_post_system_user(authorized_client, one_user, mock_subprocess_popen, username): + response = authorized_client.post( + "/users", json={"username": username, "password": "password"} + ) + assert response.status_code == 409 + + +def test_post_existing_user(authorized_client, one_user, mock_subprocess_popen): + response = authorized_client.post( + "/users", json={"username": "user1", "password": "password"} + ) + assert response.status_code == 409 + + +def test_post_user_to_undefined_users( + authorized_client, undefined_settings, mock_subprocess_popen +): + response = authorized_client.post( + "/users", json={"username": "user4", "password": "password"} + ) + assert response.status_code == 201 + assert read_json(undefined_settings / "undefined.json")["users"] == [ + {"username": "user4", "hashedPassword": "NEW_HASHED"} + ] + + +def test_post_very_long_username(authorized_client, one_user, mock_subprocess_popen): + response = authorized_client.post( + "/users", json={"username": "a" * 100, "password": "password"} + ) + assert response.status_code == 400 + + +@pytest.mark.parametrize("username", ["", "1", "фыр", "user1@", "№:%##$^&@$&^()_"]) +def test_post_invalid_username( + authorized_client, one_user, mock_subprocess_popen, username +): + response = authorized_client.post( + "/users", json={"username": username, "password": "password"} + ) + assert response.status_code == 400 + + +def test_delete_user_unauthorized(client, some_users, mock_subprocess_popen): + response = client.delete("/users/user1") + assert response.status_code == 401 + + +def test_delete_user_not_found(authorized_client, some_users, mock_subprocess_popen): + response = authorized_client.delete("/users/user4") + assert response.status_code == 404 + + +def test_delete_user(authorized_client, some_users, mock_subprocess_popen): + response = authorized_client.delete("/users/user1") + assert response.status_code == 200 + assert read_json(some_users / "some_users.json")["users"] == [ + {"username": "user2", "hashedPassword": "HASHED_PASSWORD_2", "sshKeys": []}, + {"username": "user3", "hashedPassword": "HASHED_PASSWORD_3"}, + ] + + +@pytest.mark.parametrize("username", invalid_usernames) +def test_delete_system_user( + authorized_client, some_users, mock_subprocess_popen, username +): + response = authorized_client.delete("/users/" + username) + assert response.status_code == 400 or response.status_code == 404 + + +def test_delete_main_user(authorized_client, some_users, mock_subprocess_popen): + response = authorized_client.delete("/users/tester") + assert response.status_code == 400 + + +def test_delete_without_argument(authorized_client, some_users, mock_subprocess_popen): + response = authorized_client.delete("/users/") + assert response.status_code == 404 + + +def test_delete_just_delete(authorized_client, some_users, mock_subprocess_popen): + response = authorized_client.delete("/users") + assert response.status_code == 405 diff --git a/tests/test_users/no_users.json b/tests/test_users/no_users.json new file mode 100644 index 0000000..e5efe86 --- /dev/null +++ b/tests/test_users/no_users.json @@ -0,0 +1,54 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ], + "users": [ + ] +} \ No newline at end of file diff --git a/tests/test_users/one_user.json b/tests/test_users/one_user.json new file mode 100644 index 0000000..5df2108 --- /dev/null +++ b/tests/test_users/one_user.json @@ -0,0 +1,61 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ], + "users": [ + { + "username": "user1", + "hashedPassword": "HASHED_PASSWORD_1", + "sshKeys": [ + "ssh-rsa KEY user1@pc" + ] + } + ] +} \ No newline at end of file diff --git a/tests/test_users/some_users.json b/tests/test_users/some_users.json new file mode 100644 index 0000000..569253a --- /dev/null +++ b/tests/test_users/some_users.json @@ -0,0 +1,71 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ], + "users": [ + { + "username": "user1", + "hashedPassword": "HASHED_PASSWORD_1", + "sshKeys": [ + "ssh-rsa KEY user1@pc" + ] + }, + { + "username": "user2", + "hashedPassword": "HASHED_PASSWORD_2", + "sshKeys": [ + ] + }, + { + "username": "user3", + "hashedPassword": "HASHED_PASSWORD_3" + } + ] +} \ No newline at end of file diff --git a/tests/test_users/undefined.json b/tests/test_users/undefined.json new file mode 100644 index 0000000..7b2cf8b --- /dev/null +++ b/tests/test_users/undefined.json @@ -0,0 +1,52 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file