diff --git a/.vscode/settings.json b/.vscode/settings.json index 6f8c118..ccb092d 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,5 +1,10 @@ { "python.formatting.provider": "black", "python.linting.pylintEnabled": true, - "python.linting.enabled": true + "python.linting.enabled": true, + "python.testing.pytestArgs": [ + "tests" + ], + "python.testing.unittestEnabled": false, + "python.testing.pytestEnabled": true } \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 62e65ac..4e0e02e 100755 --- a/requirements.txt +++ b/requirements.txt @@ -9,6 +9,7 @@ flask-swagger-ui pytz huey gevent +mnemonic pytest coverage diff --git a/selfprivacy_api/app.py b/selfprivacy_api/app.py index 3b575db..7a8a25e 100644 --- a/selfprivacy_api/app.py +++ b/selfprivacy_api/app.py @@ -13,11 +13,14 @@ from selfprivacy_api.resources.users import User, Users from selfprivacy_api.resources.common import ApiVersion from selfprivacy_api.resources.system import api_system from selfprivacy_api.resources.services import services as api_services +from selfprivacy_api.resources.api_auth import auth as api_auth from selfprivacy_api.restic_controller.tasks import huey, init_restic from selfprivacy_api.migrations import run_migrations +from selfprivacy_api.utils.auth import is_token_valid + swagger_blueprint = get_swaggerui_blueprint( "/api/docs", "/api/swagger.json", config={"app_name": "SelfPrivacy API"} ) @@ -29,9 +32,6 @@ def create_app(test_config=None): api = Api(app) if test_config is None: - app.config["AUTH_TOKEN"] = os.environ.get("AUTH_TOKEN") - if app.config["AUTH_TOKEN"] is None: - raise ValueError("AUTH_TOKEN is not set") app.config["ENABLE_SWAGGER"] = os.environ.get("ENABLE_SWAGGER", "0") app.config["B2_BUCKET"] = os.environ.get("B2_BUCKET") else: @@ -40,14 +40,20 @@ def create_app(test_config=None): # Check bearer token @app.before_request def check_auth(): - # Exclude swagger-ui - if not request.path.startswith("/api"): + # Exclude swagger-ui, /auth/new_device/authorize, /auth/recovery_token/use + if request.path.startswith("/api"): + pass + elif request.path.startswith("/auth/new_device/authorize"): + pass + elif request.path.startswith("/auth/recovery_token/use"): + pass + else: auth = request.headers.get("Authorization") if auth is None: return jsonify({"error": "Missing Authorization header"}), 401 - - # Check if token is valid - if auth != "Bearer " + app.config["AUTH_TOKEN"]: + # Strip Bearer from auth header + auth = auth.replace("Bearer ", "") + if not is_token_valid(auth): return jsonify({"error": "Invalid token"}), 401 api.add_resource(ApiVersion, "/api/version") @@ -56,12 +62,13 @@ def create_app(test_config=None): app.register_blueprint(api_system) app.register_blueprint(api_services) + app.register_blueprint(api_auth) @app.route("/api/swagger.json") def spec(): if app.config["ENABLE_SWAGGER"] == "1": swag = swagger(app) - swag["info"]["version"] = "1.1.1" + swag["info"]["version"] = "1.2.0" swag["info"]["title"] = "SelfPrivacy API" swag["info"]["description"] = "SelfPrivacy API" swag["securityDefinitions"] = { diff --git a/selfprivacy_api/migrations/__init__.py b/selfprivacy_api/migrations/__init__.py index 32c467f..86ac342 100644 --- a/selfprivacy_api/migrations/__init__.py +++ b/selfprivacy_api/migrations/__init__.py @@ -1,7 +1,18 @@ +"""Migrations module. +Migrations module is introduced in v1.1.1 and provides one-shot +migrations which cannot be performed from the NixOS configuration file changes. +These migrations are checked and ran before every start of the API. + +You can disable certain migrations if needed by creating an array +at api.skippedMigrations in userdata.json and populating it +with IDs of the migrations to skip. +Adding DISABLE_ALL to that array disables the migrations module entirely. +""" from selfprivacy_api.utils import ReadUserData from selfprivacy_api.migrations.fix_nixos_config_branch import FixNixosConfigBranch +from selfprivacy_api.migrations.create_tokens_json import CreateTokensJson -migrations = [FixNixosConfigBranch()] +migrations = [FixNixosConfigBranch(), CreateTokensJson()] def run_migrations(): @@ -25,7 +36,7 @@ def run_migrations(): try: if migration.is_migration_needed(): migration.migrate() - except Exception as e: + except Exception as err: print(f"Error while migrating {migration.get_migration_name()}") - print(e) + print(err) print("Skipping this migration") diff --git a/selfprivacy_api/migrations/create_tokens_json.py b/selfprivacy_api/migrations/create_tokens_json.py new file mode 100644 index 0000000..38702f8 --- /dev/null +++ b/selfprivacy_api/migrations/create_tokens_json.py @@ -0,0 +1,58 @@ +from datetime import datetime +import os +import json +from pathlib import Path + +from selfprivacy_api.migrations.migration import Migration +from selfprivacy_api.utils import TOKENS_FILE, ReadUserData + + +class CreateTokensJson(Migration): + def get_migration_name(self): + return "create_tokens_json" + + def get_migration_description(self): + return """Selfprivacy API used a single token in userdata.json for authentication. + This migration creates a new tokens.json file with the old token in it. + This migration runs if the tokens.json file does not exist. + Old token is located at ["api"]["token"] in userdata.json. + tokens.json path is declared in TOKENS_FILE imported from utils.py + tokens.json must have the following format: + { + "tokens": [ + { + "token": "token_string", + "name": "Master Token", + "date": "current date from str(datetime.now())", + } + ] + } + tokens.json must have 0600 permissions. + """ + + def is_migration_needed(self): + return not os.path.exists(TOKENS_FILE) + + def migrate(self): + try: + print(f"Creating tokens.json file at {TOKENS_FILE}") + with ReadUserData() as userdata: + token = userdata["api"]["token"] + # Touch tokens.json with 0600 permissions + Path(TOKENS_FILE).touch(mode=0o600) + # Write token to tokens.json + structure = { + "tokens": [ + { + "token": token, + "name": "primary_token", + "date": str(datetime.now()), + } + ] + } + with open(TOKENS_FILE, "w", encoding="utf-8") as tokens: + json.dump(structure, tokens, indent=4) + print("Done") + except Exception as e: + print(e) + print("Error creating tokens.json") diff --git a/selfprivacy_api/migrations/fix_nixos_config_branch.py b/selfprivacy_api/migrations/fix_nixos_config_branch.py index cb1907d..fbb994c 100644 --- a/selfprivacy_api/migrations/fix_nixos_config_branch.py +++ b/selfprivacy_api/migrations/fix_nixos_config_branch.py @@ -24,10 +24,7 @@ class FixNixosConfigBranch(Migration): ["git", "rev-parse", "--abbrev-ref", "HEAD"], start_new_session=True ) os.chdir(current_working_directory) - if nixos_config_branch.decode("utf-8").strip() == "rolling-testing": - return True - else: - return False + return nixos_config_branch.decode("utf-8").strip() == "rolling-testing" except subprocess.CalledProcessError: os.chdir(current_working_directory) return False diff --git a/selfprivacy_api/migrations/migration.py b/selfprivacy_api/migrations/migration.py index b8ae261..1116672 100644 --- a/selfprivacy_api/migrations/migration.py +++ b/selfprivacy_api/migrations/migration.py @@ -1,16 +1,16 @@ from abc import ABC, abstractmethod -""" -Abstract Migration class -This class is used to define the structure of a migration -Migration has a function is_migration_needed() that returns True or False -Migration has a function migrate() that does the migration -Migration has a function get_migration_name() that returns the migration name -Migration has a function get_migration_description() that returns the migration description -""" - class Migration(ABC): + """ + Abstract Migration class + This class is used to define the structure of a migration + Migration has a function is_migration_needed() that returns True or False + Migration has a function migrate() that does the migration + Migration has a function get_migration_name() that returns the migration name + Migration has a function get_migration_description() that returns the migration description + """ + @abstractmethod def get_migration_name(self): pass diff --git a/selfprivacy_api/resources/api_auth/__init__.py b/selfprivacy_api/resources/api_auth/__init__.py new file mode 100644 index 0000000..9bd1703 --- /dev/null +++ b/selfprivacy_api/resources/api_auth/__init__.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python3 +"""API authentication module""" + +from flask import Blueprint +from flask_restful import Api + +auth = Blueprint("auth", __name__, url_prefix="/auth") +api = Api(auth) + +from . import ( + new_device, + recovery_token, + app_tokens, +) diff --git a/selfprivacy_api/resources/api_auth/app_tokens.py b/selfprivacy_api/resources/api_auth/app_tokens.py new file mode 100644 index 0000000..3a1c0d0 --- /dev/null +++ b/selfprivacy_api/resources/api_auth/app_tokens.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +"""App tokens management module""" +from flask import request +from flask_restful import Resource, reqparse + +from selfprivacy_api.resources.api_auth import api +from selfprivacy_api.utils.auth import ( + delete_token, + get_tokens_info, + is_token_name_exists, + is_token_name_pair_valid, + refresh_token, + get_token_name, +) + + +class Tokens(Resource): + """Token management class + GET returns the list of active devices. + DELETE invalidates token unless it is the last one or the caller uses this token. + POST refreshes the token of the caller. + """ + + def get(self): + """ + Get current device tokens + --- + tags: + - Tokens + security: + - bearerAuth: [] + responses: + 200: + description: List of tokens + 400: + description: Bad request + """ + caller_name = get_token_name(request.headers.get("Authorization").split(" ")[1]) + tokens = get_tokens_info() + # Retrun a list of tokens and if it is the caller's token + # it will be marked with a flag + return [ + { + "name": token["name"], + "date": token["date"], + "is_caller": token["name"] == caller_name, + } + for token in tokens + ] + + def delete(self): + """ + Delete token + --- + tags: + - Tokens + security: + - bearerAuth: [] + parameters: + - in: body + name: token + required: true + description: Token's name to delete + schema: + type: object + properties: + token: + type: string + description: Token name to delete + required: true + responses: + 200: + description: Token deleted + 400: + description: Bad request + 404: + description: Token not found + """ + parser = reqparse.RequestParser() + parser.add_argument( + "token_name", type=str, required=True, help="Token to delete" + ) + args = parser.parse_args() + token_name = args["token_name"] + if is_token_name_pair_valid( + token_name, request.headers.get("Authorization").split(" ")[1] + ): + return {"message": "Cannot delete caller's token"}, 400 + if not is_token_name_exists(token_name): + return {"message": "Token not found"}, 404 + delete_token(token_name) + return {"message": "Token deleted"}, 200 + + def post(self): + """ + Refresh token + --- + tags: + - Tokens + security: + - bearerAuth: [] + responses: + 200: + description: Token refreshed + 400: + description: Bad request + 404: + description: Token not found + """ + # Get token from header + token = request.headers.get("Authorization").split(" ")[1] + new_token = refresh_token(token) + if new_token is None: + return {"message": "Token not found"}, 404 + return {"token": new_token}, 200 + + +api.add_resource(Tokens, "/tokens") diff --git a/selfprivacy_api/resources/api_auth/new_device.py b/selfprivacy_api/resources/api_auth/new_device.py new file mode 100644 index 0000000..2c0bde1 --- /dev/null +++ b/selfprivacy_api/resources/api_auth/new_device.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 +"""New device auth module""" +from flask_restful import Resource, reqparse + +from selfprivacy_api.resources.api_auth import api +from selfprivacy_api.utils.auth import ( + get_new_device_auth_token, + use_new_device_auth_token, + delete_new_device_auth_token, +) + + +class NewDevice(Resource): + """New device auth class + POST returns a new token for the caller. + """ + + def post(self): + """ + Get new device token + --- + tags: + - Tokens + security: + - bearerAuth: [] + responses: + 200: + description: New device token + 400: + description: Bad request + """ + token = get_new_device_auth_token() + return {"token": token} + + def delete(self): + """ + Delete new device token + --- + tags: + - Tokens + security: + - bearerAuth: [] + responses: + 200: + description: New device token deleted + 400: + description: Bad request + """ + delete_new_device_auth_token() + return {"token": None} + + +class AuthorizeDevice(Resource): + """Authorize device class + POST authorizes the caller. + """ + + def post(self): + """ + Authorize device + --- + tags: + - Tokens + parameters: + - in: body + name: data + required: true + description: Who is authorizing + schema: + type: object + properties: + token: + type: string + description: Mnemonic token to authorize + device: + type: string + description: Device to authorize + responses: + 200: + description: Device authorized + 400: + description: Bad request + 404: + description: Token not found + """ + parser = reqparse.RequestParser() + parser.add_argument( + "token", type=str, required=True, help="Mnemonic token to authorize" + ) + parser.add_argument( + "device", type=str, required=True, help="Device to authorize" + ) + args = parser.parse_args() + auth_token = args["token"] + device = args["device"] + token = use_new_device_auth_token(auth_token, device) + if token is None: + return {"message": "Token not found"}, 404 + return {"message": "Device authorized", "token": token}, 200 + + +api.add_resource(NewDevice, "/new_device") +api.add_resource(AuthorizeDevice, "/new_device/authorize") diff --git a/selfprivacy_api/resources/api_auth/recovery_token.py b/selfprivacy_api/resources/api_auth/recovery_token.py new file mode 100644 index 0000000..fbd80d9 --- /dev/null +++ b/selfprivacy_api/resources/api_auth/recovery_token.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python3 +"""Recovery token module""" +from datetime import datetime +from flask_restful import Resource, reqparse + +from selfprivacy_api.resources.api_auth import api +from selfprivacy_api.utils.auth import ( + is_recovery_token_exists, + is_recovery_token_valid, + get_recovery_token_status, + generate_recovery_token, + use_mnemonic_recoverery_token, +) + + +class RecoveryToken(Resource): + """Recovery token class + GET returns the status of the recovery token. + POST generates a new recovery token. + """ + + def get(self): + """ + Get recovery token status + --- + tags: + - Tokens + security: + - bearerAuth: [] + responses: + 200: + description: Recovery token status + schema: + type: object + properties: + exists: + type: boolean + description: Recovery token exists + valid: + type: boolean + description: Recovery token is valid + date: + type: string + description: Recovery token date + expiration: + type: string + description: Recovery token expiration date + uses_left: + type: integer + description: Recovery token uses left + 400: + description: Bad request + """ + if not is_recovery_token_exists(): + return { + "exists": False, + "valid": False, + "date": None, + "expiration": None, + "uses_left": None, + } + status = get_recovery_token_status() + if not is_recovery_token_valid(): + return { + "exists": True, + "valid": False, + "date": status["date"], + "expiration": status["expiration"], + "uses_left": status["uses_left"], + } + return { + "exists": True, + "valid": True, + "date": status["date"], + "expiration": status["expiration"], + "uses_left": status["uses_left"], + } + + def post(self): + """ + Generate recovery token + --- + tags: + - Tokens + security: + - bearerAuth: [] + parameters: + - in: body + name: data + required: true + description: Token data + schema: + type: object + properties: + expiration: + type: string + description: Token expiration date + uses: + type: integer + description: Token uses + responses: + 200: + description: Recovery token generated + schema: + type: object + properties: + token: + type: string + description: Mnemonic recovery token + 400: + description: Bad request + """ + parser = reqparse.RequestParser() + parser.add_argument( + "expiration", type=str, required=False, help="Token expiration date" + ) + parser.add_argument("uses", type=int, required=False, help="Token uses") + args = parser.parse_args() + # Convert expiration date to datetime and return 400 if it is not valid + if args["expiration"]: + try: + expiration = datetime.strptime( + args["expiration"], "%Y-%m-%dT%H:%M:%S.%fZ" + ) + # Retrun 400 if expiration date is in the past + if expiration < datetime.now(): + return {"message": "Expiration date cannot be in the past"}, 400 + except ValueError: + return { + "error": "Invalid expiration date. Use YYYY-MM-DDTHH:MM:SS.SSSZ" + }, 400 + else: + expiration = None + if args["uses"] != None and args["uses"] < 1: + return {"message": "Uses must be greater than 0"}, 400 + # Generate recovery token + token = generate_recovery_token(expiration, args["uses"]) + return {"token": token} + + +class UseRecoveryToken(Resource): + """Use recovery token class + POST uses the recovery token. + """ + + def post(self): + """ + Use recovery token + --- + tags: + - Tokens + parameters: + - in: body + name: data + required: true + description: Token data + schema: + type: object + properties: + token: + type: string + description: Mnemonic recovery token + device: + type: string + description: Device to authorize + responses: + 200: + description: Recovery token used + schema: + type: object + properties: + token: + type: string + description: Device authorization token + 400: + description: Bad request + 404: + description: Token not found + """ + parser = reqparse.RequestParser() + parser.add_argument( + "token", type=str, required=True, help="Mnemonic recovery token" + ) + parser.add_argument( + "device", type=str, required=True, help="Device to authorize" + ) + args = parser.parse_args() + # Use recovery token + token = use_mnemonic_recoverery_token(args["token"], args["device"]) + if token is None: + return {"error": "Token not found"}, 404 + return {"token": token} + + +api.add_resource(RecoveryToken, "/recovery_token") +api.add_resource(UseRecoveryToken, "/recovery_token/use") diff --git a/selfprivacy_api/resources/common.py b/selfprivacy_api/resources/common.py index d771372..d54d954 100644 --- a/selfprivacy_api/resources/common.py +++ b/selfprivacy_api/resources/common.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 """Unassigned views""" -import subprocess -from flask_restful import Resource, reqparse +from flask_restful import Resource class ApiVersion(Resource): @@ -24,4 +23,4 @@ class ApiVersion(Resource): 401: description: Unauthorized """ - return {"version": "1.1.1"} + return {"version": "1.2.0"} diff --git a/selfprivacy_api/resources/services/ssh.py b/selfprivacy_api/resources/services/ssh.py index 1665751..43159d0 100644 --- a/selfprivacy_api/resources/services/ssh.py +++ b/selfprivacy_api/resources/services/ssh.py @@ -212,17 +212,16 @@ class SSHKeys(Resource): if "sshKeys" not in data: data["sshKeys"] = [] return data["sshKeys"] - else: - if "users" not in data: - data["users"] = [] - for user in data["users"]: - if user["username"] == username: - if "sshKeys" not in user: - user["sshKeys"] = [] - return user["sshKeys"] - return { - "error": "User not found", - }, 404 + if "users" not in data: + data["users"] = [] + for user in data["users"]: + if user["username"] == username: + if "sshKeys" not in user: + user["sshKeys"] = [] + return user["sshKeys"] + return { + "error": "User not found", + }, 404 def post(self, username): """ diff --git a/selfprivacy_api/resources/system.py b/selfprivacy_api/resources/system.py index 5dbc858..db988da 100644 --- a/selfprivacy_api/resources/system.py +++ b/selfprivacy_api/resources/system.py @@ -283,6 +283,8 @@ class PythonVersion(Resource): class PullRepositoryChanges(Resource): + """Pull NixOS config repository changes""" + def get(self): """ Pull Repository Changes @@ -324,12 +326,11 @@ class PullRepositoryChanges(Resource): "message": "Update completed successfully", "data": data, } - elif git_pull_process_descriptor.returncode > 0: - return { - "status": git_pull_process_descriptor.returncode, - "message": "Something went wrong", - "data": data, - }, 500 + return { + "status": git_pull_process_descriptor.returncode, + "message": "Something went wrong", + "data": data, + }, 500 api.add_resource(Timezone, "/configuration/timezone") diff --git a/selfprivacy_api/restic_controller/__init__.py b/selfprivacy_api/restic_controller/__init__.py index 24158b0..abb5dc8 100644 --- a/selfprivacy_api/restic_controller/__init__.py +++ b/selfprivacy_api/restic_controller/__init__.py @@ -105,10 +105,7 @@ class ResticController: "--json", ] - if ( - self.state == ResticStates.BACKING_UP - or self.state == ResticStates.RESTORING - ): + if self.state in (ResticStates.BACKING_UP, ResticStates.RESTORING): return with subprocess.Popen( backup_listing_command, @@ -129,10 +126,9 @@ class ResticController: if "Is there a repository at the following location?" in snapshots_list: self.state = ResticStates.NOT_INITIALIZED return - else: - self.state = ResticStates.ERROR - self.error_message = snapshots_list - return + self.state = ResticStates.ERROR + self.error_message = snapshots_list + return def initialize_repository(self): """ @@ -195,10 +191,7 @@ class ResticController: """ backup_status_check_command = ["tail", "-1", "/var/backup.log"] - if ( - self.state == ResticStates.NO_KEY - or self.state == ResticStates.NOT_INITIALIZED - ): + if self.state in (ResticStates.NO_KEY, ResticStates.NOT_INITIALIZED): return # If the log file does not exists diff --git a/selfprivacy_api/utils.py b/selfprivacy_api/utils/__init__.py similarity index 72% rename from selfprivacy_api/utils.py rename to selfprivacy_api/utils/__init__.py index 80c8e6d..5322fae 100644 --- a/selfprivacy_api/utils.py +++ b/selfprivacy_api/utils/__init__.py @@ -1,13 +1,22 @@ #!/usr/bin/env python3 """Various utility functions""" +from enum import Enum import json import portalocker USERDATA_FILE = "/etc/nixos/userdata/userdata.json" +TOKENS_FILE = "/etc/nixos/userdata/tokens.json" DOMAIN_FILE = "/var/domain" +class UserDataFiles(Enum): + """Enum for userdata files""" + + USERDATA = 0 + TOKENS = 1 + + def get_domain(): """Get domain from /var/domain without trailing new line""" with open(DOMAIN_FILE, "r", encoding="utf-8") as domain_file: @@ -18,8 +27,13 @@ def get_domain(): class WriteUserData(object): """Write userdata.json with lock""" - def __init__(self): - self.userdata_file = open(USERDATA_FILE, "r+", encoding="utf-8") + def __init__(self, file_type=UserDataFiles.USERDATA): + if file_type == UserDataFiles.USERDATA: + self.userdata_file = open(USERDATA_FILE, "r+", encoding="utf-8") + elif file_type == UserDataFiles.TOKENS: + self.userdata_file = open(TOKENS_FILE, "r+", encoding="utf-8") + else: + raise ValueError("Unknown file type") portalocker.lock(self.userdata_file, portalocker.LOCK_EX) self.data = json.load(self.userdata_file) @@ -38,8 +52,13 @@ class WriteUserData(object): class ReadUserData(object): """Read userdata.json with lock""" - def __init__(self): - self.userdata_file = open(USERDATA_FILE, "r", encoding="utf-8") + def __init__(self, file_type=UserDataFiles.USERDATA): + if file_type == UserDataFiles.USERDATA: + self.userdata_file = open(USERDATA_FILE, "r", encoding="utf-8") + elif file_type == UserDataFiles.TOKENS: + self.userdata_file = open(TOKENS_FILE, "r", encoding="utf-8") + else: + raise ValueError("Unknown file type") portalocker.lock(self.userdata_file, portalocker.LOCK_SH) self.data = json.load(self.userdata_file) diff --git a/selfprivacy_api/utils/auth.py b/selfprivacy_api/utils/auth.py new file mode 100644 index 0000000..1aa7e22 --- /dev/null +++ b/selfprivacy_api/utils/auth.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python3 +"""Token management utils""" +import secrets +from datetime import datetime, timedelta +import re + +from mnemonic import Mnemonic + +from . import ReadUserData, UserDataFiles, WriteUserData + +""" +Token are stored in the tokens.json file. +File contains device tokens, recovery token and new device auth token. +File structure: +{ + "tokens": [ + { + "token": "device token", + "name": "device name", + "date": "date of creation", + } + ], + "recovery_token": { + "token": "recovery token", + "date": "date of creation", + "expiration": "date of expiration", + "uses_left": "number of uses left" + }, + "new_device": { + "token": "new device auth token", + "date": "date of creation", + "expiration": "date of expiration", + } +} +Recovery token may or may not have expiration date and uses_left. +There may be no recovery token at all. +Device tokens must be unique. +""" + + +def _get_tokens(): + """Get all tokens as list of tokens of every device""" + with ReadUserData(UserDataFiles.TOKENS) as tokens: + return [token["token"] for token in tokens["tokens"]] + + +def _get_token_names(): + """Get all token names""" + with ReadUserData(UserDataFiles.TOKENS) as tokens: + return [t["name"] for t in tokens["tokens"]] + + +def _validate_token_name(name): + """Token name must be an alphanumeric string and not empty. + Replace invalid characters with '_' + If token name exists, add a random number to the end of the name until it is unique. + """ + if not re.match("^[a-zA-Z0-9]*$", name): + name = re.sub("[^a-zA-Z0-9]", "_", name) + if name == "": + name = "Unknown device" + while name in _get_token_names(): + name += str(secrets.randbelow(10)) + return name + + +def is_token_valid(token): + """Check if token is valid""" + if token in _get_tokens(): + return True + return False + + +def is_token_name_exists(token_name): + """Check if token name exists""" + with ReadUserData(UserDataFiles.TOKENS) as tokens: + return token_name in [t["name"] for t in tokens["tokens"]] + + +def is_token_name_pair_valid(token_name, token): + """Check if token name and token pair exists""" + with ReadUserData(UserDataFiles.TOKENS) as tokens: + for t in tokens["tokens"]: + if t["name"] == token_name and t["token"] == token: + return True + return False + + +def get_token_name(token): + """Return the name of the token provided""" + with ReadUserData(UserDataFiles.TOKENS) as tokens: + for t in tokens["tokens"]: + if t["token"] == token: + return t["name"] + return None + + +def get_tokens_info(): + """Get all tokens info without tokens themselves""" + with ReadUserData(UserDataFiles.TOKENS) as tokens: + return [ + {"name": token["name"], "date": token["date"]} for token in tokens["tokens"] + ] + + +def _generate_token(): + """Generates new token and makes sure it is unique""" + token = secrets.token_urlsafe(32) + while token in _get_tokens(): + token = secrets.token_urlsafe(32) + return token + + +def create_token(name): + """Create new token""" + token = _generate_token() + name = _validate_token_name(name) + with WriteUserData(UserDataFiles.TOKENS) as tokens: + tokens["tokens"].append( + { + "token": token, + "name": name, + "date": str(datetime.now()), + } + ) + return token + + +def delete_token(token_name): + """Delete token""" + with WriteUserData(UserDataFiles.TOKENS) as tokens: + tokens["tokens"] = [t for t in tokens["tokens"] if t["name"] != token_name] + + +def refresh_token(token): + """Change the token field of the existing token""" + new_token = _generate_token() + with WriteUserData(UserDataFiles.TOKENS) as tokens: + for t in tokens["tokens"]: + if t["token"] == token: + t["token"] = new_token + return new_token + return None + + +def is_recovery_token_exists(): + """Check if recovery token exists""" + with ReadUserData(UserDataFiles.TOKENS) as tokens: + return "recovery_token" in tokens + + +def is_recovery_token_valid(): + """Check if recovery token is valid""" + with ReadUserData(UserDataFiles.TOKENS) as tokens: + if "recovery_token" not in tokens: + return False + recovery_token = tokens["recovery_token"] + if "uses_left" in recovery_token and recovery_token["uses_left"] is not None: + if recovery_token["uses_left"] <= 0: + return False + if "expiration" not in recovery_token or recovery_token["expiration"] is None: + return True + return datetime.now() < datetime.strptime( + recovery_token["expiration"], "%Y-%m-%dT%H:%M:%S.%fZ" + ) + + +def get_recovery_token_status(): + """Get recovery token date of creation, expiration and uses left""" + with ReadUserData(UserDataFiles.TOKENS) as tokens: + if "recovery_token" not in tokens: + return None + recovery_token = tokens["recovery_token"] + return { + "date": recovery_token["date"], + "expiration": recovery_token["expiration"] + if "expiration" in recovery_token + else None, + "uses_left": recovery_token["uses_left"] + if "uses_left" in recovery_token + else None, + } + + +def _get_recovery_token(): + """Get recovery token""" + with ReadUserData(UserDataFiles.TOKENS) as tokens: + if "recovery_token" not in tokens: + return None + return tokens["recovery_token"]["token"] + + +def generate_recovery_token(expiration=None, uses_left=None): + """Generate a 24 bytes recovery token and return a mneomnic word list. + Write a string representation of the recovery token to the tokens.json file. + """ + # expires must be a date or None + # uses_left must be an integer or None + if expiration is not None: + if not isinstance(expiration, datetime): + raise TypeError("expires must be a datetime object") + if uses_left is not None: + if not isinstance(uses_left, int): + raise TypeError("uses_left must be an integer") + if uses_left <= 0: + raise ValueError("uses_left must be greater than 0") + + recovery_token = secrets.token_bytes(24) + recovery_token_str = recovery_token.hex() + with WriteUserData(UserDataFiles.TOKENS) as tokens: + tokens["recovery_token"] = { + "token": recovery_token_str, + "date": str(datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")), + "expiration": expiration.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + if expiration is not None + else None, + "uses_left": uses_left if uses_left is not None else None, + } + return Mnemonic(language="english").to_mnemonic(recovery_token) + + +def use_mnemonic_recoverery_token(mnemonic_phrase, name): + """Use the recovery token by converting the mnemonic word list to a byte array. + If the recovery token if invalid itself, return None + If the binary representation of phrase not matches + the byte array of the recovery token, return None. + If the mnemonic phrase is valid then generate a device token and return it. + Substract 1 from uses_left if it exists. + mnemonic_phrase is a string representation of the mnemonic word list. + """ + if not is_recovery_token_valid(): + return None + recovery_token_str = _get_recovery_token() + if recovery_token_str is None: + return None + recovery_token = bytes.fromhex(recovery_token_str) + if not Mnemonic(language="english").check(mnemonic_phrase): + return None + phrase_bytes = Mnemonic(language="english").to_entropy(mnemonic_phrase) + if phrase_bytes != recovery_token: + return None + token = _generate_token() + name = _validate_token_name(name) + with WriteUserData(UserDataFiles.TOKENS) as tokens: + tokens["tokens"].append( + { + "token": token, + "name": name, + "date": str(datetime.now()), + } + ) + if "recovery_token" in tokens: + if ( + "uses_left" in tokens["recovery_token"] + and tokens["recovery_token"]["uses_left"] is not None + ): + tokens["recovery_token"]["uses_left"] -= 1 + return token + + +def get_new_device_auth_token(): + """Generate a new device auth token which is valid for 10 minutes + and return a mnemonic phrase representation + Write token to the new_device of the tokens.json file. + """ + token = secrets.token_bytes(16) + token_str = token.hex() + with WriteUserData(UserDataFiles.TOKENS) as tokens: + tokens["new_device"] = { + "token": token_str, + "date": str(datetime.now()), + "expiration": str(datetime.now() + timedelta(minutes=10)), + } + return Mnemonic(language="english").to_mnemonic(token) + + +def _get_new_device_auth_token(): + """Get new device auth token. If it is expired, return None""" + with ReadUserData(UserDataFiles.TOKENS) as tokens: + if "new_device" not in tokens: + return None + new_device = tokens["new_device"] + if "expiration" not in new_device: + return None + if datetime.now() > datetime.strptime( + new_device["expiration"], "%Y-%m-%d %H:%M:%S.%f" + ): + return None + return new_device["token"] + + +def delete_new_device_auth_token(): + """Delete new device auth token""" + with WriteUserData(UserDataFiles.TOKENS) as tokens: + if "new_device" in tokens: + del tokens["new_device"] + + +def use_new_device_auth_token(mnemonic_phrase, name): + """Use the new device auth token by converting the mnemonic string to a byte array. + If the mnemonic phrase is valid then generate a device token and return it. + New device auth token must be deleted. + """ + token_str = _get_new_device_auth_token() + if token_str is None: + return None + token = bytes.fromhex(token_str) + if not Mnemonic(language="english").check(mnemonic_phrase): + return None + phrase_bytes = Mnemonic(language="english").to_entropy(mnemonic_phrase) + if phrase_bytes != token: + return None + token = create_token(name) + with WriteUserData(UserDataFiles.TOKENS) as tokens: + if "new_device" in tokens: + del tokens["new_device"] + return token diff --git a/setup.py b/setup.py index 27d325e..e5a98b0 100755 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages setup( name="selfprivacy_api", - version="1.1.0", + version="1.2.0", packages=find_packages(), scripts=[ "selfprivacy_api/app.py", diff --git a/shell.nix b/shell.nix new file mode 100644 index 0000000..79f3623 --- /dev/null +++ b/shell.nix @@ -0,0 +1,30 @@ +{ pkgs ? import {} }: +let + sp-python = pkgs.python39.withPackages (p: with p; [ + flask + flask-restful + setuptools + portalocker + flask-swagger + flask-swagger-ui + pytz + pytest + pytest-mock + pytest-datadir + huey + gevent + mnemonic + coverage + pylint + ]); +in +pkgs.mkShell { + buildInputs = [ + sp-python + pkgs.black + ]; + shellHook = '' + PYTHONPATH=${sp-python}/${sp-python.sitePackages} + # maybe set more env-vars + ''; +} \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index aab30dd..7a6fdea 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,12 +3,19 @@ from flask import testing from selfprivacy_api.app import create_app +@pytest.fixture +def tokens_file(mocker, shared_datadir): + mock = mocker.patch( + "selfprivacy_api.utils.TOKENS_FILE", shared_datadir / "tokens.json" + ) + return mock + + @pytest.fixture def app(): app = create_app( { - "AUTH_TOKEN": "TEST_TOKEN", - "ENABLE_SWAGGER": "0", + "ENABLE_SWAGGER": "1", } ) @@ -16,7 +23,7 @@ def app(): @pytest.fixture -def client(app): +def client(app, tokens_file): return app.test_client() @@ -45,17 +52,17 @@ class WrongAuthClient(testing.FlaskClient): @pytest.fixture -def authorized_client(app): +def authorized_client(app, tokens_file): app.test_client_class = AuthorizedClient return app.test_client() @pytest.fixture -def wrong_auth_client(app): +def wrong_auth_client(app, tokens_file): app.test_client_class = WrongAuthClient return app.test_client() @pytest.fixture -def runner(app): +def runner(app, tokens_file): return app.test_cli_runner() diff --git a/tests/data/tokens.json b/tests/data/tokens.json new file mode 100644 index 0000000..9be9d02 --- /dev/null +++ b/tests/data/tokens.json @@ -0,0 +1,14 @@ +{ + "tokens": [ + { + "token": "TEST_TOKEN", + "name": "test_token", + "date": "2022-01-14 08:31:10.789314" + }, + { + "token": "TEST_TOKEN2", + "name": "test_token2", + "date": "2022-01-14 08:31:10.789314" + } + ] +} \ No newline at end of file diff --git a/tests/services/data/tokens.json b/tests/services/data/tokens.json new file mode 100644 index 0000000..9d35420 --- /dev/null +++ b/tests/services/data/tokens.json @@ -0,0 +1,9 @@ +{ + "tokens": [ + { + "token": "TEST_TOKEN", + "name": "Test Token", + "date": "2022-01-14 08:31:10.789314" + } + ] +} \ No newline at end of file diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..819a385 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,528 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +import datetime +import json +import re +import pytest +from mnemonic import Mnemonic + + +TOKENS_FILE_CONTETS = { + "tokens": [ + { + "token": "TEST_TOKEN", + "name": "test_token", + "date": "2022-01-14 08:31:10.789314", + }, + { + "token": "TEST_TOKEN2", + "name": "test_token2", + "date": "2022-01-14 08:31:10.789314", + }, + ] +} + + +def read_json(file_path): + with open(file_path, "r", encoding="utf-8") as file: + return json.load(file) + + +def write_json(file_path, data): + with open(file_path, "w", encoding="utf-8") as file: + json.dump(data, file, indent=4) + + +def test_get_tokens_info(authorized_client, tokens_file): + response = authorized_client.get("/auth/tokens") + assert response.status_code == 200 + assert response.json == [ + {"name": "test_token", "date": "2022-01-14 08:31:10.789314", "is_caller": True}, + { + "name": "test_token2", + "date": "2022-01-14 08:31:10.789314", + "is_caller": False, + }, + ] + + +def test_get_tokens_unauthorized(client, tokens_file): + response = client.get("/auth/tokens") + assert response.status_code == 401 + + +def test_delete_token_unauthorized(client, tokens_file): + response = client.delete("/auth/tokens") + assert response.status_code == 401 + assert read_json(tokens_file) == TOKENS_FILE_CONTETS + + +def test_delete_token(authorized_client, tokens_file): + response = authorized_client.delete( + "/auth/tokens", json={"token_name": "test_token2"} + ) + assert response.status_code == 200 + assert read_json(tokens_file) == { + "tokens": [ + { + "token": "TEST_TOKEN", + "name": "test_token", + "date": "2022-01-14 08:31:10.789314", + } + ] + } + + +def test_delete_self_token(authorized_client, tokens_file): + response = authorized_client.delete( + "/auth/tokens", json={"token_name": "test_token"} + ) + assert response.status_code == 400 + assert read_json(tokens_file) == TOKENS_FILE_CONTETS + + +def test_delete_nonexistent_token(authorized_client, tokens_file): + response = authorized_client.delete( + "/auth/tokens", json={"token_name": "test_token3"} + ) + assert response.status_code == 404 + assert read_json(tokens_file) == TOKENS_FILE_CONTETS + + +def test_refresh_token_unauthorized(client, tokens_file): + response = client.post("/auth/tokens") + assert response.status_code == 401 + assert read_json(tokens_file) == TOKENS_FILE_CONTETS + + +def test_refresh_token(authorized_client, tokens_file): + response = authorized_client.post("/auth/tokens") + assert response.status_code == 200 + new_token = response.json["token"] + assert read_json(tokens_file)["tokens"][0]["token"] == new_token + + +# new device + + +def test_get_new_device_auth_token_unauthorized(client, tokens_file): + response = client.get("/auth/new_device") + assert response.status_code == 401 + assert read_json(tokens_file) == TOKENS_FILE_CONTETS + + +def test_get_new_device_auth_token(authorized_client, tokens_file): + response = authorized_client.post("/auth/new_device") + assert response.status_code == 200 + assert "token" in response.json + token = Mnemonic(language="english").to_entropy(response.json["token"]).hex() + assert read_json(tokens_file)["new_device"]["token"] == token + + +def test_get_and_delete_new_device_token(authorized_client, tokens_file): + response = authorized_client.post("/auth/new_device") + assert response.status_code == 200 + assert "token" in response.json + token = Mnemonic(language="english").to_entropy(response.json["token"]).hex() + assert read_json(tokens_file)["new_device"]["token"] == token + response = authorized_client.delete( + "/auth/new_device", json={"token": response.json["token"]} + ) + assert response.status_code == 200 + assert read_json(tokens_file) == TOKENS_FILE_CONTETS + + +def test_delete_token_unauthenticated(client, tokens_file): + response = client.delete("/auth/new_device") + assert response.status_code == 401 + assert read_json(tokens_file) == TOKENS_FILE_CONTETS + + +def test_get_and_authorize_new_device(client, authorized_client, tokens_file): + response = authorized_client.post("/auth/new_device") + assert response.status_code == 200 + assert "token" in response.json + token = Mnemonic(language="english").to_entropy(response.json["token"]).hex() + assert read_json(tokens_file)["new_device"]["token"] == token + response = client.post( + "/auth/new_device/authorize", + json={"token": response.json["token"], "device": "new_device"}, + ) + assert response.status_code == 200 + assert read_json(tokens_file)["tokens"][2]["token"] == response.json["token"] + assert read_json(tokens_file)["tokens"][2]["name"] == "new_device" + + +def test_authorize_new_device_with_invalid_token(client, tokens_file): + response = client.post( + "/auth/new_device/authorize", + json={"token": "invalid_token", "device": "new_device"}, + ) + assert response.status_code == 404 + assert read_json(tokens_file) == TOKENS_FILE_CONTETS + + +def test_get_and_authorize_used_token(client, authorized_client, tokens_file): + response = authorized_client.post("/auth/new_device") + assert response.status_code == 200 + assert "token" in response.json + token = Mnemonic(language="english").to_entropy(response.json["token"]).hex() + assert read_json(tokens_file)["new_device"]["token"] == token + response = client.post( + "/auth/new_device/authorize", + json={"token": response.json["token"], "device": "new_device"}, + ) + assert response.status_code == 200 + assert read_json(tokens_file)["tokens"][2]["token"] == response.json["token"] + assert read_json(tokens_file)["tokens"][2]["name"] == "new_device" + response = client.post( + "/auth/new_device/authorize", + json={"token": response.json["token"], "device": "new_device"}, + ) + assert response.status_code == 404 + + +def test_get_and_authorize_token_after_12_minutes( + client, authorized_client, tokens_file +): + response = authorized_client.post("/auth/new_device") + assert response.status_code == 200 + assert "token" in response.json + token = Mnemonic(language="english").to_entropy(response.json["token"]).hex() + assert read_json(tokens_file)["new_device"]["token"] == token + + file_data = read_json(tokens_file) + file_data["new_device"]["expiration"] = str( + datetime.datetime.now() - datetime.timedelta(minutes=13) + ) + write_json(tokens_file, file_data) + + response = client.post( + "/auth/new_device/authorize", + json={"token": response.json["token"], "device": "new_device"}, + ) + assert response.status_code == 404 + + +def test_authorize_without_token(client, tokens_file): + response = client.post( + "/auth/new_device/authorize", + json={"device": "new_device"}, + ) + assert response.status_code == 400 + assert read_json(tokens_file) == TOKENS_FILE_CONTETS + + +# Recovery tokens +# GET /auth/recovery_token returns token status +# - if token is valid, returns 200 and token status +# - token status: +# - exists (boolean) +# - valid (boolean) +# - date (string) +# - expiration (string) +# - uses_left (int) +# - if token is invalid, returns 400 and empty body +# POST /auth/recovery_token generates a new token +# has two optional parameters: +# - expiration (string in datetime format) +# - uses_left (int) +# POST /auth/recovery_token/use uses the token +# required arguments: +# - token (string) +# - device (string) +# - if token is valid, returns 200 and token +# - if token is invalid, returns 404 +# - if request is invalid, returns 400 + + +def test_get_recovery_token_status_unauthorized(client, tokens_file): + response = client.get("/auth/recovery_token") + assert response.status_code == 401 + assert read_json(tokens_file) == TOKENS_FILE_CONTETS + + +def test_get_recovery_token_when_none_exists(authorized_client, tokens_file): + response = authorized_client.get("/auth/recovery_token") + assert response.status_code == 200 + assert response.json == { + "exists": False, + "valid": False, + "date": None, + "expiration": None, + "uses_left": None, + } + assert read_json(tokens_file) == TOKENS_FILE_CONTETS + + +def test_generate_recovery_token(authorized_client, client, tokens_file): + # Generate token without expiration and uses_left + response = authorized_client.post("/auth/recovery_token") + assert response.status_code == 200 + assert "token" in response.json + mnemonic_token = response.json["token"] + token = Mnemonic(language="english").to_entropy(mnemonic_token).hex() + assert read_json(tokens_file)["recovery_token"]["token"] == token + + time_generated = read_json(tokens_file)["recovery_token"]["date"] + assert time_generated is not None + # Assert that the token was generated near the current time + assert ( + datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%fZ") + - datetime.timedelta(seconds=5) + < datetime.datetime.now() + ) + + # Try to get token status + response = client.get("/auth/recovery_token") + assert response.status_code == 200 + assert response.json == { + "exists": True, + "valid": True, + "date": time_generated, + "expiration": None, + "uses_left": None, + } + + # Try to use the token + recovery_response = client.post( + "/auth/recovery_token/use", + json={"token": mnemonic_token, "device": "recovery_device"}, + ) + assert recovery_response.status_code == 200 + new_token = recovery_response.json["token"] + assert read_json(tokens_file)["tokens"][2]["token"] == new_token + assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device" + + # Try to use token again + recovery_response = client.post( + "/auth/recovery_token/use", + json={"token": mnemonic_token, "device": "recovery_device2"}, + ) + assert recovery_response.status_code == 200 + new_token = recovery_response.json["token"] + assert read_json(tokens_file)["tokens"][3]["token"] == new_token + assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2" + + +def test_generate_recovery_token_with_expiration_date( + authorized_client, client, tokens_file +): + # Generate token with expiration date + # Generate expiration date in the future + # Expiration date format is YYYY-MM-DDTHH:MM:SS.SSSZ + expiration_date = datetime.datetime.now() + datetime.timedelta(minutes=5) + expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + response = authorized_client.post( + "/auth/recovery_token", + json={"expiration": expiration_date_str}, + ) + assert response.status_code == 200 + assert "token" in response.json + mnemonic_token = response.json["token"] + token = Mnemonic(language="english").to_entropy(mnemonic_token).hex() + assert read_json(tokens_file)["recovery_token"]["token"] == token + assert read_json(tokens_file)["recovery_token"]["expiration"] == expiration_date_str + + time_generated = read_json(tokens_file)["recovery_token"]["date"] + assert time_generated is not None + # Assert that the token was generated near the current time + assert ( + datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%fZ") + - datetime.timedelta(seconds=5) + < datetime.datetime.now() + ) + + # Try to get token status + response = client.get("/auth/recovery_token") + assert response.status_code == 200 + assert response.json == { + "exists": True, + "valid": True, + "date": time_generated, + "expiration": expiration_date_str, + "uses_left": None, + } + + # Try to use the token + recovery_response = client.post( + "/auth/recovery_token/use", + json={"token": mnemonic_token, "device": "recovery_device"}, + ) + assert recovery_response.status_code == 200 + new_token = recovery_response.json["token"] + assert read_json(tokens_file)["tokens"][2]["token"] == new_token + assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device" + + # Try to use token again + recovery_response = client.post( + "/auth/recovery_token/use", + json={"token": mnemonic_token, "device": "recovery_device2"}, + ) + assert recovery_response.status_code == 200 + new_token = recovery_response.json["token"] + assert read_json(tokens_file)["tokens"][3]["token"] == new_token + assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2" + + # Try to use token after expiration date + new_data = read_json(tokens_file) + new_data["recovery_token"]["expiration"] = datetime.datetime.now().strftime( + "%Y-%m-%dT%H:%M:%S.%fZ" + ) + write_json(tokens_file, new_data) + recovery_response = client.post( + "/auth/recovery_token/use", + json={"token": mnemonic_token, "device": "recovery_device3"}, + ) + assert recovery_response.status_code == 404 + # Assert that the token was not created in JSON + assert read_json(tokens_file)["tokens"] == new_data["tokens"] + + # Get the status of the token + response = client.get("/auth/recovery_token") + assert response.status_code == 200 + assert response.json == { + "exists": True, + "valid": False, + "date": time_generated, + "expiration": new_data["recovery_token"]["expiration"], + "uses_left": None, + } + + +def test_generate_recovery_token_with_expiration_in_the_past( + authorized_client, client, tokens_file +): + # Server must return 400 if expiration date is in the past + expiration_date = datetime.datetime.now() - datetime.timedelta(minutes=5) + expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + response = authorized_client.post( + "/auth/recovery_token", + json={"expiration": expiration_date_str}, + ) + assert response.status_code == 400 + assert "recovery_token" not in read_json(tokens_file) + + +def test_generate_recovery_token_with_invalid_time_format( + authorized_client, client, tokens_file +): + # Server must return 400 if expiration date is in the past + expiration_date = "invalid_time_format" + response = authorized_client.post( + "/auth/recovery_token", + json={"expiration": expiration_date}, + ) + assert response.status_code == 400 + assert "recovery_token" not in read_json(tokens_file) + + +def test_generate_recovery_token_with_limited_uses( + authorized_client, client, tokens_file +): + # Generate token with limited uses + response = authorized_client.post( + "/auth/recovery_token", + json={"uses": 2}, + ) + assert response.status_code == 200 + assert "token" in response.json + mnemonic_token = response.json["token"] + token = Mnemonic(language="english").to_entropy(mnemonic_token).hex() + assert read_json(tokens_file)["recovery_token"]["token"] == token + assert read_json(tokens_file)["recovery_token"]["uses_left"] == 2 + + # Get the date of the token + time_generated = read_json(tokens_file)["recovery_token"]["date"] + assert time_generated is not None + assert ( + datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%fZ") + - datetime.timedelta(seconds=5) + < datetime.datetime.now() + ) + + # Try to get token status + response = client.get("/auth/recovery_token") + assert response.status_code == 200 + assert response.json == { + "exists": True, + "valid": True, + "date": time_generated, + "expiration": None, + "uses_left": 2, + } + + # Try to use the token + recovery_response = client.post( + "/auth/recovery_token/use", + json={"token": mnemonic_token, "device": "recovery_device"}, + ) + assert recovery_response.status_code == 200 + new_token = recovery_response.json["token"] + assert read_json(tokens_file)["tokens"][2]["token"] == new_token + assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device" + + assert read_json(tokens_file)["recovery_token"]["uses_left"] == 1 + + # Get the status of the token + response = client.get("/auth/recovery_token") + assert response.status_code == 200 + assert response.json == { + "exists": True, + "valid": True, + "date": time_generated, + "expiration": None, + "uses_left": 1, + } + + # Try to use token again + recovery_response = client.post( + "/auth/recovery_token/use", + json={"token": mnemonic_token, "device": "recovery_device2"}, + ) + assert recovery_response.status_code == 200 + new_token = recovery_response.json["token"] + assert read_json(tokens_file)["tokens"][3]["token"] == new_token + assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2" + + # Get the status of the token + response = client.get("/auth/recovery_token") + assert response.status_code == 200 + assert response.json == { + "exists": True, + "valid": False, + "date": time_generated, + "expiration": None, + "uses_left": 0, + } + + # Try to use token after limited uses + recovery_response = client.post( + "/auth/recovery_token/use", + json={"token": mnemonic_token, "device": "recovery_device3"}, + ) + assert recovery_response.status_code == 404 + + assert read_json(tokens_file)["recovery_token"]["uses_left"] == 0 + + +def test_generate_recovery_token_with_negative_uses( + authorized_client, client, tokens_file +): + # Generate token with limited uses + response = authorized_client.post( + "/auth/recovery_token", + json={"uses": -2}, + ) + assert response.status_code == 400 + assert "recovery_token" not in read_json(tokens_file) + + +def test_generate_recovery_token_with_zero_uses(authorized_client, client, tokens_file): + # Generate token with limited uses + response = authorized_client.post( + "/auth/recovery_token", + json={"uses": 0}, + ) + assert response.status_code == 400 + assert "recovery_token" not in read_json(tokens_file) diff --git a/tests/test_common.py b/tests/test_common.py index f8aa36b..db60d84 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -3,6 +3,8 @@ import json import pytest +from selfprivacy_api.utils import WriteUserData, ReadUserData + def test_get_api_version(authorized_client): response = authorized_client.get("/api/version") @@ -14,3 +16,21 @@ def test_get_api_version_unauthorized(client): response = client.get("/api/version") assert response.status_code == 200 assert "version" in response.get_json() + + +def test_get_swagger_json(authorized_client): + response = authorized_client.get("/api/swagger.json") + assert response.status_code == 200 + assert "swagger" in response.get_json() + + +def test_read_invalid_user_data(): + with pytest.raises(ValueError): + with ReadUserData("invalid") as user_data: + pass + + +def test_write_invalid_user_data(): + with pytest.raises(ValueError): + with WriteUserData("invalid") as user_data: + pass