diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..6ab4b9f --- /dev/null +++ b/.drone.yml @@ -0,0 +1,12 @@ +kind: pipeline +type: exec +name: default + +platform: + os: linux + arch: amd64 + +steps: +- name: test + commands: + - pytest \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index b451222..62e65ac 100755 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,11 @@ setuptools portalocker flask-swagger flask-swagger-ui +pytz +huey +gevent + +pytest +coverage +pytest-mock +pytest-datadir diff --git a/selfprivacy_api/app.py b/selfprivacy_api/app.py index e7c8f92..f74f650 100644 --- a/selfprivacy_api/app.py +++ b/selfprivacy_api/app.py @@ -1,6 +1,9 @@ #!/usr/bin/env python3 """SelfPrivacy server management API""" import os +from gevent import monkey + + from flask import Flask, request, jsonify from flask_restful import Api from flask_swagger import swagger @@ -11,18 +14,26 @@ from selfprivacy_api.resources.common import ApiVersion, DecryptDisk from selfprivacy_api.resources.system import api_system from selfprivacy_api.resources.services import services as api_services +from selfprivacy_api.restic_controller.tasks import huey, init_restic + swagger_blueprint = get_swaggerui_blueprint( "/api/docs", "/api/swagger.json", config={"app_name": "SelfPrivacy API"} ) -def create_app(): +def create_app(test_config=None): """Initiate Flask app and bind routes""" app = Flask(__name__) api = Api(app) - app.config["AUTH_TOKEN"] = os.environ.get("AUTH_TOKEN") - app.config["ENABLE_SWAGGER"] = os.environ.get("ENABLE_SWAGGER", "0") + if test_config is None: + app.config["AUTH_TOKEN"] = os.environ.get("AUTH_TOKEN") + if app.config["AUTH_TOKEN"] is None: + raise ValueError("AUTH_TOKEN is not set") + app.config["ENABLE_SWAGGER"] = os.environ.get("ENABLE_SWAGGER", "0") + app.config["B2_BUCKET"] = os.environ.get("B2_BUCKET") + else: + app.config.update(test_config) # Check bearer token @app.before_request @@ -49,7 +60,7 @@ def create_app(): def spec(): if app.config["ENABLE_SWAGGER"] == "1": swag = swagger(app) - swag["info"]["version"] = "1.0.0" + swag["info"]["version"] = "1.1.0" swag["info"]["title"] = "SelfPrivacy API" swag["info"]["description"] = "SelfPrivacy API" swag["securityDefinitions"] = { @@ -71,5 +82,8 @@ def create_app(): if __name__ == "__main__": + monkey.patch_all() created_app = create_app() + huey.start() + init_restic() created_app.run(port=5050, debug=False) diff --git a/selfprivacy_api/resources/common.py b/selfprivacy_api/resources/common.py index fa96a39..a9663aa 100644 --- a/selfprivacy_api/resources/common.py +++ b/selfprivacy_api/resources/common.py @@ -24,7 +24,7 @@ class ApiVersion(Resource): 401: description: Unauthorized """ - return {"version": "1.0.0"} + return {"version": "1.1.0"} class DecryptDisk(Resource): diff --git a/selfprivacy_api/resources/services/bitwarden.py b/selfprivacy_api/resources/services/bitwarden.py index 5c037c9..412ba8a 100644 --- a/selfprivacy_api/resources/services/bitwarden.py +++ b/selfprivacy_api/resources/services/bitwarden.py @@ -1,10 +1,9 @@ #!/usr/bin/env python3 """Bitwarden management module""" -import json -import portalocker from flask_restful import Resource from selfprivacy_api.resources.services import api +from selfprivacy_api.utils import WriteUserData class EnableBitwarden(Resource): @@ -24,20 +23,10 @@ class EnableBitwarden(Resource): 401: description: Unauthorized """ - with open( - "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" - ) as userdata_file: - portalocker.lock(userdata_file, portalocker.LOCK_EX) - try: - data = json.load(userdata_file) - if "bitwarden" not in data: - data["bitwarden"] = {} - data["bitwarden"]["enable"] = True - userdata_file.seek(0) - json.dump(data, userdata_file, indent=4) - userdata_file.truncate() - finally: - portalocker.unlock(userdata_file) + with WriteUserData() as data: + if "bitwarden" not in data: + data["bitwarden"] = {} + data["bitwarden"]["enable"] = True return { "status": 0, @@ -62,20 +51,10 @@ class DisableBitwarden(Resource): 401: description: Unauthorized """ - with open( - "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" - ) as userdata_file: - portalocker.lock(userdata_file, portalocker.LOCK_EX) - try: - data = json.load(userdata_file) - if "bitwarden" not in data: - data["bitwarden"] = {} - data["bitwarden"]["enable"] = False - userdata_file.seek(0) - json.dump(data, userdata_file, indent=4) - userdata_file.truncate() - finally: - portalocker.unlock(userdata_file) + with WriteUserData() as data: + if "bitwarden" not in data: + data["bitwarden"] = {} + data["bitwarden"]["enable"] = False return { "status": 0, diff --git a/selfprivacy_api/resources/services/gitea.py b/selfprivacy_api/resources/services/gitea.py index 4ae0b6a..bd4b8de 100644 --- a/selfprivacy_api/resources/services/gitea.py +++ b/selfprivacy_api/resources/services/gitea.py @@ -1,10 +1,9 @@ #!/usr/bin/env python3 """Gitea management module""" -import json -import portalocker from flask_restful import Resource from selfprivacy_api.resources.services import api +from selfprivacy_api.utils import WriteUserData class EnableGitea(Resource): @@ -24,20 +23,10 @@ class EnableGitea(Resource): 401: description: Unauthorized """ - with open( - "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" - ) as userdata_file: - portalocker.lock(userdata_file, portalocker.LOCK_EX) - try: - data = json.load(userdata_file) - if "gitea" not in data: - data["gitea"] = {} - data["gitea"]["enable"] = True - userdata_file.seek(0) - json.dump(data, userdata_file, indent=4) - userdata_file.truncate() - finally: - portalocker.unlock(userdata_file) + with WriteUserData() as data: + if "gitea" not in data: + data["gitea"] = {} + data["gitea"]["enable"] = True return { "status": 0, @@ -62,20 +51,10 @@ class DisableGitea(Resource): 401: description: Unauthorized """ - with open( - "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" - ) as userdata_file: - portalocker.lock(userdata_file, portalocker.LOCK_EX) - try: - data = json.load(userdata_file) - if "gitea" not in data: - data["gitea"] = {} - data["gitea"]["enable"] = False - userdata_file.seek(0) - json.dump(data, userdata_file, indent=4) - userdata_file.truncate() - finally: - portalocker.unlock(userdata_file) + with WriteUserData() as data: + if "gitea" not in data: + data["gitea"] = {} + data["gitea"]["enable"] = False return { "status": 0, diff --git a/selfprivacy_api/resources/services/mailserver.py b/selfprivacy_api/resources/services/mailserver.py index 4015f9a..1185d20 100644 --- a/selfprivacy_api/resources/services/mailserver.py +++ b/selfprivacy_api/resources/services/mailserver.py @@ -2,6 +2,7 @@ """Mail server management module""" import base64 import subprocess +import os from flask_restful import Resource from selfprivacy_api.resources.services import api @@ -25,15 +26,20 @@ class DKIMKey(Resource): description: DKIM key encoded in base64 401: description: Unauthorized + 404: + description: DKIM key not found """ domain = get_domain() - cat_process = subprocess.Popen( - ["cat", "/var/dkim/" + domain + ".selector.txt"], stdout=subprocess.PIPE - ) - dkim = cat_process.communicate()[0] - dkim = base64.b64encode(dkim) - dkim = str(dkim, "utf-8") - return dkim + + if os.path.exists("/var/dkim/" + domain + ".selector.txt"): + cat_process = subprocess.Popen( + ["cat", "/var/dkim/" + domain + ".selector.txt"], stdout=subprocess.PIPE + ) + dkim = cat_process.communicate()[0] + dkim = base64.b64encode(dkim) + dkim = str(dkim, "utf-8") + return dkim + return "DKIM file not found", 404 api.add_resource(DKIMKey, "/mailserver/dkim") diff --git a/selfprivacy_api/resources/services/nextcloud.py b/selfprivacy_api/resources/services/nextcloud.py index fc0bdbe..3aa9d06 100644 --- a/selfprivacy_api/resources/services/nextcloud.py +++ b/selfprivacy_api/resources/services/nextcloud.py @@ -1,10 +1,9 @@ #!/usr/bin/env python3 """Nextcloud management module""" -import json -import portalocker from flask_restful import Resource from selfprivacy_api.resources.services import api +from selfprivacy_api.utils import WriteUserData class EnableNextcloud(Resource): @@ -24,20 +23,10 @@ class EnableNextcloud(Resource): 401: description: Unauthorized """ - with open( - "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" - ) as userdata_file: - portalocker.lock(userdata_file, portalocker.LOCK_EX) - try: - data = json.load(userdata_file) - if "nextcloud" not in data: - data["nextcloud"] = {} - data["nextcloud"]["enable"] = True - userdata_file.seek(0) - json.dump(data, userdata_file, indent=4) - userdata_file.truncate() - finally: - portalocker.unlock(userdata_file) + with WriteUserData() as data: + if "nextcloud" not in data: + data["nextcloud"] = {} + data["nextcloud"]["enable"] = True return { "status": 0, @@ -62,20 +51,10 @@ class DisableNextcloud(Resource): 401: description: Unauthorized """ - with open( - "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" - ) as userdata_file: - portalocker.lock(userdata_file, portalocker.LOCK_EX) - try: - data = json.load(userdata_file) - if "nextcloud" not in data: - data["nextcloud"] = {} - data["nextcloud"]["enable"] = False - userdata_file.seek(0) - json.dump(data, userdata_file, indent=4) - userdata_file.truncate() - finally: - portalocker.unlock(userdata_file) + with WriteUserData() as data: + if "nextcloud" not in data: + data["nextcloud"] = {} + data["nextcloud"]["enable"] = False return { "status": 0, diff --git a/selfprivacy_api/resources/services/ocserv.py b/selfprivacy_api/resources/services/ocserv.py index 6ef5667..4dc83da 100644 --- a/selfprivacy_api/resources/services/ocserv.py +++ b/selfprivacy_api/resources/services/ocserv.py @@ -1,10 +1,9 @@ #!/usr/bin/env python3 """OpenConnect VPN server management module""" -import json -import portalocker from flask_restful import Resource from selfprivacy_api.resources.services import api +from selfprivacy_api.utils import WriteUserData class EnableOcserv(Resource): @@ -24,20 +23,10 @@ class EnableOcserv(Resource): 401: description: Unauthorized """ - with open( - "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" - ) as userdata_file: - portalocker.lock(userdata_file, portalocker.LOCK_EX) - try: - data = json.load(userdata_file) - if "ocserv" not in data: - data["ocserv"] = {} - data["ocserv"]["enable"] = True - userdata_file.seek(0) - json.dump(data, userdata_file, indent=4) - userdata_file.truncate() - finally: - portalocker.unlock(userdata_file) + with WriteUserData() as data: + if "ocserv" not in data: + data["ocserv"] = {} + data["ocserv"]["enable"] = True return { "status": 0, @@ -62,20 +51,10 @@ class DisableOcserv(Resource): 401: description: Unauthorized """ - with open( - "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" - ) as userdata_file: - portalocker.lock(userdata_file, portalocker.LOCK_EX) - try: - data = json.load(userdata_file) - if "ocserv" not in data: - data["ocserv"] = {} - data["ocserv"]["enable"] = False - userdata_file.seek(0) - json.dump(data, userdata_file, indent=4) - userdata_file.truncate() - finally: - portalocker.unlock(userdata_file) + with WriteUserData() as data: + if "ocserv" not in data: + data["ocserv"] = {} + data["ocserv"]["enable"] = False return { "status": 0, diff --git a/selfprivacy_api/resources/services/pleroma.py b/selfprivacy_api/resources/services/pleroma.py index 201a5a6..aaf08f0 100644 --- a/selfprivacy_api/resources/services/pleroma.py +++ b/selfprivacy_api/resources/services/pleroma.py @@ -1,10 +1,9 @@ #!/usr/bin/env python3 """Pleroma management module""" -import json -import portalocker from flask_restful import Resource from selfprivacy_api.resources.services import api +from selfprivacy_api.utils import WriteUserData class EnablePleroma(Resource): @@ -24,20 +23,10 @@ class EnablePleroma(Resource): 401: description: Unauthorized """ - with open( - "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" - ) as userdata_file: - portalocker.lock(userdata_file, portalocker.LOCK_EX) - try: - data = json.load(userdata_file) - if "pleroma" not in data: - data["pleroma"] = {} - data["pleroma"]["enable"] = True - userdata_file.seek(0) - json.dump(data, userdata_file, indent=4) - userdata_file.truncate() - finally: - portalocker.unlock(userdata_file) + with WriteUserData() as data: + if "pleroma" not in data: + data["pleroma"] = {} + data["pleroma"]["enable"] = True return { "status": 0, @@ -62,20 +51,10 @@ class DisablePleroma(Resource): 401: description: Unauthorized """ - with open( - "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" - ) as userdata_file: - portalocker.lock(userdata_file, portalocker.LOCK_EX) - try: - data = json.load(userdata_file) - if "pleroma" not in data: - data["pleroma"] = {} - data["pleroma"]["enable"] = False - userdata_file.seek(0) - json.dump(data, userdata_file, indent=4) - userdata_file.truncate() - finally: - portalocker.unlock(userdata_file) + with WriteUserData() as data: + if "pleroma" not in data: + data["pleroma"] = {} + data["pleroma"]["enable"] = False return { "status": 0, diff --git a/selfprivacy_api/resources/services/restic.py b/selfprivacy_api/resources/services/restic.py index 863d090..64ce2a8 100644 --- a/selfprivacy_api/resources/services/restic.py +++ b/selfprivacy_api/resources/services/restic.py @@ -1,11 +1,11 @@ #!/usr/bin/env python3 """Backups management module""" -import json -import subprocess -from flask import request -from flask_restful import Resource +from flask_restful import Resource, reqparse from selfprivacy_api.resources.services import api +from selfprivacy_api.utils import WriteUserData +from selfprivacy_api.restic_controller import tasks as restic_tasks +from selfprivacy_api.restic_controller import ResticController, ResticStates class ListAllBackups(Resource): @@ -27,25 +27,9 @@ class ListAllBackups(Resource): 401: description: Unauthorized """ - repository_name = request.headers.get("X-Repository-Name") - backup_listing_command = [ - "restic", - "-r", - f"rclone:backblaze:{repository_name}:/sfbackup", - "snapshots", - "--json", - ] - - with subprocess.Popen( - backup_listing_command, - shell=False, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - ) as backup_listing_process_descriptor: - snapshots_list = backup_listing_process_descriptor.communicate()[0] - - return snapshots_list.decode("utf-8") + restic = ResticController() + return restic.snapshot_list class AsyncCreateBackup(Resource): @@ -66,24 +50,17 @@ class AsyncCreateBackup(Resource): description: Bad request 401: description: Unauthorized + 409: + description: Backup already in progress """ - repository_name = request.headers.get("X-Repository-Name") - - backup_command = [ - "restic", - "-r", - f"rclone:backblaze:{repository_name}:/sfbackup", - "--verbose", - "--json", - "backup", - "/var", - ] - - with open("/tmp/backup.log", "w", encoding="utf-8") as log_file: - subprocess.Popen( - backup_command, shell=False, stdout=log_file, stderr=subprocess.STDOUT - ) - + restic = ResticController() + if restic.state is ResticStates.NO_KEY: + return {"error": "No key provided"}, 400 + if restic.state is ResticStates.INITIALIZING: + return {"error": "Backup is initializing"}, 400 + if restic.state is ResticStates.BACKING_UP: + return {"error": "Backup is already running"}, 409 + restic_tasks.start_backup() return { "status": 0, "message": "Backup creation has started", @@ -109,23 +86,39 @@ class CheckBackupStatus(Resource): 401: description: Unauthorized """ - backup_status_check_command = ["tail", "-1", "/tmp/backup.log"] + restic = ResticController() - with subprocess.Popen( - backup_status_check_command, - shell=False, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - ) as backup_status_check_process_descriptor: - backup_process_status = ( - backup_status_check_process_descriptor.communicate()[0].decode("utf-8") - ) + return { + "status": restic.state.name, + "progress": restic.progress, + "error_message": restic.error_message, + } - try: - json.loads(backup_process_status) - except ValueError: - return {"message": backup_process_status} - return backup_process_status + +class ForceReloadSnapshots(Resource): + """Force reload snapshots""" + + def get(self): + """ + Force reload snapshots + --- + tags: + - Backups + security: + - bearerAuth: [] + responses: + 200: + description: Snapshots reloaded + 400: + description: Bad request + 401: + description: Unauthorized + """ + restic_tasks.load_snapshots() + return { + "status": 0, + "message": "Snapshots reload started", + } class AsyncRestoreBackup(Resource): @@ -139,6 +132,18 @@ class AsyncRestoreBackup(Resource): - Backups security: - bearerAuth: [] + parameters: + - in: body + required: true + name: backup + description: Backup to restore + schema: + type: object + required: + - backupId + properties: + backupId: + type: string responses: 200: description: Backup restoration process started @@ -147,26 +152,88 @@ class AsyncRestoreBackup(Resource): 401: description: Unauthorized """ - backup_restoration_command = ["restic", "-r", "rclone:backblaze:sfbackup", "var", "--json"] + parser = reqparse.RequestParser() + parser.add_argument("backupId", type=str, required=True) + args = parser.parse_args() - with open("/tmp/backup.log", "w", encoding="utf-8") as backup_log_file_descriptor: - with subprocess.Popen( - backup_restoration_command, - shell=False, - stdout=subprocess.PIPE, - stderr=backup_log_file_descriptor, - ) as backup_restoration_process_descriptor: - backup_restoration_status = ( - "Backup restoration procedure started" - ) - - return { - "status": 0, - "message": backup_restoration_status - } + restic = ResticController() + if restic.state is ResticStates.NO_KEY: + return {"error": "No key provided"}, 400 + if restic.state is ResticStates.NOT_INITIALIZED: + return {"error": "Repository is not initialized"}, 400 + if restic.state is ResticStates.BACKING_UP: + return {"error": "Backup is already running"}, 409 + if restic.state is ResticStates.INITIALIZING: + return {"error": "Repository is initializing"}, 400 + if restic.state is ResticStates.RESTORING: + return {"error": "Restore is already running"}, 409 + for backup in restic.snapshot_list: + if backup["short_id"] == args["backupId"]: + restic_tasks.restore_from_backup(args["backupId"]) + return { + "status": 0, + "message": "Backup restoration procedure started", + } + + return {"error": "Backup not found"}, 404 + + +class BackblazeConfig(Resource): + """Backblaze config""" + + def put(self): + """ + Set the new key for backblaze + --- + tags: + - Backups + security: + - bearerAuth: [] + parameters: + - in: body + required: true + name: backblazeSettings + description: New Backblaze settings + schema: + type: object + required: + - accountId + - accountKey + - bucket + properties: + accountId: + type: string + accountKey: + type: string + bucket: + type: string + responses: + 200: + description: New Backblaze settings + 400: + description: Bad request + 401: + description: Unauthorized + """ + parser = reqparse.RequestParser() + parser.add_argument("accountId", type=str, required=True) + parser.add_argument("accountKey", type=str, required=True) + parser.add_argument("bucket", type=str, required=True) + args = parser.parse_args() + + with WriteUserData() as data: + data["backblaze"]["accountId"] = args["accountId"] + data["backblaze"]["accountKey"] = args["accountKey"] + data["backblaze"]["bucket"] = args["bucket"] + + restic_tasks.update_keys_from_userdata() + + return "New Backblaze settings saved" api.add_resource(ListAllBackups, "/restic/backup/list") api.add_resource(AsyncCreateBackup, "/restic/backup/create") api.add_resource(CheckBackupStatus, "/restic/backup/status") api.add_resource(AsyncRestoreBackup, "/restic/backup/restore") +api.add_resource(BackblazeConfig, "/restic/backblaze/config") +api.add_resource(ForceReloadSnapshots, "/restic/backup/reload") diff --git a/selfprivacy_api/resources/services/ssh.py b/selfprivacy_api/resources/services/ssh.py index 86ecc90..2b90087 100644 --- a/selfprivacy_api/resources/services/ssh.py +++ b/selfprivacy_api/resources/services/ssh.py @@ -1,10 +1,9 @@ #!/usr/bin/env python3 """SSH management module""" -import json -import portalocker from flask_restful import Resource, reqparse from selfprivacy_api.resources.services import api +from selfprivacy_api.utils import WriteUserData, ReadUserData, validate_ssh_public_key class EnableSSH(Resource): @@ -24,20 +23,10 @@ class EnableSSH(Resource): 401: description: Unauthorized """ - with open( - "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" - ) as userdata_file: - portalocker.lock(userdata_file, portalocker.LOCK_EX) - try: - data = json.load(userdata_file) - if "ssh" not in data: - data["ssh"] = {} - data["ssh"]["enable"] = True - userdata_file.seek(0) - json.dump(data, userdata_file, indent=4) - userdata_file.truncate() - finally: - portalocker.unlock(userdata_file) + with WriteUserData() as data: + if "ssh" not in data: + data["ssh"] = {} + data["ssh"]["enable"] = True return { "status": 0, @@ -45,6 +34,82 @@ class EnableSSH(Resource): } +class SSHSettings(Resource): + """Enable/disable SSH""" + + def get(self): + """ + Get current SSH settings + --- + tags: + - SSH + security: + - bearerAuth: [] + responses: + 200: + description: SSH settings + 400: + description: Bad request + """ + with ReadUserData() as data: + if "ssh" not in data: + return {"enable": True, "passwordAuthentication": True} + if "enable" not in data["ssh"]: + data["ssh"]["enable"] = True + if "passwordAuthentication" not in data["ssh"]: + data["ssh"]["passwordAuthentication"] = True + return { + "enable": data["ssh"]["enable"], + "passwordAuthentication": data["ssh"]["passwordAuthentication"], + } + + def put(self): + """ + Change SSH settings + --- + tags: + - SSH + security: + - bearerAuth: [] + parameters: + - name: sshSettings + in: body + required: true + description: SSH settings + schema: + type: object + required: + - enable + - passwordAuthentication + properties: + enable: + type: boolean + passwordAuthentication: + type: boolean + responses: + 200: + description: New settings saved + 400: + description: Bad request + """ + parser = reqparse.RequestParser() + parser.add_argument("enable", type=bool, required=False) + parser.add_argument("passwordAuthentication", type=bool, required=False) + args = parser.parse_args() + enable = args["enable"] + password_authentication = args["passwordAuthentication"] + + with WriteUserData() as data: + if "ssh" not in data: + data["ssh"] = {} + if enable is not None: + data["ssh"]["enable"] = enable + if password_authentication is not None: + data["ssh"]["passwordAuthentication"] = password_authentication + + return "SSH settings changed" + + class WriteSSHKey(Resource): """Write new SSH key""" @@ -89,28 +154,23 @@ class WriteSSHKey(Resource): public_key = args["public_key"] - with open( - "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" - ) as userdata_file: - portalocker.lock(userdata_file, portalocker.LOCK_EX) - try: - data = json.load(userdata_file) - if "ssh" not in data: - data["ssh"] = {} - if "rootKeys" not in data["ssh"]: - data["ssh"]["rootKeys"] = [] - # Return 409 if key already in array - for key in data["ssh"]["rootKeys"]: - if key == public_key: - return { - "error": "Key already exists", - }, 409 - data["ssh"]["rootKeys"].append(public_key) - userdata_file.seek(0) - json.dump(data, userdata_file, indent=4) - userdata_file.truncate() - finally: - portalocker.unlock(userdata_file) + if not validate_ssh_public_key(public_key): + return { + "error": "Invalid key type. Only ssh-ed25519 and ssh-rsa are supported.", + }, 400 + + with WriteUserData() as data: + if "ssh" not in data: + data["ssh"] = {} + if "rootKeys" not in data["ssh"]: + data["ssh"]["rootKeys"] = [] + # Return 409 if key already in array + for key in data["ssh"]["rootKeys"]: + if key == public_key: + return { + "error": "Key already exists", + }, 409 + data["ssh"]["rootKeys"].append(public_key) return { "status": 0, @@ -118,5 +178,228 @@ class WriteSSHKey(Resource): }, 201 +class SSHKeys(Resource): + """List SSH keys""" + + def get(self, username): + """ + List SSH keys + --- + tags: + - SSH + security: + - bearerAuth: [] + parameters: + - in: path + name: username + type: string + required: true + description: User to list keys for + responses: + 200: + description: SSH keys + 401: + description: Unauthorized + """ + with ReadUserData() as data: + if username == "root": + if "ssh" not in data: + data["ssh"] = {} + if "rootKeys" not in data["ssh"]: + data["ssh"]["rootKeys"] = [] + return data["ssh"]["rootKeys"] + if username == data["username"]: + if "sshKeys" not in data: + data["sshKeys"] = [] + return data["sshKeys"] + else: + if "users" not in data: + data["users"] = [] + for user in data["users"]: + if user["name"] == username: + if "sshKeys" not in user: + user["sshKeys"] = [] + return user["ssh"]["sshKeys"] + return { + "error": "User not found", + }, 404 + + def post(self, username): + """ + Add SSH key to the user + --- + tags: + - SSH + security: + - bearerAuth: [] + parameters: + - in: body + required: true + name: public_key + schema: + type: object + required: + - public_key + properties: + public_key: + type: string + - in: path + name: username + type: string + required: true + description: User to add keys for + responses: + 201: + description: SSH key added + 401: + description: Unauthorized + 404: + description: User not found + 409: + description: Key already exists + """ + parser = reqparse.RequestParser() + parser.add_argument( + "public_key", type=str, required=True, help="Key cannot be blank!" + ) + args = parser.parse_args() + + if username == "root": + return { + "error": "Use /ssh/key/send to add root keys", + }, 400 + + if not validate_ssh_public_key(args["public_key"]): + return { + "error": "Invalid key type. Only ssh-ed25519 and ssh-rsa are supported.", + }, 400 + + with WriteUserData() as data: + if username == data["username"]: + if "sshKeys" not in data: + data["sshKeys"] = [] + # Return 409 if key already in array + for key in data["sshKeys"]: + if key == args["public_key"]: + return { + "error": "Key already exists", + }, 409 + data["sshKeys"].append(args["public_key"]) + return { + "message": "New SSH key successfully written", + }, 201 + + if "users" not in data: + data["users"] = [] + for user in data["users"]: + if user["username"] == username: + if "sshKeys" not in user: + user["sshKeys"] = [] + # Return 409 if key already in array + for key in user["sshKeys"]: + if key == args["public_key"]: + return { + "error": "Key already exists", + }, 409 + user["sshKeys"].append(args["public_key"]) + return { + "message": "New SSH key successfully written", + }, 201 + return { + "error": "User not found", + }, 404 + + def delete(self, username): + """ + Delete SSH key + --- + tags: + - SSH + security: + - bearerAuth: [] + parameters: + - in: body + name: public_key + required: true + description: Key to delete + schema: + type: object + required: + - public_key + properties: + public_key: + type: string + - in: path + name: username + type: string + required: true + description: User to delete keys for + responses: + 200: + description: SSH key deleted + 401: + description: Unauthorized + 404: + description: Key not found + """ + parser = reqparse.RequestParser() + parser.add_argument( + "public_key", type=str, required=True, help="Key cannot be blank!" + ) + args = parser.parse_args() + + with WriteUserData() as data: + if username == "root": + if "ssh" not in data: + data["ssh"] = {} + if "rootKeys" not in data["ssh"]: + data["ssh"]["rootKeys"] = [] + # Return 404 if key not in array + for key in data["ssh"]["rootKeys"]: + if key == args["public_key"]: + data["ssh"]["rootKeys"].remove(key) + return { + "message": "SSH key deleted", + }, 200 + return { + "error": "Key not found", + }, 404 + if username == data["username"]: + if "sshKeys" not in data: + data["sshKeys"] = [] + # Return 404 if key not in array + for key in data["sshKeys"]: + if key == args["public_key"]: + data["sshKeys"].remove(key) + return { + "message": "SSH key deleted", + }, 200 + return { + "error": "Key not found", + }, 404 + if "users" not in data: + data["users"] = [] + for user in data["users"]: + if user["username"] == username: + if "sshKeys" not in user: + user["sshKeys"] = [] + # Return 404 if key not in array + for key in user["sshKeys"]: + if key == args["public_key"]: + user["sshKeys"].remove(key) + return { + "message": "SSH key successfully deleted", + }, 200 + return { + "error": "Key not found", + }, 404 + return { + "error": "User not found", + }, 404 + + api.add_resource(EnableSSH, "/ssh/enable") +api.add_resource(SSHSettings, "/ssh") + api.add_resource(WriteSSHKey, "/ssh/key/send") +api.add_resource(SSHKeys, "/ssh/keys/") diff --git a/selfprivacy_api/resources/services/update.py b/selfprivacy_api/resources/services/update.py deleted file mode 100644 index 1d15fbe..0000000 --- a/selfprivacy_api/resources/services/update.py +++ /dev/null @@ -1,59 +0,0 @@ -#!/usr/bin/env/python3 -"""Update dispatch module""" -import os -import subprocess -from flask_restful import Resource, reqparse - -from selfprivacy_api.resources.services import api - - -class PullRepositoryChanges(Resource): - def get(self): - """ - Pull Repository Changes - --- - tags: - - Update - security: - - bearerAuth: [] - responses: - 200: - description: Got update - 201: - description: Nothing to update - 401: - description: Unauthorized - 500: - description: Something went wrong - """ - - git_pull_command = ["git", "pull"] - - current_working_directory = os.getcwd() - os.chdir("/etc/nixos") - - - git_pull_process_descriptor = subprocess.Popen( - git_pull_command, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - shell=False - ) - - - git_pull_process_descriptor.communicate()[0] - - os.chdir(current_working_directory) - - if git_pull_process_descriptor.returncode == 0: - return { - "status": 0, - "message": "Update completed successfully" - } - elif git_pull_process_descriptor.returncode > 0: - return { - "status": git_pull_process_descriptor.returncode, - "message": "Something went wrong" - }, 500 - -api.add_resource(PullRepositoryChanges, "/update") diff --git a/selfprivacy_api/resources/system.py b/selfprivacy_api/resources/system.py index 7c3ad77..5dbc858 100644 --- a/selfprivacy_api/resources/system.py +++ b/selfprivacy_api/resources/system.py @@ -1,13 +1,150 @@ #!/usr/bin/env python3 """System management module""" +import os import subprocess +import pytz from flask import Blueprint -from flask_restful import Resource, Api +from flask_restful import Resource, Api, reqparse + +from selfprivacy_api.utils import WriteUserData, ReadUserData api_system = Blueprint("system", __name__, url_prefix="/system") api = Api(api_system) +class Timezone(Resource): + """Change timezone of NixOS""" + + def get(self): + """ + Get current system timezone + --- + tags: + - System + security: + - bearerAuth: [] + responses: + 200: + description: Timezone + 400: + description: Bad request + """ + with ReadUserData() as data: + if "timezone" not in data: + return "Europe/Uzhgorod" + return data["timezone"] + + def put(self): + """ + Change system timezone + --- + tags: + - System + security: + - bearerAuth: [] + parameters: + - name: timezone + in: body + required: true + description: Timezone to set + schema: + type: object + required: + - timezone + properties: + timezone: + type: string + responses: + 200: + description: Timezone changed + 400: + description: Bad request + """ + parser = reqparse.RequestParser() + parser.add_argument("timezone", type=str, required=True) + timezone = parser.parse_args()["timezone"] + + # Check if timezone is a valid tzdata string + if timezone not in pytz.all_timezones: + return {"error": "Invalid timezone"}, 400 + + with WriteUserData() as data: + data["timezone"] = timezone + return "Timezone changed" + + +class AutoUpgrade(Resource): + """Enable/disable automatic upgrades and reboots""" + + def get(self): + """ + Get current system autoupgrade settings + --- + tags: + - System + security: + - bearerAuth: [] + responses: + 200: + description: Auto-upgrade settings + 400: + description: Bad request + """ + with ReadUserData() as data: + if "autoUpgrade" not in data: + return {"enable": True, "allowReboot": False} + if "enable" not in data["autoUpgrade"]: + data["autoUpgrade"]["enable"] = True + if "allowReboot" not in data["autoUpgrade"]: + data["autoUpgrade"]["allowReboot"] = False + return data["autoUpgrade"] + + def put(self): + """ + Change system auto upgrade settings + --- + tags: + - System + security: + - bearerAuth: [] + parameters: + - name: autoUpgrade + in: body + required: true + description: Auto upgrade settings + schema: + type: object + required: + - enable + - allowReboot + properties: + enable: + type: boolean + allowReboot: + type: boolean + responses: + 200: + description: New settings saved + 400: + description: Bad request + """ + parser = reqparse.RequestParser() + parser.add_argument("enable", type=bool, required=False) + parser.add_argument("allowReboot", type=bool, required=False) + args = parser.parse_args() + enable = args["enable"] + allow_reboot = args["allowReboot"] + + with WriteUserData() as data: + if "autoUpgrade" not in data: + data["autoUpgrade"] = {} + if enable is not None: + data["autoUpgrade"]["enable"] = enable + if allow_reboot is not None: + data["autoUpgrade"]["allowReboot"] = allow_reboot + return "Auto-upgrade settings changed" + + class RebuildSystem(Resource): """Rebuild NixOS""" @@ -145,9 +282,62 @@ class PythonVersion(Resource): return subprocess.check_output(["python", "-V"]).decode("utf-8").strip() +class PullRepositoryChanges(Resource): + def get(self): + """ + Pull Repository Changes + --- + tags: + - System + security: + - bearerAuth: [] + responses: + 200: + description: Got update + 201: + description: Nothing to update + 401: + description: Unauthorized + 500: + description: Something went wrong + """ + + git_pull_command = ["git", "pull"] + + current_working_directory = os.getcwd() + os.chdir("/etc/nixos") + + git_pull_process_descriptor = subprocess.Popen( + git_pull_command, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + shell=False, + ) + + data = git_pull_process_descriptor.communicate()[0].decode("utf-8") + + os.chdir(current_working_directory) + + if git_pull_process_descriptor.returncode == 0: + return { + "status": 0, + "message": "Update completed successfully", + "data": data, + } + elif git_pull_process_descriptor.returncode > 0: + return { + "status": git_pull_process_descriptor.returncode, + "message": "Something went wrong", + "data": data, + }, 500 + + +api.add_resource(Timezone, "/configuration/timezone") +api.add_resource(AutoUpgrade, "/configuration/autoUpgrade") api.add_resource(RebuildSystem, "/configuration/apply") api.add_resource(RollbackSystem, "/configuration/rollback") api.add_resource(UpgradeSystem, "/configuration/upgrade") api.add_resource(RebootSystem, "/reboot") api.add_resource(SystemVersion, "/version") api.add_resource(PythonVersion, "/pythonVersion") +api.add_resource(PullRepositoryChanges, "/configuration/pull") diff --git a/selfprivacy_api/resources/users.py b/selfprivacy_api/resources/users.py index 2f373bc..057a5e3 100644 --- a/selfprivacy_api/resources/users.py +++ b/selfprivacy_api/resources/users.py @@ -1,11 +1,11 @@ #!/usr/bin/env python3 """Users management module""" import subprocess -import json import re -import portalocker from flask_restful import Resource, reqparse +from selfprivacy_api.utils import WriteUserData, ReadUserData + class Users(Resource): """Users management""" @@ -24,17 +24,10 @@ class Users(Resource): 401: description: Unauthorized """ - with open( - "/etc/nixos/userdata/userdata.json", "r", encoding="utf-8" - ) as userdata_file: - portalocker.lock(userdata_file, portalocker.LOCK_SH) - try: - data = json.load(userdata_file) - users = [] - for user in data["users"]: - users.append(user["username"]) - finally: - portalocker.unlock(userdata_file) + with ReadUserData() as data: + users = [] + for user in data["users"]: + users.append(user["username"]) return users def post(self): @@ -97,32 +90,21 @@ class Users(Resource): if len(args["username"]) > 32: return {"error": "username must be less than 32 characters"}, 400 - with open( - "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" - ) as userdata_file: - portalocker.lock(userdata_file, portalocker.LOCK_EX) - try: - data = json.load(userdata_file) + with WriteUserData() as data: + if "users" not in data: + data["users"] = [] - if "users" not in data: - data["users"] = [] + # Return 400 if user already exists + for user in data["users"]: + if user["username"] == args["username"]: + return {"error": "User already exists"}, 409 - # Return 400 if user already exists - for user in data["users"]: - if user["username"] == args["username"]: - return {"error": "User already exists"}, 409 - - data["users"].append( - { - "username": args["username"], - "hashedPassword": hashed_password, - } - ) - userdata_file.seek(0) - json.dump(data, userdata_file, indent=4) - userdata_file.truncate() - finally: - portalocker.unlock(userdata_file) + data["users"].append( + { + "username": args["username"], + "hashedPassword": hashed_password, + } + ) return {"result": 0, "username": args["username"]}, 201 @@ -154,29 +136,18 @@ class User(Resource): 404: description: User not found """ - with open( - "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" - ) as userdata_file: - portalocker.lock(userdata_file, portalocker.LOCK_EX) - try: - data = json.load(userdata_file) - # Return 400 if username is not provided - if username is None: - return {"error": "username is required"}, 400 - if username == data["username"]: - return {"error": "Cannot delete root user"}, 400 - # Return 400 if user does not exist - for user in data["users"]: - if user["username"] == username: - data["users"].remove(user) - break - else: - return {"error": "User does not exist"}, 404 - - userdata_file.seek(0) - json.dump(data, userdata_file, indent=4) - userdata_file.truncate() - finally: - portalocker.unlock(userdata_file) + with WriteUserData() as data: + # Return 400 if username is not provided + if username is None: + return {"error": "username is required"}, 400 + if username == data["username"]: + return {"error": "Cannot delete root user"}, 400 + # Return 400 if user does not exist + for user in data["users"]: + if user["username"] == username: + data["users"].remove(user) + break + else: + return {"error": "User does not exist"}, 404 return {"result": 0, "username": username} diff --git a/selfprivacy_api/restic_controller/__init__.py b/selfprivacy_api/restic_controller/__init__.py new file mode 100644 index 0000000..cefef53 --- /dev/null +++ b/selfprivacy_api/restic_controller/__init__.py @@ -0,0 +1,261 @@ +"""Restic singleton controller.""" +from datetime import datetime +import json +import subprocess +import os +from threading import Lock +from enum import Enum +import portalocker +from selfprivacy_api.utils import ReadUserData + + +class ResticStates(Enum): + """Restic states enum.""" + + NO_KEY = 0 + NOT_INITIALIZED = 1 + INITIALIZED = 2 + BACKING_UP = 3 + RESTORING = 4 + ERROR = 5 + INITIALIZING = 6 + + +class ResticController: + """ + States in wich the restic_controller may be + - no backblaze key + - backblaze key is provided, but repository is not initialized + - backblaze key is provided, repository is initialized + - fetching list of snapshots + - creating snapshot, current progress can be retrieved + - recovering from snapshot + + Any ongoing operation acquires the lock + Current state can be fetched with get_state() + """ + + _instance = None + _lock = Lock() + _initialized = False + + def __new__(cls): + print("new is called!") + if not cls._instance: + with cls._lock: + cls._instance = super(ResticController, cls).__new__(cls) + return cls._instance + + def __init__(self): + if self._initialized: + return + self.state = ResticStates.NO_KEY + self.lock = False + self.progress = 0 + self._backblaze_account = None + self._backblaze_key = None + self._repository_name = None + self.snapshot_list = [] + self.error_message = None + self._initialized = True + print("init is called!") + self.load_configuration() + self.write_rclone_config() + self.load_snapshots() + + def load_configuration(self): + """Load current configuration from user data to singleton.""" + with ReadUserData() as user_data: + self._backblaze_account = user_data["backblaze"]["accountId"] + self._backblaze_key = user_data["backblaze"]["accountKey"] + self._repository_name = user_data["backblaze"]["bucket"] + if self._backblaze_account and self._backblaze_key and self._repository_name: + self.state = ResticStates.INITIALIZING + else: + self.state = ResticStates.NO_KEY + + def write_rclone_config(self): + """ + Open /root/.config/rclone/rclone.conf with portalocker + and write configuration in the following format: + [backblaze] + type = b2 + account = {self.backblaze_account} + key = {self.backblaze_key} + """ + with portalocker.Lock( + "/root/.config/rclone/rclone.conf", "w", timeout=None + ) as rclone_config: + rclone_config.write( + f"[backblaze]\n" + f"type = b2\n" + f"account = {self._backblaze_account}\n" + f"key = {self._backblaze_key}\n" + ) + + def load_snapshots(self): + """ + Load list of snapshots from repository + """ + backup_listing_command = [ + "restic", + "-o", + "rclone.args=serve restic --stdio", + "-r", + f"rclone:backblaze:{self._repository_name}/sfbackup", + "snapshots", + "--json", + ] + + if ( + self.state == ResticStates.BACKING_UP + or self.state == ResticStates.RESTORING + ): + return + print("preparing to read snapshots") + with subprocess.Popen( + backup_listing_command, + shell=False, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) as backup_listing_process_descriptor: + snapshots_list = backup_listing_process_descriptor.communicate()[0].decode( + "utf-8" + ) + try: + starting_index = snapshots_list.find("[") + json.loads(snapshots_list[starting_index:]) + self.snapshot_list = json.loads(snapshots_list[starting_index:]) + self.state = ResticStates.INITIALIZED + print(snapshots_list) + except ValueError: + if "Is there a repository at the following location?" in snapshots_list: + self.state = ResticStates.NOT_INITIALIZED + return + else: + self.state = ResticStates.ERROR + self.error_message = snapshots_list + return + + def initialize_repository(self): + """ + Initialize repository with restic + """ + initialize_repository_command = [ + "restic", + "-o", + "rclone.args=serve restic --stdio", + "-r", + f"rclone:backblaze:{self._repository_name}/sfbackup", + "init", + ] + with subprocess.Popen( + initialize_repository_command, + shell=False, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) as initialize_repository_process_descriptor: + msg = initialize_repository_process_descriptor.communicate()[0].decode( + "utf-8" + ) + if initialize_repository_process_descriptor.returncode == 0: + self.state = ResticStates.INITIALIZED + else: + self.state = ResticStates.ERROR + self.error_message = msg + + self.state = ResticStates.INITIALIZED + + def start_backup(self): + """ + Start backup with restic + """ + backup_command = [ + "restic", + "-o", + "rclone.args=serve restic --stdio", + "-r", + f"rclone:backblaze:{self._repository_name}/sfbackup", + "--verbose", + "--json", + "backup", + "/var", + ] + with open("/tmp/backup.log", "w", encoding="utf-8") as log_file: + subprocess.Popen( + backup_command, + shell=False, + stdout=log_file, + stderr=subprocess.STDOUT, + ) + + self.state = ResticStates.BACKING_UP + self.progress = 0 + + def check_progress(self): + """ + Check progress of ongoing backup operation + """ + backup_status_check_command = ["tail", "-1", "/tmp/backup.log"] + + if ( + self.state == ResticStates.NO_KEY + or self.state == ResticStates.NOT_INITIALIZED + ): + return + + # If the log file does not exists + if os.path.exists("/tmp/backup.log") is False: + self.state = ResticStates.INITIALIZED + + with subprocess.Popen( + backup_status_check_command, + shell=False, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) as backup_status_check_process_descriptor: + backup_process_status = ( + backup_status_check_process_descriptor.communicate()[0].decode("utf-8") + ) + + try: + status = json.loads(backup_process_status) + except ValueError: + print(backup_process_status) + self.error_message = backup_process_status + return + if status["message_type"] == "status": + self.progress = status["percent_done"] + self.state = ResticStates.BACKING_UP + elif status["message_type"] == "summary": + self.state = ResticStates.INITIALIZED + self.progress = 0 + self.snapshot_list.append( + { + "short_id": status["snapshot_id"], + # Current time in format 2021-12-02T00:02:51.086452543+03:00 + "time": datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f%z"), + } + ) + + def restore_from_backup(self, snapshot_id): + """ + Restore from backup with restic + """ + backup_restoration_command = [ + "restic", + "-o", + "rclone.args=serve restic --stdio", + "-r", + f"rclone:backblaze:{self._repository_name}/sfbackup", + "restore", + snapshot_id, + "--target", + "/", + ] + + self.state = ResticStates.RESTORING + + subprocess.run(backup_restoration_command, shell=False) + + self.state = ResticStates.INITIALIZED diff --git a/selfprivacy_api/restic_controller/tasks.py b/selfprivacy_api/restic_controller/tasks.py new file mode 100644 index 0000000..4c610c4 --- /dev/null +++ b/selfprivacy_api/restic_controller/tasks.py @@ -0,0 +1,72 @@ +"""Tasks for the restic controller.""" +from huey import crontab +from huey.contrib.mini import MiniHuey +from . import ResticController, ResticStates + +huey = MiniHuey() + + +@huey.task() +def init_restic(): + controller = ResticController() + if controller.state == ResticStates.NOT_INITIALIZED: + initialize_repository() + + +@huey.task() +def update_keys_from_userdata(): + controller = ResticController() + controller.load_configuration() + controller.write_rclone_config() + initialize_repository() + + +# Check every morning at 5:00 AM +@huey.task(crontab(hour=5, minute=0)) +def cron_load_snapshots(): + controller = ResticController() + controller.load_snapshots() + + +# Check every morning at 5:00 AM +@huey.task() +def load_snapshots(): + controller = ResticController() + controller.load_snapshots() + if controller.state == ResticStates.NOT_INITIALIZED: + load_snapshots.schedule(delay=120) + + +@huey.task() +def initialize_repository(): + controller = ResticController() + if controller.state is not ResticStates.NO_KEY: + controller.initialize_repository() + load_snapshots() + + +@huey.task() +def fetch_backup_status(): + controller = ResticController() + if controller.state is ResticStates.BACKING_UP: + controller.check_progress() + if controller.state is ResticStates.BACKING_UP: + fetch_backup_status.schedule(delay=2) + else: + load_snapshots.schedule(delay=240) + + +@huey.task() +def start_backup(): + controller = ResticController() + if controller.state is ResticStates.NOT_INITIALIZED: + resp = initialize_repository() + resp.get() + controller.start_backup() + fetch_backup_status.schedule(delay=3) + + +@huey.task() +def restore_from_backup(snapshot): + controller = ResticController() + controller.restore_from_backup(snapshot) diff --git a/selfprivacy_api/utils.py b/selfprivacy_api/utils.py index b7ef2a8..1b0c43c 100644 --- a/selfprivacy_api/utils.py +++ b/selfprivacy_api/utils.py @@ -1,5 +1,10 @@ #!/usr/bin/env python3 """Various utility functions""" +import json +import portalocker + + +USERDATA_FILE = "/etc/nixos/userdata/userdata.json" def get_domain(): @@ -7,3 +12,47 @@ def get_domain(): with open("/var/domain", "r", encoding="utf-8") as domain_file: domain = domain_file.readline().rstrip() return domain + + +class WriteUserData(object): + """Write userdata.json with lock""" + + def __init__(self): + self.userdata_file = open(USERDATA_FILE, "r+", encoding="utf-8") + portalocker.lock(self.userdata_file, portalocker.LOCK_EX) + self.data = json.load(self.userdata_file) + + def __enter__(self): + return self.data + + def __exit__(self, exc_type, exc_value, traceback): + if exc_type is None: + self.userdata_file.seek(0) + json.dump(self.data, self.userdata_file, indent=4) + self.userdata_file.truncate() + portalocker.unlock(self.userdata_file) + self.userdata_file.close() + + +class ReadUserData(object): + """Read userdata.json with lock""" + + def __init__(self): + self.userdata_file = open(USERDATA_FILE, "r", encoding="utf-8") + portalocker.lock(self.userdata_file, portalocker.LOCK_SH) + self.data = json.load(self.userdata_file) + + def __enter__(self): + return self.data + + def __exit__(self, *args): + portalocker.unlock(self.userdata_file) + self.userdata_file.close() + + +def validate_ssh_public_key(key): + """Validate SSH public key. It may be ssh-ed25519 or ssh-rsa.""" + if not key.startswith("ssh-ed25519"): + if not key.startswith("ssh-rsa"): + return False + return True diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..e963224 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,43 @@ +import pytest +from flask import testing +from selfprivacy_api.app import create_app + + +@pytest.fixture +def app(): + app = create_app( + { + "AUTH_TOKEN": "TEST_TOKEN", + "ENABLE_SWAGGER": "0", + } + ) + + yield app + + +@pytest.fixture +def client(app): + return app.test_client() + + +class AuthorizedClient(testing.FlaskClient): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.token = "TEST_TOKEN" + + def open(self, *args, **kwargs): + if "headers" not in kwargs: + kwargs["headers"] = {} + kwargs["headers"]["Authorization"] = f"Bearer {self.token}" + return super().open(*args, **kwargs) + + +@pytest.fixture +def authorized_client(app): + app.test_client_class = AuthorizedClient + return app.test_client() + + +@pytest.fixture +def runner(app): + return app.test_cli_runner() diff --git a/tests/services/test_bitwarden.py b/tests/services/test_bitwarden.py new file mode 100644 index 0000000..3977253 --- /dev/null +++ b/tests/services/test_bitwarden.py @@ -0,0 +1,125 @@ +import json +import pytest + + +def read_json(file_path): + with open(file_path, "r") as f: + return json.load(f) + + +############################################################################### + + +@pytest.fixture +def bitwarden_off(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json") + assert read_json(datadir / "turned_off.json")["bitwarden"]["enable"] == False + return datadir + + +@pytest.fixture +def bitwarden_on(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json") + assert read_json(datadir / "turned_on.json")["bitwarden"]["enable"] == True + return datadir + + +@pytest.fixture +def bitwarden_enable_undefined(mocker, datadir): + mocker.patch( + "selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json" + ) + assert "enable" not in read_json(datadir / "enable_undefined.json")["bitwarden"] + return datadir + + +@pytest.fixture +def bitwarden_undefined(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json") + assert "bitwarden" not in read_json(datadir / "undefined.json") + return datadir + + +############################################################################### + + +@pytest.mark.parametrize("endpoint", ["enable", "disable"]) +def test_unauthorized(client, bitwarden_off, endpoint): + response = client.post(f"/services/bitwarden/{endpoint}") + assert response.status_code == 401 + + +@pytest.mark.parametrize("endpoint", ["enable", "disable"]) +def test_illegal_methods(authorized_client, bitwarden_off, endpoint): + response = authorized_client.get(f"/services/bitwarden/{endpoint}") + assert response.status_code == 405 + response = authorized_client.put(f"/services/bitwarden/{endpoint}") + assert response.status_code == 405 + response = authorized_client.delete(f"/services/bitwarden/{endpoint}") + assert response.status_code == 405 + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_switch_from_off(authorized_client, bitwarden_off, endpoint, target_file): + response = authorized_client.post(f"/services/bitwarden/{endpoint}") + assert response.status_code == 200 + assert read_json(bitwarden_off / "turned_off.json") == read_json( + bitwarden_off / target_file + ) + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_switch_from_on(authorized_client, bitwarden_on, endpoint, target_file): + response = authorized_client.post(f"/services/bitwarden/{endpoint}") + assert response.status_code == 200 + assert read_json(bitwarden_on / "turned_on.json") == read_json( + bitwarden_on / target_file + ) + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_switch_twice(authorized_client, bitwarden_off, endpoint, target_file): + response = authorized_client.post(f"/services/bitwarden/{endpoint}") + assert response.status_code == 200 + response = authorized_client.post(f"/services/bitwarden/{endpoint}") + assert response.status_code == 200 + assert read_json(bitwarden_off / "turned_off.json") == read_json( + bitwarden_off / target_file + ) + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_on_attribute_deleted( + authorized_client, bitwarden_enable_undefined, endpoint, target_file +): + response = authorized_client.post(f"/services/bitwarden/{endpoint}") + assert response.status_code == 200 + assert read_json(bitwarden_enable_undefined / "enable_undefined.json") == read_json( + bitwarden_enable_undefined / target_file + ) + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_on_bitwarden_undefined( + authorized_client, bitwarden_undefined, endpoint, target_file +): + response = authorized_client.post(f"/services/bitwarden/{endpoint}") + assert response.status_code == 200 + assert read_json(bitwarden_undefined / "undefined.json") == read_json( + bitwarden_undefined / target_file + ) diff --git a/tests/services/test_bitwarden/enable_undefined.json b/tests/services/test_bitwarden/enable_undefined.json new file mode 100644 index 0000000..05e04c1 --- /dev/null +++ b/tests/services/test_bitwarden/enable_undefined.json @@ -0,0 +1,51 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/services/test_bitwarden/turned_off.json b/tests/services/test_bitwarden/turned_off.json new file mode 100644 index 0000000..7b2cf8b --- /dev/null +++ b/tests/services/test_bitwarden/turned_off.json @@ -0,0 +1,52 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/services/test_bitwarden/turned_on.json b/tests/services/test_bitwarden/turned_on.json new file mode 100644 index 0000000..337e47f --- /dev/null +++ b/tests/services/test_bitwarden/turned_on.json @@ -0,0 +1,52 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": true + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/services/test_bitwarden/undefined.json b/tests/services/test_bitwarden/undefined.json new file mode 100644 index 0000000..625422b --- /dev/null +++ b/tests/services/test_bitwarden/undefined.json @@ -0,0 +1,49 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/services/test_gitea.py b/tests/services/test_gitea.py new file mode 100644 index 0000000..0a50c19 --- /dev/null +++ b/tests/services/test_gitea.py @@ -0,0 +1,121 @@ +import json +import pytest + + +def read_json(file_path): + with open(file_path, "r") as f: + return json.load(f) + + +############################################################################### + + +@pytest.fixture +def gitea_off(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json") + assert read_json(datadir / "turned_off.json")["gitea"]["enable"] == False + return datadir + + +@pytest.fixture +def gitea_on(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json") + assert read_json(datadir / "turned_on.json")["gitea"]["enable"] == True + return datadir + + +@pytest.fixture +def gitea_enable_undefined(mocker, datadir): + mocker.patch( + "selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json" + ) + assert "enable" not in read_json(datadir / "enable_undefined.json")["gitea"] + return datadir + + +@pytest.fixture +def gitea_undefined(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json") + assert "gitea" not in read_json(datadir / "undefined.json") + return datadir + + +############################################################################### + + +@pytest.mark.parametrize("endpoint", ["enable", "disable"]) +def test_unauthorized(client, gitea_off, endpoint): + response = client.post(f"/services/gitea/{endpoint}") + assert response.status_code == 401 + + +@pytest.mark.parametrize("endpoint", ["enable", "disable"]) +def test_illegal_methods(authorized_client, gitea_off, endpoint): + response = authorized_client.get(f"/services/gitea/{endpoint}") + assert response.status_code == 405 + response = authorized_client.put(f"/services/gitea/{endpoint}") + assert response.status_code == 405 + response = authorized_client.delete(f"/services/gitea/{endpoint}") + assert response.status_code == 405 + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_switch_from_off(authorized_client, gitea_off, endpoint, target_file): + response = authorized_client.post(f"/services/gitea/{endpoint}") + assert response.status_code == 200 + assert read_json(gitea_off / "turned_off.json") == read_json( + gitea_off / target_file + ) + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_switch_from_on(authorized_client, gitea_on, endpoint, target_file): + response = authorized_client.post(f"/services/gitea/{endpoint}") + assert response.status_code == 200 + assert read_json(gitea_on / "turned_on.json") == read_json(gitea_on / target_file) + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_switch_twice(authorized_client, gitea_off, endpoint, target_file): + response = authorized_client.post(f"/services/gitea/{endpoint}") + assert response.status_code == 200 + response = authorized_client.post(f"/services/gitea/{endpoint}") + assert response.status_code == 200 + assert read_json(gitea_off / "turned_off.json") == read_json( + gitea_off / target_file + ) + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_on_attribute_deleted( + authorized_client, gitea_enable_undefined, endpoint, target_file +): + response = authorized_client.post(f"/services/gitea/{endpoint}") + assert response.status_code == 200 + assert read_json(gitea_enable_undefined / "enable_undefined.json") == read_json( + gitea_enable_undefined / target_file + ) + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_on_gitea_undefined(authorized_client, gitea_undefined, endpoint, target_file): + response = authorized_client.post(f"/services/gitea/{endpoint}") + assert response.status_code == 200 + assert read_json(gitea_undefined / "undefined.json") == read_json( + gitea_undefined / target_file + ) diff --git a/tests/services/test_gitea/enable_undefined.json b/tests/services/test_gitea/enable_undefined.json new file mode 100644 index 0000000..07b0e78 --- /dev/null +++ b/tests/services/test_gitea/enable_undefined.json @@ -0,0 +1,51 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/services/test_gitea/turned_off.json b/tests/services/test_gitea/turned_off.json new file mode 100644 index 0000000..7b2cf8b --- /dev/null +++ b/tests/services/test_gitea/turned_off.json @@ -0,0 +1,52 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/services/test_gitea/turned_on.json b/tests/services/test_gitea/turned_on.json new file mode 100644 index 0000000..acb98ce --- /dev/null +++ b/tests/services/test_gitea/turned_on.json @@ -0,0 +1,52 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": true + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/services/test_gitea/undefined.json b/tests/services/test_gitea/undefined.json new file mode 100644 index 0000000..f689b2e --- /dev/null +++ b/tests/services/test_gitea/undefined.json @@ -0,0 +1,49 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/services/test_mailserver.py b/tests/services/test_mailserver.py new file mode 100644 index 0000000..f45ad1e --- /dev/null +++ b/tests/services/test_mailserver.py @@ -0,0 +1,87 @@ +import base64 +import json +import pytest + + +def read_json(file_path): + with open(file_path, "r", encoding="utf-8") as f: + return json.load(f) + + +############################################################################### + + +class ProcessMock: + """Mock subprocess.Popen""" + + def __init__(self, args, **kwargs): + self.args = args + self.kwargs = kwargs + + def communicate(): + return (b"I am a DKIM key", None) + + +class NoFileMock(ProcessMock): + def communicate(): + return (b"", None) + + +@pytest.fixture +def mock_subproccess_popen(mocker): + mock = mocker.patch("subprocess.Popen", autospec=True, return_value=ProcessMock) + mocker.patch( + "selfprivacy_api.resources.services.mailserver.get_domain", + autospec=True, + return_value="example.com", + ) + mocker.patch("os.path.exists", autospec=True, return_value=True) + return mock + + +@pytest.fixture +def mock_no_file(mocker): + mock = mocker.patch("subprocess.Popen", autospec=True, return_value=NoFileMock) + mocker.patch( + "selfprivacy_api.resources.services.mailserver.get_domain", + autospec=True, + return_value="example.com", + ) + mocker.patch("os.path.exists", autospec=True, return_value=False) + return mock + + +############################################################################### + + +def test_unauthorized(client, mock_subproccess_popen): + """Test unauthorized""" + response = client.get("/services/mailserver/dkim") + assert response.status_code == 401 + + +def test_illegal_methods(authorized_client, mock_subproccess_popen): + response = authorized_client.post("/services/mailserver/dkim") + assert response.status_code == 405 + response = authorized_client.put("/services/mailserver/dkim") + assert response.status_code == 405 + response = authorized_client.delete("/services/mailserver/dkim") + assert response.status_code == 405 + + +def test_dkim_key(authorized_client, mock_subproccess_popen): + """Test DKIM key""" + response = authorized_client.get("/services/mailserver/dkim") + assert response.status_code == 200 + assert base64.b64decode(response.data) == b"I am a DKIM key" + assert mock_subproccess_popen.call_args[0][0] == [ + "cat", + "/var/dkim/example.com.selector.txt", + ] + + +def test_no_dkim_key(authorized_client, mock_no_file): + """Test no DKIM key""" + response = authorized_client.get("/services/mailserver/dkim") + assert response.status_code == 404 + assert mock_no_file.called == False diff --git a/tests/services/test_nextcloud.py b/tests/services/test_nextcloud.py new file mode 100644 index 0000000..b05c363 --- /dev/null +++ b/tests/services/test_nextcloud.py @@ -0,0 +1,123 @@ +import json +import pytest + + +def read_json(file_path): + with open(file_path, "r") as f: + return json.load(f) + + +############################################################################### + + +@pytest.fixture +def nextcloud_off(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json") + assert read_json(datadir / "turned_off.json")["nextcloud"]["enable"] == False + return datadir + + +@pytest.fixture +def nextcloud_on(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json") + assert read_json(datadir / "turned_on.json")["nextcloud"]["enable"] == True + return datadir + + +@pytest.fixture +def nextcloud_enable_undefined(mocker, datadir): + mocker.patch( + "selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json" + ) + assert "enable" not in read_json(datadir / "enable_undefined.json")["nextcloud"] + return datadir + + +@pytest.fixture +def nextcloud_undefined(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json") + assert "nextcloud" not in read_json(datadir / "undefined.json") + return datadir + + +############################################################################### + + +@pytest.mark.parametrize("endpoint", ["enable", "disable"]) +def test_unauthorized(client, nextcloud_off, endpoint): + response = client.post(f"/services/nextcloud/{endpoint}") + assert response.status_code == 401 + + +@pytest.mark.parametrize("endpoint", ["enable", "disable"]) +def test_illegal_methods(authorized_client, nextcloud_off, endpoint): + response = authorized_client.get(f"/services/nextcloud/{endpoint}") + assert response.status_code == 405 + response = authorized_client.put(f"/services/nextcloud/{endpoint}") + assert response.status_code == 405 + response = authorized_client.delete(f"/services/nextcloud/{endpoint}") + assert response.status_code == 405 + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_switch_from_off(authorized_client, nextcloud_off, endpoint, target_file): + response = authorized_client.post(f"/services/nextcloud/{endpoint}") + assert response.status_code == 200 + assert read_json(nextcloud_off / "turned_off.json") == read_json( + nextcloud_off / target_file + ) + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_switch_from_on(authorized_client, nextcloud_on, endpoint, target_file): + response = authorized_client.post(f"/services/nextcloud/{endpoint}") + assert response.status_code == 200 + assert read_json(nextcloud_on / "turned_on.json") == read_json( + nextcloud_on / target_file + ) + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_switch_twice(authorized_client, nextcloud_off, endpoint, target_file): + response = authorized_client.post(f"/services/nextcloud/{endpoint}") + assert response.status_code == 200 + response = authorized_client.post(f"/services/nextcloud/{endpoint}") + assert response.status_code == 200 + assert read_json(nextcloud_off / "turned_off.json") == read_json( + nextcloud_off / target_file + ) + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_on_attribute_deleted( + authorized_client, nextcloud_enable_undefined, endpoint, target_file +): + response = authorized_client.post(f"/services/nextcloud/{endpoint}") + assert response.status_code == 200 + assert read_json(nextcloud_enable_undefined / "enable_undefined.json") == read_json( + nextcloud_enable_undefined / target_file + ) + + +@pytest.mark.parametrize("endpoint,target", [("enable", True), ("disable", False)]) +def test_on_nextcloud_undefined( + authorized_client, nextcloud_undefined, endpoint, target +): + response = authorized_client.post(f"/services/nextcloud/{endpoint}") + assert response.status_code == 200 + assert ( + read_json(nextcloud_undefined / "undefined.json")["nextcloud"]["enable"] + == target + ) diff --git a/tests/services/test_nextcloud/enable_undefined.json b/tests/services/test_nextcloud/enable_undefined.json new file mode 100644 index 0000000..68127f0 --- /dev/null +++ b/tests/services/test_nextcloud/enable_undefined.json @@ -0,0 +1,51 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN" + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/services/test_nextcloud/turned_off.json b/tests/services/test_nextcloud/turned_off.json new file mode 100644 index 0000000..375e70f --- /dev/null +++ b/tests/services/test_nextcloud/turned_off.json @@ -0,0 +1,52 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": false + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/services/test_nextcloud/turned_on.json b/tests/services/test_nextcloud/turned_on.json new file mode 100644 index 0000000..7b2cf8b --- /dev/null +++ b/tests/services/test_nextcloud/turned_on.json @@ -0,0 +1,52 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/services/test_nextcloud/undefined.json b/tests/services/test_nextcloud/undefined.json new file mode 100644 index 0000000..fb02c69 --- /dev/null +++ b/tests/services/test_nextcloud/undefined.json @@ -0,0 +1,44 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/services/test_ocserv.py b/tests/services/test_ocserv.py new file mode 100644 index 0000000..8f43e70 --- /dev/null +++ b/tests/services/test_ocserv.py @@ -0,0 +1,123 @@ +import json +import pytest + + +def read_json(file_path): + with open(file_path, "r") as f: + return json.load(f) + + +############################################################################### + + +@pytest.fixture +def ocserv_off(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json") + assert read_json(datadir / "turned_off.json")["ocserv"]["enable"] == False + return datadir + + +@pytest.fixture +def ocserv_on(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json") + assert read_json(datadir / "turned_on.json")["ocserv"]["enable"] == True + return datadir + + +@pytest.fixture +def ocserv_enable_undefined(mocker, datadir): + mocker.patch( + "selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json" + ) + assert "enable" not in read_json(datadir / "enable_undefined.json")["ocserv"] + return datadir + + +@pytest.fixture +def ocserv_undefined(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json") + assert "ocserv" not in read_json(datadir / "undefined.json") + return datadir + + +############################################################################### + + +@pytest.mark.parametrize("endpoint", ["enable", "disable"]) +def test_unauthorized(client, ocserv_off, endpoint): + response = client.post(f"/services/ocserv/{endpoint}") + assert response.status_code == 401 + + +@pytest.mark.parametrize("endpoint", ["enable", "disable"]) +def test_illegal_methods(authorized_client, ocserv_off, endpoint): + response = authorized_client.get(f"/services/ocserv/{endpoint}") + assert response.status_code == 405 + response = authorized_client.put(f"/services/ocserv/{endpoint}") + assert response.status_code == 405 + response = authorized_client.delete(f"/services/ocserv/{endpoint}") + assert response.status_code == 405 + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_switch_from_off(authorized_client, ocserv_off, endpoint, target_file): + response = authorized_client.post(f"/services/ocserv/{endpoint}") + assert response.status_code == 200 + assert read_json(ocserv_off / "turned_off.json") == read_json( + ocserv_off / target_file + ) + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_switch_from_on(authorized_client, ocserv_on, endpoint, target_file): + response = authorized_client.post(f"/services/ocserv/{endpoint}") + assert response.status_code == 200 + assert read_json(ocserv_on / "turned_on.json") == read_json(ocserv_on / target_file) + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_switch_twice(authorized_client, ocserv_off, endpoint, target_file): + response = authorized_client.post(f"/services/ocserv/{endpoint}") + assert response.status_code == 200 + response = authorized_client.post(f"/services/ocserv/{endpoint}") + assert response.status_code == 200 + assert read_json(ocserv_off / "turned_off.json") == read_json( + ocserv_off / target_file + ) + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_on_attribute_deleted( + authorized_client, ocserv_enable_undefined, endpoint, target_file +): + response = authorized_client.post(f"/services/ocserv/{endpoint}") + assert response.status_code == 200 + assert read_json(ocserv_enable_undefined / "enable_undefined.json") == read_json( + ocserv_enable_undefined / target_file + ) + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_on_ocserv_undefined( + authorized_client, ocserv_undefined, endpoint, target_file +): + response = authorized_client.post(f"/services/ocserv/{endpoint}") + assert response.status_code == 200 + assert read_json(ocserv_undefined / "undefined.json") == read_json( + ocserv_undefined / target_file + ) diff --git a/tests/services/test_ocserv/enable_undefined.json b/tests/services/test_ocserv/enable_undefined.json new file mode 100644 index 0000000..88d804d --- /dev/null +++ b/tests/services/test_ocserv/enable_undefined.json @@ -0,0 +1,51 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": false + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/services/test_ocserv/turned_off.json b/tests/services/test_ocserv/turned_off.json new file mode 100644 index 0000000..6220561 --- /dev/null +++ b/tests/services/test_ocserv/turned_off.json @@ -0,0 +1,52 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": false + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": false + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/services/test_ocserv/turned_on.json b/tests/services/test_ocserv/turned_on.json new file mode 100644 index 0000000..375e70f --- /dev/null +++ b/tests/services/test_ocserv/turned_on.json @@ -0,0 +1,52 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": false + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/services/test_ocserv/undefined.json b/tests/services/test_ocserv/undefined.json new file mode 100644 index 0000000..f7e21bf --- /dev/null +++ b/tests/services/test_ocserv/undefined.json @@ -0,0 +1,49 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": false + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/services/test_pleroma.py b/tests/services/test_pleroma.py new file mode 100644 index 0000000..0d7f149 --- /dev/null +++ b/tests/services/test_pleroma.py @@ -0,0 +1,125 @@ +import json +import pytest + + +def read_json(file_path): + with open(file_path, "r") as f: + return json.load(f) + + +############################################################################### + + +@pytest.fixture +def pleroma_off(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json") + assert read_json(datadir / "turned_off.json")["pleroma"]["enable"] == False + return datadir + + +@pytest.fixture +def pleroma_on(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json") + assert read_json(datadir / "turned_on.json")["pleroma"]["enable"] == True + return datadir + + +@pytest.fixture +def pleroma_enable_undefined(mocker, datadir): + mocker.patch( + "selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json" + ) + assert "enable" not in read_json(datadir / "enable_undefined.json")["pleroma"] + return datadir + + +@pytest.fixture +def pleroma_undefined(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json") + assert "pleroma" not in read_json(datadir / "undefined.json") + return datadir + + +############################################################################### + + +@pytest.mark.parametrize("endpoint", ["enable", "disable"]) +def test_unauthorized(client, pleroma_off, endpoint): + response = client.post(f"/services/pleroma/{endpoint}") + assert response.status_code == 401 + + +@pytest.mark.parametrize("endpoint", ["enable", "disable"]) +def test_illegal_methods(authorized_client, pleroma_off, endpoint): + response = authorized_client.get(f"/services/pleroma/{endpoint}") + assert response.status_code == 405 + response = authorized_client.put(f"/services/pleroma/{endpoint}") + assert response.status_code == 405 + response = authorized_client.delete(f"/services/pleroma/{endpoint}") + assert response.status_code == 405 + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_switch_from_off(authorized_client, pleroma_off, endpoint, target_file): + response = authorized_client.post(f"/services/pleroma/{endpoint}") + assert response.status_code == 200 + assert read_json(pleroma_off / "turned_off.json") == read_json( + pleroma_off / target_file + ) + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_switch_from_on(authorized_client, pleroma_on, endpoint, target_file): + response = authorized_client.post(f"/services/pleroma/{endpoint}") + assert response.status_code == 200 + assert read_json(pleroma_on / "turned_on.json") == read_json( + pleroma_on / target_file + ) + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_switch_twice(authorized_client, pleroma_off, endpoint, target_file): + response = authorized_client.post(f"/services/pleroma/{endpoint}") + assert response.status_code == 200 + response = authorized_client.post(f"/services/pleroma/{endpoint}") + assert response.status_code == 200 + assert read_json(pleroma_off / "turned_off.json") == read_json( + pleroma_off / target_file + ) + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_on_attribute_deleted( + authorized_client, pleroma_enable_undefined, endpoint, target_file +): + response = authorized_client.post(f"/services/pleroma/{endpoint}") + assert response.status_code == 200 + assert read_json(pleroma_enable_undefined / "enable_undefined.json") == read_json( + pleroma_enable_undefined / target_file + ) + + +@pytest.mark.parametrize( + "endpoint,target_file", + [("enable", "turned_on.json"), ("disable", "turned_off.json")], +) +def test_on_pleroma_undefined( + authorized_client, pleroma_undefined, endpoint, target_file +): + response = authorized_client.post(f"/services/pleroma/{endpoint}") + assert response.status_code == 200 + assert read_json(pleroma_undefined / "undefined.json") == read_json( + pleroma_undefined / target_file + ) diff --git a/tests/services/test_pleroma/enable_undefined.json b/tests/services/test_pleroma/enable_undefined.json new file mode 100644 index 0000000..20ab960 --- /dev/null +++ b/tests/services/test_pleroma/enable_undefined.json @@ -0,0 +1,51 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": false + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": false + }, + "pleroma": { + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/services/test_pleroma/turned_off.json b/tests/services/test_pleroma/turned_off.json new file mode 100644 index 0000000..b6d5fd6 --- /dev/null +++ b/tests/services/test_pleroma/turned_off.json @@ -0,0 +1,52 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": false + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": false + }, + "pleroma": { + "enable": false + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/services/test_pleroma/turned_on.json b/tests/services/test_pleroma/turned_on.json new file mode 100644 index 0000000..6220561 --- /dev/null +++ b/tests/services/test_pleroma/turned_on.json @@ -0,0 +1,52 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": false + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": false + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/services/test_pleroma/undefined.json b/tests/services/test_pleroma/undefined.json new file mode 100644 index 0000000..b909a95 --- /dev/null +++ b/tests/services/test_pleroma/undefined.json @@ -0,0 +1,49 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": false + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": false + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/services/test_services.py b/tests/services/test_services.py new file mode 100644 index 0000000..859da5a --- /dev/null +++ b/tests/services/test_services.py @@ -0,0 +1,133 @@ +import base64 +import json +import pytest + + +def read_json(file_path): + with open(file_path, "r", encoding="utf-8") as f: + return json.load(f) + + +def call_args_asserts(mocked_object): + assert mocked_object.call_count == 8 + assert mocked_object.call_args_list[0][0][0] == [ + "systemctl", + "status", + "dovecot2.service", + ] + assert mocked_object.call_args_list[1][0][0] == [ + "systemctl", + "status", + "postfix.service", + ] + assert mocked_object.call_args_list[2][0][0] == [ + "systemctl", + "status", + "nginx.service", + ] + assert mocked_object.call_args_list[3][0][0] == [ + "systemctl", + "status", + "bitwarden_rs.service", + ] + assert mocked_object.call_args_list[4][0][0] == [ + "systemctl", + "status", + "gitea.service", + ] + assert mocked_object.call_args_list[5][0][0] == [ + "systemctl", + "status", + "phpfpm-nextcloud.service", + ] + assert mocked_object.call_args_list[6][0][0] == [ + "systemctl", + "status", + "ocserv.service", + ] + assert mocked_object.call_args_list[7][0][0] == [ + "systemctl", + "status", + "pleroma.service", + ] + + +class ProcessMock: + """Mock subprocess.Popen""" + + def __init__(self, args, **kwargs): + self.args = args + self.kwargs = kwargs + + def communicate(): + return (b"", None) + + returncode = 0 + + +class BrokenServiceMock(ProcessMock): + returncode = 3 + + +@pytest.fixture +def mock_subproccess_popen(mocker): + mock = mocker.patch("subprocess.Popen", autospec=True, return_value=ProcessMock) + return mock + + +@pytest.fixture +def mock_broken_service(mocker): + mock = mocker.patch( + "subprocess.Popen", autospec=True, return_value=BrokenServiceMock + ) + return mock + + +############################################################################### + + +def test_unauthorized(client, mock_subproccess_popen): + """Test unauthorized""" + response = client.get("/services/status") + assert response.status_code == 401 + + +def test_illegal_methods(authorized_client, mock_subproccess_popen): + response = authorized_client.post("/services/status") + assert response.status_code == 405 + response = authorized_client.put("/services/status") + assert response.status_code == 405 + response = authorized_client.delete("/services/status") + assert response.status_code == 405 + + +def test_dkim_key(authorized_client, mock_subproccess_popen): + response = authorized_client.get("/services/status") + assert response.status_code == 200 + assert response.get_json() == { + "imap": 0, + "smtp": 0, + "http": 0, + "bitwarden": 0, + "gitea": 0, + "nextcloud": 0, + "ocserv": 0, + "pleroma": 0, + } + call_args_asserts(mock_subproccess_popen) + + +def test_no_dkim_key(authorized_client, mock_broken_service): + response = authorized_client.get("/services/status") + assert response.status_code == 200 + assert response.get_json() == { + "imap": 3, + "smtp": 3, + "http": 3, + "bitwarden": 3, + "gitea": 3, + "nextcloud": 3, + "ocserv": 3, + "pleroma": 3, + } + call_args_asserts(mock_broken_service) diff --git a/tests/services/test_ssh.py b/tests/services/test_ssh.py new file mode 100644 index 0000000..7233a53 --- /dev/null +++ b/tests/services/test_ssh.py @@ -0,0 +1,314 @@ +import json +from os import read +import pytest + + +def read_json(file_path): + with open(file_path, "r") as f: + return json.load(f) + + +############################################################################### + + +@pytest.fixture +def ssh_off(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json") + assert read_json(datadir / "turned_off.json")["ssh"]["enable"] == False + assert ( + read_json(datadir / "turned_off.json")["ssh"]["passwordAuthentication"] == True + ) + return datadir + + +@pytest.fixture +def ssh_on(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json") + assert ( + read_json(datadir / "turned_off.json")["ssh"]["passwordAuthentication"] == True + ) + assert read_json(datadir / "turned_on.json")["ssh"]["enable"] == True + return datadir + + +@pytest.fixture +def all_off(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "all_off.json") + assert read_json(datadir / "all_off.json")["ssh"]["passwordAuthentication"] == False + assert read_json(datadir / "all_off.json")["ssh"]["enable"] == False + return datadir + + +@pytest.fixture +def undefined_settings(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json") + assert "ssh" not in read_json(datadir / "undefined.json") + return datadir + + +@pytest.fixture +def root_and_admin_have_keys(mocker, datadir): + mocker.patch( + "selfprivacy_api.utils.USERDATA_FILE", + new=datadir / "root_and_admin_have_keys.json", + ) + assert read_json(datadir / "root_and_admin_have_keys.json")["ssh"]["enable"] == True + assert ( + read_json(datadir / "root_and_admin_have_keys.json")["ssh"][ + "passwordAuthentication" + ] + == True + ) + assert read_json(datadir / "root_and_admin_have_keys.json")["ssh"]["rootKeys"] == [ + "ssh-ed25519 KEY test@pc" + ] + assert read_json(datadir / "root_and_admin_have_keys.json")["sshKeys"] == [ + "ssh-rsa KEY test@pc" + ] + return datadir + + +############################################################################### + + +@pytest.mark.parametrize( + "endpoint", ["ssh", "ssh/enable", "ssh/key/send", "ssh/keys/user"] +) +def test_unauthorized(client, ssh_off, endpoint): + response = client.post(f"/services/{endpoint}") + assert response.status_code == 401 + + +def test_legacy_enable(authorized_client, ssh_off): + response = authorized_client.post(f"/services/ssh/enable") + assert response.status_code == 200 + assert read_json(ssh_off / "turned_off.json") == read_json( + ssh_off / "turned_on.json" + ) + + +def test_legacy_enable_when_enabled(authorized_client, ssh_on): + response = authorized_client.post(f"/services/ssh/enable") + assert response.status_code == 200 + assert read_json(ssh_on / "turned_on.json") == read_json(ssh_on / "turned_on.json") + + +def test_get_current_settings_ssh_off(authorized_client, ssh_off): + response = authorized_client.get("/services/ssh") + assert response.status_code == 200 + assert response.json == {"enable": False, "passwordAuthentication": True} + + +def test_get_current_settings_ssh_on(authorized_client, ssh_on): + response = authorized_client.get("/services/ssh") + assert response.status_code == 200 + assert response.json == {"enable": True, "passwordAuthentication": True} + + +def test_get_current_settings_all_off(authorized_client, all_off): + response = authorized_client.get("/services/ssh") + assert response.status_code == 200 + assert response.json == {"enable": False, "passwordAuthentication": False} + + +def test_get_current_settings_undefined(authorized_client, undefined_settings): + response = authorized_client.get("/services/ssh") + assert response.status_code == 200 + assert response.json == {"enable": True, "passwordAuthentication": True} + + +available_settings = [ + {"enable": True, "passwordAuthentication": True}, + {"enable": True, "passwordAuthentication": False}, + {"enable": False, "passwordAuthentication": True}, + {"enable": False, "passwordAuthentication": False}, + {"enable": True}, + {"enable": False}, + {"passwordAuthentication": True}, + {"passwordAuthentication": False}, +] + + +@pytest.mark.parametrize("settings", available_settings) +def test_set_settings_ssh_off(authorized_client, ssh_off, settings): + response = authorized_client.put(f"/services/ssh", json=settings) + assert response.status_code == 200 + data = read_json(ssh_off / "turned_off.json")["ssh"] + if "enable" in settings: + assert data["enable"] == settings["enable"] + if "passwordAuthentication" in settings: + assert data["passwordAuthentication"] == settings["passwordAuthentication"] + + +@pytest.mark.parametrize("settings", available_settings) +def test_set_settings_ssh_on(authorized_client, ssh_on, settings): + response = authorized_client.put(f"/services/ssh", json=settings) + assert response.status_code == 200 + data = read_json(ssh_on / "turned_on.json")["ssh"] + if "enable" in settings: + assert data["enable"] == settings["enable"] + if "passwordAuthentication" in settings: + assert data["passwordAuthentication"] == settings["passwordAuthentication"] + + +@pytest.mark.parametrize("settings", available_settings) +def test_set_settings_all_off(authorized_client, all_off, settings): + response = authorized_client.put(f"/services/ssh", json=settings) + assert response.status_code == 200 + data = read_json(all_off / "all_off.json")["ssh"] + if "enable" in settings: + assert data["enable"] == settings["enable"] + if "passwordAuthentication" in settings: + assert data["passwordAuthentication"] == settings["passwordAuthentication"] + + +@pytest.mark.parametrize("settings", available_settings) +def test_set_settings_undefined(authorized_client, undefined_settings, settings): + response = authorized_client.put(f"/services/ssh", json=settings) + assert response.status_code == 200 + data = read_json(undefined_settings / "undefined.json")["ssh"] + if "enable" in settings: + assert data["enable"] == settings["enable"] + if "passwordAuthentication" in settings: + assert data["passwordAuthentication"] == settings["passwordAuthentication"] + + +def test_add_root_key(authorized_client, ssh_on): + response = authorized_client.put( + f"/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"} + ) + assert response.status_code == 201 + assert read_json(ssh_on / "turned_on.json")["ssh"]["rootKeys"] == [ + "ssh-rsa KEY test@pc", + ] + + +def test_add_root_key_one_more(authorized_client, root_and_admin_have_keys): + response = authorized_client.put( + f"/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"} + ) + assert response.status_code == 201 + assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][ + "rootKeys" + ] == [ + "ssh-ed25519 KEY test@pc", + "ssh-rsa KEY test@pc", + ] + + +def test_add_existing_root_key(authorized_client, root_and_admin_have_keys): + response = authorized_client.put( + f"/services/ssh/key/send", json={"public_key": "ssh-ed25519 KEY test@pc"} + ) + assert response.status_code == 409 + assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][ + "rootKeys" + ] == [ + "ssh-ed25519 KEY test@pc", + ] + + +def test_add_invalid_root_key(authorized_client, ssh_on): + response = authorized_client.put( + f"/services/ssh/key/send", json={"public_key": "INVALID KEY test@pc"} + ) + assert response.status_code == 400 + + +def test_add_root_key_via_wrong_endpoint(authorized_client, ssh_on): + response = authorized_client.post( + f"/services/ssh/keys/root", json={"public_key": "ssh-rsa KEY test@pc"} + ) + assert response.status_code == 400 + + +def test_get_root_key(authorized_client, root_and_admin_have_keys): + response = authorized_client.get(f"/services/ssh/keys/root") + assert response.status_code == 200 + assert response.json == ["ssh-ed25519 KEY test@pc"] + + +def test_get_root_key_when_none(authorized_client, ssh_on): + response = authorized_client.get(f"/services/ssh/keys/root") + assert response.status_code == 200 + assert response.json == [] + + +def test_delete_root_key(authorized_client, root_and_admin_have_keys): + response = authorized_client.delete( + f"/services/ssh/keys/root", json={"public_key": "ssh-ed25519 KEY test@pc"} + ) + assert response.status_code == 200 + assert ( + read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][ + "rootKeys" + ] + == [] + ) + + +def test_delete_root_nonexistent_key(authorized_client, root_and_admin_have_keys): + response = authorized_client.delete( + f"/services/ssh/keys/root", json={"public_key": "ssh-rsa KEY test@pc"} + ) + assert response.status_code == 404 + assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][ + "rootKeys" + ] == [ + "ssh-ed25519 KEY test@pc", + ] + + +def test_get_admin_key(authorized_client, root_and_admin_have_keys): + response = authorized_client.get(f"/services/ssh/keys/tester") + assert response.status_code == 200 + assert response.json == ["ssh-rsa KEY test@pc"] + + +def test_get_admin_key_when_none(authorized_client, ssh_on): + response = authorized_client.get(f"/services/ssh/keys/tester") + assert response.status_code == 200 + assert response.json == [] + + +def test_delete_admin_key(authorized_client, root_and_admin_have_keys): + response = authorized_client.delete( + f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"} + ) + assert response.status_code == 200 + assert ( + read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["sshKeys"] + == [] + ) + + +def test_add_admin_key(authorized_client, ssh_on): + response = authorized_client.post( + f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"} + ) + assert response.status_code == 201 + assert read_json(ssh_on / "turned_on.json")["sshKeys"] == [ + "ssh-rsa KEY test@pc", + ] + + +def test_add_admin_key_one_more(authorized_client, root_and_admin_have_keys): + response = authorized_client.post( + f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY_2 test@pc"} + ) + assert response.status_code == 201 + assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")[ + "sshKeys" + ] == ["ssh-rsa KEY test@pc", "ssh-rsa KEY_2 test@pc"] + + +def test_add_existing_admin_key(authorized_client, root_and_admin_have_keys): + response = authorized_client.post( + f"/services/ssh/keys/tester", json={"public_key": "ssh-rsa KEY test@pc"} + ) + assert response.status_code == 409 + assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")[ + "sshKeys" + ] == [ + "ssh-rsa KEY test@pc", + ] diff --git a/tests/services/test_ssh/all_off.json b/tests/services/test_ssh/all_off.json new file mode 100644 index 0000000..e1b8510 --- /dev/null +++ b/tests/services/test_ssh/all_off.json @@ -0,0 +1,52 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": false, + "passwordAuthentication": false, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/services/test_ssh/root_and_admin_have_keys.json b/tests/services/test_ssh/root_and_admin_have_keys.json new file mode 100644 index 0000000..7b2cf8b --- /dev/null +++ b/tests/services/test_ssh/root_and_admin_have_keys.json @@ -0,0 +1,52 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/services/test_ssh/turned_off.json b/tests/services/test_ssh/turned_off.json new file mode 100644 index 0000000..b09395b --- /dev/null +++ b/tests/services/test_ssh/turned_off.json @@ -0,0 +1,46 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": false, + "passwordAuthentication": true + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow" +} \ No newline at end of file diff --git a/tests/services/test_ssh/turned_on.json b/tests/services/test_ssh/turned_on.json new file mode 100644 index 0000000..44b28ce --- /dev/null +++ b/tests/services/test_ssh/turned_on.json @@ -0,0 +1,46 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow" +} \ No newline at end of file diff --git a/tests/services/test_ssh/undefined.json b/tests/services/test_ssh/undefined.json new file mode 100644 index 0000000..3f5545f --- /dev/null +++ b/tests/services/test_ssh/undefined.json @@ -0,0 +1,45 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": false + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file