Merge pull request 'feat: Use redis as a data storage for auth data' (#68) from redis/connection-pool-merged into master
continuous-integration/drone/push Build is failing Details

Reviewed-on: #68
pull/73/head
Inex Code 2023-11-10 10:57:59 +02:00
commit bc98e41be8
29 changed files with 1062 additions and 1297 deletions

View File

@ -1,11 +1,14 @@
"""App tokens actions"""
from datetime import datetime
"""
App tokens actions.
The only actions on tokens that are accessible from APIs
"""
from datetime import datetime, timezone
from typing import Optional
from pydantic import BaseModel
from mnemonic import Mnemonic
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
RedisTokensRepository,
)
from selfprivacy_api.repositories.tokens.exceptions import (
TokenNotFound,
@ -14,7 +17,7 @@ from selfprivacy_api.repositories.tokens.exceptions import (
NewDeviceKeyNotFound,
)
TOKEN_REPO = JsonTokensRepository()
TOKEN_REPO = RedisTokensRepository()
class TokenInfoWithIsCaller(BaseModel):
@ -25,6 +28,14 @@ class TokenInfoWithIsCaller(BaseModel):
is_caller: bool
def _naive(date_time: datetime) -> datetime:
if date_time is None:
return None
if date_time.tzinfo is not None:
date_time.astimezone(timezone.utc)
return date_time.replace(tzinfo=None)
def get_api_tokens_with_caller_flag(caller_token: str) -> list[TokenInfoWithIsCaller]:
"""Get the tokens info"""
caller_name = TOKEN_REPO.get_token_by_token_string(caller_token).device_name
@ -91,8 +102,8 @@ def get_api_recovery_token_status() -> RecoveryTokenStatus:
return RecoveryTokenStatus(
exists=True,
valid=is_valid,
date=token.created_at,
expiration=token.expires_at,
date=_naive(token.created_at),
expiration=_naive(token.expires_at),
uses_left=token.uses_left,
)

View File

@ -25,6 +25,7 @@ from selfprivacy_api.migrations.prepare_for_nixos_2211 import (
from selfprivacy_api.migrations.prepare_for_nixos_2305 import (
MigrateToSelfprivacyChannelFrom2211,
)
from selfprivacy_api.migrations.redis_tokens import LoadTokensToRedis
migrations = [
FixNixosConfigBranch(),
@ -35,6 +36,7 @@ migrations = [
CreateProviderFields(),
MigrateToSelfprivacyChannelFrom2205(),
MigrateToSelfprivacyChannelFrom2211(),
LoadTokensToRedis(),
]

View File

@ -0,0 +1,48 @@
from selfprivacy_api.migrations.migration import Migration
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
RedisTokensRepository,
)
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
class LoadTokensToRedis(Migration):
"""Load Json tokens into Redis"""
def get_migration_name(self):
return "load_tokens_to_redis"
def get_migration_description(self):
return "Loads access tokens and recovery keys from legacy json file into redis token storage"
def is_repo_empty(self, repo: AbstractTokensRepository) -> bool:
if repo.get_tokens() != []:
return False
if repo.get_recovery_key() is not None:
return False
return True
def is_migration_needed(self):
try:
if not self.is_repo_empty(JsonTokensRepository()) and self.is_repo_empty(
RedisTokensRepository()
):
return True
except Exception as e:
print(e)
return False
def migrate(self):
# Write info about providers to userdata.json
try:
RedisTokensRepository().clone(JsonTokensRepository())
print("Done")
except Exception as e:
print(e)
print("Error migrating access tokens from json to redis")

View File

@ -1,11 +1,13 @@
"""
New device key used to obtain access token.
"""
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
import secrets
from pydantic import BaseModel
from mnemonic import Mnemonic
from selfprivacy_api.models.tokens.time import is_past
class NewDeviceKey(BaseModel):
"""
@ -22,7 +24,7 @@ class NewDeviceKey(BaseModel):
"""
Check if the recovery key is valid.
"""
if self.expires_at < datetime.now():
if is_past(self.expires_at):
return False
return True
@ -37,10 +39,10 @@ class NewDeviceKey(BaseModel):
"""
Factory to generate a random token.
"""
creation_date = datetime.now()
creation_date = datetime.now(timezone.utc)
key = secrets.token_bytes(16).hex()
return NewDeviceKey(
key=key,
created_at=creation_date,
expires_at=datetime.now() + timedelta(minutes=10),
expires_at=creation_date + timedelta(minutes=10),
)

View File

@ -3,12 +3,14 @@ Recovery key used to obtain access token.
Recovery key has a token string, date of creation, optional date of expiration and optional count of uses left.
"""
from datetime import datetime
from datetime import datetime, timezone
import secrets
from typing import Optional
from pydantic import BaseModel
from mnemonic import Mnemonic
from selfprivacy_api.models.tokens.time import is_past, ensure_timezone
class RecoveryKey(BaseModel):
"""
@ -26,7 +28,7 @@ class RecoveryKey(BaseModel):
"""
Check if the recovery key is valid.
"""
if self.expires_at is not None and self.expires_at < datetime.now():
if self.expires_at is not None and is_past(self.expires_at):
return False
if self.uses_left is not None and self.uses_left <= 0:
return False
@ -46,7 +48,9 @@ class RecoveryKey(BaseModel):
"""
Factory to generate a random token.
"""
creation_date = datetime.now()
creation_date = datetime.now(timezone.utc)
if expiration is not None:
expiration = ensure_timezone(expiration)
key = secrets.token_bytes(24).hex()
return RecoveryKey(
key=key,

View File

@ -0,0 +1,14 @@
from datetime import datetime, timezone
def is_past(dt: datetime) -> bool:
# we cannot compare a naive now()
# to dt which might be tz-aware or unaware
dt = ensure_timezone(dt)
return dt < datetime.now(timezone.utc)
def ensure_timezone(dt: datetime) -> datetime:
if dt.tzinfo is None or dt.tzinfo.utcoffset(None) is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt

View File

@ -1,3 +1,5 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional
@ -86,13 +88,15 @@ class AbstractTokensRepository(ABC):
def get_recovery_key(self) -> Optional[RecoveryKey]:
"""Get the recovery key"""
@abstractmethod
def create_recovery_key(
self,
expiration: Optional[datetime],
uses_left: Optional[int],
) -> RecoveryKey:
"""Create the recovery key"""
recovery_key = RecoveryKey.generate(expiration, uses_left)
self._store_recovery_key(recovery_key)
return recovery_key
def use_mnemonic_recovery_key(
self, mnemonic_phrase: str, device_name: str
@ -123,6 +127,14 @@ class AbstractTokensRepository(ABC):
return False
return recovery_key.is_valid()
@abstractmethod
def _store_recovery_key(self, recovery_key: RecoveryKey) -> None:
"""Store recovery key directly"""
@abstractmethod
def _delete_recovery_key(self) -> None:
"""Delete the recovery key"""
def get_new_device_key(self) -> NewDeviceKey:
"""Creates and returns the new device key"""
new_device_key = NewDeviceKey.generate()
@ -156,6 +168,26 @@ class AbstractTokensRepository(ABC):
return new_token
def reset(self):
for token in self.get_tokens():
self.delete_token(token)
self.delete_new_device_key()
self._delete_recovery_key()
def clone(self, source: AbstractTokensRepository) -> None:
"""Clone the state of another repository to this one"""
self.reset()
for token in source.get_tokens():
self._store_token(token)
recovery_key = source.get_recovery_key()
if recovery_key is not None:
self._store_recovery_key(recovery_key)
new_device_key = source._get_stored_new_device_key()
if new_device_key is not None:
self._store_new_device_key(new_device_key)
@abstractmethod
def _store_token(self, new_token: Token):
"""Store a token directly"""

View File

@ -2,7 +2,7 @@
temporary legacy
"""
from typing import Optional
from datetime import datetime
from datetime import datetime, timezone
from selfprivacy_api.utils import UserDataFiles, WriteUserData, ReadUserData
from selfprivacy_api.models.tokens.token import Token
@ -15,6 +15,7 @@ from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
@ -56,6 +57,20 @@ class JsonTokensRepository(AbstractTokensRepository):
raise TokenNotFound("Token not found!")
def __key_date_from_str(self, date_string: str) -> datetime:
if date_string is None or date_string == "":
return None
# we assume that we store dates in json as naive utc
utc_no_tz = datetime.fromisoformat(date_string)
utc_with_tz = utc_no_tz.replace(tzinfo=timezone.utc)
return utc_with_tz
def __date_from_tokens_file(
self, tokens_file: object, tokenfield: str, datefield: str
):
date_string = tokens_file[tokenfield].get(datefield)
return self.__key_date_from_str(date_string)
def get_recovery_key(self) -> Optional[RecoveryKey]:
"""Get the recovery key"""
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
@ -68,22 +83,18 @@ class JsonTokensRepository(AbstractTokensRepository):
recovery_key = RecoveryKey(
key=tokens_file["recovery_token"].get("token"),
created_at=tokens_file["recovery_token"].get("date"),
expires_at=tokens_file["recovery_token"].get("expiration"),
created_at=self.__date_from_tokens_file(
tokens_file, "recovery_token", "date"
),
expires_at=self.__date_from_tokens_file(
tokens_file, "recovery_token", "expiration"
),
uses_left=tokens_file["recovery_token"].get("uses_left"),
)
return recovery_key
def create_recovery_key(
self,
expiration: Optional[datetime],
uses_left: Optional[int],
) -> RecoveryKey:
"""Create the recovery key"""
recovery_key = RecoveryKey.generate(expiration, uses_left)
def _store_recovery_key(self, recovery_key: RecoveryKey) -> None:
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
key_expiration: Optional[str] = None
if recovery_key.expires_at is not None:
@ -95,8 +106,6 @@ class JsonTokensRepository(AbstractTokensRepository):
"uses_left": recovery_key.uses_left,
}
return recovery_key
def _decrement_recovery_token(self):
"""Decrement recovery key use count by one"""
if self.is_recovery_key_valid():
@ -104,6 +113,13 @@ class JsonTokensRepository(AbstractTokensRepository):
if tokens["recovery_token"]["uses_left"] is not None:
tokens["recovery_token"]["uses_left"] -= 1
def _delete_recovery_key(self) -> None:
"""Delete the recovery key"""
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
if "recovery_token" in tokens_file:
del tokens_file["recovery_token"]
return
def _store_new_device_key(self, new_device_key: NewDeviceKey) -> None:
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
tokens_file["new_device"] = {
@ -127,7 +143,11 @@ class JsonTokensRepository(AbstractTokensRepository):
new_device_key = NewDeviceKey(
key=tokens_file["new_device"]["token"],
created_at=tokens_file["new_device"]["date"],
expires_at=tokens_file["new_device"]["expiration"],
created_at=self.__date_from_tokens_file(
tokens_file, "new_device", "date"
),
expires_at=self.__date_from_tokens_file(
tokens_file, "new_device", "expiration"
),
)
return new_device_key

View File

@ -4,6 +4,7 @@ Token repository using Redis as backend.
from typing import Any, Optional
from datetime import datetime
from hashlib import md5
from datetime import timezone
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
@ -53,6 +54,7 @@ class RedisTokensRepository(AbstractTokensRepository):
token = self._token_from_hash(key)
if token == input_token:
return key
return None
def delete_token(self, input_token: Token) -> None:
"""Delete the token"""
@ -62,13 +64,6 @@ class RedisTokensRepository(AbstractTokensRepository):
raise TokenNotFound
redis.delete(key)
def reset(self):
for token in self.get_tokens():
self.delete_token(token)
self.delete_new_device_key()
redis = self.connection
redis.delete(RECOVERY_KEY_REDIS_KEY)
def get_recovery_key(self) -> Optional[RecoveryKey]:
"""Get the recovery key"""
redis = self.connection
@ -76,15 +71,13 @@ class RedisTokensRepository(AbstractTokensRepository):
return self._recovery_key_from_hash(RECOVERY_KEY_REDIS_KEY)
return None
def create_recovery_key(
self,
expiration: Optional[datetime],
uses_left: Optional[int],
) -> RecoveryKey:
"""Create the recovery key"""
recovery_key = RecoveryKey.generate(expiration=expiration, uses_left=uses_left)
def _store_recovery_key(self, recovery_key: RecoveryKey) -> None:
self._store_model_as_hash(RECOVERY_KEY_REDIS_KEY, recovery_key)
return recovery_key
def _delete_recovery_key(self) -> None:
"""Delete the recovery key"""
redis = self.connection
redis.delete(RECOVERY_KEY_REDIS_KEY)
def _store_new_device_key(self, new_device_key: NewDeviceKey) -> None:
"""Store new device key directly"""
@ -157,6 +150,7 @@ class RedisTokensRepository(AbstractTokensRepository):
if token is not None:
token.created_at = token.created_at.replace(tzinfo=None)
return token
return None
def _recovery_key_from_hash(self, redis_key: str) -> Optional[RecoveryKey]:
return self._hash_as_model(redis_key, RecoveryKey)
@ -168,5 +162,7 @@ class RedisTokensRepository(AbstractTokensRepository):
redis = self.connection
for key, value in model.dict().items():
if isinstance(value, datetime):
if value.tzinfo is None:
value = value.replace(tzinfo=timezone.utc)
value = value.isoformat()
redis.hset(redis_key, key, str(value))

View File

@ -1,6 +1,33 @@
import json
from datetime import datetime, timezone, timedelta
from mnemonic import Mnemonic
# for expiration tests. If headache, consider freezegun
RECOVERY_KEY_VALIDATION_DATETIME = "selfprivacy_api.models.tokens.time.datetime"
DEVICE_KEY_VALIDATION_DATETIME = RECOVERY_KEY_VALIDATION_DATETIME
def five_minutes_into_future_naive():
return datetime.now() + timedelta(minutes=5)
def five_minutes_into_future():
return datetime.now(timezone.utc) + timedelta(minutes=5)
def five_minutes_into_past_naive():
return datetime.now() - timedelta(minutes=5)
def five_minutes_into_past():
return datetime.now(timezone.utc) - timedelta(minutes=5)
class NearFuture(datetime):
@classmethod
def now(cls, tz=None):
return datetime.now(tz) + timedelta(minutes=13)
def read_json(file_path):
with open(file_path, "r", encoding="utf-8") as file:
@ -30,3 +57,10 @@ def generate_backup_query(query_array):
def mnemonic_to_hex(mnemonic):
return Mnemonic(language="english").to_entropy(mnemonic).hex()
def assert_recovery_recent(time_generated):
assert (
datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f") - timedelta(seconds=5)
< datetime.now()
)

View File

@ -6,6 +6,38 @@ import pytest
from os import path
from fastapi.testclient import TestClient
import os.path as path
import datetime
from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
RedisTokensRepository,
)
from tests.common import read_json
EMPTY_TOKENS_JSON = ' {"tokens": []}'
TOKENS_FILE_CONTENTS = {
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": datetime.datetime(2022, 1, 14, 8, 31, 10, 789314),
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": datetime.datetime(2022, 1, 14, 8, 31, 10, 789314),
},
]
}
DEVICE_WE_AUTH_TESTS_WITH = TOKENS_FILE_CONTENTS["tokens"][0]
def pytest_generate_tests(metafunc):
@ -17,12 +49,45 @@ def global_data_dir():
@pytest.fixture
def tokens_file(mocker, shared_datadir):
"""Mock tokens file."""
mock = mocker.patch(
"selfprivacy_api.utils.TOKENS_FILE", shared_datadir / "tokens.json"
)
return mock
def empty_tokens(mocker, tmpdir):
tokenfile = tmpdir / "empty_tokens.json"
with open(tokenfile, "w") as file:
file.write(EMPTY_TOKENS_JSON)
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=tokenfile)
assert read_json(tokenfile)["tokens"] == []
return tmpdir
@pytest.fixture
def empty_json_repo(empty_tokens):
repo = JsonTokensRepository()
for token in repo.get_tokens():
repo.delete_token(token)
assert repo.get_tokens() == []
return repo
@pytest.fixture
def empty_redis_repo():
repo = RedisTokensRepository()
repo.reset()
assert repo.get_tokens() == []
return repo
@pytest.fixture
def tokens_file(empty_redis_repo, tmpdir):
"""A state with tokens"""
repo = empty_redis_repo
for token in TOKENS_FILE_CONTENTS["tokens"]:
repo._store_token(
Token(
token=token["token"],
device_name=token["name"],
created_at=token["date"],
)
)
return repo
@pytest.fixture
@ -68,7 +133,9 @@ def authorized_client(tokens_file, huey_database, jobs_file):
from selfprivacy_api.app import app
client = TestClient(app)
client.headers.update({"Authorization": "Bearer TEST_TOKEN"})
client.headers.update(
{"Authorization": "Bearer " + DEVICE_WE_AUTH_TESTS_WITH["token"]}
)
return client

View File

@ -0,0 +1,89 @@
from tests.common import generate_api_query
from tests.conftest import TOKENS_FILE_CONTENTS, DEVICE_WE_AUTH_TESTS_WITH
ORIGINAL_DEVICES = TOKENS_FILE_CONTENTS["tokens"]
def assert_ok(response, request):
data = assert_data(response)
data[request]["success"] is True
data[request]["message"] is not None
data[request]["code"] == 200
def assert_errorcode(response, request, code):
data = assert_data(response)
data[request]["success"] is False
data[request]["message"] is not None
data[request]["code"] == code
def assert_empty(response):
assert response.status_code == 200
assert response.json().get("data") is None
def assert_data(response):
assert response.status_code == 200
data = response.json().get("data")
assert data is not None
assert "api" in data.keys()
return data["api"]
API_DEVICES_QUERY = """
devices {
creationDate
isCaller
name
}
"""
def request_devices(client):
return client.post(
"/graphql",
json={"query": generate_api_query([API_DEVICES_QUERY])},
)
def graphql_get_devices(client):
response = request_devices(client)
data = assert_data(response)
devices = data["devices"]
assert devices is not None
return devices
def set_client_token(client, token):
client.headers.update({"Authorization": "Bearer " + token})
def assert_token_valid(client, token):
set_client_token(client, token)
assert graphql_get_devices(client) is not None
def assert_same(graphql_devices, abstract_devices):
"""Orderless comparison"""
assert len(graphql_devices) == len(abstract_devices)
for original_device in abstract_devices:
assert original_device["name"] in [device["name"] for device in graphql_devices]
for device in graphql_devices:
if device["name"] == original_device["name"]:
assert device["creationDate"] == original_device["date"].isoformat()
def assert_original(client):
devices = graphql_get_devices(client)
assert_original_devices(devices)
def assert_original_devices(devices):
assert_same(devices, ORIGINAL_DEVICES)
for device in devices:
if device["name"] == DEVICE_WE_AUTH_TESTS_WITH["name"]:
assert device["isCaller"] is True
else:
assert device["isCaller"] is False

View File

@ -0,0 +1,88 @@
from tests.common import generate_api_query
from tests.conftest import TOKENS_FILE_CONTENTS, DEVICE_WE_AUTH_TESTS_WITH
ORIGINAL_DEVICES = TOKENS_FILE_CONTENTS["tokens"]
def assert_ok(response, request):
data = assert_data(response)
data[request]["success"] is True
data[request]["message"] is not None
data[request]["code"] == 200
def assert_errorcode(response, request, code):
data = assert_data(response)
data[request]["success"] is False
data[request]["message"] is not None
data[request]["code"] == code
def assert_empty(response):
assert response.status_code == 200
assert response.json().get("data") is None
def assert_data(response):
assert response.status_code == 200
data = response.json().get("data")
assert data is not None
return data
API_DEVICES_QUERY = """
devices {
creationDate
isCaller
name
}
"""
def request_devices(client):
return client.post(
"/graphql",
json={"query": generate_api_query([API_DEVICES_QUERY])},
)
def graphql_get_devices(client):
response = request_devices(client)
data = assert_data(response)
devices = data["api"]["devices"]
assert devices is not None
return devices
def set_client_token(client, token):
client.headers.update({"Authorization": "Bearer " + token})
def assert_token_valid(client, token):
set_client_token(client, token)
assert graphql_get_devices(client) is not None
def assert_same(graphql_devices, abstract_devices):
"""Orderless comparison"""
assert len(graphql_devices) == len(abstract_devices)
for original_device in abstract_devices:
assert original_device["name"] in [device["name"] for device in graphql_devices]
for device in graphql_devices:
if device["name"] == original_device["name"]:
assert device["creationDate"] == original_device["date"].isoformat()
def assert_original(client):
devices = graphql_get_devices(client)
assert_original_devices(devices)
def assert_original_devices(devices):
assert_same(devices, ORIGINAL_DEVICES)
for device in devices:
if device["name"] == DEVICE_WE_AUTH_TESTS_WITH["name"]:
assert device["isCaller"] is True
else:
assert device["isCaller"] is False

View File

View File

@ -1,14 +0,0 @@
{
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314"
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314"
}
]
}

View File

@ -3,25 +3,11 @@
# pylint: disable=missing-function-docstring
from tests.common import generate_api_query
from tests.test_graphql.common import assert_original_devices
from tests.test_graphql.test_api_devices import API_DEVICES_QUERY
from tests.test_graphql.test_api_recovery import API_RECOVERY_QUERY
from tests.test_graphql.test_api_version import API_VERSION_QUERY
TOKENS_FILE_CONTETS = {
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314",
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314",
},
]
}
def test_graphql_get_entire_api_data(authorized_client, tokens_file):
response = authorized_client.post(
@ -35,20 +21,11 @@ def test_graphql_get_entire_api_data(authorized_client, tokens_file):
assert response.status_code == 200
assert response.json().get("data") is not None
assert "version" in response.json()["data"]["api"]
assert response.json()["data"]["api"]["devices"] is not None
assert len(response.json()["data"]["api"]["devices"]) == 2
assert (
response.json()["data"]["api"]["devices"][0]["creationDate"]
== "2022-01-14T08:31:10.789314"
)
assert response.json()["data"]["api"]["devices"][0]["isCaller"] is True
assert response.json()["data"]["api"]["devices"][0]["name"] == "test_token"
assert (
response.json()["data"]["api"]["devices"][1]["creationDate"]
== "2022-01-14T08:31:10.789314"
)
assert response.json()["data"]["api"]["devices"][1]["isCaller"] is False
assert response.json()["data"]["api"]["devices"][1]["name"] == "test_token2"
devices = response.json()["data"]["api"]["devices"]
assert devices is not None
assert_original_devices(devices)
assert response.json()["data"]["api"]["recoveryKey"] is not None
assert response.json()["data"]["api"]["recoveryKey"]["exists"] is False
assert response.json()["data"]["api"]["recoveryKey"]["valid"] is False

View File

@ -1,76 +1,77 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=missing-function-docstring
import datetime
import pytest
from mnemonic import Mnemonic
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
from tests.common import (
RECOVERY_KEY_VALIDATION_DATETIME,
DEVICE_KEY_VALIDATION_DATETIME,
NearFuture,
generate_api_query,
)
from tests.conftest import DEVICE_WE_AUTH_TESTS_WITH, TOKENS_FILE_CONTENTS
from tests.test_graphql.api_common import (
assert_data,
assert_empty,
assert_ok,
assert_errorcode,
assert_token_valid,
assert_original,
assert_same,
graphql_get_devices,
request_devices,
set_client_token,
API_DEVICES_QUERY,
ORIGINAL_DEVICES,
)
from selfprivacy_api.models.tokens.token import Token
from tests.common import generate_api_query, read_json, write_json
TOKENS_FILE_CONTETS = {
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314",
def graphql_get_caller_token_info(client):
devices = graphql_get_devices(client)
for device in devices:
if device["isCaller"] is True:
return device
def graphql_get_new_device_key(authorized_client) -> str:
response = authorized_client.post(
"/graphql",
json={"query": NEW_DEVICE_KEY_MUTATION},
)
assert_ok(response, "getNewDeviceApiKey")
key = response.json()["data"]["api"]["getNewDeviceApiKey"]["key"]
assert key.split(" ").__len__() == 12
return key
def graphql_try_auth_new_device(client, mnemonic_key, device_name):
return client.post(
"/graphql",
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION,
"variables": {
"input": {
"key": mnemonic_key,
"deviceName": device_name,
}
},
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314",
},
]
}
API_DEVICES_QUERY = """
devices {
creationDate
isCaller
name
}
"""
)
@pytest.fixture
def token_repo():
return JsonTokensRepository()
def graphql_authorize_new_device(client, mnemonic_key, device_name) -> str:
response = graphql_try_auth_new_device(client, mnemonic_key, "new_device")
assert_ok(response, "authorizeWithNewDeviceApiKey")
token = response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["token"]
assert_token_valid(client, token)
def test_graphql_tokens_info(authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
json={"query": generate_api_query([API_DEVICES_QUERY])},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["devices"] is not None
assert len(response.json()["data"]["api"]["devices"]) == 2
assert (
response.json()["data"]["api"]["devices"][0]["creationDate"]
== "2022-01-14T08:31:10.789314"
)
assert response.json()["data"]["api"]["devices"][0]["isCaller"] is True
assert response.json()["data"]["api"]["devices"][0]["name"] == "test_token"
assert (
response.json()["data"]["api"]["devices"][1]["creationDate"]
== "2022-01-14T08:31:10.789314"
)
assert response.json()["data"]["api"]["devices"][1]["isCaller"] is False
assert response.json()["data"]["api"]["devices"][1]["name"] == "test_token2"
assert_original(authorized_client)
def test_graphql_tokens_info_unauthorized(client, tokens_file):
response = client.post(
"/graphql",
json={"query": generate_api_query([API_DEVICES_QUERY])},
)
assert response.status_code == 200
assert response.json()["data"] is None
response = request_devices(client)
assert_empty(response)
DELETE_TOKEN_MUTATION = """
@ -96,34 +97,27 @@ def test_graphql_delete_token_unauthorized(client, tokens_file):
},
},
)
assert response.status_code == 200
assert response.json()["data"] is None
assert_empty(response)
def test_graphql_delete_token(authorized_client, tokens_file):
test_devices = ORIGINAL_DEVICES.copy()
device_to_delete = test_devices.pop(1)
assert device_to_delete != DEVICE_WE_AUTH_TESTS_WITH
response = authorized_client.post(
"/graphql",
json={
"query": DELETE_TOKEN_MUTATION,
"variables": {
"device": "test_token2",
"device": device_to_delete["name"],
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["deleteDeviceApiToken"]["success"] is True
assert response.json()["data"]["api"]["deleteDeviceApiToken"]["message"] is not None
assert response.json()["data"]["api"]["deleteDeviceApiToken"]["code"] == 200
assert read_json(tokens_file) == {
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314",
}
]
}
assert_ok(response, "deleteDeviceApiToken")
devices = graphql_get_devices(authorized_client)
assert_same(devices, test_devices)
def test_graphql_delete_self_token(authorized_client, tokens_file):
@ -132,16 +126,12 @@ def test_graphql_delete_self_token(authorized_client, tokens_file):
json={
"query": DELETE_TOKEN_MUTATION,
"variables": {
"device": "test_token",
"device": DEVICE_WE_AUTH_TESTS_WITH["name"],
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["deleteDeviceApiToken"]["success"] is False
assert response.json()["data"]["api"]["deleteDeviceApiToken"]["message"] is not None
assert response.json()["data"]["api"]["deleteDeviceApiToken"]["code"] == 400
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
assert_errorcode(response, "deleteDeviceApiToken", 400)
assert_original(authorized_client)
def test_graphql_delete_nonexistent_token(
@ -157,12 +147,9 @@ def test_graphql_delete_nonexistent_token(
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["deleteDeviceApiToken"]["success"] is False
assert response.json()["data"]["api"]["deleteDeviceApiToken"]["message"] is not None
assert response.json()["data"]["api"]["deleteDeviceApiToken"]["code"] == 404
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
assert_errorcode(response, "deleteDeviceApiToken", 404)
assert_original(authorized_client)
REFRESH_TOKEN_MUTATION = """
@ -184,32 +171,22 @@ def test_graphql_refresh_token_unauthorized(client, tokens_file):
"/graphql",
json={"query": REFRESH_TOKEN_MUTATION},
)
assert response.status_code == 200
assert response.json()["data"] is None
assert_empty(response)
def test_graphql_refresh_token(
authorized_client,
tokens_file,
token_repo,
):
def test_graphql_refresh_token(authorized_client, client, tokens_file):
caller_name_and_date = graphql_get_caller_token_info(authorized_client)
response = authorized_client.post(
"/graphql",
json={"query": REFRESH_TOKEN_MUTATION},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["refreshDeviceApiToken"]["success"] is True
assert (
response.json()["data"]["api"]["refreshDeviceApiToken"]["message"] is not None
)
assert response.json()["data"]["api"]["refreshDeviceApiToken"]["code"] == 200
token = token_repo.get_token_by_name("test_token")
assert token == Token(
token=response.json()["data"]["api"]["refreshDeviceApiToken"]["token"],
device_name="test_token",
created_at=datetime.datetime(2022, 1, 14, 8, 31, 10, 789314),
)
assert_ok(response, "refreshDeviceApiToken")
new_token = response.json()["data"]["api"]["refreshDeviceApiToken"]["token"]
assert_token_valid(client, new_token)
set_client_token(client, new_token)
assert graphql_get_caller_token_info(client) == caller_name_and_date
NEW_DEVICE_KEY_MUTATION = """
@ -234,33 +211,7 @@ def test_graphql_get_new_device_auth_key_unauthorized(
"/graphql",
json={"query": NEW_DEVICE_KEY_MUTATION},
)
assert response.status_code == 200
assert response.json()["data"] is None
def test_graphql_get_new_device_auth_key(
authorized_client,
tokens_file,
):
response = authorized_client.post(
"/graphql",
json={"query": NEW_DEVICE_KEY_MUTATION},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["success"] is True
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["message"] is not None
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["code"] == 200
assert (
response.json()["data"]["api"]["getNewDeviceApiKey"]["key"].split(" ").__len__()
== 12
)
token = (
Mnemonic(language="english")
.to_entropy(response.json()["data"]["api"]["getNewDeviceApiKey"]["key"])
.hex()
)
assert read_json(tokens_file)["new_device"]["token"] == token
assert_empty(response)
INVALIDATE_NEW_DEVICE_KEY_MUTATION = """
@ -289,48 +240,20 @@ def test_graphql_invalidate_new_device_token_unauthorized(
},
},
)
assert response.status_code == 200
assert response.json()["data"] is None
assert_empty(response)
def test_graphql_get_and_delete_new_device_key(
authorized_client,
tokens_file,
):
response = authorized_client.post(
"/graphql",
json={"query": NEW_DEVICE_KEY_MUTATION},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["success"] is True
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["message"] is not None
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["code"] == 200
assert (
response.json()["data"]["api"]["getNewDeviceApiKey"]["key"].split(" ").__len__()
== 12
)
token = (
Mnemonic(language="english")
.to_entropy(response.json()["data"]["api"]["getNewDeviceApiKey"]["key"])
.hex()
)
assert read_json(tokens_file)["new_device"]["token"] == token
def test_graphql_get_and_delete_new_device_key(client, authorized_client, tokens_file):
mnemonic_key = graphql_get_new_device_key(authorized_client)
response = authorized_client.post(
"/graphql",
json={"query": INVALIDATE_NEW_DEVICE_KEY_MUTATION},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert (
response.json()["data"]["api"]["invalidateNewDeviceApiKey"]["success"] is True
)
assert (
response.json()["data"]["api"]["invalidateNewDeviceApiKey"]["message"]
is not None
)
assert response.json()["data"]["api"]["invalidateNewDeviceApiKey"]["code"] == 200
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
assert_ok(response, "invalidateNewDeviceApiKey")
response = graphql_try_auth_new_device(client, mnemonic_key, "new_device")
assert_errorcode(response, "authorizeWithNewDeviceApiKey", 404)
AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION = """
@ -347,209 +270,46 @@ mutation AuthorizeWithNewDeviceKey($input: UseNewDeviceKeyInput!) {
"""
def test_graphql_get_and_authorize_new_device(
client,
authorized_client,
tokens_file,
):
response = authorized_client.post(
"/graphql",
json={"query": NEW_DEVICE_KEY_MUTATION},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["success"] is True
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["message"] is not None
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["code"] == 200
mnemonic_key = response.json()["data"]["api"]["getNewDeviceApiKey"]["key"]
assert mnemonic_key.split(" ").__len__() == 12
key = Mnemonic(language="english").to_entropy(mnemonic_key).hex()
assert read_json(tokens_file)["new_device"]["token"] == key
response = client.post(
"/graphql",
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION,
"variables": {
"input": {
"key": mnemonic_key,
"deviceName": "new_device",
}
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert (
response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["success"]
is True
)
assert (
response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["message"]
is not None
)
assert response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["code"] == 200
token = response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["token"]
assert read_json(tokens_file)["tokens"][2]["token"] == token
assert read_json(tokens_file)["tokens"][2]["name"] == "new_device"
def test_graphql_get_and_authorize_new_device(client, authorized_client, tokens_file):
mnemonic_key = graphql_get_new_device_key(authorized_client)
old_devices = graphql_get_devices(authorized_client)
graphql_authorize_new_device(client, mnemonic_key, "new_device")
new_devices = graphql_get_devices(authorized_client)
assert len(new_devices) == len(old_devices) + 1
assert "new_device" in [device["name"] for device in new_devices]
def test_graphql_authorize_new_device_with_invalid_key(
client,
tokens_file,
client, authorized_client, tokens_file
):
response = client.post(
"/graphql",
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION,
"variables": {
"input": {
"key": "invalid_token",
"deviceName": "test_token",
}
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert (
response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["success"]
is False
)
assert (
response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["message"]
is not None
)
assert response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["code"] == 404
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
response = graphql_try_auth_new_device(client, "invalid_token", "new_device")
assert_errorcode(response, "authorizeWithNewDeviceApiKey", 404)
assert_original(authorized_client)
def test_graphql_get_and_authorize_used_key(
client,
authorized_client,
tokens_file,
):
response = authorized_client.post(
"/graphql",
json={"query": NEW_DEVICE_KEY_MUTATION},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["success"] is True
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["message"] is not None
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["code"] == 200
mnemonic_key = response.json()["data"]["api"]["getNewDeviceApiKey"]["key"]
assert mnemonic_key.split(" ").__len__() == 12
key = Mnemonic(language="english").to_entropy(mnemonic_key).hex()
assert read_json(tokens_file)["new_device"]["token"] == key
response = client.post(
"/graphql",
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION,
"variables": {
"input": {
"key": mnemonic_key,
"deviceName": "new_token",
}
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert (
response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["success"]
is True
)
assert (
response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["message"]
is not None
)
assert response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["code"] == 200
assert (
read_json(tokens_file)["tokens"][2]["token"]
== response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["token"]
)
assert read_json(tokens_file)["tokens"][2]["name"] == "new_token"
def test_graphql_get_and_authorize_used_key(client, authorized_client, tokens_file):
mnemonic_key = graphql_get_new_device_key(authorized_client)
response = client.post(
"/graphql",
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION,
"variables": {
"input": {
"key": NEW_DEVICE_KEY_MUTATION,
"deviceName": "test_token2",
}
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert (
response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["success"]
is False
)
assert (
response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["message"]
is not None
)
assert response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["code"] == 404
assert read_json(tokens_file)["tokens"].__len__() == 3
graphql_authorize_new_device(client, mnemonic_key, "new_device")
devices = graphql_get_devices(authorized_client)
response = graphql_try_auth_new_device(client, mnemonic_key, "new_device2")
assert_errorcode(response, "authorizeWithNewDeviceApiKey", 404)
assert graphql_get_devices(authorized_client) == devices
def test_graphql_get_and_authorize_key_after_12_minutes(
client,
authorized_client,
tokens_file,
client, authorized_client, tokens_file, mocker
):
response = authorized_client.post(
"/graphql",
json={"query": NEW_DEVICE_KEY_MUTATION},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["success"] is True
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["message"] is not None
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["code"] == 200
assert (
response.json()["data"]["api"]["getNewDeviceApiKey"]["key"].split(" ").__len__()
== 12
)
key = (
Mnemonic(language="english")
.to_entropy(response.json()["data"]["api"]["getNewDeviceApiKey"]["key"])
.hex()
)
assert read_json(tokens_file)["new_device"]["token"] == key
mnemonic_key = graphql_get_new_device_key(authorized_client)
mock = mocker.patch(DEVICE_KEY_VALIDATION_DATETIME, NearFuture)
file_data = read_json(tokens_file)
file_data["new_device"]["expiration"] = str(
datetime.datetime.now() - datetime.timedelta(minutes=13)
)
write_json(tokens_file, file_data)
response = client.post(
"/graphql",
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION,
"variables": {
"input": {
"key": key,
"deviceName": "test_token",
}
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert (
response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["success"]
is False
)
assert (
response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["message"]
is not None
)
assert response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["code"] == 404
response = graphql_try_auth_new_device(client, mnemonic_key, "new_device")
assert_errorcode(response, "authorizeWithNewDeviceApiKey", 404)
def test_graphql_authorize_without_token(
@ -567,5 +327,4 @@ def test_graphql_authorize_without_token(
},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert_empty(response)

View File

@ -1,24 +1,28 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=missing-function-docstring
import datetime
from tests.common import generate_api_query, mnemonic_to_hex, read_json, write_json
from tests.common import (
generate_api_query,
assert_recovery_recent,
NearFuture,
RECOVERY_KEY_VALIDATION_DATETIME,
)
TOKENS_FILE_CONTETS = {
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314",
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314",
},
]
}
# Graphql API's output should be timezone-naive
from tests.common import five_minutes_into_future_naive as five_minutes_into_future
from tests.common import five_minutes_into_past_naive as five_minutes_into_past
from tests.test_graphql.api_common import (
assert_empty,
assert_data,
assert_ok,
assert_errorcode,
assert_token_valid,
assert_original,
graphql_get_devices,
set_client_token,
)
API_RECOVERY_QUERY = """
recoveryKey {
@ -31,28 +35,85 @@ recoveryKey {
"""
def test_graphql_recovery_key_status_unauthorized(client, tokens_file):
response = client.post(
def request_recovery_status(client):
return client.post(
"/graphql",
json={"query": generate_api_query([API_RECOVERY_QUERY])},
)
assert response.status_code == 200
assert response.json().get("data") is None
def graphql_recovery_status(client):
response = request_recovery_status(client)
data = assert_data(response)
status = data["recoveryKey"]
assert status is not None
return status
def request_make_new_recovery_key(client, expires_at=None, uses=None):
json = {"query": API_RECOVERY_KEY_GENERATE_MUTATION}
limits = {}
if expires_at is not None:
limits["expirationDate"] = expires_at.isoformat()
if uses is not None:
limits["uses"] = uses
if limits != {}:
json["variables"] = {"limits": limits}
response = client.post("/graphql", json=json)
return response
def graphql_make_new_recovery_key(client, expires_at=None, uses=None):
response = request_make_new_recovery_key(client, expires_at, uses)
assert_ok(response, "getNewRecoveryApiKey")
key = response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"]
assert key is not None
assert key.split(" ").__len__() == 18
return key
def request_recovery_auth(client, key, device_name):
return client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"key": key,
"deviceName": device_name,
},
},
},
)
def graphql_use_recovery_key(client, key, device_name):
response = request_recovery_auth(client, key, device_name)
assert_ok(response, "useRecoveryApiKey")
token = response.json()["data"]["api"]["useRecoveryApiKey"]["token"]
assert token is not None
assert_token_valid(client, token)
set_client_token(client, token)
assert device_name in [device["name"] for device in graphql_get_devices(client)]
return token
def test_graphql_recovery_key_status_unauthorized(client, tokens_file):
response = request_recovery_status(client)
assert_empty(response)
def test_graphql_recovery_key_status_when_none_exists(authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
json={"query": generate_api_query([API_RECOVERY_QUERY])},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["recoveryKey"] is not None
assert response.json()["data"]["api"]["recoveryKey"]["exists"] is False
assert response.json()["data"]["api"]["recoveryKey"]["valid"] is False
assert response.json()["data"]["api"]["recoveryKey"]["creationDate"] is None
assert response.json()["data"]["api"]["recoveryKey"]["expirationDate"] is None
assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] is None
status = graphql_recovery_status(authorized_client)
assert status["exists"] is False
assert status["valid"] is False
assert status["creationDate"] is None
assert status["expirationDate"] is None
assert status["usesLeft"] is None
API_RECOVERY_KEY_GENERATE_MUTATION = """
@ -83,281 +144,71 @@ mutation TestUseRecoveryKey($input: UseRecoveryKeyInput!) {
def test_graphql_generate_recovery_key(client, authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["success"] is True
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["code"] == 200
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] is not None
assert (
response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"]
.split(" ")
.__len__()
== 18
)
assert read_json(tokens_file)["recovery_token"] is not None
time_generated = read_json(tokens_file)["recovery_token"]["date"]
assert time_generated is not None
key = response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"]
assert (
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
)
key = graphql_make_new_recovery_key(authorized_client)
# Try to get token status
response = authorized_client.post(
"/graphql",
json={"query": generate_api_query([API_RECOVERY_QUERY])},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["recoveryKey"] is not None
assert response.json()["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json()["data"]["api"]["recoveryKey"]["valid"] is True
assert response.json()["data"]["api"]["recoveryKey"][
"creationDate"
] == time_generated.replace("Z", "")
assert response.json()["data"]["api"]["recoveryKey"]["expirationDate"] is None
assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] is None
status = graphql_recovery_status(authorized_client)
assert status["exists"] is True
assert status["valid"] is True
assert_recovery_recent(status["creationDate"])
assert status["expirationDate"] is None
assert status["usesLeft"] is None
# Try to use token
response = client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"key": key,
"deviceName": "new_test_token",
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is True
assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 200
assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is not None
assert (
response.json()["data"]["api"]["useRecoveryApiKey"]["token"]
== read_json(tokens_file)["tokens"][2]["token"]
)
assert read_json(tokens_file)["tokens"][2]["name"] == "new_test_token"
# Try to use token again
response = client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"key": key,
"deviceName": "new_test_token2",
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is True
assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 200
assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is not None
assert (
response.json()["data"]["api"]["useRecoveryApiKey"]["token"]
== read_json(tokens_file)["tokens"][3]["token"]
)
assert read_json(tokens_file)["tokens"][3]["name"] == "new_test_token2"
graphql_use_recovery_key(client, key, "new_test_token")
# And again
graphql_use_recovery_key(client, key, "new_test_token2")
def test_graphql_generate_recovery_key_with_expiration_date(
client, authorized_client, tokens_file
):
expiration_date = datetime.datetime.now() + datetime.timedelta(minutes=5)
expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%f")
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
"variables": {
"limits": {
"expirationDate": expiration_date_str,
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["success"] is True
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["code"] == 200
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] is not None
assert (
response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"]
.split(" ")
.__len__()
== 18
)
assert read_json(tokens_file)["recovery_token"] is not None
expiration_date = five_minutes_into_future()
key = graphql_make_new_recovery_key(authorized_client, expires_at=expiration_date)
key = response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"]
assert read_json(tokens_file)["recovery_token"]["expiration"] == expiration_date_str
assert read_json(tokens_file)["recovery_token"]["token"] == mnemonic_to_hex(key)
status = graphql_recovery_status(authorized_client)
assert status["exists"] is True
assert status["valid"] is True
assert_recovery_recent(status["creationDate"])
assert status["expirationDate"] == expiration_date.isoformat()
assert status["usesLeft"] is None
time_generated = read_json(tokens_file)["recovery_token"]["date"]
assert time_generated is not None
assert (
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
)
graphql_use_recovery_key(client, key, "new_test_token")
# And again
graphql_use_recovery_key(client, key, "new_test_token2")
# Try to get token status
response = authorized_client.post(
"/graphql",
json={"query": generate_api_query([API_RECOVERY_QUERY])},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["recoveryKey"] is not None
assert response.json()["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json()["data"]["api"]["recoveryKey"]["valid"] is True
assert response.json()["data"]["api"]["recoveryKey"][
"creationDate"
] == time_generated.replace("Z", "")
assert (
response.json()["data"]["api"]["recoveryKey"]["expirationDate"]
== expiration_date_str
)
assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] is None
# Try to use token
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"key": key,
"deviceName": "new_test_token",
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is True
assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 200
assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is not None
assert (
response.json()["data"]["api"]["useRecoveryApiKey"]["token"]
== read_json(tokens_file)["tokens"][2]["token"]
)
def test_graphql_use_recovery_key_after_expiration(
client, authorized_client, tokens_file, mocker
):
expiration_date = five_minutes_into_future()
key = graphql_make_new_recovery_key(authorized_client, expires_at=expiration_date)
# Try to use token again
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"key": key,
"deviceName": "new_test_token2",
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is True
assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 200
assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is not None
assert (
response.json()["data"]["api"]["useRecoveryApiKey"]["token"]
== read_json(tokens_file)["tokens"][3]["token"]
)
# Timewarp to after it expires
mock = mocker.patch(RECOVERY_KEY_VALIDATION_DATETIME, NearFuture)
# Try to use token after expiration date
new_data = read_json(tokens_file)
new_data["recovery_token"]["expiration"] = (
datetime.datetime.now() - datetime.timedelta(minutes=5)
).strftime("%Y-%m-%dT%H:%M:%S.%f")
write_json(tokens_file, new_data)
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"key": key,
"deviceName": "new_test_token3",
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is False
assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 404
response = request_recovery_auth(client, key, "new_test_token3")
assert_errorcode(response, "useRecoveryApiKey", 404)
assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is None
assert_original(authorized_client)
assert read_json(tokens_file)["tokens"] == new_data["tokens"]
# Try to get token status
response = authorized_client.post(
"/graphql",
json={"query": generate_api_query([API_RECOVERY_QUERY])},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["recoveryKey"] is not None
assert response.json()["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json()["data"]["api"]["recoveryKey"]["valid"] is False
assert (
response.json()["data"]["api"]["recoveryKey"]["creationDate"] == time_generated
)
assert (
response.json()["data"]["api"]["recoveryKey"]["expirationDate"]
== new_data["recovery_token"]["expiration"]
)
assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] is None
status = graphql_recovery_status(authorized_client)
assert status["exists"] is True
assert status["valid"] is False
assert_recovery_recent(status["creationDate"])
assert status["expirationDate"] == expiration_date.isoformat()
assert status["usesLeft"] is None
def test_graphql_generate_recovery_key_with_expiration_in_the_past(
authorized_client, tokens_file
):
expiration_date = datetime.datetime.now() - datetime.timedelta(minutes=5)
expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%f")
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
"variables": {
"limits": {
"expirationDate": expiration_date_str,
},
},
},
expiration_date = five_minutes_into_past()
response = request_make_new_recovery_key(
authorized_client, expires_at=expiration_date
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["success"] is False
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["code"] == 400
assert_errorcode(response, "getNewRecoveryApiKey", 400)
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] is None
assert "recovery_token" not in read_json(tokens_file)
assert graphql_recovery_status(authorized_client)["exists"] is False
def test_graphql_generate_recovery_key_with_invalid_time_format(
@ -377,183 +228,57 @@ def test_graphql_generate_recovery_key_with_invalid_time_format(
},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert "recovery_token" not in read_json(tokens_file)
assert_empty(response)
assert graphql_recovery_status(authorized_client)["exists"] is False
def test_graphql_generate_recovery_key_with_limited_uses(
authorized_client, tokens_file
authorized_client, client, tokens_file
):
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
"variables": {
"limits": {
"expirationDate": None,
"uses": 2,
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["success"] is True
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["code"] == 200
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] is not None
mnemonic_key = graphql_make_new_recovery_key(authorized_client, uses=2)
mnemonic_key = response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"]
key = mnemonic_to_hex(mnemonic_key)
status = graphql_recovery_status(authorized_client)
assert status["exists"] is True
assert status["valid"] is True
assert status["creationDate"] is not None
assert status["expirationDate"] is None
assert status["usesLeft"] == 2
assert read_json(tokens_file)["recovery_token"]["token"] == key
assert read_json(tokens_file)["recovery_token"]["uses_left"] == 2
graphql_use_recovery_key(client, mnemonic_key, "new_test_token1")
# Try to get token status
response = authorized_client.post(
"/graphql",
json={"query": generate_api_query([API_RECOVERY_QUERY])},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["recoveryKey"] is not None
assert response.json()["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json()["data"]["api"]["recoveryKey"]["valid"] is True
assert response.json()["data"]["api"]["recoveryKey"]["creationDate"] is not None
assert response.json()["data"]["api"]["recoveryKey"]["expirationDate"] is None
assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] == 2
status = graphql_recovery_status(authorized_client)
assert status["exists"] is True
assert status["valid"] is True
assert status["creationDate"] is not None
assert status["expirationDate"] is None
assert status["usesLeft"] == 1
# Try to use token
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"key": mnemonic_key,
"deviceName": "test_token1",
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is True
assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 200
assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is not None
graphql_use_recovery_key(client, mnemonic_key, "new_test_token2")
# Try to get token status
response = authorized_client.post(
"/graphql",
json={"query": generate_api_query([API_RECOVERY_QUERY])},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["recoveryKey"] is not None
assert response.json()["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json()["data"]["api"]["recoveryKey"]["valid"] is True
assert response.json()["data"]["api"]["recoveryKey"]["creationDate"] is not None
assert response.json()["data"]["api"]["recoveryKey"]["expirationDate"] is None
assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] == 1
status = graphql_recovery_status(authorized_client)
assert status["exists"] is True
assert status["valid"] is False
assert status["creationDate"] is not None
assert status["expirationDate"] is None
assert status["usesLeft"] == 0
# Try to use token
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"key": mnemonic_key,
"deviceName": "test_token2",
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is True
assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 200
assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is not None
# Try to get token status
response = authorized_client.post(
"/graphql",
json={"query": generate_api_query([API_RECOVERY_QUERY])},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["recoveryKey"] is not None
assert response.json()["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json()["data"]["api"]["recoveryKey"]["valid"] is False
assert response.json()["data"]["api"]["recoveryKey"]["creationDate"] is not None
assert response.json()["data"]["api"]["recoveryKey"]["expirationDate"] is None
assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] == 0
# Try to use token
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"key": mnemonic_key,
"deviceName": "test_token3",
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is False
assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 404
assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is None
response = request_recovery_auth(client, mnemonic_key, "new_test_token3")
assert_errorcode(response, "useRecoveryApiKey", 404)
def test_graphql_generate_recovery_key_with_negative_uses(
authorized_client, tokens_file
):
# Try to get token status
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
"variables": {
"limits": {
"uses": -1,
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["success"] is False
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["code"] == 400
response = request_make_new_recovery_key(authorized_client, uses=-1)
assert_errorcode(response, "getNewRecoveryApiKey", 400)
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] is None
def test_graphql_generate_recovery_key_with_zero_uses(authorized_client, tokens_file):
# Try to get token status
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
"variables": {
"limits": {
"uses": 0,
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["success"] is False
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["code"] == 400
response = request_make_new_recovery_key(authorized_client, uses=0)
assert_errorcode(response, "getNewRecoveryApiKey", 400)
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] is None
assert graphql_recovery_status(authorized_client)["exists"] is False

View File

@ -25,7 +25,6 @@ from test_tokens_repository import (
mock_recovery_key_generate,
mock_generate_token,
mock_new_device_key_generate,
empty_keys,
)
ORIGINAL_TOKEN_CONTENT = [
@ -51,6 +50,18 @@ ORIGINAL_TOKEN_CONTENT = [
},
]
EMPTY_KEYS_JSON = """
{
"tokens": [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698"
}
]
}
"""
@pytest.fixture
def tokens(mocker, datadir):
@ -59,6 +70,22 @@ def tokens(mocker, datadir):
return datadir
@pytest.fixture
def empty_keys(mocker, tmpdir):
tokens_file = tmpdir / "empty_keys.json"
with open(tokens_file, "w") as file:
file.write(EMPTY_KEYS_JSON)
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=tokens_file)
assert read_json(tokens_file)["tokens"] == [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698",
}
]
return tmpdir
@pytest.fixture
def null_keys(mocker, datadir):
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "null_keys.json")

View File

@ -1,9 +0,0 @@
{
"tokens": [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698"
}
]
}

View File

@ -2,7 +2,7 @@
# pylint: disable=unused-argument
# pylint: disable=missing-function-docstring
from datetime import datetime, timedelta
from datetime import datetime, timezone
from mnemonic import Mnemonic
import pytest
@ -16,13 +16,18 @@ from selfprivacy_api.repositories.tokens.exceptions import (
TokenNotFound,
NewDeviceKeyNotFound,
)
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
RedisTokensRepository,
)
from tests.common import read_json
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
from tests.common import five_minutes_into_past, five_minutes_into_future
ORIGINAL_DEVICE_NAMES = [
@ -32,24 +37,15 @@ ORIGINAL_DEVICE_NAMES = [
"forth_token",
]
TEST_DATE = datetime(2022, 7, 15, 17, 41, 31, 675698, timezone.utc)
# tokens are not tz-aware
TOKEN_TEST_DATE = datetime(2022, 7, 15, 17, 41, 31, 675698)
def mnemonic_from_hex(hexkey):
return Mnemonic(language="english").to_mnemonic(bytes.fromhex(hexkey))
@pytest.fixture
def empty_keys(mocker, datadir):
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "empty_keys.json")
assert read_json(datadir / "empty_keys.json")["tokens"] == [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698",
}
]
return datadir
@pytest.fixture
def mock_new_device_key_generate(mocker):
mock = mocker.patch(
@ -57,8 +53,8 @@ def mock_new_device_key_generate(mocker):
autospec=True,
return_value=NewDeviceKey(
key="43478d05b35e4781598acd76e33832bb",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
expires_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
created_at=TEST_DATE,
expires_at=TEST_DATE,
),
)
return mock
@ -72,8 +68,8 @@ def mock_new_device_key_generate_for_mnemonic(mocker):
autospec=True,
return_value=NewDeviceKey(
key="2237238de23dc71ab558e317bdb8ff8e",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
expires_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
created_at=TEST_DATE,
expires_at=TEST_DATE,
),
)
return mock
@ -100,7 +96,7 @@ def mock_recovery_key_generate_invalid(mocker):
autospec=True,
return_value=RecoveryKey(
key="889bf49c1d3199d71a2e704718772bd53a422020334db051",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
created_at=TEST_DATE,
expires_at=None,
uses_left=0,
),
@ -116,7 +112,7 @@ def mock_token_generate(mocker):
return_value=Token(
token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM",
device_name="IamNewDevice",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
created_at=TOKEN_TEST_DATE,
),
)
return mock
@ -129,7 +125,7 @@ def mock_recovery_key_generate(mocker):
autospec=True,
return_value=RecoveryKey(
key="889bf49c1d3199d71a2e704718772bd53a422020334db051",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
created_at=TEST_DATE,
expires_at=None,
uses_left=1,
),
@ -137,23 +133,6 @@ def mock_recovery_key_generate(mocker):
return mock
@pytest.fixture
def empty_json_repo(empty_keys):
repo = JsonTokensRepository()
for token in repo.get_tokens():
repo.delete_token(token)
assert repo.get_tokens() == []
return repo
@pytest.fixture
def empty_redis_repo():
repo = RedisTokensRepository()
repo.reset()
assert repo.get_tokens() == []
return repo
@pytest.fixture(params=["json", "redis"])
def empty_repo(request, empty_json_repo, empty_redis_repo):
if request.param == "json":
@ -250,13 +229,13 @@ def test_create_token(empty_repo, mock_token_generate):
assert repo.create_token(device_name="IamNewDevice") == Token(
token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM",
device_name="IamNewDevice",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
created_at=TOKEN_TEST_DATE,
)
assert repo.get_tokens() == [
Token(
token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM",
device_name="IamNewDevice",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
created_at=TOKEN_TEST_DATE,
)
]
@ -292,7 +271,7 @@ def test_delete_not_found_token(some_tokens_repo):
input_token = Token(
token="imbadtoken",
device_name="primary_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
created_at=TEST_DATE,
)
with pytest.raises(TokenNotFound):
assert repo.delete_token(input_token) is None
@ -321,7 +300,7 @@ def test_refresh_not_found_token(some_tokens_repo, mock_token_generate):
input_token = Token(
token="idontknowwhoiam",
device_name="tellmewhoiam?",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
created_at=TEST_DATE,
)
with pytest.raises(TokenNotFound):
@ -345,7 +324,7 @@ def test_create_get_recovery_key(some_tokens_repo, mock_recovery_key_generate):
assert repo.create_recovery_key(uses_left=1, expiration=None) is not None
assert repo.get_recovery_key() == RecoveryKey(
key="889bf49c1d3199d71a2e704718772bd53a422020334db051",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
created_at=TEST_DATE,
expires_at=None,
uses_left=1,
)
@ -384,10 +363,13 @@ def test_use_mnemonic_expired_recovery_key(
some_tokens_repo,
):
repo = some_tokens_repo
expiration = datetime.now() - timedelta(minutes=5)
expiration = five_minutes_into_past()
assert repo.create_recovery_key(uses_left=2, expiration=expiration) is not None
recovery_key = repo.get_recovery_key()
assert recovery_key.expires_at == expiration
# TODO: do not ignore timezone once json backend is deleted
assert recovery_key.expires_at.replace(tzinfo=None) == expiration.replace(
tzinfo=None
)
assert not repo.is_recovery_key_valid()
with pytest.raises(RecoveryKeyNotFound):
@ -484,8 +466,8 @@ def test_get_new_device_key(some_tokens_repo, mock_new_device_key_generate):
assert repo.get_new_device_key() == NewDeviceKey(
key="43478d05b35e4781598acd76e33832bb",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
expires_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
created_at=TEST_DATE,
expires_at=TEST_DATE,
)
@ -561,7 +543,7 @@ def test_use_mnemonic_expired_new_device_key(
some_tokens_repo,
):
repo = some_tokens_repo
expiration = datetime.now() - timedelta(minutes=5)
expiration = five_minutes_into_past()
key = repo.get_new_device_key()
assert key is not None
@ -588,3 +570,36 @@ def test_use_mnemonic_new_device_key_when_empty(empty_repo):
)
is None
)
def assert_identical(
repo_a: AbstractTokensRepository, repo_b: AbstractTokensRepository
):
tokens_a = repo_a.get_tokens()
tokens_b = repo_b.get_tokens()
assert len(tokens_a) == len(tokens_b)
for token in tokens_a:
assert token in tokens_b
assert repo_a.get_recovery_key() == repo_b.get_recovery_key()
assert repo_a._get_stored_new_device_key() == repo_b._get_stored_new_device_key()
def clone_to_redis(repo: JsonTokensRepository):
other_repo = RedisTokensRepository()
other_repo.clone(repo)
assert_identical(repo, other_repo)
# we cannot easily parametrize this unfortunately, since some_tokens and empty_repo cannot coexist
def test_clone_json_to_redis_empty(empty_repo):
repo = empty_repo
if isinstance(repo, JsonTokensRepository):
clone_to_redis(repo)
def test_clone_json_to_redis_full(some_tokens_repo):
repo = some_tokens_repo
if isinstance(repo, JsonTokensRepository):
repo.get_new_device_key()
repo.create_recovery_key(five_minutes_into_future(), 2)
clone_to_redis(repo)

View File

@ -1,9 +0,0 @@
{
"tokens": [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698"
}
]
}

View File

@ -3,6 +3,7 @@
import pytest
from tests.common import read_json
from tests.test_graphql.common import assert_empty
class ProcessMock:
@ -72,8 +73,7 @@ def test_graphql_add_ssh_key_unauthorized(client, some_users, mock_subprocess_po
},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert_empty(response)
def test_graphql_add_ssh_key(authorized_client, some_users, mock_subprocess_popen):
@ -231,8 +231,7 @@ def test_graphql_remove_ssh_key_unauthorized(client, some_users, mock_subprocess
},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert_empty(response)
def test_graphql_remove_ssh_key(authorized_client, some_users, mock_subprocess_popen):

View File

@ -5,6 +5,7 @@ import os
import pytest
from tests.common import generate_system_query, read_json
from tests.test_graphql.common import assert_empty
@pytest.fixture
@ -144,8 +145,7 @@ def test_graphql_get_python_version_wrong_auth(
"query": generate_system_query([API_PYTHON_VERSION_INFO]),
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert_empty(response)
def test_graphql_get_python_version(authorized_client, mock_subprocess_check_output):
@ -181,8 +181,7 @@ def test_graphql_get_system_version_unauthorized(
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert_empty(response)
assert mock_subprocess_check_output.call_count == 0
@ -348,8 +347,7 @@ def test_graphql_get_timezone_unauthorized(client, turned_on):
"query": generate_system_query([API_GET_TIMEZONE]),
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert_empty(response)
def test_graphql_get_timezone(authorized_client, turned_on):
@ -405,8 +403,7 @@ def test_graphql_change_timezone_unauthorized(client, turned_on):
},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert_empty(response)
def test_graphql_change_timezone(authorized_client, turned_on):
@ -515,8 +512,7 @@ def test_graphql_get_auto_upgrade_unauthorized(client, turned_on):
"query": generate_system_query([API_GET_AUTO_UPGRADE_SETTINGS_QUERY]),
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert_empty(response)
def test_graphql_get_auto_upgrade(authorized_client, turned_on):
@ -624,8 +620,7 @@ def test_graphql_change_auto_upgrade_unauthorized(client, turned_on):
},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert_empty(response)
def test_graphql_change_auto_upgrade(authorized_client, turned_on):
@ -932,8 +927,7 @@ def test_graphql_pull_system_configuration_unauthorized(client, mock_subprocess_
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert_empty(response)
assert mock_subprocess_popen.call_count == 0

View File

@ -6,6 +6,7 @@ from tests.common import (
generate_users_query,
read_json,
)
from tests.test_graphql.common import assert_empty
invalid_usernames = [
"messagebus",
@ -125,8 +126,7 @@ def test_graphql_get_users_unauthorized(client, some_users, mock_subprocess_pope
"query": generate_users_query([API_USERS_INFO]),
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert_empty(response)
def test_graphql_get_some_users(authorized_client, some_users, mock_subprocess_popen):
@ -192,8 +192,7 @@ def test_graphql_get_one_user_unauthorized(client, one_user, mock_subprocess_pop
},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert_empty(response)
def test_graphql_get_one_user(authorized_client, one_user, mock_subprocess_popen):
@ -323,8 +322,7 @@ def test_graphql_add_user_unauthorize(client, one_user, mock_subprocess_popen):
},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert_empty(response)
def test_graphql_add_user(authorized_client, one_user, mock_subprocess_popen):
@ -576,8 +574,7 @@ def test_graphql_delete_user_unauthorized(client, some_users, mock_subprocess_po
"variables": {"username": "user1"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert_empty(response)
def test_graphql_delete_user(authorized_client, some_users, mock_subprocess_popen):
@ -683,8 +680,7 @@ def test_graphql_update_user_unauthorized(client, some_users, mock_subprocess_po
},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
assert_empty(response)
def test_graphql_update_user(authorized_client, some_users, mock_subprocess_popen):

View File

@ -1,14 +0,0 @@
{
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314"
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314"
}
]
}

View File

@ -1,9 +0,0 @@
{
"tokens": [
{
"token": "TEST_TOKEN",
"name": "Test Token",
"date": "2022-01-14 08:31:10.789314"
}
]
}

View File

@ -3,31 +3,16 @@
# pylint: disable=missing-function-docstring
import datetime
import pytest
from mnemonic import Mnemonic
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
from tests.conftest import TOKENS_FILE_CONTENTS
from tests.common import (
RECOVERY_KEY_VALIDATION_DATETIME,
DEVICE_KEY_VALIDATION_DATETIME,
NearFuture,
assert_recovery_recent,
)
TOKEN_REPO = JsonTokensRepository()
from tests.common import read_json, write_json
TOKENS_FILE_CONTETS = {
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314",
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314",
},
]
}
from tests.common import five_minutes_into_future_naive as five_minutes_into_future
from tests.common import five_minutes_into_past_naive as five_minutes_into_past
DATE_FORMATS = [
"%Y-%m-%dT%H:%M:%S.%fZ",
@ -37,10 +22,99 @@ DATE_FORMATS = [
]
def test_get_tokens_info(authorized_client, tokens_file):
response = authorized_client.get("/auth/tokens")
def assert_original(client):
new_tokens = rest_get_tokens_info(client)
for token in TOKENS_FILE_CONTENTS["tokens"]:
assert_token_valid(client, token["token"])
for new_token in new_tokens:
if new_token["name"] == token["name"]:
assert (
datetime.datetime.fromisoformat(new_token["date"]) == token["date"]
)
assert_no_recovery(client)
def assert_token_valid(client, token):
client.headers.update({"Authorization": "Bearer " + token})
assert rest_get_tokens_info(client) is not None
def rest_get_tokens_info(client):
response = client.get("/auth/tokens")
assert response.status_code == 200
assert response.json() == [
return response.json()
def rest_try_authorize_new_device(client, token, device_name):
response = client.post(
"/auth/new_device/authorize",
json={
"token": token,
"device": device_name,
},
)
return response
def rest_make_recovery_token(client, expires_at=None, timeformat=None, uses=None):
json = {}
if expires_at is not None:
assert timeformat is not None
expires_at_str = expires_at.strftime(timeformat)
json["expiration"] = expires_at_str
if uses is not None:
json["uses"] = uses
if json == {}:
response = client.post("/auth/recovery_token")
else:
response = client.post(
"/auth/recovery_token",
json=json,
)
if not response.status_code == 200:
raise ValueError(response.reason, response.text, response.json()["detail"])
assert response.status_code == 200
assert "token" in response.json()
return response.json()["token"]
def rest_get_recovery_status(client):
response = client.get("/auth/recovery_token")
assert response.status_code == 200
return response.json()
def rest_get_recovery_date(client):
status = rest_get_recovery_status(client)
assert "date" in status
return status["date"]
def assert_no_recovery(client):
assert not rest_get_recovery_status(client)["exists"]
def rest_recover_with_mnemonic(client, mnemonic_token, device_name):
recovery_response = client.post(
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": device_name},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json()["token"]
assert_token_valid(client, new_token)
return new_token
# Tokens
def test_get_tokens_info(authorized_client, tokens_file):
assert sorted(rest_get_tokens_info(authorized_client), key=lambda x: x["name"]) == [
{"name": "test_token", "date": "2022-01-14T08:31:10.789314", "is_caller": True},
{
"name": "test_token2",
@ -55,10 +129,10 @@ def test_get_tokens_unauthorized(client, tokens_file):
assert response.status_code == 401
def test_delete_token_unauthorized(client, tokens_file):
def test_delete_token_unauthorized(client, authorized_client, tokens_file):
response = client.delete("/auth/tokens")
assert response.status_code == 401
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
assert_original(authorized_client)
def test_delete_token(authorized_client, tokens_file):
@ -66,15 +140,9 @@ def test_delete_token(authorized_client, tokens_file):
"/auth/tokens", json={"token_name": "test_token2"}
)
assert response.status_code == 200
assert read_json(tokens_file) == {
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314",
}
]
}
assert rest_get_tokens_info(authorized_client) == [
{"name": "test_token", "date": "2022-01-14T08:31:10.789314", "is_caller": True}
]
def test_delete_self_token(authorized_client, tokens_file):
@ -82,7 +150,7 @@ def test_delete_self_token(authorized_client, tokens_file):
"/auth/tokens", json={"token_name": "test_token"}
)
assert response.status_code == 400
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
assert_original(authorized_client)
def test_delete_nonexistent_token(authorized_client, tokens_file):
@ -90,131 +158,103 @@ def test_delete_nonexistent_token(authorized_client, tokens_file):
"/auth/tokens", json={"token_name": "test_token3"}
)
assert response.status_code == 404
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
assert_original(authorized_client)
def test_refresh_token_unauthorized(client, tokens_file):
def test_refresh_token_unauthorized(client, authorized_client, tokens_file):
response = client.post("/auth/tokens")
assert response.status_code == 401
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
assert_original(authorized_client)
def test_refresh_token(authorized_client, tokens_file):
response = authorized_client.post("/auth/tokens")
assert response.status_code == 200
new_token = response.json()["token"]
assert TOKEN_REPO.get_token_by_token_string(new_token) is not None
assert_token_valid(authorized_client, new_token)
# new device
# New device
def test_get_new_device_auth_token_unauthorized(client, tokens_file):
def test_get_new_device_auth_token_unauthorized(client, authorized_client, tokens_file):
response = client.post("/auth/new_device")
assert response.status_code == 401
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
assert "token" not in response.json()
assert "detail" in response.json()
# We only can check existence of a token we know.
def test_get_new_device_auth_token(authorized_client, tokens_file):
response = authorized_client.post("/auth/new_device")
def test_get_and_delete_new_device_token(client, authorized_client, tokens_file):
token = rest_get_new_device_token(authorized_client)
response = authorized_client.delete("/auth/new_device", json={"token": token})
assert response.status_code == 200
assert "token" in response.json()
token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
assert rest_try_authorize_new_device(client, token, "new_device").status_code == 404
def test_get_and_delete_new_device_token(authorized_client, tokens_file):
response = authorized_client.post("/auth/new_device")
assert response.status_code == 200
assert "token" in response.json()
token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
response = authorized_client.delete(
"/auth/new_device", json={"token": response.json()["token"]}
)
assert response.status_code == 200
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
def test_delete_token_unauthenticated(client, tokens_file):
response = client.delete("/auth/new_device")
def test_delete_token_unauthenticated(client, authorized_client, tokens_file):
token = rest_get_new_device_token(authorized_client)
response = client.delete("/auth/new_device", json={"token": token})
assert response.status_code == 401
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
assert rest_try_authorize_new_device(client, token, "new_device").status_code == 200
def rest_get_new_device_token(client):
response = client.post("/auth/new_device")
assert response.status_code == 200
assert "token" in response.json()
return response.json()["token"]
def test_get_and_authorize_new_device(client, authorized_client, tokens_file):
response = authorized_client.post("/auth/new_device")
token = rest_get_new_device_token(authorized_client)
response = rest_try_authorize_new_device(client, token, "new_device")
assert response.status_code == 200
assert "token" in response.json()
token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
response = client.post(
"/auth/new_device/authorize",
json={"token": response.json()["token"], "device": "new_device"},
)
assert response.status_code == 200
assert read_json(tokens_file)["tokens"][2]["token"] == response.json()["token"]
assert read_json(tokens_file)["tokens"][2]["name"] == "new_device"
assert_token_valid(authorized_client, response.json()["token"])
def test_authorize_new_device_with_invalid_token(client, tokens_file):
response = client.post(
"/auth/new_device/authorize",
json={"token": "invalid_token", "device": "new_device"},
)
def test_authorize_new_device_with_invalid_token(
client, authorized_client, tokens_file
):
response = rest_try_authorize_new_device(client, "invalid_token", "new_device")
assert response.status_code == 404
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
assert_original(authorized_client)
def test_get_and_authorize_used_token(client, authorized_client, tokens_file):
response = authorized_client.post("/auth/new_device")
assert response.status_code == 200
assert "token" in response.json()
token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
response = client.post(
"/auth/new_device/authorize",
json={"token": response.json()["token"], "device": "new_device"},
token_to_be_used_2_times = rest_get_new_device_token(authorized_client)
response = rest_try_authorize_new_device(
client, token_to_be_used_2_times, "new_device"
)
assert response.status_code == 200
assert read_json(tokens_file)["tokens"][2]["token"] == response.json()["token"]
assert read_json(tokens_file)["tokens"][2]["name"] == "new_device"
response = client.post(
"/auth/new_device/authorize",
json={"token": response.json()["token"], "device": "new_device"},
assert_token_valid(authorized_client, response.json()["token"])
response = rest_try_authorize_new_device(
client, token_to_be_used_2_times, "new_device"
)
assert response.status_code == 404
def test_get_and_authorize_token_after_12_minutes(
client, authorized_client, tokens_file
client, authorized_client, tokens_file, mocker
):
response = authorized_client.post("/auth/new_device")
assert response.status_code == 200
assert "token" in response.json()
token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
token = rest_get_new_device_token(authorized_client)
file_data = read_json(tokens_file)
file_data["new_device"]["expiration"] = str(
datetime.datetime.now() - datetime.timedelta(minutes=13)
)
write_json(tokens_file, file_data)
# TARDIS sounds
mock = mocker.patch(DEVICE_KEY_VALIDATION_DATETIME, NearFuture)
response = client.post(
"/auth/new_device/authorize",
json={"token": response.json()["token"], "device": "new_device"},
)
response = rest_try_authorize_new_device(client, token, "new_device")
assert response.status_code == 404
assert_original(authorized_client)
def test_authorize_without_token(client, tokens_file):
def test_authorize_without_token(client, authorized_client, tokens_file):
response = client.post(
"/auth/new_device/authorize",
json={"device": "new_device"},
)
assert response.status_code == 422
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
assert_original(authorized_client)
# Recovery tokens
@ -240,10 +280,10 @@ def test_authorize_without_token(client, tokens_file):
# - if request is invalid, returns 400
def test_get_recovery_token_status_unauthorized(client, tokens_file):
def test_get_recovery_token_status_unauthorized(client, authorized_client, tokens_file):
response = client.get("/auth/recovery_token")
assert response.status_code == 401
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
assert_original(authorized_client)
def test_get_recovery_token_when_none_exists(authorized_client, tokens_file):
@ -256,31 +296,17 @@ def test_get_recovery_token_when_none_exists(authorized_client, tokens_file):
"expiration": None,
"uses_left": None,
}
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
assert_original(authorized_client)
def test_generate_recovery_token(authorized_client, client, tokens_file):
# Generate token without expiration and uses_left
response = authorized_client.post("/auth/recovery_token")
assert response.status_code == 200
assert "token" in response.json()
mnemonic_token = response.json()["token"]
token = Mnemonic(language="english").to_entropy(mnemonic_token).hex()
assert read_json(tokens_file)["recovery_token"]["token"] == token
mnemonic_token = rest_make_recovery_token(authorized_client)
time_generated = read_json(tokens_file)["recovery_token"]["date"]
assert time_generated is not None
# Assert that the token was generated near the current time
assert (
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
)
time_generated = rest_get_recovery_date(authorized_client)
assert_recovery_recent(time_generated)
# Try to get token status
response = authorized_client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json() == {
assert rest_get_recovery_status(authorized_client) == {
"exists": True,
"valid": True,
"date": time_generated,
@ -288,112 +314,49 @@ def test_generate_recovery_token(authorized_client, client, tokens_file):
"uses_left": None,
}
# Try to use the token
recovery_response = client.post(
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json()["token"]
assert read_json(tokens_file)["tokens"][2]["token"] == new_token
assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device"
# Try to use token again
recovery_response = client.post(
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device2"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json()["token"]
assert read_json(tokens_file)["tokens"][3]["token"] == new_token
assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2"
rest_recover_with_mnemonic(client, mnemonic_token, "recover_device")
# And again
rest_recover_with_mnemonic(client, mnemonic_token, "recover_device2")
@pytest.mark.parametrize("timeformat", DATE_FORMATS)
def test_generate_recovery_token_with_expiration_date(
authorized_client, client, tokens_file, timeformat
authorized_client, client, tokens_file, timeformat, mocker
):
# Generate token with expiration date
# Generate expiration date in the future
expiration_date = datetime.datetime.now() + datetime.timedelta(minutes=5)
expiration_date_str = expiration_date.strftime(timeformat)
response = authorized_client.post(
"/auth/recovery_token",
json={"expiration": expiration_date_str},
)
assert response.status_code == 200
assert "token" in response.json()
mnemonic_token = response.json()["token"]
token = Mnemonic(language="english").to_entropy(mnemonic_token).hex()
assert read_json(tokens_file)["recovery_token"]["token"] == token
assert datetime.datetime.strptime(
read_json(tokens_file)["recovery_token"]["expiration"], "%Y-%m-%dT%H:%M:%S.%f"
) == datetime.datetime.strptime(expiration_date_str, timeformat)
time_generated = read_json(tokens_file)["recovery_token"]["date"]
assert time_generated is not None
# Assert that the token was generated near the current time
assert (
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
expiration_date = five_minutes_into_future()
mnemonic_token = rest_make_recovery_token(
authorized_client, expires_at=expiration_date, timeformat=timeformat
)
# Try to get token status
response = authorized_client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json() == {
time_generated = rest_get_recovery_date(authorized_client)
assert_recovery_recent(time_generated)
assert rest_get_recovery_status(authorized_client) == {
"exists": True,
"valid": True,
"date": time_generated,
"expiration": expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%f"),
"expiration": expiration_date.isoformat(),
"uses_left": None,
}
# Try to use the token
recovery_response = client.post(
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json()["token"]
assert read_json(tokens_file)["tokens"][2]["token"] == new_token
assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device"
# Try to use token again
recovery_response = client.post(
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device2"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json()["token"]
assert read_json(tokens_file)["tokens"][3]["token"] == new_token
assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2"
rest_recover_with_mnemonic(client, mnemonic_token, "recover_device")
# And again
rest_recover_with_mnemonic(client, mnemonic_token, "recover_device2")
# Try to use token after expiration date
new_data = read_json(tokens_file)
new_data["recovery_token"]["expiration"] = datetime.datetime.now().strftime(
"%Y-%m-%dT%H:%M:%S.%f"
)
write_json(tokens_file, new_data)
mock = mocker.patch(RECOVERY_KEY_VALIDATION_DATETIME, NearFuture)
device_name = "recovery_device3"
recovery_response = client.post(
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device3"},
json={"token": mnemonic_token, "device": device_name},
)
assert recovery_response.status_code == 404
# Assert that the token was not created in JSON
assert read_json(tokens_file)["tokens"] == new_data["tokens"]
# Get the status of the token
response = authorized_client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json() == {
"exists": True,
"valid": False,
"date": time_generated,
"expiration": new_data["recovery_token"]["expiration"],
"uses_left": None,
}
# Assert that the token was not created
assert device_name not in [
token["name"] for token in rest_get_tokens_info(authorized_client)
]
@pytest.mark.parametrize("timeformat", DATE_FORMATS)
@ -401,14 +364,14 @@ def test_generate_recovery_token_with_expiration_in_the_past(
authorized_client, tokens_file, timeformat
):
# Server must return 400 if expiration date is in the past
expiration_date = datetime.datetime.utcnow() - datetime.timedelta(minutes=5)
expiration_date = five_minutes_into_past()
expiration_date_str = expiration_date.strftime(timeformat)
response = authorized_client.post(
"/auth/recovery_token",
json={"expiration": expiration_date_str},
)
assert response.status_code == 400
assert "recovery_token" not in read_json(tokens_file)
assert_no_recovery(authorized_client)
def test_generate_recovery_token_with_invalid_time_format(
@ -421,37 +384,19 @@ def test_generate_recovery_token_with_invalid_time_format(
json={"expiration": expiration_date},
)
assert response.status_code == 422
assert "recovery_token" not in read_json(tokens_file)
assert_no_recovery(authorized_client)
def test_generate_recovery_token_with_limited_uses(
authorized_client, client, tokens_file
):
# Generate token with limited uses
response = authorized_client.post(
"/auth/recovery_token",
json={"uses": 2},
)
assert response.status_code == 200
assert "token" in response.json()
mnemonic_token = response.json()["token"]
token = Mnemonic(language="english").to_entropy(mnemonic_token).hex()
assert read_json(tokens_file)["recovery_token"]["token"] == token
assert read_json(tokens_file)["recovery_token"]["uses_left"] == 2
mnemonic_token = rest_make_recovery_token(authorized_client, uses=2)
# Get the date of the token
time_generated = read_json(tokens_file)["recovery_token"]["date"]
assert time_generated is not None
assert (
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
)
time_generated = rest_get_recovery_date(authorized_client)
assert_recovery_recent(time_generated)
# Try to get token status
response = authorized_client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json() == {
assert rest_get_recovery_status(authorized_client) == {
"exists": True,
"valid": True,
"date": time_generated,
@ -460,21 +405,9 @@ def test_generate_recovery_token_with_limited_uses(
}
# Try to use the token
recovery_response = client.post(
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json()["token"]
assert read_json(tokens_file)["tokens"][2]["token"] == new_token
assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device"
rest_recover_with_mnemonic(client, mnemonic_token, "recover_device")
assert read_json(tokens_file)["recovery_token"]["uses_left"] == 1
# Get the status of the token
response = authorized_client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json() == {
assert rest_get_recovery_status(authorized_client) == {
"exists": True,
"valid": True,
"date": time_generated,
@ -483,19 +416,9 @@ def test_generate_recovery_token_with_limited_uses(
}
# Try to use token again
recovery_response = client.post(
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device2"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json()["token"]
assert read_json(tokens_file)["tokens"][3]["token"] == new_token
assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2"
rest_recover_with_mnemonic(client, mnemonic_token, "recover_device2")
# Get the status of the token
response = authorized_client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json() == {
assert rest_get_recovery_status(authorized_client) == {
"exists": True,
"valid": False,
"date": time_generated,
@ -510,8 +433,6 @@ def test_generate_recovery_token_with_limited_uses(
)
assert recovery_response.status_code == 404
assert read_json(tokens_file)["recovery_token"]["uses_left"] == 0
def test_generate_recovery_token_with_negative_uses(
authorized_client, client, tokens_file
@ -522,7 +443,7 @@ def test_generate_recovery_token_with_negative_uses(
json={"uses": -2},
)
assert response.status_code == 400
assert "recovery_token" not in read_json(tokens_file)
assert_no_recovery(authorized_client)
def test_generate_recovery_token_with_zero_uses(authorized_client, client, tokens_file):
@ -532,4 +453,4 @@ def test_generate_recovery_token_with_zero_uses(authorized_client, client, token
json={"uses": 0},
)
assert response.status_code == 400
assert "recovery_token" not in read_json(tokens_file)
assert_no_recovery(authorized_client)