diff --git a/.vscode/settings.json b/.vscode/settings.json index de288e1..6f8c118 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,5 @@ { - "python.formatting.provider": "black" + "python.formatting.provider": "black", + "python.linting.pylintEnabled": true, + "python.linting.enabled": true } \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index c1feda0..1ffd18c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,3 @@ [build-system] -requires = ["setuptools", "wheel", "portalocker"] +requires = ["setuptools", "wheel", "portalocker", "flask-swagger", "flask-swagger-ui"] build-backend = "setuptools.build_meta" \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 028c332..b451222 100755 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,5 @@ flask_restful flask_socketio setuptools portalocker +flask-swagger +flask-swagger-ui diff --git a/selfprivacy_api/app.py b/selfprivacy_api/app.py index 28959f1..9f8dcb1 100644 --- a/selfprivacy_api/app.py +++ b/selfprivacy_api/app.py @@ -1,40 +1,74 @@ #!/usr/bin/env python3 +"""SelfPrivacy server management API""" +import os from flask import Flask, request, jsonify from flask_restful import Api -import os +from flask_swagger import swagger +from flask_swagger_ui import get_swaggerui_blueprint -from selfprivacy_api.resources.users import Users +from selfprivacy_api.resources.users import User, Users from selfprivacy_api.resources.common import DecryptDisk +from selfprivacy_api.resources.system import api_system +from selfprivacy_api.resources.services import services as api_services + +swagger_blueprint = get_swaggerui_blueprint( + "/api/docs", "/api/swagger.json", config={"app_name": "SelfPrivacy API"} +) def create_app(): + """Initiate Flask app and bind routes""" app = Flask(__name__) api = Api(app) - app.config['AUTH_TOKEN'] = os.environ.get('AUTH_TOKEN') + app.config["AUTH_TOKEN"] = os.environ.get("AUTH_TOKEN") + app.config["ENABLE_SWAGGER"] = os.environ.get("ENABLE_SWAGGER", "0") # Check bearer token @app.before_request def check_auth(): - auth = request.headers.get("Authorization") - if auth is None: - return jsonify({"error": "Missing Authorization header"}), 401 + # Exclude swagger-ui + if not request.path.startswith("/api"): + auth = request.headers.get("Authorization") + if auth is None: + return jsonify({"error": "Missing Authorization header"}), 401 + + # Check if token is valid + if auth != "Bearer " + app.config["AUTH_TOKEN"]: + return jsonify({"error": "Invalid token"}), 401 - # Check if token is valid - if auth != "Bearer " + app.config['AUTH_TOKEN']: - return jsonify({"error": "Invalid token"}), 401 - api.add_resource(Users, "/users") + api.add_resource(User, "/users/") api.add_resource(DecryptDisk, "/decryptDisk") - from selfprivacy_api.resources.system import api_system - from selfprivacy_api.resources.services import services as api_services app.register_blueprint(api_system) app.register_blueprint(api_services) + @app.route("/api/swagger.json") + def spec(): + if app.config["ENABLE_SWAGGER"] == "1": + swag = swagger(app) + swag["info"]["version"] = "1.0" + swag["info"]["title"] = "SelfPrivacy API" + swag["info"]["description"] = "SelfPrivacy API" + swag["securityDefinitions"] = { + "bearerAuth": { + "type": "apiKey", + "name": "Authorization", + "in": "header", + } + } + swag["security"] = [{"bearerAuth": []}] + + return jsonify(swag) + return jsonify({}), 404 + + if app.config["ENABLE_SWAGGER"] == "1": + app.register_blueprint(swagger_blueprint, url_prefix="/api/docs") + return app if __name__ == "__main__": - app = create_app() - app.run(port=5050, debug=False) + created_app = create_app() + created_app.run(port=5050, debug=False) diff --git a/selfprivacy_api/resources/common.py b/selfprivacy_api/resources/common.py index 001e686..c75f22a 100644 --- a/selfprivacy_api/resources/common.py +++ b/selfprivacy_api/resources/common.py @@ -1,20 +1,56 @@ #!/usr/bin/env python3 -from flask import Flask, jsonify, request, json -from flask_restful import Resource +"""Unassigned views""" import subprocess +from flask_restful import Resource, reqparse -from selfprivacy_api.utils import get_domain -# Decrypt disk class DecryptDisk(Resource): - def post(self): - decryptionCommand = """ - echo -n {0} | cryptsetup luksOpen /dev/sdb decryptedVar""".format( - request.headers.get("X-Decryption-Key") - ) + """Decrypt disk""" - decryptionService = subprocess.Popen( - decryptionCommand, shell=True, stdout=subprocess.PIPE + def post(self): + """ + Decrypt /dev/sdb using cryptsetup luksOpen + --- + consumes: + - application/json + tags: + - System + security: + - bearerAuth: [] + parameters: + - in: body + name: body + required: true + description: Provide a password for decryption + schema: + type: object + required: + - password + properties: + password: + type: string + description: Decryption password. + responses: + 201: + description: OK + 400: + description: Bad request + 401: + description: Unauthorized + """ + parser = reqparse.RequestParser(bundle_errors=True) + parser.add_argument("password", type=str, required=True) + args = parser.parse_args() + + decryption_command = ["cryptsetup", "luksOpen", "/dev/sdb", "decryptedVar"] + + # TODO: Check if this works at all + + decryption_service = subprocess.Popen( + decryption_command, + shell=False, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, ) - decryptionService.communicate() - return {"status": decryptionService.returncode} + decryption_service.communicate(input=args["password"]) + return {"status": decryption_service.returncode}, 201 diff --git a/selfprivacy_api/resources/services/__init__.py b/selfprivacy_api/resources/services/__init__.py index 9a16e72..7e8d41f 100644 --- a/selfprivacy_api/resources/services/__init__.py +++ b/selfprivacy_api/resources/services/__init__.py @@ -1,9 +1,8 @@ +#!/usr/bin/env python3 +"""Services management module""" from flask import Blueprint from flask_restful import Api -services = Blueprint("services", __name__, url_prefix="/services") -api = Api(services) - from . import ( bitwarden, gitea, @@ -15,3 +14,6 @@ from . import ( restic, ssh, ) + +services = Blueprint("services", __name__, url_prefix="/services") +api = Api(services) diff --git a/selfprivacy_api/resources/services/bitwarden.py b/selfprivacy_api/resources/services/bitwarden.py index d0d10f9..5c037c9 100644 --- a/selfprivacy_api/resources/services/bitwarden.py +++ b/selfprivacy_api/resources/services/bitwarden.py @@ -1,25 +1,43 @@ #!/usr/bin/env python3 -from flask_restful import Resource -import portalocker +"""Bitwarden management module""" import json +import portalocker +from flask_restful import Resource from selfprivacy_api.resources.services import api -# Enable Bitwarden + class EnableBitwarden(Resource): + """Enable Bitwarden""" + def post(self): - with open("/etc/nixos/userdata/userdata.json", "r+", encoding="utf8") as f: - portalocker.lock(f, portalocker.LOCK_EX) + """ + Enable Bitwarden + --- + tags: + - Bitwarden + security: + - bearerAuth: [] + responses: + 200: + description: Bitwarden enabled + 401: + description: Unauthorized + """ + with open( + "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" + ) as userdata_file: + portalocker.lock(userdata_file, portalocker.LOCK_EX) try: - data = json.load(f) + data = json.load(userdata_file) if "bitwarden" not in data: data["bitwarden"] = {} data["bitwarden"]["enable"] = True - f.seek(0) - json.dump(data, f, indent=4) - f.truncate() + userdata_file.seek(0) + json.dump(data, userdata_file, indent=4) + userdata_file.truncate() finally: - portalocker.unlock(f) + portalocker.unlock(userdata_file) return { "status": 0, @@ -27,21 +45,37 @@ class EnableBitwarden(Resource): } -# Disable Bitwarden class DisableBitwarden(Resource): + """Disable Bitwarden""" + def post(self): - with open("/etc/nixos/userdata/userdata.json", "r+", encoding="utf8") as f: - portalocker.lock(f, portalocker.LOCK_EX) + """ + Disable Bitwarden + --- + tags: + - Bitwarden + security: + - bearerAuth: [] + responses: + 200: + description: Bitwarden disabled + 401: + description: Unauthorized + """ + with open( + "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" + ) as userdata_file: + portalocker.lock(userdata_file, portalocker.LOCK_EX) try: - data = json.load(f) + data = json.load(userdata_file) if "bitwarden" not in data: data["bitwarden"] = {} data["bitwarden"]["enable"] = False - f.seek(0) - json.dump(data, f, indent=4) - f.truncate() + userdata_file.seek(0) + json.dump(data, userdata_file, indent=4) + userdata_file.truncate() finally: - portalocker.unlock(f) + portalocker.unlock(userdata_file) return { "status": 0, diff --git a/selfprivacy_api/resources/services/gitea.py b/selfprivacy_api/resources/services/gitea.py index 743071f..4ae0b6a 100644 --- a/selfprivacy_api/resources/services/gitea.py +++ b/selfprivacy_api/resources/services/gitea.py @@ -1,25 +1,43 @@ #!/usr/bin/env python3 -from flask_restful import Resource -import portalocker +"""Gitea management module""" import json +import portalocker +from flask_restful import Resource from selfprivacy_api.resources.services import api -# Enable Gitea + class EnableGitea(Resource): + """Enable Gitea""" + def post(self): - with open("/etc/nixos/userdata/userdata.json", "r+", encoding="utf8") as f: - portalocker.lock(f, portalocker.LOCK_EX) + """ + Enable Gitea + --- + tags: + - Gitea + security: + - bearerAuth: [] + responses: + 200: + description: Gitea enabled + 401: + description: Unauthorized + """ + with open( + "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" + ) as userdata_file: + portalocker.lock(userdata_file, portalocker.LOCK_EX) try: - data = json.load(f) + data = json.load(userdata_file) if "gitea" not in data: data["gitea"] = {} data["gitea"]["enable"] = True - f.seek(0) - json.dump(data, f, indent=4) - f.truncate() + userdata_file.seek(0) + json.dump(data, userdata_file, indent=4) + userdata_file.truncate() finally: - portalocker.unlock(f) + portalocker.unlock(userdata_file) return { "status": 0, @@ -27,21 +45,37 @@ class EnableGitea(Resource): } -# Disable Gitea class DisableGitea(Resource): + """Disable Gitea""" + def post(self): - with open("/etc/nixos/userdata/userdata.json", "r+", encoding="utf8") as f: - portalocker.lock(f, portalocker.LOCK_EX) + """ + Disable Gitea + --- + tags: + - Gitea + security: + - bearerAuth: [] + responses: + 200: + description: Gitea disabled + 401: + description: Unauthorized + """ + with open( + "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" + ) as userdata_file: + portalocker.lock(userdata_file, portalocker.LOCK_EX) try: - data = json.load(f) + data = json.load(userdata_file) if "gitea" not in data: data["gitea"] = {} data["gitea"]["enable"] = False - f.seek(0) - json.dump(data, f, indent=4) - f.truncate() + userdata_file.seek(0) + json.dump(data, userdata_file, indent=4) + userdata_file.truncate() finally: - portalocker.unlock(f) + portalocker.unlock(userdata_file) return { "status": 0, diff --git a/selfprivacy_api/resources/services/mailserver.py b/selfprivacy_api/resources/services/mailserver.py index f4026b9..4015f9a 100644 --- a/selfprivacy_api/resources/services/mailserver.py +++ b/selfprivacy_api/resources/services/mailserver.py @@ -1,20 +1,36 @@ #!/usr/bin/env python3 -from flask_restful import Resource +"""Mail server management module""" import base64 import subprocess +from flask_restful import Resource from selfprivacy_api.resources.services import api from selfprivacy_api.utils import get_domain -# Get DKIM key from file + class DKIMKey(Resource): + """Get DKIM key from file""" + def get(self): + """ + Get DKIM key from file + --- + tags: + - Email + security: + - bearerAuth: [] + responses: + 200: + description: DKIM key encoded in base64 + 401: + description: Unauthorized + """ domain = get_domain() - catProcess = subprocess.Popen( + cat_process = subprocess.Popen( ["cat", "/var/dkim/" + domain + ".selector.txt"], stdout=subprocess.PIPE ) - dkim = catProcess.communicate()[0] + dkim = cat_process.communicate()[0] dkim = base64.b64encode(dkim) dkim = str(dkim, "utf-8") return dkim diff --git a/selfprivacy_api/resources/services/main.py b/selfprivacy_api/resources/services/main.py index b6fe46e..f14e095 100644 --- a/selfprivacy_api/resources/services/main.py +++ b/selfprivacy_api/resources/services/main.py @@ -1,42 +1,83 @@ #!/usr/bin/env python3 -from flask_restful import Resource, Api +"""Services status module""" import subprocess +from flask_restful import Resource from . import api -# Get service status + class ServiceStatus(Resource): + """Get service status""" + def get(self): - imapService = subprocess.Popen(["systemctl", "status", "dovecot2.service"]) - imapService.communicate()[0] - smtpService = subprocess.Popen(["systemctl", "status", "postfix.service"]) - smtpService.communicate()[0] - httpService = subprocess.Popen(["systemctl", "status", "nginx.service"]) - httpService.communicate()[0] - bitwardenService = subprocess.Popen( + """ + Get service status + --- + tags: + - Services + responses: + 200: + description: Service status + schema: + type: object + properties: + imap: + type: integer + description: Dovecot service status + smtp: + type: integer + description: Postfix service status + http: + type: integer + description: Nginx service status + bitwarden: + type: integer + description: Bitwarden service status + gitea: + type: integer + description: Gitea service status + nextcloud: + type: integer + description: Nextcloud service status + ocserv: + type: integer + description: OpenConnect VPN service status + pleroma: + type: integer + description: Pleroma service status + 401: + description: Unauthorized + """ + imap_service = subprocess.Popen(["systemctl", "status", "dovecot2.service"]) + imap_service.communicate()[0] + smtp_service = subprocess.Popen(["systemctl", "status", "postfix.service"]) + smtp_service.communicate()[0] + http_service = subprocess.Popen(["systemctl", "status", "nginx.service"]) + http_service.communicate()[0] + bitwarden_service = subprocess.Popen( ["systemctl", "status", "bitwarden_rs.service"] ) - bitwardenService.communicate()[0] - giteaService = subprocess.Popen(["systemctl", "status", "gitea.service"]) - giteaService.communicate()[0] - nextcloudService = subprocess.Popen( + bitwarden_service.communicate()[0] + gitea_service = subprocess.Popen(["systemctl", "status", "gitea.service"]) + gitea_service.communicate()[0] + nextcloud_service = subprocess.Popen( ["systemctl", "status", "phpfpm-nextcloud.service"] ) - nextcloudService.communicate()[0] - ocservService = subprocess.Popen(["systemctl", "status", "ocserv.service"]) - ocservService.communicate()[0] - pleromaService = subprocess.Popen(["systemctl", "status", "pleroma.service"]) - pleromaService.communicate()[0] + nextcloud_service.communicate()[0] + ocserv_service = subprocess.Popen(["systemctl", "status", "ocserv.service"]) + ocserv_service.communicate()[0] + pleroma_service = subprocess.Popen(["systemctl", "status", "pleroma.service"]) + pleroma_service.communicate()[0] return { - "imap": imapService.returncode, - "smtp": smtpService.returncode, - "http": httpService.returncode, - "bitwarden": bitwardenService.returncode, - "gitea": giteaService.returncode, - "nextcloud": nextcloudService.returncode, - "ocserv": ocservService.returncode, - "pleroma": pleromaService.returncode, + "imap": imap_service.returncode, + "smtp": smtp_service.returncode, + "http": http_service.returncode, + "bitwarden": bitwarden_service.returncode, + "gitea": gitea_service.returncode, + "nextcloud": nextcloud_service.returncode, + "ocserv": ocserv_service.returncode, + "pleroma": pleroma_service.returncode, } diff --git a/selfprivacy_api/resources/services/nextcloud.py b/selfprivacy_api/resources/services/nextcloud.py index 899ab5f..fc0bdbe 100644 --- a/selfprivacy_api/resources/services/nextcloud.py +++ b/selfprivacy_api/resources/services/nextcloud.py @@ -1,25 +1,43 @@ #!/usr/bin/env python3 -from flask_restful import Resource -import portalocker +"""Nextcloud management module""" import json +import portalocker +from flask_restful import Resource from selfprivacy_api.resources.services import api -# Enable Nextcloud + class EnableNextcloud(Resource): + """Enable Nextcloud""" + def post(self): - with portalocker.Lock("/etc/nixos/userdata/userdata.json", "r+") as f: - portalocker.lock(f, portalocker.LOCK_EX) + """ + Enable Nextcloud + --- + tags: + - Nextcloud + security: + - bearerAuth: [] + responses: + 200: + description: Nextcloud enabled + 401: + description: Unauthorized + """ + with open( + "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" + ) as userdata_file: + portalocker.lock(userdata_file, portalocker.LOCK_EX) try: - data = json.load(f) + data = json.load(userdata_file) if "nextcloud" not in data: data["nextcloud"] = {} data["nextcloud"]["enable"] = True - f.seek(0) - json.dump(data, f, indent=4) - f.truncate() + userdata_file.seek(0) + json.dump(data, userdata_file, indent=4) + userdata_file.truncate() finally: - portalocker.unlock(f) + portalocker.unlock(userdata_file) return { "status": 0, @@ -27,21 +45,37 @@ class EnableNextcloud(Resource): } -# Disable Nextcloud class DisableNextcloud(Resource): + """Disable Nextcloud""" + def post(self): - with portalocker.Lock("/etc/nixos/userdata/userdata.json", "r+") as f: - portalocker.lock(f, portalocker.LOCK_EX) + """ + Disable Nextcloud + --- + tags: + - Nextcloud + security: + - bearerAuth: [] + responses: + 200: + description: Nextcloud disabled + 401: + description: Unauthorized + """ + with open( + "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" + ) as userdata_file: + portalocker.lock(userdata_file, portalocker.LOCK_EX) try: - data = json.load(f) + data = json.load(userdata_file) if "nextcloud" not in data: data["nextcloud"] = {} data["nextcloud"]["enable"] = False - f.seek(0) - json.dump(data, f, indent=4) - f.truncate() + userdata_file.seek(0) + json.dump(data, userdata_file, indent=4) + userdata_file.truncate() finally: - portalocker.unlock(f) + portalocker.unlock(userdata_file) return { "status": 0, diff --git a/selfprivacy_api/resources/services/ocserv.py b/selfprivacy_api/resources/services/ocserv.py index 00d9ee2..6ef5667 100644 --- a/selfprivacy_api/resources/services/ocserv.py +++ b/selfprivacy_api/resources/services/ocserv.py @@ -1,25 +1,43 @@ #!/usr/bin/env python3 -from flask_restful import Resource -import portalocker +"""OpenConnect VPN server management module""" import json +import portalocker +from flask_restful import Resource from selfprivacy_api.resources.services import api -# Enable OpenConnect VPN server + class EnableOcserv(Resource): + """Enable OpenConnect VPN server""" + def post(self): - with portalocker.Lock("/etc/nixos/userdata/userdata.json", "r+") as f: - portalocker.lock(f, portalocker.LOCK_EX) + """ + Enable OCserv + --- + tags: + - OCserv + security: + - bearerAuth: [] + responses: + 200: + description: OCserv enabled + 401: + description: Unauthorized + """ + with open( + "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" + ) as userdata_file: + portalocker.lock(userdata_file, portalocker.LOCK_EX) try: - data = json.load(f) + data = json.load(userdata_file) if "ocserv" not in data: data["ocserv"] = {} data["ocserv"]["enable"] = True - f.seek(0) - json.dump(data, f, indent=4) - f.truncate() + userdata_file.seek(0) + json.dump(data, userdata_file, indent=4) + userdata_file.truncate() finally: - portalocker.unlock(f) + portalocker.unlock(userdata_file) return { "status": 0, @@ -27,21 +45,37 @@ class EnableOcserv(Resource): } -# Disable OpenConnect VPN server class DisableOcserv(Resource): + """Disable OpenConnect VPN server""" + def post(self): - with portalocker.Lock("/etc/nixos/userdata/userdata.json", "r+") as f: - portalocker.lock(f, portalocker.LOCK_EX) + """ + Disable OCserv + --- + tags: + - OCserv + security: + - bearerAuth: [] + responses: + 200: + description: OCserv disabled + 401: + description: Unauthorized + """ + with open( + "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" + ) as userdata_file: + portalocker.lock(userdata_file, portalocker.LOCK_EX) try: - data = json.load(f) + data = json.load(userdata_file) if "ocserv" not in data: data["ocserv"] = {} data["ocserv"]["enable"] = False - f.seek(0) - json.dump(data, f, indent=4) - f.truncate() + userdata_file.seek(0) + json.dump(data, userdata_file, indent=4) + userdata_file.truncate() finally: - portalocker.unlock(f) + portalocker.unlock(userdata_file) return { "status": 0, diff --git a/selfprivacy_api/resources/services/pleroma.py b/selfprivacy_api/resources/services/pleroma.py index dbc393d..201a5a6 100644 --- a/selfprivacy_api/resources/services/pleroma.py +++ b/selfprivacy_api/resources/services/pleroma.py @@ -1,25 +1,43 @@ #!/usr/bin/env python3 -from flask_restful import Resource -import portalocker +"""Pleroma management module""" import json +import portalocker +from flask_restful import Resource from selfprivacy_api.resources.services import api -# Enable Pleroma + class EnablePleroma(Resource): + """Enable Pleroma""" + def post(self): - with portalocker.Lock("/etc/nixos/userdata/userdata.json", "r+") as f: - portalocker.lock(f, portalocker.LOCK_EX) + """ + Enable Pleroma + --- + tags: + - Pleroma + security: + - bearerAuth: [] + responses: + 200: + description: Pleroma enabled + 401: + description: Unauthorized + """ + with open( + "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" + ) as userdata_file: + portalocker.lock(userdata_file, portalocker.LOCK_EX) try: - data = json.load(f) + data = json.load(userdata_file) if "pleroma" not in data: data["pleroma"] = {} data["pleroma"]["enable"] = True - f.seek(0) - json.dump(data, f, indent=4) - f.truncate() + userdata_file.seek(0) + json.dump(data, userdata_file, indent=4) + userdata_file.truncate() finally: - portalocker.unlock(f) + portalocker.unlock(userdata_file) return { "status": 0, @@ -27,21 +45,37 @@ class EnablePleroma(Resource): } -# Disable Pleroma class DisablePleroma(Resource): + """Disable Pleroma""" + def post(self): - with portalocker.Lock("/etc/nixos/userdata/userdata.json", "r+") as f: - portalocker.lock(f, portalocker.LOCK_EX) + """ + Disable Pleroma + --- + tags: + - Pleroma + security: + - bearerAuth: [] + responses: + 200: + description: Pleroma disabled + 401: + description: Unauthorized + """ + with open( + "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" + ) as userdata_file: + portalocker.lock(userdata_file, portalocker.LOCK_EX) try: - data = json.load(f) + data = json.load(userdata_file) if "pleroma" not in data: data["pleroma"] = {} data["pleroma"]["enable"] = False - f.seek(0) - json.dump(data, f, indent=4) - f.truncate() + userdata_file.seek(0) + json.dump(data, userdata_file, indent=4) + userdata_file.truncate() finally: - portalocker.unlock(f) + portalocker.unlock(userdata_file) return { "status": 0, diff --git a/selfprivacy_api/resources/services/restic.py b/selfprivacy_api/resources/services/restic.py index 2b93299..be911ca 100644 --- a/selfprivacy_api/resources/services/restic.py +++ b/selfprivacy_api/resources/services/restic.py @@ -1,69 +1,131 @@ #!/usr/bin/env python3 +"""Backups management module""" +import json +import subprocess from flask import request from flask_restful import Resource -import subprocess -import json from selfprivacy_api.resources.services import api -# List all restic backups -class ListAllBackups(Resource): - def get(self): - backupListingCommand = """ - restic -r b2:{0}:/sfbackup snapshots --password-file /var/lib/restic/rpass --json - """.format( - request.headers.get("X-Repository-Name") - ) - backupListingProcessDescriptor = subprocess.Popen( - backupListingCommand, - shell=True, +class ListAllBackups(Resource): + """List all restic backups""" + + def get(self): + """ + Get all restic backups + --- + tags: + - Backups + security: + - bearerAuth: [] + responses: + 200: + description: A list of snapshots + 400: + description: Bad request + 401: + description: Unauthorized + """ + repository_name = request.headers.get("X-Repository-Name") + + backup_listing_command = [ + "restic", + "-r", + f"b2:{repository_name}:/sfbackup", + "snapshots", + "--json", + ] + + with subprocess.Popen( + backup_listing_command, + shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - ) + ) as backup_listing_process_descriptor: + snapshots_list = backup_listing_process_descriptor.communicate()[0] - snapshotsList = backupListingProcessDescriptor.communicate()[0] - - return snapshotsList.decode("utf-8") + return snapshots_list.decode("utf-8") -# Create a new restic backup class AsyncCreateBackup(Resource): - def put(self): - backupCommand = """ - restic -r b2:{0}:/sfbackup --verbose backup /var --password-file /var/lib/restic/rpass > /tmp/backup.log - """.format( - request.headers.get("X-Repository-Name") - ) + """Create a new restic backup""" - backupProcessDescriptor = subprocess.Popen( - backupCommand, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT - ) + def put(self): + """ + Initiate a new restic backup + --- + tags: + - Backups + security: + - bearerAuth: [] + responses: + 200: + description: Backup creation has started + 400: + description: Bad request + 401: + description: Unauthorized + """ + repository_name = request.headers.get("X-Repository-Name") + + backup_command = [ + "restic", + "-r", + f"b2:{repository_name}:/sfbackup", + "--verbose", + "--json", + "backup", + "/var", + ] + + with open("/tmp/backup.log", "w", encoding="utf-8") as log_file: + subprocess.Popen( + backup_command, shell=False, stdout=log_file, stderr=subprocess.STDOUT + ) return { "status": 0, "message": "Backup creation has started", } + class CheckBackupStatus(Resource): + """Check current backup status""" + def get(self): - backupStatusCheckCommand = """ - tail -1 /tmp/backup.log """ + Get backup status + --- + tags: + - Backups + security: + - bearerAuth: [] + responses: + 200: + description: Backup status + 400: + description: Bad request + 401: + description: Unauthorized + """ + backup_status_check_command = ["tail", "-1", "/tmp/backup.log"] - backupStatusCheckProcessDescriptor = subprocess.Popen( - backupStatusCheckCommand, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT - ) - - backupProcessStatus = backupStatusCheckProcessDescriptor.communicate()[0].decode("utf-8") + with subprocess.Popen( + backup_status_check_command, + shell=False, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) as backup_status_check_process_descriptor: + backup_process_status = ( + backup_status_check_process_descriptor.communicate()[0].decode("utf-8") + ) try: - json.loads(backupProcessStatus) + json.loads(backup_process_status) except ValueError: - return { - "message": backupProcessStatus - } - return backupProcessStatus + return {"message": backup_process_status} + return backup_process_status api.add_resource(ListAllBackups, "/restic/backup/list") diff --git a/selfprivacy_api/resources/services/ssh.py b/selfprivacy_api/resources/services/ssh.py index 953511e..0ad6499 100644 --- a/selfprivacy_api/resources/services/ssh.py +++ b/selfprivacy_api/resources/services/ssh.py @@ -1,26 +1,43 @@ #!/usr/bin/env python3 -from flask import Blueprint, request -from flask_restful import Resource, reqparse -import portalocker +"""SSH management module""" import json +import portalocker +from flask_restful import Resource, reqparse from selfprivacy_api.resources.services import api -# Enable SSH + class EnableSSH(Resource): + """Enable SSH""" + def post(self): - with portalocker.Lock("/etc/nixos/userdata/userdata.json", "r+") as f: - portalocker.lock(f, portalocker.LOCK_EX) + """ + Enable SSH + --- + tags: + - SSH + security: + - bearerAuth: [] + responses: + 200: + description: SSH enabled + 401: + description: Unauthorized + """ + with open( + "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" + ) as userdata_file: + portalocker.lock(userdata_file, portalocker.LOCK_EX) try: - data = json.load(f) + data = json.load(userdata_file) if "ssh" not in data: data["ssh"] = {} data["ssh"]["enable"] = True - f.seek(0) - json.dump(data, f, indent=4) - f.truncate() + userdata_file.seek(0) + json.dump(data, userdata_file, indent=4) + userdata_file.truncate() finally: - portalocker.unlock(f) + portalocker.unlock(userdata_file) return { "status": 0, @@ -28,40 +45,75 @@ class EnableSSH(Resource): } -# Write new SSH key class WriteSSHKey(Resource): + """Write new SSH key""" + def put(self): + """ + Add a SSH root key + --- + consumes: + - application/json + tags: + - SSH + security: + - bearerAuth: [] + parameters: + - in: body + name: body + required: true + description: Public key to add + schema: + type: object + required: + - public_key + properties: + public_key: + type: string + description: ssh-ed25519 public key. + responses: + 201: + description: Key added + 400: + description: Bad request + 401: + description: Unauthorized + 409: + description: Key already exists + """ parser = reqparse.RequestParser() parser.add_argument( "public_key", type=str, required=True, help="Key cannot be blank!" ) args = parser.parse_args() - publicKey = args["public_key"] + public_key = args["public_key"] - with portalocker.Lock("/etc/nixos/userdata/userdata.json", "r+") as f: - portalocker.lock(f, portalocker.LOCK_EX) + with open( + "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" + ) as userdata_file: + portalocker.lock(userdata_file, portalocker.LOCK_EX) try: - data = json.load(f) + data = json.load(userdata_file) if "ssh" not in data: data["ssh"] = {} - # Return 400 if key already in array + # Return 409 if key already in array for key in data["ssh"]["rootSshKeys"]: - if key == publicKey: + if key == public_key: return { "error": "Key already exists", - }, 400 - data["ssh"]["rootSshKeys"].append(publicKey) - f.seek(0) - json.dump(data, f, indent=4) - f.truncate() + }, 409 + data["ssh"]["rootSshKeys"].append(public_key) + userdata_file.seek(0) + json.dump(data, userdata_file, indent=4) + userdata_file.truncate() finally: - portalocker.unlock(f) + portalocker.unlock(userdata_file) return { "status": 0, "message": "New SSH key successfully written", - } + }, 201 api.add_resource(EnableSSH, "/ssh/enable") diff --git a/selfprivacy_api/resources/system.py b/selfprivacy_api/resources/system.py index 0fb8ac5..57063b8 100644 --- a/selfprivacy_api/resources/system.py +++ b/selfprivacy_api/resources/system.py @@ -1,38 +1,96 @@ #!/usr/bin/env python3 +"""System management module""" +import subprocess from flask import Blueprint from flask_restful import Resource, Api -import subprocess api_system = Blueprint("system", __name__, url_prefix="/system") api = Api(api_system) -# Rebuild NixOS + class RebuildSystem(Resource): + """Rebuild NixOS""" + def get(self): - rebuildResult = subprocess.Popen(["nixos-rebuild", "switch"]) - rebuildResult.communicate()[0] - return rebuildResult.returncode + """ + Rebuild NixOS with nixos-rebuild switch + --- + tags: + - System + security: + - bearerAuth: [] + responses: + 200: + description: System rebuild has started + 401: + description: Unauthorized + """ + rebuild_result = subprocess.Popen(["nixos-rebuild", "switch"]) + rebuild_result.communicate()[0] + return rebuild_result.returncode -# Rollback NixOS class RollbackSystem(Resource): + """Rollback NixOS""" + def get(self): - rollbackResult = subprocess.Popen(["nixos-rebuild", "switch", "--rollback"]) - rollbackResult.communicate()[0] - return rollbackResult.returncode + """ + Rollback NixOS with nixos-rebuild switch --rollback + --- + tags: + - System + security: + - bearerAuth: [] + responses: + 200: + description: System rollback has started + 401: + description: Unauthorized + """ + rollback_result = subprocess.Popen(["nixos-rebuild", "switch", "--rollback"]) + rollback_result.communicate()[0] + return rollback_result.returncode -# Upgrade NixOS class UpgradeSystem(Resource): + """Upgrade NixOS""" + def get(self): - upgradeResult = subprocess.Popen(["nixos-rebuild", "switch", "--upgrade"]) - upgradeResult.communicate()[0] - return upgradeResult.returncode + """ + Upgrade NixOS with nixos-rebuild switch --upgrade + --- + tags: + - System + security: + - bearerAuth: [] + responses: + 200: + description: System upgrade has started + 401: + description: Unauthorized + """ + upgrade_result = subprocess.Popen(["nixos-rebuild", "switch", "--upgrade"]) + upgrade_result.communicate()[0] + return upgrade_result.returncode -# Get system version from uname class SystemVersion(Resource): + """Get system version from uname""" + def get(self): + """ + Get system version from uname -a + --- + tags: + - System + security: + - bearerAuth: [] + responses: + 200: + description: OK + 401: + description: Unauthorized + """ return { "system_version": subprocess.check_output(["uname", "-a"]) .decode("utf-8") @@ -40,9 +98,23 @@ class SystemVersion(Resource): } -# Get python version class PythonVersion(Resource): + """Get python version""" + def get(self): + """ + Get python version used by this API + --- + tags: + - System + security: + - bearerAuth: [] + responses: + 200: + description: OK + 401: + description: Unauthorized + """ return subprocess.check_output(["python", "-V"]).decode("utf-8").strip() diff --git a/selfprivacy_api/resources/users.py b/selfprivacy_api/resources/users.py index 66e4360..ac7a58b 100644 --- a/selfprivacy_api/resources/users.py +++ b/selfprivacy_api/resources/users.py @@ -1,88 +1,181 @@ #!/usr/bin/env python3 -from flask import Blueprint, jsonify, request -from flask_restful import Resource, Api +"""Users management module""" import subprocess -import portalocker import json import re +import portalocker +from flask_restful import Resource, reqparse -from selfprivacy_api import resources -api_users = Blueprint("api_users", __name__) -api = Api(api_users) - -# Create a new user class Users(Resource): - def post(self): - rawPassword = request.headers.get("X-Password") - hashingCommand = """ - mkpasswd -m sha-512 {0} - """.format( - rawPassword - ) - passwordHashProcessDescriptor = subprocess.Popen( - hashingCommand, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT - ) - hashedPassword = passwordHashProcessDescriptor.communicate()[0] - hashedPassword = hashedPassword.decode("ascii") - hashedPassword = hashedPassword.rstrip() + """Users management""" - with open("/etc/nixos/userdata/userdata.json", "r+", encoding="utf8") as f: - portalocker.lock(f, portalocker.LOCK_EX) + def get(self): + """ + Get a list of users + --- + tags: + - Users + security: + - bearerAuth: [] + responses: + 200: + description: A list of users + 401: + description: Unauthorized + """ + with open( + "/etc/nixos/userdata/userdata.json", "r", encoding="utf-8" + ) as userdata_file: + portalocker.lock(userdata_file, portalocker.LOCK_SH) try: - data = json.load(f) - # Return 400 if username is not provided - if request.headers.get("X-User") is None: - return {"error": "username is required"}, 400 - # Return 400 if password is not provided - if request.headers.get("X-Password") is None: - return {"error": "password is required"}, 400 - # Check is username passes regex - if not re.match(r"^[a-z_][a-z0-9_]+$", request.headers.get("X-User")): - return {"error": "username must be alphanumeric"}, 400 - # Check if username less than 32 characters - if len(request.headers.get("X-User")) > 32: - return {"error": "username must be less than 32 characters"}, 400 + data = json.load(userdata_file) + users = [] + for user in data["users"]: + users.append(user["username"]) + finally: + portalocker.unlock(userdata_file) + return users + + def post(self): + """ + Create a new user + --- + consumes: + - application/json + tags: + - Users + security: + - bearerAuth: [] + parameters: + - in: body + name: user + required: true + description: User to create + schema: + type: object + required: + - username + - password + properties: + username: + type: string + description: Unix username. Must be alphanumeric and less than 32 characters + password: + type: string + description: Unix password. + responses: + 201: + description: Created user + 400: + description: Bad request + 401: + description: Unauthorized + 409: + description: User already exists + """ + parser = reqparse.RequestParser(bundle_errors=True) + parser.add_argument("username", type=str, required=True) + parser.add_argument("password", type=str, required=True) + args = parser.parse_args() + + hashing_command = ["mkpasswd", "-m", "sha-512", args["password"]] + password_hash_process_descriptor = subprocess.Popen( + hashing_command, + shell=False, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + hashed_password = password_hash_process_descriptor.communicate()[0] + hashed_password = hashed_password.decode("ascii") + hashed_password = hashed_password.rstrip() + + # Check is username passes regex + if not re.match(r"^[a-z_][a-z0-9_]+$", args["username"]): + return {"error": "username must be alphanumeric"}, 400 + # Check if username less than 32 characters + if len(args["username"]) > 32: + return {"error": "username must be less than 32 characters"}, 400 + + with open( + "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" + ) as userdata_file: + portalocker.lock(userdata_file, portalocker.LOCK_EX) + try: + data = json.load(userdata_file) + # Return 400 if user already exists for user in data["users"]: - if user["username"] == request.headers.get("X-User"): - return {"error": "User already exists"}, 400 + if user["username"] == args["username"]: + return {"error": "User already exists"}, 409 + if "users" not in data: data["users"] = [] data["users"].append( { - "username": request.headers.get("X-User"), - "hashedPassword": hashedPassword, + "username": args["username"], + "hashedPassword": hashed_password, } ) - f.seek(0) - json.dump(data, f, indent=4) - f.truncate() + userdata_file.seek(0) + json.dump(data, userdata_file, indent=4) + userdata_file.truncate() finally: - portalocker.unlock(f) + portalocker.unlock(userdata_file) - return {"result": 0} + return {"result": 0, "username": args["username"]}, 201 - def delete(self): - with open("/etc/nixos/userdata/userdata.json", "r+", encoding="utf8") as f: - portalocker.lock(f, portalocker.LOCK_EX) + +class User(Resource): + """Single user managment""" + + def delete(self, username): + """ + Delete a user + --- + tags: + - Users + security: + - bearerAuth: [] + parameters: + - in: path + name: username + required: true + description: User to delete + type: string + responses: + 200: + description: Deleted user + 400: + description: Bad request + 401: + description: Unauthorized + 404: + description: User not found + """ + with open( + "/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8" + ) as userdata_file: + portalocker.lock(userdata_file, portalocker.LOCK_EX) try: - data = json.load(f) + data = json.load(userdata_file) # Return 400 if username is not provided - if request.headers.get("X-User") is None: + if username is None: return {"error": "username is required"}, 400 + if username == data["username"]: + return {"error": "Cannot delete root user"}, 400 # Return 400 if user does not exist for user in data["users"]: - if user["username"] == request.headers.get("X-User"): + if user["username"] == username: data["users"].remove(user) break else: - return {"error": "User does not exist"}, 400 + return {"error": "User does not exist"}, 404 - f.seek(0) - json.dump(data, f, indent=4) - f.truncate() + userdata_file.seek(0) + json.dump(data, userdata_file, indent=4) + userdata_file.truncate() finally: - portalocker.unlock(f) + portalocker.unlock(userdata_file) - return {"result": 0} + return {"result": 0, "username": username} diff --git a/selfprivacy_api/utils.py b/selfprivacy_api/utils.py index 094d08e..b7ef2a8 100644 --- a/selfprivacy_api/utils.py +++ b/selfprivacy_api/utils.py @@ -1,7 +1,9 @@ #!/usr/bin/env python3 +"""Various utility functions""" + -# Get domain from /var/domain without trailing new line def get_domain(): - with open("/var/domain", "r") as f: - domain = f.readline().rstrip() + """Get domain from /var/domain without trailing new line""" + with open("/var/domain", "r", encoding="utf-8") as domain_file: + domain = domain_file.readline().rstrip() return domain diff --git a/setup.py b/setup.py index 0aa97ab..27d325e 100755 --- a/setup.py +++ b/setup.py @@ -1,8 +1,10 @@ from setuptools import setup, find_packages setup( - name='selfprivacy_api', - version='1.1.0', + name="selfprivacy_api", + version="1.1.0", packages=find_packages(), - scripts=['selfprivacy_api/app.py',], + scripts=[ + "selfprivacy_api/app.py", + ], )