Compare commits

...

5 Commits

5 changed files with 283 additions and 267 deletions

View File

@ -4,8 +4,34 @@
import os
import pytest
from fastapi.testclient import TestClient
from shutil import copy
import os.path as path
import datetime
# from selfprivacy_api.actions.api_tokens import TOKEN_REPO
from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
from tests.common import read_json
EMPTY_TOKENS_JSON = ' {"tokens": []}'
TOKENS_FILE_CONTENTS = {
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": datetime.datetime(2022, 1, 14, 8, 31, 10, 789314),
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": datetime.datetime(2022, 1, 14, 8, 31, 10, 789314),
},
]
}
def pytest_generate_tests(metafunc):
@ -17,13 +43,40 @@ def global_data_dir():
@pytest.fixture
def tokens_file(mocker, tmpdir):
"""Mock tokens file."""
tmp_file = tmpdir / "tokens.json"
source_file = path.join(global_data_dir(), "tokens.json")
copy(source_file, tmp_file)
mock = mocker.patch("selfprivacy_api.utils.TOKENS_FILE", tmp_file)
return mock
def empty_tokens(mocker, tmpdir):
tokenfile = tmpdir / "empty_tokens.json"
with open(tokenfile, "w") as file:
file.write(EMPTY_TOKENS_JSON)
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=tokenfile)
assert read_json(tokenfile)["tokens"] == []
return tmpdir
@pytest.fixture
def empty_json_repo(empty_tokens):
repo = JsonTokensRepository()
for token in repo.get_tokens():
repo.delete_token(token)
assert repo.get_tokens() == []
return repo
@pytest.fixture
def tokens_file(empty_json_repo, tmpdir):
"""A state with tokens"""
for token in TOKENS_FILE_CONTENTS["tokens"]:
empty_json_repo._store_token(
Token(
token=token["token"],
device_name=token["name"],
created_at=token["date"],
)
)
# temporary return for compatibility with older tests
tokenfile = tmpdir / "empty_tokens.json"
assert path.exists(tokenfile)
return tokenfile
@pytest.fixture

View File

@ -17,12 +17,12 @@ TOKENS_FILE_CONTETS = {
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314",
"date": "2022-01-14T08:31:10.789314",
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314",
"date": "2022-01-14T08:31:10.789314",
},
]
}
@ -118,7 +118,7 @@ def test_graphql_delete_token(authorized_client, tokens_file):
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314",
"date": "2022-01-14T08:31:10.789314",
}
]
}

View File

@ -16,13 +16,9 @@ from selfprivacy_api.repositories.tokens.exceptions import (
TokenNotFound,
NewDeviceKeyNotFound,
)
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
RedisTokensRepository,
)
from tests.common import read_json
ORIGINAL_DEVICE_NAMES = [
@ -33,23 +29,10 @@ ORIGINAL_DEVICE_NAMES = [
]
EMPTY_TOKENS_JSON = ' {"tokens": []}'
def mnemonic_from_hex(hexkey):
return Mnemonic(language="english").to_mnemonic(bytes.fromhex(hexkey))
@pytest.fixture
def empty_tokens(mocker, tmpdir):
tokens_file = tmpdir / "empty_tokens.json"
with open(tokens_file, "w") as file:
file.write(EMPTY_TOKENS_JSON)
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=tokens_file)
assert read_json(tokens_file)["tokens"] == []
return tmpdir
@pytest.fixture
def mock_new_device_key_generate(mocker):
mock = mocker.patch(
@ -137,15 +120,6 @@ def mock_recovery_key_generate(mocker):
return mock
@pytest.fixture
def empty_json_repo(empty_tokens):
repo = JsonTokensRepository()
for token in repo.get_tokens():
repo.delete_token(token)
assert repo.get_tokens() == []
return repo
@pytest.fixture
def empty_redis_repo():
repo = RedisTokensRepository()

View File

@ -3,10 +3,6 @@
# pylint: disable=missing-function-docstring
import datetime
import pytest
from mnemonic import Mnemonic
from tests.common import read_json, write_json
TOKENS_FILE_CONTENTS = {
@ -14,12 +10,12 @@ TOKENS_FILE_CONTENTS = {
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314",
"date": datetime.datetime(2022, 1, 14, 8, 31, 10, 789314),
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314",
"date": datetime.datetime(2022, 1, 14, 8, 31, 10, 789314),
},
]
}
@ -42,8 +38,17 @@ class NearFuture(datetime.datetime):
return datetime.datetime.now() + datetime.timedelta(minutes=13)
def assert_original(filename):
assert read_json(filename) == TOKENS_FILE_CONTENTS
def assert_original(client):
new_tokens = rest_get_tokens_info(client)
for token in TOKENS_FILE_CONTENTS["tokens"]:
assert_token_valid(client, token["token"])
for new_token in new_tokens:
if new_token["name"] == token["name"]:
assert (
datetime.datetime.fromisoformat(new_token["date"]) == token["date"]
)
assert_no_recovery(client)
def assert_token_valid(client, token):
@ -57,198 +62,15 @@ def rest_get_tokens_info(client):
return response.json()
def test_get_tokens_info(authorized_client, tokens_file):
assert rest_get_tokens_info(authorized_client) == [
{"name": "test_token", "date": "2022-01-14T08:31:10.789314", "is_caller": True},
{
"name": "test_token2",
"date": "2022-01-14T08:31:10.789314",
"is_caller": False,
},
]
def test_get_tokens_unauthorized(client, tokens_file):
response = client.get("/auth/tokens")
assert response.status_code == 401
def test_delete_token_unauthorized(client, tokens_file):
response = client.delete("/auth/tokens")
assert response.status_code == 401
assert_original(tokens_file)
def test_delete_token(authorized_client, tokens_file):
response = authorized_client.delete(
"/auth/tokens", json={"token_name": "test_token2"}
)
assert response.status_code == 200
assert rest_get_tokens_info(authorized_client) == [
{"name": "test_token", "date": "2022-01-14T08:31:10.789314", "is_caller": True}
]
def test_delete_self_token(authorized_client, tokens_file):
response = authorized_client.delete(
"/auth/tokens", json={"token_name": "test_token"}
)
assert response.status_code == 400
assert_original(tokens_file)
def test_delete_nonexistent_token(authorized_client, tokens_file):
response = authorized_client.delete(
"/auth/tokens", json={"token_name": "test_token3"}
)
assert response.status_code == 404
assert_original(tokens_file)
def test_refresh_token_unauthorized(client, tokens_file):
response = client.post("/auth/tokens")
assert response.status_code == 401
assert_original(tokens_file)
def test_refresh_token(authorized_client, tokens_file):
response = authorized_client.post("/auth/tokens")
assert response.status_code == 200
new_token = response.json()["token"]
assert_token_valid(authorized_client, new_token)
# new device
def test_get_new_device_auth_token_unauthorized(client, tokens_file):
response = client.post("/auth/new_device")
assert response.status_code == 401
assert_original(tokens_file)
def test_get_and_delete_new_device_token(authorized_client, tokens_file):
token = rest_get_new_device_token(authorized_client)
response = authorized_client.delete("/auth/new_device", json={"token": token})
assert response.status_code == 200
assert_original(tokens_file)
def test_delete_token_unauthenticated(client, tokens_file):
response = client.delete("/auth/new_device")
assert response.status_code == 401
assert_original(tokens_file)
def rest_get_new_device_token(client):
response = client.post("/auth/new_device")
assert response.status_code == 200
assert "token" in response.json()
return response.json()["token"]
def test_get_and_authorize_new_device(client, authorized_client, tokens_file):
token = rest_get_new_device_token(authorized_client)
def rest_try_authorize_new_device(client, token, device_name):
response = client.post(
"/auth/new_device/authorize",
json={
"token": token,
"device": "new_device",
"device": device_name,
},
)
assert response.status_code == 200
assert_token_valid(authorized_client, response.json()["token"])
def test_authorize_new_device_with_invalid_token(client, tokens_file):
response = client.post(
"/auth/new_device/authorize",
json={"token": "invalid_token", "device": "new_device"},
)
assert response.status_code == 404
assert_original(tokens_file)
def test_get_and_authorize_used_token(client, authorized_client, tokens_file):
token_to_be_used_2_times = rest_get_new_device_token(authorized_client)
response = client.post(
"/auth/new_device/authorize",
json={"token": token_to_be_used_2_times, "device": "new_device"},
)
assert response.status_code == 200
assert_token_valid(authorized_client, response.json()["token"])
response = client.post(
"/auth/new_device/authorize",
json={"token": token_to_be_used_2_times, "device": "new_device"},
)
assert response.status_code == 404
def test_get_and_authorize_token_after_12_minutes(
client, authorized_client, tokens_file, mocker
):
token = rest_get_new_device_token(authorized_client)
# TARDIS sounds
mock = mocker.patch(DEVICE_KEY_VALIDATION_DATETIME, NearFuture)
response = client.post(
"/auth/new_device/authorize",
json={"token": token, "device": "new_device"},
)
assert response.status_code == 404
def test_authorize_without_token(client, tokens_file):
response = client.post(
"/auth/new_device/authorize",
json={"device": "new_device"},
)
assert response.status_code == 422
assert_original(tokens_file)
# Recovery tokens
# GET /auth/recovery_token returns token status
# - if token is valid, returns 200 and token status
# - token status:
# - exists (boolean)
# - valid (boolean)
# - date (string)
# - expiration (string)
# - uses_left (int)
# - if token is invalid, returns 400 and empty body
# POST /auth/recovery_token generates a new token
# has two optional parameters:
# - expiration (string in datetime format)
# - uses_left (int)
# POST /auth/recovery_token/use uses the token
# required arguments:
# - token (string)
# - device (string)
# - if token is valid, returns 200 and token
# - if token is invalid, returns 404
# - if request is invalid, returns 400
def test_get_recovery_token_status_unauthorized(client, tokens_file):
response = client.get("/auth/recovery_token")
assert response.status_code == 401
assert_original(tokens_file)
def test_get_recovery_token_when_none_exists(authorized_client, tokens_file):
response = authorized_client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json() == {
"exists": False,
"valid": False,
"date": None,
"expiration": None,
"uses_left": None,
}
assert_original(tokens_file)
return response
def rest_make_recovery_token(client, expires_at=None, timeformat=None, uses=None):
@ -295,6 +117,10 @@ def assert_recovery_recent(time_generated):
)
def assert_no_recovery(client):
assert not rest_get_recovery_status(client)["exists"]
def rest_recover_with_mnemonic(client, mnemonic_token, device_name):
recovery_response = client.post(
"/auth/recovery_token/use",
@ -306,6 +132,195 @@ def rest_recover_with_mnemonic(client, mnemonic_token, device_name):
return new_token
# Tokens
def test_get_tokens_info(authorized_client, tokens_file):
assert rest_get_tokens_info(authorized_client) == [
{"name": "test_token", "date": "2022-01-14T08:31:10.789314", "is_caller": True},
{
"name": "test_token2",
"date": "2022-01-14T08:31:10.789314",
"is_caller": False,
},
]
def test_get_tokens_unauthorized(client, tokens_file):
response = client.get("/auth/tokens")
assert response.status_code == 401
def test_delete_token_unauthorized(client, authorized_client, tokens_file):
response = client.delete("/auth/tokens")
assert response.status_code == 401
assert_original(authorized_client)
def test_delete_token(authorized_client, tokens_file):
response = authorized_client.delete(
"/auth/tokens", json={"token_name": "test_token2"}
)
assert response.status_code == 200
assert rest_get_tokens_info(authorized_client) == [
{"name": "test_token", "date": "2022-01-14T08:31:10.789314", "is_caller": True}
]
def test_delete_self_token(authorized_client, tokens_file):
response = authorized_client.delete(
"/auth/tokens", json={"token_name": "test_token"}
)
assert response.status_code == 400
assert_original(authorized_client)
def test_delete_nonexistent_token(authorized_client, tokens_file):
response = authorized_client.delete(
"/auth/tokens", json={"token_name": "test_token3"}
)
assert response.status_code == 404
assert_original(authorized_client)
def test_refresh_token_unauthorized(client, authorized_client, tokens_file):
response = client.post("/auth/tokens")
assert response.status_code == 401
assert_original(authorized_client)
def test_refresh_token(authorized_client, tokens_file):
response = authorized_client.post("/auth/tokens")
assert response.status_code == 200
new_token = response.json()["token"]
assert_token_valid(authorized_client, new_token)
# New device
def test_get_new_device_auth_token_unauthorized(client, authorized_client, tokens_file):
response = client.post("/auth/new_device")
assert response.status_code == 401
assert "token" not in response.json()
assert "detail" in response.json()
# We only can check existence of a token we know.
def test_get_and_delete_new_device_token(client, authorized_client, tokens_file):
token = rest_get_new_device_token(authorized_client)
response = authorized_client.delete("/auth/new_device", json={"token": token})
assert response.status_code == 200
assert rest_try_authorize_new_device(client, token, "new_device").status_code == 404
def test_delete_token_unauthenticated(client, authorized_client, tokens_file):
token = rest_get_new_device_token(authorized_client)
response = client.delete("/auth/new_device", json={"token": token})
assert response.status_code == 401
assert rest_try_authorize_new_device(client, token, "new_device").status_code == 200
def rest_get_new_device_token(client):
response = client.post("/auth/new_device")
assert response.status_code == 200
assert "token" in response.json()
return response.json()["token"]
def test_get_and_authorize_new_device(client, authorized_client, tokens_file):
token = rest_get_new_device_token(authorized_client)
response = rest_try_authorize_new_device(client, token, "new_device")
assert response.status_code == 200
assert_token_valid(authorized_client, response.json()["token"])
def test_authorize_new_device_with_invalid_token(
client, authorized_client, tokens_file
):
response = rest_try_authorize_new_device(client, "invalid_token", "new_device")
assert response.status_code == 404
assert_original(authorized_client)
def test_get_and_authorize_used_token(client, authorized_client, tokens_file):
token_to_be_used_2_times = rest_get_new_device_token(authorized_client)
response = rest_try_authorize_new_device(
client, token_to_be_used_2_times, "new_device"
)
assert response.status_code == 200
assert_token_valid(authorized_client, response.json()["token"])
response = rest_try_authorize_new_device(
client, token_to_be_used_2_times, "new_device"
)
assert response.status_code == 404
def test_get_and_authorize_token_after_12_minutes(
client, authorized_client, tokens_file, mocker
):
token = rest_get_new_device_token(authorized_client)
# TARDIS sounds
mock = mocker.patch(DEVICE_KEY_VALIDATION_DATETIME, NearFuture)
response = rest_try_authorize_new_device(client, token, "new_device")
assert response.status_code == 404
assert_original(authorized_client)
def test_authorize_without_token(client, authorized_client, tokens_file):
response = client.post(
"/auth/new_device/authorize",
json={"device": "new_device"},
)
assert response.status_code == 422
assert_original(authorized_client)
# Recovery tokens
# GET /auth/recovery_token returns token status
# - if token is valid, returns 200 and token status
# - token status:
# - exists (boolean)
# - valid (boolean)
# - date (string)
# - expiration (string)
# - uses_left (int)
# - if token is invalid, returns 400 and empty body
# POST /auth/recovery_token generates a new token
# has two optional parameters:
# - expiration (string in datetime format)
# - uses_left (int)
# POST /auth/recovery_token/use uses the token
# required arguments:
# - token (string)
# - device (string)
# - if token is valid, returns 200 and token
# - if token is invalid, returns 404
# - if request is invalid, returns 400
def test_get_recovery_token_status_unauthorized(client, authorized_client, tokens_file):
response = client.get("/auth/recovery_token")
assert response.status_code == 401
assert_original(authorized_client)
def test_get_recovery_token_when_none_exists(authorized_client, tokens_file):
response = authorized_client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json() == {
"exists": False,
"valid": False,
"date": None,
"expiration": None,
"uses_left": None,
}
assert_original(authorized_client)
def test_generate_recovery_token(authorized_client, client, tokens_file):
# Generate token without expiration and uses_left
mnemonic_token = rest_make_recovery_token(authorized_client)
@ -378,7 +393,7 @@ def test_generate_recovery_token_with_expiration_in_the_past(
json={"expiration": expiration_date_str},
)
assert response.status_code == 400
assert "recovery_token" not in read_json(tokens_file)
assert_no_recovery(authorized_client)
def test_generate_recovery_token_with_invalid_time_format(
@ -391,37 +406,19 @@ def test_generate_recovery_token_with_invalid_time_format(
json={"expiration": expiration_date},
)
assert response.status_code == 422
assert "recovery_token" not in read_json(tokens_file)
assert_no_recovery(authorized_client)
def test_generate_recovery_token_with_limited_uses(
authorized_client, client, tokens_file
):
# Generate token with limited uses
response = authorized_client.post(
"/auth/recovery_token",
json={"uses": 2},
)
assert response.status_code == 200
assert "token" in response.json()
mnemonic_token = response.json()["token"]
token = Mnemonic(language="english").to_entropy(mnemonic_token).hex()
assert read_json(tokens_file)["recovery_token"]["token"] == token
assert read_json(tokens_file)["recovery_token"]["uses_left"] == 2
mnemonic_token = rest_make_recovery_token(authorized_client, uses=2)
# Get the date of the token
time_generated = read_json(tokens_file)["recovery_token"]["date"]
assert time_generated is not None
assert (
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
)
time_generated = rest_get_recovery_date(authorized_client)
assert_recovery_recent(time_generated)
# Try to get token status
response = authorized_client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json() == {
assert rest_get_recovery_status(authorized_client) == {
"exists": True,
"valid": True,
"date": time_generated,
@ -432,10 +429,7 @@ def test_generate_recovery_token_with_limited_uses(
# Try to use the token
rest_recover_with_mnemonic(client, mnemonic_token, "recover_device")
# Get the status of the token
response = authorized_client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json() == {
assert rest_get_recovery_status(authorized_client) == {
"exists": True,
"valid": True,
"date": time_generated,
@ -446,10 +440,7 @@ def test_generate_recovery_token_with_limited_uses(
# Try to use token again
rest_recover_with_mnemonic(client, mnemonic_token, "recover_device2")
# Get the status of the token
response = authorized_client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json() == {
assert rest_get_recovery_status(authorized_client) == {
"exists": True,
"valid": False,
"date": time_generated,
@ -464,8 +455,6 @@ def test_generate_recovery_token_with_limited_uses(
)
assert recovery_response.status_code == 404
assert read_json(tokens_file)["recovery_token"]["uses_left"] == 0
def test_generate_recovery_token_with_negative_uses(
authorized_client, client, tokens_file
@ -476,7 +465,7 @@ def test_generate_recovery_token_with_negative_uses(
json={"uses": -2},
)
assert response.status_code == 400
assert "recovery_token" not in read_json(tokens_file)
assert_no_recovery(authorized_client)
def test_generate_recovery_token_with_zero_uses(authorized_client, client, tokens_file):
@ -486,4 +475,4 @@ def test_generate_recovery_token_with_zero_uses(authorized_client, client, token
json={"uses": 0},
)
assert response.status_code == 400
assert "recovery_token" not in read_json(tokens_file)
assert_no_recovery(authorized_client)