Merge pull request 'Token backend-agnostic rest API testing' (#31) from redis/rest-tests into redis/connection-pool

Reviewed-on: #31
pull/32/head
Inex Code 2023-01-06 11:43:11 +02:00
commit d84b4e9ad6
12 changed files with 299 additions and 351 deletions

View File

@ -4,19 +4,78 @@
import os import os
import pytest import pytest
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
import os.path as path
import datetime
from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
from tests.common import read_json
EMPTY_TOKENS_JSON = ' {"tokens": []}'
TOKENS_FILE_CONTENTS = {
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": datetime.datetime(2022, 1, 14, 8, 31, 10, 789314),
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": datetime.datetime(2022, 1, 14, 8, 31, 10, 789314),
},
]
}
def pytest_generate_tests(metafunc): def pytest_generate_tests(metafunc):
os.environ["TEST_MODE"] = "true" os.environ["TEST_MODE"] = "true"
def global_data_dir():
return path.join(path.dirname(__file__), "data")
@pytest.fixture @pytest.fixture
def tokens_file(mocker, shared_datadir): def empty_tokens(mocker, tmpdir):
"""Mock tokens file.""" tokenfile = tmpdir / "empty_tokens.json"
mock = mocker.patch( with open(tokenfile, "w") as file:
"selfprivacy_api.utils.TOKENS_FILE", shared_datadir / "tokens.json" file.write(EMPTY_TOKENS_JSON)
) mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=tokenfile)
return mock assert read_json(tokenfile)["tokens"] == []
return tmpdir
@pytest.fixture
def empty_json_repo(empty_tokens):
repo = JsonTokensRepository()
for token in repo.get_tokens():
repo.delete_token(token)
assert repo.get_tokens() == []
return repo
@pytest.fixture
def tokens_file(empty_json_repo, tmpdir):
"""A state with tokens"""
for token in TOKENS_FILE_CONTENTS["tokens"]:
empty_json_repo._store_token(
Token(
token=token["token"],
device_name=token["name"],
created_at=token["date"],
)
)
# temporary return for compatibility with older tests
tokenfile = tmpdir / "empty_tokens.json"
assert path.exists(tokenfile)
return tokenfile
@pytest.fixture @pytest.fixture

View File

View File

@ -1,14 +0,0 @@
{
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314"
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314"
}
]
}

View File

@ -17,12 +17,12 @@ TOKENS_FILE_CONTETS = {
{ {
"token": "TEST_TOKEN", "token": "TEST_TOKEN",
"name": "test_token", "name": "test_token",
"date": "2022-01-14 08:31:10.789314", "date": "2022-01-14T08:31:10.789314",
}, },
{ {
"token": "TEST_TOKEN2", "token": "TEST_TOKEN2",
"name": "test_token2", "name": "test_token2",
"date": "2022-01-14 08:31:10.789314", "date": "2022-01-14T08:31:10.789314",
}, },
] ]
} }
@ -118,7 +118,7 @@ def test_graphql_delete_token(authorized_client, tokens_file):
{ {
"token": "TEST_TOKEN", "token": "TEST_TOKEN",
"name": "test_token", "name": "test_token",
"date": "2022-01-14 08:31:10.789314", "date": "2022-01-14T08:31:10.789314",
} }
] ]
} }

View File

@ -25,7 +25,6 @@ from test_tokens_repository import (
mock_recovery_key_generate, mock_recovery_key_generate,
mock_generate_token, mock_generate_token,
mock_new_device_key_generate, mock_new_device_key_generate,
empty_keys,
) )
ORIGINAL_TOKEN_CONTENT = [ ORIGINAL_TOKEN_CONTENT = [
@ -51,6 +50,18 @@ ORIGINAL_TOKEN_CONTENT = [
}, },
] ]
EMPTY_KEYS_JSON = """
{
"tokens": [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698"
}
]
}
"""
@pytest.fixture @pytest.fixture
def tokens(mocker, datadir): def tokens(mocker, datadir):
@ -59,6 +70,22 @@ def tokens(mocker, datadir):
return datadir return datadir
@pytest.fixture
def empty_keys(mocker, tmpdir):
tokens_file = tmpdir / "empty_keys.json"
with open(tokens_file, "w") as file:
file.write(EMPTY_KEYS_JSON)
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=tokens_file)
assert read_json(tokens_file)["tokens"] == [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698",
}
]
return tmpdir
@pytest.fixture @pytest.fixture
def null_keys(mocker, datadir): def null_keys(mocker, datadir):
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "null_keys.json") mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "null_keys.json")

View File

@ -1,9 +0,0 @@
{
"tokens": [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698"
}
]
}

View File

@ -16,13 +16,9 @@ from selfprivacy_api.repositories.tokens.exceptions import (
TokenNotFound, TokenNotFound,
NewDeviceKeyNotFound, NewDeviceKeyNotFound,
) )
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
from selfprivacy_api.repositories.tokens.redis_tokens_repository import ( from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
RedisTokensRepository, RedisTokensRepository,
) )
from tests.common import read_json
ORIGINAL_DEVICE_NAMES = [ ORIGINAL_DEVICE_NAMES = [
@ -37,19 +33,6 @@ def mnemonic_from_hex(hexkey):
return Mnemonic(language="english").to_mnemonic(bytes.fromhex(hexkey)) return Mnemonic(language="english").to_mnemonic(bytes.fromhex(hexkey))
@pytest.fixture
def empty_keys(mocker, datadir):
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "empty_keys.json")
assert read_json(datadir / "empty_keys.json")["tokens"] == [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698",
}
]
return datadir
@pytest.fixture @pytest.fixture
def mock_new_device_key_generate(mocker): def mock_new_device_key_generate(mocker):
mock = mocker.patch( mock = mocker.patch(
@ -137,15 +120,6 @@ def mock_recovery_key_generate(mocker):
return mock return mock
@pytest.fixture
def empty_json_repo(empty_keys):
repo = JsonTokensRepository()
for token in repo.get_tokens():
repo.delete_token(token)
assert repo.get_tokens() == []
return repo
@pytest.fixture @pytest.fixture
def empty_redis_repo(): def empty_redis_repo():
repo = RedisTokensRepository() repo = RedisTokensRepository()

View File

@ -1,9 +0,0 @@
{
"tokens": [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698"
}
]
}

View File

@ -1,14 +0,0 @@
{
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314"
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314"
}
]
}

View File

@ -1,9 +0,0 @@
{
"tokens": [
{
"token": "TEST_TOKEN",
"name": "Test Token",
"date": "2022-01-14 08:31:10.789314"
}
]
}

View File

@ -3,28 +3,19 @@
# pylint: disable=missing-function-docstring # pylint: disable=missing-function-docstring
import datetime import datetime
import pytest import pytest
from mnemonic import Mnemonic
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
TOKEN_REPO = JsonTokensRepository()
from tests.common import read_json, write_json
TOKENS_FILE_CONTETS = { TOKENS_FILE_CONTENTS = {
"tokens": [ "tokens": [
{ {
"token": "TEST_TOKEN", "token": "TEST_TOKEN",
"name": "test_token", "name": "test_token",
"date": "2022-01-14 08:31:10.789314", "date": datetime.datetime(2022, 1, 14, 8, 31, 10, 789314),
}, },
{ {
"token": "TEST_TOKEN2", "token": "TEST_TOKEN2",
"name": "test_token2", "name": "test_token2",
"date": "2022-01-14 08:31:10.789314", "date": datetime.datetime(2022, 1, 14, 8, 31, 10, 789314),
}, },
] ]
} }
@ -36,11 +27,116 @@ DATE_FORMATS = [
"%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S.%f",
] ]
# for expiration tests. If headache, consider freezegun
RECOVERY_KEY_VALIDATION_DATETIME = "selfprivacy_api.models.tokens.recovery_key.datetime"
DEVICE_KEY_VALIDATION_DATETIME = "selfprivacy_api.models.tokens.new_device_key.datetime"
class NearFuture(datetime.datetime):
@classmethod
def now(cls):
return datetime.datetime.now() + datetime.timedelta(minutes=13)
def assert_original(client):
new_tokens = rest_get_tokens_info(client)
for token in TOKENS_FILE_CONTENTS["tokens"]:
assert_token_valid(client, token["token"])
for new_token in new_tokens:
if new_token["name"] == token["name"]:
assert (
datetime.datetime.fromisoformat(new_token["date"]) == token["date"]
)
assert_no_recovery(client)
def assert_token_valid(client, token):
client.headers.update({"Authorization": "Bearer " + token})
assert rest_get_tokens_info(client) is not None
def rest_get_tokens_info(client):
response = client.get("/auth/tokens")
assert response.status_code == 200
return response.json()
def rest_try_authorize_new_device(client, token, device_name):
response = client.post(
"/auth/new_device/authorize",
json={
"token": token,
"device": device_name,
},
)
return response
def rest_make_recovery_token(client, expires_at=None, timeformat=None, uses=None):
json = {}
if expires_at is not None:
assert timeformat is not None
expires_at_str = expires_at.strftime(timeformat)
json["expiration"] = expires_at_str
if uses is not None:
json["uses"] = uses
if json == {}:
response = client.post("/auth/recovery_token")
else:
response = client.post(
"/auth/recovery_token",
json=json,
)
assert response.status_code == 200
assert "token" in response.json()
return response.json()["token"]
def rest_get_recovery_status(client):
response = client.get("/auth/recovery_token")
assert response.status_code == 200
return response.json()
def rest_get_recovery_date(client):
status = rest_get_recovery_status(client)
assert "date" in status
return status["date"]
def assert_recovery_recent(time_generated):
assert (
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
)
def assert_no_recovery(client):
assert not rest_get_recovery_status(client)["exists"]
def rest_recover_with_mnemonic(client, mnemonic_token, device_name):
recovery_response = client.post(
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": device_name},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json()["token"]
assert_token_valid(client, new_token)
return new_token
# Tokens
def test_get_tokens_info(authorized_client, tokens_file): def test_get_tokens_info(authorized_client, tokens_file):
response = authorized_client.get("/auth/tokens") assert rest_get_tokens_info(authorized_client) == [
assert response.status_code == 200
assert response.json() == [
{"name": "test_token", "date": "2022-01-14T08:31:10.789314", "is_caller": True}, {"name": "test_token", "date": "2022-01-14T08:31:10.789314", "is_caller": True},
{ {
"name": "test_token2", "name": "test_token2",
@ -55,10 +151,10 @@ def test_get_tokens_unauthorized(client, tokens_file):
assert response.status_code == 401 assert response.status_code == 401
def test_delete_token_unauthorized(client, tokens_file): def test_delete_token_unauthorized(client, authorized_client, tokens_file):
response = client.delete("/auth/tokens") response = client.delete("/auth/tokens")
assert response.status_code == 401 assert response.status_code == 401
assert read_json(tokens_file) == TOKENS_FILE_CONTETS assert_original(authorized_client)
def test_delete_token(authorized_client, tokens_file): def test_delete_token(authorized_client, tokens_file):
@ -66,15 +162,9 @@ def test_delete_token(authorized_client, tokens_file):
"/auth/tokens", json={"token_name": "test_token2"} "/auth/tokens", json={"token_name": "test_token2"}
) )
assert response.status_code == 200 assert response.status_code == 200
assert read_json(tokens_file) == { assert rest_get_tokens_info(authorized_client) == [
"tokens": [ {"name": "test_token", "date": "2022-01-14T08:31:10.789314", "is_caller": True}
{ ]
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314",
}
]
}
def test_delete_self_token(authorized_client, tokens_file): def test_delete_self_token(authorized_client, tokens_file):
@ -82,7 +172,7 @@ def test_delete_self_token(authorized_client, tokens_file):
"/auth/tokens", json={"token_name": "test_token"} "/auth/tokens", json={"token_name": "test_token"}
) )
assert response.status_code == 400 assert response.status_code == 400
assert read_json(tokens_file) == TOKENS_FILE_CONTETS assert_original(authorized_client)
def test_delete_nonexistent_token(authorized_client, tokens_file): def test_delete_nonexistent_token(authorized_client, tokens_file):
@ -90,131 +180,103 @@ def test_delete_nonexistent_token(authorized_client, tokens_file):
"/auth/tokens", json={"token_name": "test_token3"} "/auth/tokens", json={"token_name": "test_token3"}
) )
assert response.status_code == 404 assert response.status_code == 404
assert read_json(tokens_file) == TOKENS_FILE_CONTETS assert_original(authorized_client)
def test_refresh_token_unauthorized(client, tokens_file): def test_refresh_token_unauthorized(client, authorized_client, tokens_file):
response = client.post("/auth/tokens") response = client.post("/auth/tokens")
assert response.status_code == 401 assert response.status_code == 401
assert read_json(tokens_file) == TOKENS_FILE_CONTETS assert_original(authorized_client)
def test_refresh_token(authorized_client, tokens_file): def test_refresh_token(authorized_client, tokens_file):
response = authorized_client.post("/auth/tokens") response = authorized_client.post("/auth/tokens")
assert response.status_code == 200 assert response.status_code == 200
new_token = response.json()["token"] new_token = response.json()["token"]
assert TOKEN_REPO.get_token_by_token_string(new_token) is not None assert_token_valid(authorized_client, new_token)
# new device # New device
def test_get_new_device_auth_token_unauthorized(client, tokens_file): def test_get_new_device_auth_token_unauthorized(client, authorized_client, tokens_file):
response = client.post("/auth/new_device") response = client.post("/auth/new_device")
assert response.status_code == 401 assert response.status_code == 401
assert read_json(tokens_file) == TOKENS_FILE_CONTETS assert "token" not in response.json()
assert "detail" in response.json()
# We only can check existence of a token we know.
def test_get_new_device_auth_token(authorized_client, tokens_file): def test_get_and_delete_new_device_token(client, authorized_client, tokens_file):
response = authorized_client.post("/auth/new_device") token = rest_get_new_device_token(authorized_client)
response = authorized_client.delete("/auth/new_device", json={"token": token})
assert response.status_code == 200 assert response.status_code == 200
assert "token" in response.json() assert rest_try_authorize_new_device(client, token, "new_device").status_code == 404
token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
def test_get_and_delete_new_device_token(authorized_client, tokens_file): def test_delete_token_unauthenticated(client, authorized_client, tokens_file):
response = authorized_client.post("/auth/new_device") token = rest_get_new_device_token(authorized_client)
assert response.status_code == 200 response = client.delete("/auth/new_device", json={"token": token})
assert "token" in response.json()
token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
response = authorized_client.delete(
"/auth/new_device", json={"token": response.json()["token"]}
)
assert response.status_code == 200
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
def test_delete_token_unauthenticated(client, tokens_file):
response = client.delete("/auth/new_device")
assert response.status_code == 401 assert response.status_code == 401
assert read_json(tokens_file) == TOKENS_FILE_CONTETS assert rest_try_authorize_new_device(client, token, "new_device").status_code == 200
def rest_get_new_device_token(client):
response = client.post("/auth/new_device")
assert response.status_code == 200
assert "token" in response.json()
return response.json()["token"]
def test_get_and_authorize_new_device(client, authorized_client, tokens_file): def test_get_and_authorize_new_device(client, authorized_client, tokens_file):
response = authorized_client.post("/auth/new_device") token = rest_get_new_device_token(authorized_client)
response = rest_try_authorize_new_device(client, token, "new_device")
assert response.status_code == 200 assert response.status_code == 200
assert "token" in response.json() assert_token_valid(authorized_client, response.json()["token"])
token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
response = client.post(
"/auth/new_device/authorize",
json={"token": response.json()["token"], "device": "new_device"},
)
assert response.status_code == 200
assert read_json(tokens_file)["tokens"][2]["token"] == response.json()["token"]
assert read_json(tokens_file)["tokens"][2]["name"] == "new_device"
def test_authorize_new_device_with_invalid_token(client, tokens_file): def test_authorize_new_device_with_invalid_token(
response = client.post( client, authorized_client, tokens_file
"/auth/new_device/authorize", ):
json={"token": "invalid_token", "device": "new_device"}, response = rest_try_authorize_new_device(client, "invalid_token", "new_device")
)
assert response.status_code == 404 assert response.status_code == 404
assert read_json(tokens_file) == TOKENS_FILE_CONTETS assert_original(authorized_client)
def test_get_and_authorize_used_token(client, authorized_client, tokens_file): def test_get_and_authorize_used_token(client, authorized_client, tokens_file):
response = authorized_client.post("/auth/new_device") token_to_be_used_2_times = rest_get_new_device_token(authorized_client)
assert response.status_code == 200 response = rest_try_authorize_new_device(
assert "token" in response.json() client, token_to_be_used_2_times, "new_device"
token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
response = client.post(
"/auth/new_device/authorize",
json={"token": response.json()["token"], "device": "new_device"},
) )
assert response.status_code == 200 assert response.status_code == 200
assert read_json(tokens_file)["tokens"][2]["token"] == response.json()["token"] assert_token_valid(authorized_client, response.json()["token"])
assert read_json(tokens_file)["tokens"][2]["name"] == "new_device"
response = client.post( response = rest_try_authorize_new_device(
"/auth/new_device/authorize", client, token_to_be_used_2_times, "new_device"
json={"token": response.json()["token"], "device": "new_device"},
) )
assert response.status_code == 404 assert response.status_code == 404
def test_get_and_authorize_token_after_12_minutes( def test_get_and_authorize_token_after_12_minutes(
client, authorized_client, tokens_file client, authorized_client, tokens_file, mocker
): ):
response = authorized_client.post("/auth/new_device") token = rest_get_new_device_token(authorized_client)
assert response.status_code == 200
assert "token" in response.json()
token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
file_data = read_json(tokens_file) # TARDIS sounds
file_data["new_device"]["expiration"] = str( mock = mocker.patch(DEVICE_KEY_VALIDATION_DATETIME, NearFuture)
datetime.datetime.now() - datetime.timedelta(minutes=13)
)
write_json(tokens_file, file_data)
response = client.post( response = rest_try_authorize_new_device(client, token, "new_device")
"/auth/new_device/authorize",
json={"token": response.json()["token"], "device": "new_device"},
)
assert response.status_code == 404 assert response.status_code == 404
assert_original(authorized_client)
def test_authorize_without_token(client, tokens_file): def test_authorize_without_token(client, authorized_client, tokens_file):
response = client.post( response = client.post(
"/auth/new_device/authorize", "/auth/new_device/authorize",
json={"device": "new_device"}, json={"device": "new_device"},
) )
assert response.status_code == 422 assert response.status_code == 422
assert read_json(tokens_file) == TOKENS_FILE_CONTETS assert_original(authorized_client)
# Recovery tokens # Recovery tokens
@ -240,10 +302,10 @@ def test_authorize_without_token(client, tokens_file):
# - if request is invalid, returns 400 # - if request is invalid, returns 400
def test_get_recovery_token_status_unauthorized(client, tokens_file): def test_get_recovery_token_status_unauthorized(client, authorized_client, tokens_file):
response = client.get("/auth/recovery_token") response = client.get("/auth/recovery_token")
assert response.status_code == 401 assert response.status_code == 401
assert read_json(tokens_file) == TOKENS_FILE_CONTETS assert_original(authorized_client)
def test_get_recovery_token_when_none_exists(authorized_client, tokens_file): def test_get_recovery_token_when_none_exists(authorized_client, tokens_file):
@ -256,31 +318,17 @@ def test_get_recovery_token_when_none_exists(authorized_client, tokens_file):
"expiration": None, "expiration": None,
"uses_left": None, "uses_left": None,
} }
assert read_json(tokens_file) == TOKENS_FILE_CONTETS assert_original(authorized_client)
def test_generate_recovery_token(authorized_client, client, tokens_file): def test_generate_recovery_token(authorized_client, client, tokens_file):
# Generate token without expiration and uses_left # Generate token without expiration and uses_left
response = authorized_client.post("/auth/recovery_token") mnemonic_token = rest_make_recovery_token(authorized_client)
assert response.status_code == 200
assert "token" in response.json()
mnemonic_token = response.json()["token"]
token = Mnemonic(language="english").to_entropy(mnemonic_token).hex()
assert read_json(tokens_file)["recovery_token"]["token"] == token
time_generated = read_json(tokens_file)["recovery_token"]["date"] time_generated = rest_get_recovery_date(authorized_client)
assert time_generated is not None assert_recovery_recent(time_generated)
# Assert that the token was generated near the current time
assert (
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
)
# Try to get token status assert rest_get_recovery_status(authorized_client) == {
response = authorized_client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json() == {
"exists": True, "exists": True,
"valid": True, "valid": True,
"date": time_generated, "date": time_generated,
@ -288,61 +336,26 @@ def test_generate_recovery_token(authorized_client, client, tokens_file):
"uses_left": None, "uses_left": None,
} }
# Try to use the token rest_recover_with_mnemonic(client, mnemonic_token, "recover_device")
recovery_response = client.post( # And again
"/auth/recovery_token/use", rest_recover_with_mnemonic(client, mnemonic_token, "recover_device2")
json={"token": mnemonic_token, "device": "recovery_device"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json()["token"]
assert read_json(tokens_file)["tokens"][2]["token"] == new_token
assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device"
# Try to use token again
recovery_response = client.post(
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device2"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json()["token"]
assert read_json(tokens_file)["tokens"][3]["token"] == new_token
assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2"
@pytest.mark.parametrize("timeformat", DATE_FORMATS) @pytest.mark.parametrize("timeformat", DATE_FORMATS)
def test_generate_recovery_token_with_expiration_date( def test_generate_recovery_token_with_expiration_date(
authorized_client, client, tokens_file, timeformat authorized_client, client, tokens_file, timeformat, mocker
): ):
# Generate token with expiration date # Generate token with expiration date
# Generate expiration date in the future # Generate expiration date in the future
expiration_date = datetime.datetime.now() + datetime.timedelta(minutes=5) expiration_date = datetime.datetime.now() + datetime.timedelta(minutes=5)
expiration_date_str = expiration_date.strftime(timeformat) mnemonic_token = rest_make_recovery_token(
response = authorized_client.post( authorized_client, expires_at=expiration_date, timeformat=timeformat
"/auth/recovery_token",
json={"expiration": expiration_date_str},
)
assert response.status_code == 200
assert "token" in response.json()
mnemonic_token = response.json()["token"]
token = Mnemonic(language="english").to_entropy(mnemonic_token).hex()
assert read_json(tokens_file)["recovery_token"]["token"] == token
assert datetime.datetime.strptime(
read_json(tokens_file)["recovery_token"]["expiration"], "%Y-%m-%dT%H:%M:%S.%f"
) == datetime.datetime.strptime(expiration_date_str, timeformat)
time_generated = read_json(tokens_file)["recovery_token"]["date"]
assert time_generated is not None
# Assert that the token was generated near the current time
assert (
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
) )
# Try to get token status time_generated = rest_get_recovery_date(authorized_client)
response = authorized_client.get("/auth/recovery_token") assert_recovery_recent(time_generated)
assert response.status_code == 200
assert response.json() == { assert rest_get_recovery_status(authorized_client) == {
"exists": True, "exists": True,
"valid": True, "valid": True,
"date": time_generated, "date": time_generated,
@ -350,50 +363,22 @@ def test_generate_recovery_token_with_expiration_date(
"uses_left": None, "uses_left": None,
} }
# Try to use the token rest_recover_with_mnemonic(client, mnemonic_token, "recover_device")
recovery_response = client.post( # And again
"/auth/recovery_token/use", rest_recover_with_mnemonic(client, mnemonic_token, "recover_device2")
json={"token": mnemonic_token, "device": "recovery_device"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json()["token"]
assert read_json(tokens_file)["tokens"][2]["token"] == new_token
assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device"
# Try to use token again
recovery_response = client.post(
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device2"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json()["token"]
assert read_json(tokens_file)["tokens"][3]["token"] == new_token
assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2"
# Try to use token after expiration date # Try to use token after expiration date
new_data = read_json(tokens_file) mock = mocker.patch(RECOVERY_KEY_VALIDATION_DATETIME, NearFuture)
new_data["recovery_token"]["expiration"] = datetime.datetime.now().strftime( device_name = "recovery_device3"
"%Y-%m-%dT%H:%M:%S.%f"
)
write_json(tokens_file, new_data)
recovery_response = client.post( recovery_response = client.post(
"/auth/recovery_token/use", "/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device3"}, json={"token": mnemonic_token, "device": device_name},
) )
assert recovery_response.status_code == 404 assert recovery_response.status_code == 404
# Assert that the token was not created in JSON # Assert that the token was not created
assert read_json(tokens_file)["tokens"] == new_data["tokens"] assert device_name not in [
token["name"] for token in rest_get_tokens_info(authorized_client)
# Get the status of the token ]
response = authorized_client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json() == {
"exists": True,
"valid": False,
"date": time_generated,
"expiration": new_data["recovery_token"]["expiration"],
"uses_left": None,
}
@pytest.mark.parametrize("timeformat", DATE_FORMATS) @pytest.mark.parametrize("timeformat", DATE_FORMATS)
@ -408,7 +393,7 @@ def test_generate_recovery_token_with_expiration_in_the_past(
json={"expiration": expiration_date_str}, json={"expiration": expiration_date_str},
) )
assert response.status_code == 400 assert response.status_code == 400
assert "recovery_token" not in read_json(tokens_file) assert_no_recovery(authorized_client)
def test_generate_recovery_token_with_invalid_time_format( def test_generate_recovery_token_with_invalid_time_format(
@ -421,37 +406,19 @@ def test_generate_recovery_token_with_invalid_time_format(
json={"expiration": expiration_date}, json={"expiration": expiration_date},
) )
assert response.status_code == 422 assert response.status_code == 422
assert "recovery_token" not in read_json(tokens_file) assert_no_recovery(authorized_client)
def test_generate_recovery_token_with_limited_uses( def test_generate_recovery_token_with_limited_uses(
authorized_client, client, tokens_file authorized_client, client, tokens_file
): ):
# Generate token with limited uses # Generate token with limited uses
response = authorized_client.post( mnemonic_token = rest_make_recovery_token(authorized_client, uses=2)
"/auth/recovery_token",
json={"uses": 2},
)
assert response.status_code == 200
assert "token" in response.json()
mnemonic_token = response.json()["token"]
token = Mnemonic(language="english").to_entropy(mnemonic_token).hex()
assert read_json(tokens_file)["recovery_token"]["token"] == token
assert read_json(tokens_file)["recovery_token"]["uses_left"] == 2
# Get the date of the token time_generated = rest_get_recovery_date(authorized_client)
time_generated = read_json(tokens_file)["recovery_token"]["date"] assert_recovery_recent(time_generated)
assert time_generated is not None
assert (
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
)
# Try to get token status assert rest_get_recovery_status(authorized_client) == {
response = authorized_client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json() == {
"exists": True, "exists": True,
"valid": True, "valid": True,
"date": time_generated, "date": time_generated,
@ -460,21 +427,9 @@ def test_generate_recovery_token_with_limited_uses(
} }
# Try to use the token # Try to use the token
recovery_response = client.post( rest_recover_with_mnemonic(client, mnemonic_token, "recover_device")
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json()["token"]
assert read_json(tokens_file)["tokens"][2]["token"] == new_token
assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device"
assert read_json(tokens_file)["recovery_token"]["uses_left"] == 1 assert rest_get_recovery_status(authorized_client) == {
# Get the status of the token
response = authorized_client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json() == {
"exists": True, "exists": True,
"valid": True, "valid": True,
"date": time_generated, "date": time_generated,
@ -483,19 +438,9 @@ def test_generate_recovery_token_with_limited_uses(
} }
# Try to use token again # Try to use token again
recovery_response = client.post( rest_recover_with_mnemonic(client, mnemonic_token, "recover_device2")
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device2"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json()["token"]
assert read_json(tokens_file)["tokens"][3]["token"] == new_token
assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2"
# Get the status of the token assert rest_get_recovery_status(authorized_client) == {
response = authorized_client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json() == {
"exists": True, "exists": True,
"valid": False, "valid": False,
"date": time_generated, "date": time_generated,
@ -510,8 +455,6 @@ def test_generate_recovery_token_with_limited_uses(
) )
assert recovery_response.status_code == 404 assert recovery_response.status_code == 404
assert read_json(tokens_file)["recovery_token"]["uses_left"] == 0
def test_generate_recovery_token_with_negative_uses( def test_generate_recovery_token_with_negative_uses(
authorized_client, client, tokens_file authorized_client, client, tokens_file
@ -522,7 +465,7 @@ def test_generate_recovery_token_with_negative_uses(
json={"uses": -2}, json={"uses": -2},
) )
assert response.status_code == 400 assert response.status_code == 400
assert "recovery_token" not in read_json(tokens_file) assert_no_recovery(authorized_client)
def test_generate_recovery_token_with_zero_uses(authorized_client, client, tokens_file): def test_generate_recovery_token_with_zero_uses(authorized_client, client, tokens_file):
@ -532,4 +475,4 @@ def test_generate_recovery_token_with_zero_uses(authorized_client, client, token
json={"uses": 0}, json={"uses": 0},
) )
assert response.status_code == 400 assert response.status_code == 400
assert "recovery_token" not in read_json(tokens_file) assert_no_recovery(authorized_client)