From 40501401b4e7059ec289af3740605a53b4833359 Mon Sep 17 00:00:00 2001 From: Inex Code Date: Mon, 24 Jan 2022 22:01:37 +0200 Subject: [PATCH] More auth tests --- .vscode/settings.json | 7 +- .../resources/api_auth/app_tokens.py | 2 - .../resources/api_auth/new_device.py | 3 +- selfprivacy_api/utils/auth.py | 9 +- tests/conftest.py | 6 +- tests/test_auth.py | 157 ++++++++++++++++++ tests/test_common.py | 14 ++ 7 files changed, 190 insertions(+), 8 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 6f8c118..ccb092d 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,5 +1,10 @@ { "python.formatting.provider": "black", "python.linting.pylintEnabled": true, - "python.linting.enabled": true + "python.linting.enabled": true, + "python.testing.pytestArgs": [ + "tests" + ], + "python.testing.unittestEnabled": false, + "python.testing.pytestEnabled": true } \ No newline at end of file diff --git a/selfprivacy_api/resources/api_auth/app_tokens.py b/selfprivacy_api/resources/api_auth/app_tokens.py index 05ebf91..09bc55c 100644 --- a/selfprivacy_api/resources/api_auth/app_tokens.py +++ b/selfprivacy_api/resources/api_auth/app_tokens.py @@ -98,8 +98,6 @@ class Tokens(Resource): """ # Get token from header token = request.headers.get("Authorization").split(" ")[1] - if not is_token_valid(token): - return {"message": "Token not found"}, 404 new_token = refresh_token(token) if new_token is None: return {"message": "Token not found"}, 404 diff --git a/selfprivacy_api/resources/api_auth/new_device.py b/selfprivacy_api/resources/api_auth/new_device.py index 61195c2..6961e90 100644 --- a/selfprivacy_api/resources/api_auth/new_device.py +++ b/selfprivacy_api/resources/api_auth/new_device.py @@ -29,7 +29,8 @@ class NewDevice(Resource): 400: description: Bad request """ - return get_new_device_auth_token() + token = get_new_device_auth_token() + return {"token": token} class AuthorizeDevice(Resource): diff --git a/selfprivacy_api/utils/auth.py b/selfprivacy_api/utils/auth.py index fb802ea..cbfd643 100644 --- a/selfprivacy_api/utils/auth.py +++ b/selfprivacy_api/utils/auth.py @@ -146,10 +146,10 @@ def is_recovery_token_valid(): if "recovery_token" not in tokens: return False recovery_token = tokens["recovery_token"] - if "uses_left" in recovery_token: + if "uses_left" in recovery_token and recovery_token["uses_left"] is not None: if recovery_token["uses_left"] <= 0: return False - if "expiration" not in recovery_token: + if "expiration" not in recovery_token or recovery_token["expiration"] is None: return True return datetime.now() < datetime.strptime( recovery_token["expiration"], "%Y-%m-%d %H:%M:%S.%f" @@ -238,7 +238,10 @@ def use_mnemonic_recoverery_token(mnemonic_phrase, name): } ) if "recovery_token" in tokens: - if "uses_left" in tokens["recovery_token"]: + if ( + "uses_left" in tokens["recovery_token"] + and tokens["recovery_token"]["uses_left"] is not None + ): tokens["recovery_token"]["uses_left"] -= 1 return token diff --git a/tests/conftest.py b/tests/conftest.py index 674e017..7a6fdea 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,11 +2,15 @@ import pytest from flask import testing from selfprivacy_api.app import create_app + @pytest.fixture def tokens_file(mocker, shared_datadir): - mock = mocker.patch("selfprivacy_api.utils.TOKENS_FILE", shared_datadir / "tokens.json") + mock = mocker.patch( + "selfprivacy_api.utils.TOKENS_FILE", shared_datadir / "tokens.json" + ) return mock + @pytest.fixture def app(): app = create_app( diff --git a/tests/test_auth.py b/tests/test_auth.py index 9fc617a..6ec61fd 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -1,7 +1,10 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument +import datetime import json import pytest +from mnemonic import Mnemonic + TOKENS_FILE_CONTETS = { "tokens": [ @@ -64,6 +67,7 @@ def test_delete_token(authorized_client, tokens_file): ] } + def test_delete_self_token(authorized_client, tokens_file): response = authorized_client.delete( "/auth/tokens", json={"token_name": "test_token"} @@ -71,6 +75,7 @@ def test_delete_self_token(authorized_client, tokens_file): assert response.status_code == 400 assert read_json(tokens_file) == TOKENS_FILE_CONTETS + def test_delete_nonexistent_token(authorized_client, tokens_file): response = authorized_client.delete( "/auth/tokens", json={"token_name": "test_token3"} @@ -78,13 +83,165 @@ def test_delete_nonexistent_token(authorized_client, tokens_file): assert response.status_code == 404 assert read_json(tokens_file) == TOKENS_FILE_CONTETS + def test_refresh_token_unauthorized(client, tokens_file): response = client.post("/auth/tokens") assert response.status_code == 401 assert read_json(tokens_file) == TOKENS_FILE_CONTETS + def test_refresh_token(authorized_client, tokens_file): response = authorized_client.post("/auth/tokens") assert response.status_code == 200 new_token = response.json["token"] assert read_json(tokens_file)["tokens"][0]["token"] == new_token + + +# new device + + +def test_get_new_device_auth_token_unauthorized(client, tokens_file): + response = client.get("/auth/new_device") + assert response.status_code == 401 + assert read_json(tokens_file) == TOKENS_FILE_CONTETS + + +def test_get_new_device_auth_token(authorized_client, tokens_file): + response = authorized_client.post("/auth/new_device") + assert response.status_code == 200 + assert "token" in response.json + token = Mnemonic(language="english").to_entropy(response.json["token"]).hex() + assert read_json(tokens_file)["new_device"]["token"] == token + + +def test_get_and_authorize_new_device(client, authorized_client, tokens_file): + response = authorized_client.post("/auth/new_device") + assert response.status_code == 200 + assert "token" in response.json + token = Mnemonic(language="english").to_entropy(response.json["token"]).hex() + assert read_json(tokens_file)["new_device"]["token"] == token + response = client.post( + "/auth/new_device/authorize", + json={"token": response.json["token"], "device": "new_device"}, + ) + assert response.status_code == 200 + assert read_json(tokens_file)["tokens"][2]["token"] == response.json["token"] + assert read_json(tokens_file)["tokens"][2]["name"] == "new_device" + + +def test_authorize_new_device_with_invalid_token(client, tokens_file): + response = client.post( + "/auth/new_device/authorize", + json={"token": "invalid_token", "device": "new_device"}, + ) + assert response.status_code == 404 + assert read_json(tokens_file) == TOKENS_FILE_CONTETS + + +def test_get_and_authorize_used_token(client, authorized_client, tokens_file): + response = authorized_client.post("/auth/new_device") + assert response.status_code == 200 + assert "token" in response.json + token = Mnemonic(language="english").to_entropy(response.json["token"]).hex() + assert read_json(tokens_file)["new_device"]["token"] == token + response = client.post( + "/auth/new_device/authorize", + json={"token": response.json["token"], "device": "new_device"}, + ) + assert response.status_code == 200 + assert read_json(tokens_file)["tokens"][2]["token"] == response.json["token"] + assert read_json(tokens_file)["tokens"][2]["name"] == "new_device" + response = client.post( + "/auth/new_device/authorize", + json={"token": response.json["token"], "device": "new_device"}, + ) + assert response.status_code == 404 + + +def test_get_and_authorize_token_after_12_minutes( + client, authorized_client, tokens_file +): + response = authorized_client.post("/auth/new_device") + assert response.status_code == 200 + assert "token" in response.json + token = Mnemonic(language="english").to_entropy(response.json["token"]).hex() + assert read_json(tokens_file)["new_device"]["token"] == token + + file_data = read_json(tokens_file) + file_data["new_device"]["expiration"] = str( + datetime.datetime.now() - datetime.timedelta(minutes=13) + ) + write_json(tokens_file, file_data) + + response = client.post( + "/auth/new_device/authorize", + json={"token": response.json["token"], "device": "new_device"}, + ) + assert response.status_code == 404 + + +def test_authorize_without_token(client, tokens_file): + response = client.post( + "/auth/new_device/authorize", + json={"device": "new_device"}, + ) + assert response.status_code == 400 + assert read_json(tokens_file) == TOKENS_FILE_CONTETS + + +# Recovery tokens +# GET /auth/recovery_token returns token status +# - if token is valid, returns 200 and token status +# - token status: +# - exists (boolean) +# - valid (boolean) +# - date (string) +# - expiration (string) +# - uses_left (int) +# - if token is invalid, returns 400 and empty body +# POST /auth/recovery_token generates a new token +# has two optional parameters: +# - expiration (string in datetime format) +# - uses_left (int) +# POST /auth/recovery_token/use uses the token +# required arguments: +# - token (string) +# - device (string) +# - if token is valid, returns 200 and token +# - if token is invalid, returns 404 +# - if request is invalid, returns 400 + + +def test_get_recovery_token_status_unauthorized(client, tokens_file): + response = client.get("/auth/recovery_token") + assert response.status_code == 401 + assert read_json(tokens_file) == TOKENS_FILE_CONTETS + + +def test_generate_recovery_token(authorized_client, client, tokens_file): + # Generate token without expiration and uses_left + response = authorized_client.post("/auth/recovery_token") + assert response.status_code == 200 + assert "token" in response.json + token = Mnemonic(language="english").to_entropy(response.json["token"]).hex() + assert read_json(tokens_file)["recovery_token"]["token"] == token + + # Try to use the token + recovery_response = client.post( + "/auth/recovery_token/use", + json={"token": response.json["token"], "device": "recovery_device"}, + ) + assert recovery_response.status_code == 200 + new_token = recovery_response.json["token"] + assert read_json(tokens_file)["tokens"][2]["token"] == new_token + assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device" + + # Try to use token again + recovery_response = client.post( + "/auth/recovery_token/use", + json={"token": response.json["token"], "device": "recovery_device2"}, + ) + assert recovery_response.status_code == 200 + new_token = recovery_response.json["token"] + assert read_json(tokens_file)["tokens"][3]["token"] == new_token + assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2" diff --git a/tests/test_common.py b/tests/test_common.py index ecd729c..db60d84 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -3,6 +3,8 @@ import json import pytest +from selfprivacy_api.utils import WriteUserData, ReadUserData + def test_get_api_version(authorized_client): response = authorized_client.get("/api/version") @@ -20,3 +22,15 @@ def test_get_swagger_json(authorized_client): response = authorized_client.get("/api/swagger.json") assert response.status_code == 200 assert "swagger" in response.get_json() + + +def test_read_invalid_user_data(): + with pytest.raises(ValueError): + with ReadUserData("invalid") as user_data: + pass + + +def test_write_invalid_user_data(): + with pytest.raises(ValueError): + with WriteUserData("invalid") as user_data: + pass