diff --git a/tests/conftest.py b/tests/conftest.py index ea7a66a..891e4e9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,19 +4,78 @@ import os import pytest from fastapi.testclient import TestClient +import os.path as path +import datetime + +from selfprivacy_api.models.tokens.token import Token +from selfprivacy_api.repositories.tokens.json_tokens_repository import ( + JsonTokensRepository, +) + +from tests.common import read_json + +EMPTY_TOKENS_JSON = ' {"tokens": []}' + + +TOKENS_FILE_CONTENTS = { + "tokens": [ + { + "token": "TEST_TOKEN", + "name": "test_token", + "date": datetime.datetime(2022, 1, 14, 8, 31, 10, 789314), + }, + { + "token": "TEST_TOKEN2", + "name": "test_token2", + "date": datetime.datetime(2022, 1, 14, 8, 31, 10, 789314), + }, + ] +} def pytest_generate_tests(metafunc): os.environ["TEST_MODE"] = "true" +def global_data_dir(): + return path.join(path.dirname(__file__), "data") + + @pytest.fixture -def tokens_file(mocker, shared_datadir): - """Mock tokens file.""" - mock = mocker.patch( - "selfprivacy_api.utils.TOKENS_FILE", shared_datadir / "tokens.json" - ) - return mock +def empty_tokens(mocker, tmpdir): + tokenfile = tmpdir / "empty_tokens.json" + with open(tokenfile, "w") as file: + file.write(EMPTY_TOKENS_JSON) + mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=tokenfile) + assert read_json(tokenfile)["tokens"] == [] + return tmpdir + + +@pytest.fixture +def empty_json_repo(empty_tokens): + repo = JsonTokensRepository() + for token in repo.get_tokens(): + repo.delete_token(token) + assert repo.get_tokens() == [] + return repo + + +@pytest.fixture +def tokens_file(empty_json_repo, tmpdir): + """A state with tokens""" + for token in TOKENS_FILE_CONTENTS["tokens"]: + empty_json_repo._store_token( + Token( + token=token["token"], + device_name=token["name"], + created_at=token["date"], + ) + ) + # temporary return for compatibility with older tests + + tokenfile = tmpdir / "empty_tokens.json" + assert path.exists(tokenfile) + return tokenfile @pytest.fixture diff --git a/tests/test_graphql/data/gitkeep b/tests/test_graphql/data/gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_graphql/data/tokens.json b/tests/test_graphql/data/tokens.json deleted file mode 100644 index 9be9d02..0000000 --- a/tests/test_graphql/data/tokens.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "tokens": [ - { - "token": "TEST_TOKEN", - "name": "test_token", - "date": "2022-01-14 08:31:10.789314" - }, - { - "token": "TEST_TOKEN2", - "name": "test_token2", - "date": "2022-01-14 08:31:10.789314" - } - ] -} \ No newline at end of file diff --git a/tests/test_graphql/test_api_devices.py b/tests/test_graphql/test_api_devices.py index 07cf42a..c546238 100644 --- a/tests/test_graphql/test_api_devices.py +++ b/tests/test_graphql/test_api_devices.py @@ -17,12 +17,12 @@ TOKENS_FILE_CONTETS = { { "token": "TEST_TOKEN", "name": "test_token", - "date": "2022-01-14 08:31:10.789314", + "date": "2022-01-14T08:31:10.789314", }, { "token": "TEST_TOKEN2", "name": "test_token2", - "date": "2022-01-14 08:31:10.789314", + "date": "2022-01-14T08:31:10.789314", }, ] } @@ -118,7 +118,7 @@ def test_graphql_delete_token(authorized_client, tokens_file): { "token": "TEST_TOKEN", "name": "test_token", - "date": "2022-01-14 08:31:10.789314", + "date": "2022-01-14T08:31:10.789314", } ] } diff --git a/tests/test_graphql/test_repository/test_json_tokens_repository.py b/tests/test_graphql/test_repository/test_json_tokens_repository.py index af8c844..23df9df 100644 --- a/tests/test_graphql/test_repository/test_json_tokens_repository.py +++ b/tests/test_graphql/test_repository/test_json_tokens_repository.py @@ -25,7 +25,6 @@ from test_tokens_repository import ( mock_recovery_key_generate, mock_generate_token, mock_new_device_key_generate, - empty_keys, ) ORIGINAL_TOKEN_CONTENT = [ @@ -51,6 +50,18 @@ ORIGINAL_TOKEN_CONTENT = [ }, ] +EMPTY_KEYS_JSON = """ +{ + "tokens": [ + { + "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", + "name": "primary_token", + "date": "2022-07-15 17:41:31.675698" + } + ] +} +""" + @pytest.fixture def tokens(mocker, datadir): @@ -59,6 +70,22 @@ def tokens(mocker, datadir): return datadir +@pytest.fixture +def empty_keys(mocker, tmpdir): + tokens_file = tmpdir / "empty_keys.json" + with open(tokens_file, "w") as file: + file.write(EMPTY_KEYS_JSON) + mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=tokens_file) + assert read_json(tokens_file)["tokens"] == [ + { + "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", + "name": "primary_token", + "date": "2022-07-15 17:41:31.675698", + } + ] + return tmpdir + + @pytest.fixture def null_keys(mocker, datadir): mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "null_keys.json") diff --git a/tests/test_graphql/test_repository/test_json_tokens_repository/empty_keys.json b/tests/test_graphql/test_repository/test_json_tokens_repository/empty_keys.json deleted file mode 100644 index 2131ddf..0000000 --- a/tests/test_graphql/test_repository/test_json_tokens_repository/empty_keys.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "tokens": [ - { - "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", - "name": "primary_token", - "date": "2022-07-15 17:41:31.675698" - } - ] -} diff --git a/tests/test_graphql/test_repository/test_tokens_repository.py b/tests/test_graphql/test_repository/test_tokens_repository.py index 020a868..a2dbb7a 100644 --- a/tests/test_graphql/test_repository/test_tokens_repository.py +++ b/tests/test_graphql/test_repository/test_tokens_repository.py @@ -16,13 +16,9 @@ from selfprivacy_api.repositories.tokens.exceptions import ( TokenNotFound, NewDeviceKeyNotFound, ) -from selfprivacy_api.repositories.tokens.json_tokens_repository import ( - JsonTokensRepository, -) from selfprivacy_api.repositories.tokens.redis_tokens_repository import ( RedisTokensRepository, ) -from tests.common import read_json ORIGINAL_DEVICE_NAMES = [ @@ -37,19 +33,6 @@ def mnemonic_from_hex(hexkey): return Mnemonic(language="english").to_mnemonic(bytes.fromhex(hexkey)) -@pytest.fixture -def empty_keys(mocker, datadir): - mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "empty_keys.json") - assert read_json(datadir / "empty_keys.json")["tokens"] == [ - { - "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", - "name": "primary_token", - "date": "2022-07-15 17:41:31.675698", - } - ] - return datadir - - @pytest.fixture def mock_new_device_key_generate(mocker): mock = mocker.patch( @@ -137,15 +120,6 @@ def mock_recovery_key_generate(mocker): return mock -@pytest.fixture -def empty_json_repo(empty_keys): - repo = JsonTokensRepository() - for token in repo.get_tokens(): - repo.delete_token(token) - assert repo.get_tokens() == [] - return repo - - @pytest.fixture def empty_redis_repo(): repo = RedisTokensRepository() diff --git a/tests/test_graphql/test_repository/test_tokens_repository/empty_keys.json b/tests/test_graphql/test_repository/test_tokens_repository/empty_keys.json deleted file mode 100644 index 2131ddf..0000000 --- a/tests/test_graphql/test_repository/test_tokens_repository/empty_keys.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "tokens": [ - { - "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", - "name": "primary_token", - "date": "2022-07-15 17:41:31.675698" - } - ] -} diff --git a/tests/test_rest_endpoints/data/tokens.json b/tests/test_rest_endpoints/data/tokens.json deleted file mode 100644 index 9be9d02..0000000 --- a/tests/test_rest_endpoints/data/tokens.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "tokens": [ - { - "token": "TEST_TOKEN", - "name": "test_token", - "date": "2022-01-14 08:31:10.789314" - }, - { - "token": "TEST_TOKEN2", - "name": "test_token2", - "date": "2022-01-14 08:31:10.789314" - } - ] -} \ No newline at end of file diff --git a/tests/test_rest_endpoints/services/data/gitkeep b/tests/test_rest_endpoints/services/data/gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_rest_endpoints/services/data/tokens.json b/tests/test_rest_endpoints/services/data/tokens.json deleted file mode 100644 index 9d35420..0000000 --- a/tests/test_rest_endpoints/services/data/tokens.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "tokens": [ - { - "token": "TEST_TOKEN", - "name": "Test Token", - "date": "2022-01-14 08:31:10.789314" - } - ] -} \ No newline at end of file diff --git a/tests/test_rest_endpoints/test_auth.py b/tests/test_rest_endpoints/test_auth.py index 12de0cf..40960f0 100644 --- a/tests/test_rest_endpoints/test_auth.py +++ b/tests/test_rest_endpoints/test_auth.py @@ -3,28 +3,19 @@ # pylint: disable=missing-function-docstring import datetime import pytest -from mnemonic import Mnemonic - -from selfprivacy_api.repositories.tokens.json_tokens_repository import ( - JsonTokensRepository, -) - -TOKEN_REPO = JsonTokensRepository() - -from tests.common import read_json, write_json -TOKENS_FILE_CONTETS = { +TOKENS_FILE_CONTENTS = { "tokens": [ { "token": "TEST_TOKEN", "name": "test_token", - "date": "2022-01-14 08:31:10.789314", + "date": datetime.datetime(2022, 1, 14, 8, 31, 10, 789314), }, { "token": "TEST_TOKEN2", "name": "test_token2", - "date": "2022-01-14 08:31:10.789314", + "date": datetime.datetime(2022, 1, 14, 8, 31, 10, 789314), }, ] } @@ -36,11 +27,116 @@ DATE_FORMATS = [ "%Y-%m-%d %H:%M:%S.%f", ] +# for expiration tests. If headache, consider freezegun +RECOVERY_KEY_VALIDATION_DATETIME = "selfprivacy_api.models.tokens.recovery_key.datetime" +DEVICE_KEY_VALIDATION_DATETIME = "selfprivacy_api.models.tokens.new_device_key.datetime" + + +class NearFuture(datetime.datetime): + @classmethod + def now(cls): + return datetime.datetime.now() + datetime.timedelta(minutes=13) + + +def assert_original(client): + new_tokens = rest_get_tokens_info(client) + + for token in TOKENS_FILE_CONTENTS["tokens"]: + assert_token_valid(client, token["token"]) + for new_token in new_tokens: + if new_token["name"] == token["name"]: + assert ( + datetime.datetime.fromisoformat(new_token["date"]) == token["date"] + ) + assert_no_recovery(client) + + +def assert_token_valid(client, token): + client.headers.update({"Authorization": "Bearer " + token}) + assert rest_get_tokens_info(client) is not None + + +def rest_get_tokens_info(client): + response = client.get("/auth/tokens") + assert response.status_code == 200 + return response.json() + + +def rest_try_authorize_new_device(client, token, device_name): + response = client.post( + "/auth/new_device/authorize", + json={ + "token": token, + "device": device_name, + }, + ) + return response + + +def rest_make_recovery_token(client, expires_at=None, timeformat=None, uses=None): + json = {} + + if expires_at is not None: + assert timeformat is not None + expires_at_str = expires_at.strftime(timeformat) + json["expiration"] = expires_at_str + + if uses is not None: + json["uses"] = uses + + if json == {}: + response = client.post("/auth/recovery_token") + else: + response = client.post( + "/auth/recovery_token", + json=json, + ) + + assert response.status_code == 200 + assert "token" in response.json() + return response.json()["token"] + + +def rest_get_recovery_status(client): + response = client.get("/auth/recovery_token") + assert response.status_code == 200 + return response.json() + + +def rest_get_recovery_date(client): + status = rest_get_recovery_status(client) + assert "date" in status + return status["date"] + + +def assert_recovery_recent(time_generated): + assert ( + datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f") + - datetime.timedelta(seconds=5) + < datetime.datetime.now() + ) + + +def assert_no_recovery(client): + assert not rest_get_recovery_status(client)["exists"] + + +def rest_recover_with_mnemonic(client, mnemonic_token, device_name): + recovery_response = client.post( + "/auth/recovery_token/use", + json={"token": mnemonic_token, "device": device_name}, + ) + assert recovery_response.status_code == 200 + new_token = recovery_response.json()["token"] + assert_token_valid(client, new_token) + return new_token + + +# Tokens + def test_get_tokens_info(authorized_client, tokens_file): - response = authorized_client.get("/auth/tokens") - assert response.status_code == 200 - assert response.json() == [ + assert rest_get_tokens_info(authorized_client) == [ {"name": "test_token", "date": "2022-01-14T08:31:10.789314", "is_caller": True}, { "name": "test_token2", @@ -55,10 +151,10 @@ def test_get_tokens_unauthorized(client, tokens_file): assert response.status_code == 401 -def test_delete_token_unauthorized(client, tokens_file): +def test_delete_token_unauthorized(client, authorized_client, tokens_file): response = client.delete("/auth/tokens") assert response.status_code == 401 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert_original(authorized_client) def test_delete_token(authorized_client, tokens_file): @@ -66,15 +162,9 @@ def test_delete_token(authorized_client, tokens_file): "/auth/tokens", json={"token_name": "test_token2"} ) assert response.status_code == 200 - assert read_json(tokens_file) == { - "tokens": [ - { - "token": "TEST_TOKEN", - "name": "test_token", - "date": "2022-01-14 08:31:10.789314", - } - ] - } + assert rest_get_tokens_info(authorized_client) == [ + {"name": "test_token", "date": "2022-01-14T08:31:10.789314", "is_caller": True} + ] def test_delete_self_token(authorized_client, tokens_file): @@ -82,7 +172,7 @@ def test_delete_self_token(authorized_client, tokens_file): "/auth/tokens", json={"token_name": "test_token"} ) assert response.status_code == 400 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert_original(authorized_client) def test_delete_nonexistent_token(authorized_client, tokens_file): @@ -90,131 +180,103 @@ def test_delete_nonexistent_token(authorized_client, tokens_file): "/auth/tokens", json={"token_name": "test_token3"} ) assert response.status_code == 404 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert_original(authorized_client) -def test_refresh_token_unauthorized(client, tokens_file): +def test_refresh_token_unauthorized(client, authorized_client, tokens_file): response = client.post("/auth/tokens") assert response.status_code == 401 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert_original(authorized_client) def test_refresh_token(authorized_client, tokens_file): response = authorized_client.post("/auth/tokens") assert response.status_code == 200 new_token = response.json()["token"] - assert TOKEN_REPO.get_token_by_token_string(new_token) is not None + assert_token_valid(authorized_client, new_token) -# new device +# New device -def test_get_new_device_auth_token_unauthorized(client, tokens_file): +def test_get_new_device_auth_token_unauthorized(client, authorized_client, tokens_file): response = client.post("/auth/new_device") assert response.status_code == 401 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert "token" not in response.json() + assert "detail" in response.json() + # We only can check existence of a token we know. -def test_get_new_device_auth_token(authorized_client, tokens_file): - response = authorized_client.post("/auth/new_device") +def test_get_and_delete_new_device_token(client, authorized_client, tokens_file): + token = rest_get_new_device_token(authorized_client) + response = authorized_client.delete("/auth/new_device", json={"token": token}) assert response.status_code == 200 - assert "token" in response.json() - token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex() - assert read_json(tokens_file)["new_device"]["token"] == token + assert rest_try_authorize_new_device(client, token, "new_device").status_code == 404 -def test_get_and_delete_new_device_token(authorized_client, tokens_file): - response = authorized_client.post("/auth/new_device") - assert response.status_code == 200 - assert "token" in response.json() - token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex() - assert read_json(tokens_file)["new_device"]["token"] == token - response = authorized_client.delete( - "/auth/new_device", json={"token": response.json()["token"]} - ) - assert response.status_code == 200 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS - - -def test_delete_token_unauthenticated(client, tokens_file): - response = client.delete("/auth/new_device") +def test_delete_token_unauthenticated(client, authorized_client, tokens_file): + token = rest_get_new_device_token(authorized_client) + response = client.delete("/auth/new_device", json={"token": token}) assert response.status_code == 401 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert rest_try_authorize_new_device(client, token, "new_device").status_code == 200 + + +def rest_get_new_device_token(client): + response = client.post("/auth/new_device") + assert response.status_code == 200 + assert "token" in response.json() + return response.json()["token"] def test_get_and_authorize_new_device(client, authorized_client, tokens_file): - response = authorized_client.post("/auth/new_device") + token = rest_get_new_device_token(authorized_client) + response = rest_try_authorize_new_device(client, token, "new_device") assert response.status_code == 200 - assert "token" in response.json() - token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex() - assert read_json(tokens_file)["new_device"]["token"] == token - response = client.post( - "/auth/new_device/authorize", - json={"token": response.json()["token"], "device": "new_device"}, - ) - assert response.status_code == 200 - assert read_json(tokens_file)["tokens"][2]["token"] == response.json()["token"] - assert read_json(tokens_file)["tokens"][2]["name"] == "new_device" + assert_token_valid(authorized_client, response.json()["token"]) -def test_authorize_new_device_with_invalid_token(client, tokens_file): - response = client.post( - "/auth/new_device/authorize", - json={"token": "invalid_token", "device": "new_device"}, - ) +def test_authorize_new_device_with_invalid_token( + client, authorized_client, tokens_file +): + response = rest_try_authorize_new_device(client, "invalid_token", "new_device") assert response.status_code == 404 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert_original(authorized_client) def test_get_and_authorize_used_token(client, authorized_client, tokens_file): - response = authorized_client.post("/auth/new_device") - assert response.status_code == 200 - assert "token" in response.json() - token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex() - assert read_json(tokens_file)["new_device"]["token"] == token - response = client.post( - "/auth/new_device/authorize", - json={"token": response.json()["token"], "device": "new_device"}, + token_to_be_used_2_times = rest_get_new_device_token(authorized_client) + response = rest_try_authorize_new_device( + client, token_to_be_used_2_times, "new_device" ) assert response.status_code == 200 - assert read_json(tokens_file)["tokens"][2]["token"] == response.json()["token"] - assert read_json(tokens_file)["tokens"][2]["name"] == "new_device" - response = client.post( - "/auth/new_device/authorize", - json={"token": response.json()["token"], "device": "new_device"}, + assert_token_valid(authorized_client, response.json()["token"]) + + response = rest_try_authorize_new_device( + client, token_to_be_used_2_times, "new_device" ) assert response.status_code == 404 def test_get_and_authorize_token_after_12_minutes( - client, authorized_client, tokens_file + client, authorized_client, tokens_file, mocker ): - response = authorized_client.post("/auth/new_device") - assert response.status_code == 200 - assert "token" in response.json() - token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex() - assert read_json(tokens_file)["new_device"]["token"] == token + token = rest_get_new_device_token(authorized_client) - file_data = read_json(tokens_file) - file_data["new_device"]["expiration"] = str( - datetime.datetime.now() - datetime.timedelta(minutes=13) - ) - write_json(tokens_file, file_data) + # TARDIS sounds + mock = mocker.patch(DEVICE_KEY_VALIDATION_DATETIME, NearFuture) - response = client.post( - "/auth/new_device/authorize", - json={"token": response.json()["token"], "device": "new_device"}, - ) + response = rest_try_authorize_new_device(client, token, "new_device") assert response.status_code == 404 + assert_original(authorized_client) -def test_authorize_without_token(client, tokens_file): +def test_authorize_without_token(client, authorized_client, tokens_file): response = client.post( "/auth/new_device/authorize", json={"device": "new_device"}, ) assert response.status_code == 422 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert_original(authorized_client) # Recovery tokens @@ -240,10 +302,10 @@ def test_authorize_without_token(client, tokens_file): # - if request is invalid, returns 400 -def test_get_recovery_token_status_unauthorized(client, tokens_file): +def test_get_recovery_token_status_unauthorized(client, authorized_client, tokens_file): response = client.get("/auth/recovery_token") assert response.status_code == 401 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert_original(authorized_client) def test_get_recovery_token_when_none_exists(authorized_client, tokens_file): @@ -256,31 +318,17 @@ def test_get_recovery_token_when_none_exists(authorized_client, tokens_file): "expiration": None, "uses_left": None, } - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert_original(authorized_client) def test_generate_recovery_token(authorized_client, client, tokens_file): # Generate token without expiration and uses_left - response = authorized_client.post("/auth/recovery_token") - assert response.status_code == 200 - assert "token" in response.json() - mnemonic_token = response.json()["token"] - token = Mnemonic(language="english").to_entropy(mnemonic_token).hex() - assert read_json(tokens_file)["recovery_token"]["token"] == token + mnemonic_token = rest_make_recovery_token(authorized_client) - time_generated = read_json(tokens_file)["recovery_token"]["date"] - assert time_generated is not None - # Assert that the token was generated near the current time - assert ( - datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f") - - datetime.timedelta(seconds=5) - < datetime.datetime.now() - ) + time_generated = rest_get_recovery_date(authorized_client) + assert_recovery_recent(time_generated) - # Try to get token status - response = authorized_client.get("/auth/recovery_token") - assert response.status_code == 200 - assert response.json() == { + assert rest_get_recovery_status(authorized_client) == { "exists": True, "valid": True, "date": time_generated, @@ -288,61 +336,26 @@ def test_generate_recovery_token(authorized_client, client, tokens_file): "uses_left": None, } - # Try to use the token - recovery_response = client.post( - "/auth/recovery_token/use", - json={"token": mnemonic_token, "device": "recovery_device"}, - ) - assert recovery_response.status_code == 200 - new_token = recovery_response.json()["token"] - assert read_json(tokens_file)["tokens"][2]["token"] == new_token - assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device" - - # Try to use token again - recovery_response = client.post( - "/auth/recovery_token/use", - json={"token": mnemonic_token, "device": "recovery_device2"}, - ) - assert recovery_response.status_code == 200 - new_token = recovery_response.json()["token"] - assert read_json(tokens_file)["tokens"][3]["token"] == new_token - assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2" + rest_recover_with_mnemonic(client, mnemonic_token, "recover_device") + # And again + rest_recover_with_mnemonic(client, mnemonic_token, "recover_device2") @pytest.mark.parametrize("timeformat", DATE_FORMATS) def test_generate_recovery_token_with_expiration_date( - authorized_client, client, tokens_file, timeformat + authorized_client, client, tokens_file, timeformat, mocker ): # Generate token with expiration date # Generate expiration date in the future expiration_date = datetime.datetime.now() + datetime.timedelta(minutes=5) - expiration_date_str = expiration_date.strftime(timeformat) - response = authorized_client.post( - "/auth/recovery_token", - json={"expiration": expiration_date_str}, - ) - assert response.status_code == 200 - assert "token" in response.json() - mnemonic_token = response.json()["token"] - token = Mnemonic(language="english").to_entropy(mnemonic_token).hex() - assert read_json(tokens_file)["recovery_token"]["token"] == token - assert datetime.datetime.strptime( - read_json(tokens_file)["recovery_token"]["expiration"], "%Y-%m-%dT%H:%M:%S.%f" - ) == datetime.datetime.strptime(expiration_date_str, timeformat) - - time_generated = read_json(tokens_file)["recovery_token"]["date"] - assert time_generated is not None - # Assert that the token was generated near the current time - assert ( - datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f") - - datetime.timedelta(seconds=5) - < datetime.datetime.now() + mnemonic_token = rest_make_recovery_token( + authorized_client, expires_at=expiration_date, timeformat=timeformat ) - # Try to get token status - response = authorized_client.get("/auth/recovery_token") - assert response.status_code == 200 - assert response.json() == { + time_generated = rest_get_recovery_date(authorized_client) + assert_recovery_recent(time_generated) + + assert rest_get_recovery_status(authorized_client) == { "exists": True, "valid": True, "date": time_generated, @@ -350,50 +363,22 @@ def test_generate_recovery_token_with_expiration_date( "uses_left": None, } - # Try to use the token - recovery_response = client.post( - "/auth/recovery_token/use", - json={"token": mnemonic_token, "device": "recovery_device"}, - ) - assert recovery_response.status_code == 200 - new_token = recovery_response.json()["token"] - assert read_json(tokens_file)["tokens"][2]["token"] == new_token - assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device" - - # Try to use token again - recovery_response = client.post( - "/auth/recovery_token/use", - json={"token": mnemonic_token, "device": "recovery_device2"}, - ) - assert recovery_response.status_code == 200 - new_token = recovery_response.json()["token"] - assert read_json(tokens_file)["tokens"][3]["token"] == new_token - assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2" + rest_recover_with_mnemonic(client, mnemonic_token, "recover_device") + # And again + rest_recover_with_mnemonic(client, mnemonic_token, "recover_device2") # Try to use token after expiration date - new_data = read_json(tokens_file) - new_data["recovery_token"]["expiration"] = datetime.datetime.now().strftime( - "%Y-%m-%dT%H:%M:%S.%f" - ) - write_json(tokens_file, new_data) + mock = mocker.patch(RECOVERY_KEY_VALIDATION_DATETIME, NearFuture) + device_name = "recovery_device3" recovery_response = client.post( "/auth/recovery_token/use", - json={"token": mnemonic_token, "device": "recovery_device3"}, + json={"token": mnemonic_token, "device": device_name}, ) assert recovery_response.status_code == 404 - # Assert that the token was not created in JSON - assert read_json(tokens_file)["tokens"] == new_data["tokens"] - - # Get the status of the token - response = authorized_client.get("/auth/recovery_token") - assert response.status_code == 200 - assert response.json() == { - "exists": True, - "valid": False, - "date": time_generated, - "expiration": new_data["recovery_token"]["expiration"], - "uses_left": None, - } + # Assert that the token was not created + assert device_name not in [ + token["name"] for token in rest_get_tokens_info(authorized_client) + ] @pytest.mark.parametrize("timeformat", DATE_FORMATS) @@ -408,7 +393,7 @@ def test_generate_recovery_token_with_expiration_in_the_past( json={"expiration": expiration_date_str}, ) assert response.status_code == 400 - assert "recovery_token" not in read_json(tokens_file) + assert_no_recovery(authorized_client) def test_generate_recovery_token_with_invalid_time_format( @@ -421,37 +406,19 @@ def test_generate_recovery_token_with_invalid_time_format( json={"expiration": expiration_date}, ) assert response.status_code == 422 - assert "recovery_token" not in read_json(tokens_file) + assert_no_recovery(authorized_client) def test_generate_recovery_token_with_limited_uses( authorized_client, client, tokens_file ): # Generate token with limited uses - response = authorized_client.post( - "/auth/recovery_token", - json={"uses": 2}, - ) - assert response.status_code == 200 - assert "token" in response.json() - mnemonic_token = response.json()["token"] - token = Mnemonic(language="english").to_entropy(mnemonic_token).hex() - assert read_json(tokens_file)["recovery_token"]["token"] == token - assert read_json(tokens_file)["recovery_token"]["uses_left"] == 2 + mnemonic_token = rest_make_recovery_token(authorized_client, uses=2) - # Get the date of the token - time_generated = read_json(tokens_file)["recovery_token"]["date"] - assert time_generated is not None - assert ( - datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f") - - datetime.timedelta(seconds=5) - < datetime.datetime.now() - ) + time_generated = rest_get_recovery_date(authorized_client) + assert_recovery_recent(time_generated) - # Try to get token status - response = authorized_client.get("/auth/recovery_token") - assert response.status_code == 200 - assert response.json() == { + assert rest_get_recovery_status(authorized_client) == { "exists": True, "valid": True, "date": time_generated, @@ -460,21 +427,9 @@ def test_generate_recovery_token_with_limited_uses( } # Try to use the token - recovery_response = client.post( - "/auth/recovery_token/use", - json={"token": mnemonic_token, "device": "recovery_device"}, - ) - assert recovery_response.status_code == 200 - new_token = recovery_response.json()["token"] - assert read_json(tokens_file)["tokens"][2]["token"] == new_token - assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device" + rest_recover_with_mnemonic(client, mnemonic_token, "recover_device") - assert read_json(tokens_file)["recovery_token"]["uses_left"] == 1 - - # Get the status of the token - response = authorized_client.get("/auth/recovery_token") - assert response.status_code == 200 - assert response.json() == { + assert rest_get_recovery_status(authorized_client) == { "exists": True, "valid": True, "date": time_generated, @@ -483,19 +438,9 @@ def test_generate_recovery_token_with_limited_uses( } # Try to use token again - recovery_response = client.post( - "/auth/recovery_token/use", - json={"token": mnemonic_token, "device": "recovery_device2"}, - ) - assert recovery_response.status_code == 200 - new_token = recovery_response.json()["token"] - assert read_json(tokens_file)["tokens"][3]["token"] == new_token - assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2" + rest_recover_with_mnemonic(client, mnemonic_token, "recover_device2") - # Get the status of the token - response = authorized_client.get("/auth/recovery_token") - assert response.status_code == 200 - assert response.json() == { + assert rest_get_recovery_status(authorized_client) == { "exists": True, "valid": False, "date": time_generated, @@ -510,8 +455,6 @@ def test_generate_recovery_token_with_limited_uses( ) assert recovery_response.status_code == 404 - assert read_json(tokens_file)["recovery_token"]["uses_left"] == 0 - def test_generate_recovery_token_with_negative_uses( authorized_client, client, tokens_file @@ -522,7 +465,7 @@ def test_generate_recovery_token_with_negative_uses( json={"uses": -2}, ) assert response.status_code == 400 - assert "recovery_token" not in read_json(tokens_file) + assert_no_recovery(authorized_client) def test_generate_recovery_token_with_zero_uses(authorized_client, client, tokens_file): @@ -532,4 +475,4 @@ def test_generate_recovery_token_with_zero_uses(authorized_client, client, token json={"uses": 0}, ) assert response.status_code == 400 - assert "recovery_token" not in read_json(tokens_file) + assert_no_recovery(authorized_client)