diff --git a/selfprivacy_api/actions/api_tokens.py b/selfprivacy_api/actions/api_tokens.py index 38133fd..37b7631 100644 --- a/selfprivacy_api/actions/api_tokens.py +++ b/selfprivacy_api/actions/api_tokens.py @@ -1,11 +1,14 @@ -"""App tokens actions""" -from datetime import datetime +""" +App tokens actions. +The only actions on tokens that are accessible from APIs +""" +from datetime import datetime, timezone from typing import Optional from pydantic import BaseModel from mnemonic import Mnemonic -from selfprivacy_api.repositories.tokens.json_tokens_repository import ( - JsonTokensRepository, +from selfprivacy_api.repositories.tokens.redis_tokens_repository import ( + RedisTokensRepository, ) from selfprivacy_api.repositories.tokens.exceptions import ( TokenNotFound, @@ -14,7 +17,7 @@ from selfprivacy_api.repositories.tokens.exceptions import ( NewDeviceKeyNotFound, ) -TOKEN_REPO = JsonTokensRepository() +TOKEN_REPO = RedisTokensRepository() class TokenInfoWithIsCaller(BaseModel): @@ -25,6 +28,14 @@ class TokenInfoWithIsCaller(BaseModel): is_caller: bool +def _naive(date_time: datetime) -> datetime: + if date_time is None: + return None + if date_time.tzinfo is not None: + date_time.astimezone(timezone.utc) + return date_time.replace(tzinfo=None) + + def get_api_tokens_with_caller_flag(caller_token: str) -> list[TokenInfoWithIsCaller]: """Get the tokens info""" caller_name = TOKEN_REPO.get_token_by_token_string(caller_token).device_name @@ -91,8 +102,8 @@ def get_api_recovery_token_status() -> RecoveryTokenStatus: return RecoveryTokenStatus( exists=True, valid=is_valid, - date=token.created_at, - expiration=token.expires_at, + date=_naive(token.created_at), + expiration=_naive(token.expires_at), uses_left=token.uses_left, ) diff --git a/selfprivacy_api/migrations/__init__.py b/selfprivacy_api/migrations/__init__.py index 33472b9..4aa932c 100644 --- a/selfprivacy_api/migrations/__init__.py +++ b/selfprivacy_api/migrations/__init__.py @@ -25,6 +25,7 @@ from selfprivacy_api.migrations.prepare_for_nixos_2211 import ( from selfprivacy_api.migrations.prepare_for_nixos_2305 import ( MigrateToSelfprivacyChannelFrom2211, ) +from selfprivacy_api.migrations.redis_tokens import LoadTokensToRedis migrations = [ FixNixosConfigBranch(), @@ -35,6 +36,7 @@ migrations = [ CreateProviderFields(), MigrateToSelfprivacyChannelFrom2205(), MigrateToSelfprivacyChannelFrom2211(), + LoadTokensToRedis(), ] diff --git a/selfprivacy_api/migrations/redis_tokens.py b/selfprivacy_api/migrations/redis_tokens.py new file mode 100644 index 0000000..c5eea2f --- /dev/null +++ b/selfprivacy_api/migrations/redis_tokens.py @@ -0,0 +1,48 @@ +from selfprivacy_api.migrations.migration import Migration + +from selfprivacy_api.repositories.tokens.json_tokens_repository import ( + JsonTokensRepository, +) +from selfprivacy_api.repositories.tokens.redis_tokens_repository import ( + RedisTokensRepository, +) +from selfprivacy_api.repositories.tokens.abstract_tokens_repository import ( + AbstractTokensRepository, +) + + +class LoadTokensToRedis(Migration): + """Load Json tokens into Redis""" + + def get_migration_name(self): + return "load_tokens_to_redis" + + def get_migration_description(self): + return "Loads access tokens and recovery keys from legacy json file into redis token storage" + + def is_repo_empty(self, repo: AbstractTokensRepository) -> bool: + if repo.get_tokens() != []: + return False + if repo.get_recovery_key() is not None: + return False + return True + + def is_migration_needed(self): + try: + if not self.is_repo_empty(JsonTokensRepository()) and self.is_repo_empty( + RedisTokensRepository() + ): + return True + except Exception as e: + print(e) + return False + + def migrate(self): + # Write info about providers to userdata.json + try: + RedisTokensRepository().clone(JsonTokensRepository()) + + print("Done") + except Exception as e: + print(e) + print("Error migrating access tokens from json to redis") diff --git a/selfprivacy_api/models/tokens/new_device_key.py b/selfprivacy_api/models/tokens/new_device_key.py index dda926c..9fbd23b 100644 --- a/selfprivacy_api/models/tokens/new_device_key.py +++ b/selfprivacy_api/models/tokens/new_device_key.py @@ -1,11 +1,13 @@ """ New device key used to obtain access token. """ -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone import secrets from pydantic import BaseModel from mnemonic import Mnemonic +from selfprivacy_api.models.tokens.time import is_past + class NewDeviceKey(BaseModel): """ @@ -22,7 +24,7 @@ class NewDeviceKey(BaseModel): """ Check if the recovery key is valid. """ - if self.expires_at < datetime.now(): + if is_past(self.expires_at): return False return True @@ -37,10 +39,10 @@ class NewDeviceKey(BaseModel): """ Factory to generate a random token. """ - creation_date = datetime.now() + creation_date = datetime.now(timezone.utc) key = secrets.token_bytes(16).hex() return NewDeviceKey( key=key, created_at=creation_date, - expires_at=datetime.now() + timedelta(minutes=10), + expires_at=creation_date + timedelta(minutes=10), ) diff --git a/selfprivacy_api/models/tokens/recovery_key.py b/selfprivacy_api/models/tokens/recovery_key.py index 098aceb..3b81398 100644 --- a/selfprivacy_api/models/tokens/recovery_key.py +++ b/selfprivacy_api/models/tokens/recovery_key.py @@ -3,12 +3,14 @@ Recovery key used to obtain access token. Recovery key has a token string, date of creation, optional date of expiration and optional count of uses left. """ -from datetime import datetime +from datetime import datetime, timezone import secrets from typing import Optional from pydantic import BaseModel from mnemonic import Mnemonic +from selfprivacy_api.models.tokens.time import is_past, ensure_timezone + class RecoveryKey(BaseModel): """ @@ -26,7 +28,7 @@ class RecoveryKey(BaseModel): """ Check if the recovery key is valid. """ - if self.expires_at is not None and self.expires_at < datetime.now(): + if self.expires_at is not None and is_past(self.expires_at): return False if self.uses_left is not None and self.uses_left <= 0: return False @@ -46,7 +48,9 @@ class RecoveryKey(BaseModel): """ Factory to generate a random token. """ - creation_date = datetime.now() + creation_date = datetime.now(timezone.utc) + if expiration is not None: + expiration = ensure_timezone(expiration) key = secrets.token_bytes(24).hex() return RecoveryKey( key=key, diff --git a/selfprivacy_api/models/tokens/time.py b/selfprivacy_api/models/tokens/time.py new file mode 100644 index 0000000..967fcfb --- /dev/null +++ b/selfprivacy_api/models/tokens/time.py @@ -0,0 +1,14 @@ +from datetime import datetime, timezone + + +def is_past(dt: datetime) -> bool: + # we cannot compare a naive now() + # to dt which might be tz-aware or unaware + dt = ensure_timezone(dt) + return dt < datetime.now(timezone.utc) + + +def ensure_timezone(dt: datetime) -> datetime: + if dt.tzinfo is None or dt.tzinfo.utcoffset(None) is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt diff --git a/selfprivacy_api/repositories/tokens/abstract_tokens_repository.py b/selfprivacy_api/repositories/tokens/abstract_tokens_repository.py index 3a20ede..d81bd65 100644 --- a/selfprivacy_api/repositories/tokens/abstract_tokens_repository.py +++ b/selfprivacy_api/repositories/tokens/abstract_tokens_repository.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from abc import ABC, abstractmethod from datetime import datetime from typing import Optional @@ -86,13 +88,15 @@ class AbstractTokensRepository(ABC): def get_recovery_key(self) -> Optional[RecoveryKey]: """Get the recovery key""" - @abstractmethod def create_recovery_key( self, expiration: Optional[datetime], uses_left: Optional[int], ) -> RecoveryKey: """Create the recovery key""" + recovery_key = RecoveryKey.generate(expiration, uses_left) + self._store_recovery_key(recovery_key) + return recovery_key def use_mnemonic_recovery_key( self, mnemonic_phrase: str, device_name: str @@ -123,6 +127,14 @@ class AbstractTokensRepository(ABC): return False return recovery_key.is_valid() + @abstractmethod + def _store_recovery_key(self, recovery_key: RecoveryKey) -> None: + """Store recovery key directly""" + + @abstractmethod + def _delete_recovery_key(self) -> None: + """Delete the recovery key""" + def get_new_device_key(self) -> NewDeviceKey: """Creates and returns the new device key""" new_device_key = NewDeviceKey.generate() @@ -156,6 +168,26 @@ class AbstractTokensRepository(ABC): return new_token + def reset(self): + for token in self.get_tokens(): + self.delete_token(token) + self.delete_new_device_key() + self._delete_recovery_key() + + def clone(self, source: AbstractTokensRepository) -> None: + """Clone the state of another repository to this one""" + self.reset() + for token in source.get_tokens(): + self._store_token(token) + + recovery_key = source.get_recovery_key() + if recovery_key is not None: + self._store_recovery_key(recovery_key) + + new_device_key = source._get_stored_new_device_key() + if new_device_key is not None: + self._store_new_device_key(new_device_key) + @abstractmethod def _store_token(self, new_token: Token): """Store a token directly""" diff --git a/selfprivacy_api/repositories/tokens/json_tokens_repository.py b/selfprivacy_api/repositories/tokens/json_tokens_repository.py index 77e1311..be753ea 100644 --- a/selfprivacy_api/repositories/tokens/json_tokens_repository.py +++ b/selfprivacy_api/repositories/tokens/json_tokens_repository.py @@ -2,7 +2,7 @@ temporary legacy """ from typing import Optional -from datetime import datetime +from datetime import datetime, timezone from selfprivacy_api.utils import UserDataFiles, WriteUserData, ReadUserData from selfprivacy_api.models.tokens.token import Token @@ -15,6 +15,7 @@ from selfprivacy_api.repositories.tokens.abstract_tokens_repository import ( AbstractTokensRepository, ) + DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f" @@ -56,6 +57,20 @@ class JsonTokensRepository(AbstractTokensRepository): raise TokenNotFound("Token not found!") + def __key_date_from_str(self, date_string: str) -> datetime: + if date_string is None or date_string == "": + return None + # we assume that we store dates in json as naive utc + utc_no_tz = datetime.fromisoformat(date_string) + utc_with_tz = utc_no_tz.replace(tzinfo=timezone.utc) + return utc_with_tz + + def __date_from_tokens_file( + self, tokens_file: object, tokenfield: str, datefield: str + ): + date_string = tokens_file[tokenfield].get(datefield) + return self.__key_date_from_str(date_string) + def get_recovery_key(self) -> Optional[RecoveryKey]: """Get the recovery key""" with ReadUserData(UserDataFiles.TOKENS) as tokens_file: @@ -68,22 +83,18 @@ class JsonTokensRepository(AbstractTokensRepository): recovery_key = RecoveryKey( key=tokens_file["recovery_token"].get("token"), - created_at=tokens_file["recovery_token"].get("date"), - expires_at=tokens_file["recovery_token"].get("expiration"), + created_at=self.__date_from_tokens_file( + tokens_file, "recovery_token", "date" + ), + expires_at=self.__date_from_tokens_file( + tokens_file, "recovery_token", "expiration" + ), uses_left=tokens_file["recovery_token"].get("uses_left"), ) return recovery_key - def create_recovery_key( - self, - expiration: Optional[datetime], - uses_left: Optional[int], - ) -> RecoveryKey: - """Create the recovery key""" - - recovery_key = RecoveryKey.generate(expiration, uses_left) - + def _store_recovery_key(self, recovery_key: RecoveryKey) -> None: with WriteUserData(UserDataFiles.TOKENS) as tokens_file: key_expiration: Optional[str] = None if recovery_key.expires_at is not None: @@ -95,8 +106,6 @@ class JsonTokensRepository(AbstractTokensRepository): "uses_left": recovery_key.uses_left, } - return recovery_key - def _decrement_recovery_token(self): """Decrement recovery key use count by one""" if self.is_recovery_key_valid(): @@ -104,6 +113,13 @@ class JsonTokensRepository(AbstractTokensRepository): if tokens["recovery_token"]["uses_left"] is not None: tokens["recovery_token"]["uses_left"] -= 1 + def _delete_recovery_key(self) -> None: + """Delete the recovery key""" + with WriteUserData(UserDataFiles.TOKENS) as tokens_file: + if "recovery_token" in tokens_file: + del tokens_file["recovery_token"] + return + def _store_new_device_key(self, new_device_key: NewDeviceKey) -> None: with WriteUserData(UserDataFiles.TOKENS) as tokens_file: tokens_file["new_device"] = { @@ -127,7 +143,11 @@ class JsonTokensRepository(AbstractTokensRepository): new_device_key = NewDeviceKey( key=tokens_file["new_device"]["token"], - created_at=tokens_file["new_device"]["date"], - expires_at=tokens_file["new_device"]["expiration"], + created_at=self.__date_from_tokens_file( + tokens_file, "new_device", "date" + ), + expires_at=self.__date_from_tokens_file( + tokens_file, "new_device", "expiration" + ), ) return new_device_key diff --git a/selfprivacy_api/repositories/tokens/redis_tokens_repository.py b/selfprivacy_api/repositories/tokens/redis_tokens_repository.py index 8e683d2..834794c 100644 --- a/selfprivacy_api/repositories/tokens/redis_tokens_repository.py +++ b/selfprivacy_api/repositories/tokens/redis_tokens_repository.py @@ -4,6 +4,7 @@ Token repository using Redis as backend. from typing import Any, Optional from datetime import datetime from hashlib import md5 +from datetime import timezone from selfprivacy_api.repositories.tokens.abstract_tokens_repository import ( AbstractTokensRepository, @@ -53,6 +54,7 @@ class RedisTokensRepository(AbstractTokensRepository): token = self._token_from_hash(key) if token == input_token: return key + return None def delete_token(self, input_token: Token) -> None: """Delete the token""" @@ -62,13 +64,6 @@ class RedisTokensRepository(AbstractTokensRepository): raise TokenNotFound redis.delete(key) - def reset(self): - for token in self.get_tokens(): - self.delete_token(token) - self.delete_new_device_key() - redis = self.connection - redis.delete(RECOVERY_KEY_REDIS_KEY) - def get_recovery_key(self) -> Optional[RecoveryKey]: """Get the recovery key""" redis = self.connection @@ -76,15 +71,13 @@ class RedisTokensRepository(AbstractTokensRepository): return self._recovery_key_from_hash(RECOVERY_KEY_REDIS_KEY) return None - def create_recovery_key( - self, - expiration: Optional[datetime], - uses_left: Optional[int], - ) -> RecoveryKey: - """Create the recovery key""" - recovery_key = RecoveryKey.generate(expiration=expiration, uses_left=uses_left) + def _store_recovery_key(self, recovery_key: RecoveryKey) -> None: self._store_model_as_hash(RECOVERY_KEY_REDIS_KEY, recovery_key) - return recovery_key + + def _delete_recovery_key(self) -> None: + """Delete the recovery key""" + redis = self.connection + redis.delete(RECOVERY_KEY_REDIS_KEY) def _store_new_device_key(self, new_device_key: NewDeviceKey) -> None: """Store new device key directly""" @@ -157,6 +150,7 @@ class RedisTokensRepository(AbstractTokensRepository): if token is not None: token.created_at = token.created_at.replace(tzinfo=None) return token + return None def _recovery_key_from_hash(self, redis_key: str) -> Optional[RecoveryKey]: return self._hash_as_model(redis_key, RecoveryKey) @@ -168,5 +162,7 @@ class RedisTokensRepository(AbstractTokensRepository): redis = self.connection for key, value in model.dict().items(): if isinstance(value, datetime): + if value.tzinfo is None: + value = value.replace(tzinfo=timezone.utc) value = value.isoformat() redis.hset(redis_key, key, str(value)) diff --git a/tests/common.py b/tests/common.py index e4a283d..97d0d7a 100644 --- a/tests/common.py +++ b/tests/common.py @@ -1,6 +1,33 @@ import json +from datetime import datetime, timezone, timedelta from mnemonic import Mnemonic +# for expiration tests. If headache, consider freezegun +RECOVERY_KEY_VALIDATION_DATETIME = "selfprivacy_api.models.tokens.time.datetime" +DEVICE_KEY_VALIDATION_DATETIME = RECOVERY_KEY_VALIDATION_DATETIME + + +def five_minutes_into_future_naive(): + return datetime.now() + timedelta(minutes=5) + + +def five_minutes_into_future(): + return datetime.now(timezone.utc) + timedelta(minutes=5) + + +def five_minutes_into_past_naive(): + return datetime.now() - timedelta(minutes=5) + + +def five_minutes_into_past(): + return datetime.now(timezone.utc) - timedelta(minutes=5) + + +class NearFuture(datetime): + @classmethod + def now(cls, tz=None): + return datetime.now(tz) + timedelta(minutes=13) + def read_json(file_path): with open(file_path, "r", encoding="utf-8") as file: @@ -30,3 +57,10 @@ def generate_backup_query(query_array): def mnemonic_to_hex(mnemonic): return Mnemonic(language="english").to_entropy(mnemonic).hex() + + +def assert_recovery_recent(time_generated): + assert ( + datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f") - timedelta(seconds=5) + < datetime.now() + ) diff --git a/tests/conftest.py b/tests/conftest.py index 7e8ae11..f058997 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,6 +6,38 @@ import pytest from os import path from fastapi.testclient import TestClient +import os.path as path +import datetime + +from selfprivacy_api.models.tokens.token import Token +from selfprivacy_api.repositories.tokens.json_tokens_repository import ( + JsonTokensRepository, +) +from selfprivacy_api.repositories.tokens.redis_tokens_repository import ( + RedisTokensRepository, +) + +from tests.common import read_json + +EMPTY_TOKENS_JSON = ' {"tokens": []}' + + +TOKENS_FILE_CONTENTS = { + "tokens": [ + { + "token": "TEST_TOKEN", + "name": "test_token", + "date": datetime.datetime(2022, 1, 14, 8, 31, 10, 789314), + }, + { + "token": "TEST_TOKEN2", + "name": "test_token2", + "date": datetime.datetime(2022, 1, 14, 8, 31, 10, 789314), + }, + ] +} + +DEVICE_WE_AUTH_TESTS_WITH = TOKENS_FILE_CONTENTS["tokens"][0] def pytest_generate_tests(metafunc): @@ -17,12 +49,45 @@ def global_data_dir(): @pytest.fixture -def tokens_file(mocker, shared_datadir): - """Mock tokens file.""" - mock = mocker.patch( - "selfprivacy_api.utils.TOKENS_FILE", shared_datadir / "tokens.json" - ) - return mock +def empty_tokens(mocker, tmpdir): + tokenfile = tmpdir / "empty_tokens.json" + with open(tokenfile, "w") as file: + file.write(EMPTY_TOKENS_JSON) + mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=tokenfile) + assert read_json(tokenfile)["tokens"] == [] + return tmpdir + + +@pytest.fixture +def empty_json_repo(empty_tokens): + repo = JsonTokensRepository() + for token in repo.get_tokens(): + repo.delete_token(token) + assert repo.get_tokens() == [] + return repo + + +@pytest.fixture +def empty_redis_repo(): + repo = RedisTokensRepository() + repo.reset() + assert repo.get_tokens() == [] + return repo + + +@pytest.fixture +def tokens_file(empty_redis_repo, tmpdir): + """A state with tokens""" + repo = empty_redis_repo + for token in TOKENS_FILE_CONTENTS["tokens"]: + repo._store_token( + Token( + token=token["token"], + device_name=token["name"], + created_at=token["date"], + ) + ) + return repo @pytest.fixture @@ -68,7 +133,9 @@ def authorized_client(tokens_file, huey_database, jobs_file): from selfprivacy_api.app import app client = TestClient(app) - client.headers.update({"Authorization": "Bearer TEST_TOKEN"}) + client.headers.update( + {"Authorization": "Bearer " + DEVICE_WE_AUTH_TESTS_WITH["token"]} + ) return client diff --git a/tests/test_graphql/api_common.py b/tests/test_graphql/api_common.py new file mode 100644 index 0000000..bfac767 --- /dev/null +++ b/tests/test_graphql/api_common.py @@ -0,0 +1,89 @@ +from tests.common import generate_api_query +from tests.conftest import TOKENS_FILE_CONTENTS, DEVICE_WE_AUTH_TESTS_WITH + +ORIGINAL_DEVICES = TOKENS_FILE_CONTENTS["tokens"] + + +def assert_ok(response, request): + data = assert_data(response) + data[request]["success"] is True + data[request]["message"] is not None + data[request]["code"] == 200 + + +def assert_errorcode(response, request, code): + data = assert_data(response) + data[request]["success"] is False + data[request]["message"] is not None + data[request]["code"] == code + + +def assert_empty(response): + assert response.status_code == 200 + assert response.json().get("data") is None + + +def assert_data(response): + assert response.status_code == 200 + data = response.json().get("data") + assert data is not None + assert "api" in data.keys() + return data["api"] + + +API_DEVICES_QUERY = """ +devices { + creationDate + isCaller + name +} +""" + + +def request_devices(client): + return client.post( + "/graphql", + json={"query": generate_api_query([API_DEVICES_QUERY])}, + ) + + +def graphql_get_devices(client): + response = request_devices(client) + data = assert_data(response) + devices = data["devices"] + assert devices is not None + return devices + + +def set_client_token(client, token): + client.headers.update({"Authorization": "Bearer " + token}) + + +def assert_token_valid(client, token): + set_client_token(client, token) + assert graphql_get_devices(client) is not None + + +def assert_same(graphql_devices, abstract_devices): + """Orderless comparison""" + assert len(graphql_devices) == len(abstract_devices) + for original_device in abstract_devices: + assert original_device["name"] in [device["name"] for device in graphql_devices] + for device in graphql_devices: + if device["name"] == original_device["name"]: + assert device["creationDate"] == original_device["date"].isoformat() + + +def assert_original(client): + devices = graphql_get_devices(client) + assert_original_devices(devices) + + +def assert_original_devices(devices): + assert_same(devices, ORIGINAL_DEVICES) + + for device in devices: + if device["name"] == DEVICE_WE_AUTH_TESTS_WITH["name"]: + assert device["isCaller"] is True + else: + assert device["isCaller"] is False diff --git a/tests/test_graphql/common.py b/tests/test_graphql/common.py new file mode 100644 index 0000000..d473433 --- /dev/null +++ b/tests/test_graphql/common.py @@ -0,0 +1,88 @@ +from tests.common import generate_api_query +from tests.conftest import TOKENS_FILE_CONTENTS, DEVICE_WE_AUTH_TESTS_WITH + +ORIGINAL_DEVICES = TOKENS_FILE_CONTENTS["tokens"] + + +def assert_ok(response, request): + data = assert_data(response) + data[request]["success"] is True + data[request]["message"] is not None + data[request]["code"] == 200 + + +def assert_errorcode(response, request, code): + data = assert_data(response) + data[request]["success"] is False + data[request]["message"] is not None + data[request]["code"] == code + + +def assert_empty(response): + assert response.status_code == 200 + assert response.json().get("data") is None + + +def assert_data(response): + assert response.status_code == 200 + data = response.json().get("data") + assert data is not None + return data + + +API_DEVICES_QUERY = """ +devices { + creationDate + isCaller + name +} +""" + + +def request_devices(client): + return client.post( + "/graphql", + json={"query": generate_api_query([API_DEVICES_QUERY])}, + ) + + +def graphql_get_devices(client): + response = request_devices(client) + data = assert_data(response) + devices = data["api"]["devices"] + assert devices is not None + return devices + + +def set_client_token(client, token): + client.headers.update({"Authorization": "Bearer " + token}) + + +def assert_token_valid(client, token): + set_client_token(client, token) + assert graphql_get_devices(client) is not None + + +def assert_same(graphql_devices, abstract_devices): + """Orderless comparison""" + assert len(graphql_devices) == len(abstract_devices) + for original_device in abstract_devices: + assert original_device["name"] in [device["name"] for device in graphql_devices] + for device in graphql_devices: + if device["name"] == original_device["name"]: + assert device["creationDate"] == original_device["date"].isoformat() + + +def assert_original(client): + devices = graphql_get_devices(client) + assert_original_devices(devices) + + +def assert_original_devices(devices): + assert_same(devices, ORIGINAL_DEVICES) + + for device in devices: + if device["name"] == DEVICE_WE_AUTH_TESTS_WITH["name"]: + assert device["isCaller"] is True + else: + assert device["isCaller"] is False diff --git a/tests/test_graphql/data/gitkeep b/tests/test_graphql/data/gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_graphql/data/tokens.json b/tests/test_graphql/data/tokens.json deleted file mode 100644 index 9be9d02..0000000 --- a/tests/test_graphql/data/tokens.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "tokens": [ - { - "token": "TEST_TOKEN", - "name": "test_token", - "date": "2022-01-14 08:31:10.789314" - }, - { - "token": "TEST_TOKEN2", - "name": "test_token2", - "date": "2022-01-14 08:31:10.789314" - } - ] -} \ No newline at end of file diff --git a/tests/test_graphql/test_api.py b/tests/test_graphql/test_api.py index 16c7c4d..c252d44 100644 --- a/tests/test_graphql/test_api.py +++ b/tests/test_graphql/test_api.py @@ -3,25 +3,11 @@ # pylint: disable=missing-function-docstring from tests.common import generate_api_query +from tests.test_graphql.common import assert_original_devices from tests.test_graphql.test_api_devices import API_DEVICES_QUERY from tests.test_graphql.test_api_recovery import API_RECOVERY_QUERY from tests.test_graphql.test_api_version import API_VERSION_QUERY -TOKENS_FILE_CONTETS = { - "tokens": [ - { - "token": "TEST_TOKEN", - "name": "test_token", - "date": "2022-01-14 08:31:10.789314", - }, - { - "token": "TEST_TOKEN2", - "name": "test_token2", - "date": "2022-01-14 08:31:10.789314", - }, - ] -} - def test_graphql_get_entire_api_data(authorized_client, tokens_file): response = authorized_client.post( @@ -35,20 +21,11 @@ def test_graphql_get_entire_api_data(authorized_client, tokens_file): assert response.status_code == 200 assert response.json().get("data") is not None assert "version" in response.json()["data"]["api"] - assert response.json()["data"]["api"]["devices"] is not None - assert len(response.json()["data"]["api"]["devices"]) == 2 - assert ( - response.json()["data"]["api"]["devices"][0]["creationDate"] - == "2022-01-14T08:31:10.789314" - ) - assert response.json()["data"]["api"]["devices"][0]["isCaller"] is True - assert response.json()["data"]["api"]["devices"][0]["name"] == "test_token" - assert ( - response.json()["data"]["api"]["devices"][1]["creationDate"] - == "2022-01-14T08:31:10.789314" - ) - assert response.json()["data"]["api"]["devices"][1]["isCaller"] is False - assert response.json()["data"]["api"]["devices"][1]["name"] == "test_token2" + + devices = response.json()["data"]["api"]["devices"] + assert devices is not None + assert_original_devices(devices) + assert response.json()["data"]["api"]["recoveryKey"] is not None assert response.json()["data"]["api"]["recoveryKey"]["exists"] is False assert response.json()["data"]["api"]["recoveryKey"]["valid"] is False diff --git a/tests/test_graphql/test_api_devices.py b/tests/test_graphql/test_api_devices.py index cd76ef7..b24bc7f 100644 --- a/tests/test_graphql/test_api_devices.py +++ b/tests/test_graphql/test_api_devices.py @@ -1,76 +1,77 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument # pylint: disable=missing-function-docstring -import datetime -import pytest -from mnemonic import Mnemonic - -from selfprivacy_api.repositories.tokens.json_tokens_repository import ( - JsonTokensRepository, +from tests.common import ( + RECOVERY_KEY_VALIDATION_DATETIME, + DEVICE_KEY_VALIDATION_DATETIME, + NearFuture, + generate_api_query, +) +from tests.conftest import DEVICE_WE_AUTH_TESTS_WITH, TOKENS_FILE_CONTENTS +from tests.test_graphql.api_common import ( + assert_data, + assert_empty, + assert_ok, + assert_errorcode, + assert_token_valid, + assert_original, + assert_same, + graphql_get_devices, + request_devices, + set_client_token, + API_DEVICES_QUERY, + ORIGINAL_DEVICES, ) -from selfprivacy_api.models.tokens.token import Token -from tests.common import generate_api_query, read_json, write_json -TOKENS_FILE_CONTETS = { - "tokens": [ - { - "token": "TEST_TOKEN", - "name": "test_token", - "date": "2022-01-14 08:31:10.789314", +def graphql_get_caller_token_info(client): + devices = graphql_get_devices(client) + for device in devices: + if device["isCaller"] is True: + return device + + +def graphql_get_new_device_key(authorized_client) -> str: + response = authorized_client.post( + "/graphql", + json={"query": NEW_DEVICE_KEY_MUTATION}, + ) + assert_ok(response, "getNewDeviceApiKey") + + key = response.json()["data"]["api"]["getNewDeviceApiKey"]["key"] + assert key.split(" ").__len__() == 12 + return key + + +def graphql_try_auth_new_device(client, mnemonic_key, device_name): + return client.post( + "/graphql", + json={ + "query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION, + "variables": { + "input": { + "key": mnemonic_key, + "deviceName": device_name, + } + }, }, - { - "token": "TEST_TOKEN2", - "name": "test_token2", - "date": "2022-01-14 08:31:10.789314", - }, - ] -} - -API_DEVICES_QUERY = """ -devices { - creationDate - isCaller - name -} -""" + ) -@pytest.fixture -def token_repo(): - return JsonTokensRepository() +def graphql_authorize_new_device(client, mnemonic_key, device_name) -> str: + response = graphql_try_auth_new_device(client, mnemonic_key, "new_device") + assert_ok(response, "authorizeWithNewDeviceApiKey") + token = response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["token"] + assert_token_valid(client, token) def test_graphql_tokens_info(authorized_client, tokens_file): - response = authorized_client.post( - "/graphql", - json={"query": generate_api_query([API_DEVICES_QUERY])}, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["devices"] is not None - assert len(response.json()["data"]["api"]["devices"]) == 2 - assert ( - response.json()["data"]["api"]["devices"][0]["creationDate"] - == "2022-01-14T08:31:10.789314" - ) - assert response.json()["data"]["api"]["devices"][0]["isCaller"] is True - assert response.json()["data"]["api"]["devices"][0]["name"] == "test_token" - assert ( - response.json()["data"]["api"]["devices"][1]["creationDate"] - == "2022-01-14T08:31:10.789314" - ) - assert response.json()["data"]["api"]["devices"][1]["isCaller"] is False - assert response.json()["data"]["api"]["devices"][1]["name"] == "test_token2" + assert_original(authorized_client) def test_graphql_tokens_info_unauthorized(client, tokens_file): - response = client.post( - "/graphql", - json={"query": generate_api_query([API_DEVICES_QUERY])}, - ) - assert response.status_code == 200 - assert response.json()["data"] is None + response = request_devices(client) + assert_empty(response) DELETE_TOKEN_MUTATION = """ @@ -96,34 +97,27 @@ def test_graphql_delete_token_unauthorized(client, tokens_file): }, }, ) - assert response.status_code == 200 - assert response.json()["data"] is None + assert_empty(response) def test_graphql_delete_token(authorized_client, tokens_file): + test_devices = ORIGINAL_DEVICES.copy() + device_to_delete = test_devices.pop(1) + assert device_to_delete != DEVICE_WE_AUTH_TESTS_WITH + response = authorized_client.post( "/graphql", json={ "query": DELETE_TOKEN_MUTATION, "variables": { - "device": "test_token2", + "device": device_to_delete["name"], }, }, ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["deleteDeviceApiToken"]["success"] is True - assert response.json()["data"]["api"]["deleteDeviceApiToken"]["message"] is not None - assert response.json()["data"]["api"]["deleteDeviceApiToken"]["code"] == 200 - assert read_json(tokens_file) == { - "tokens": [ - { - "token": "TEST_TOKEN", - "name": "test_token", - "date": "2022-01-14 08:31:10.789314", - } - ] - } + assert_ok(response, "deleteDeviceApiToken") + + devices = graphql_get_devices(authorized_client) + assert_same(devices, test_devices) def test_graphql_delete_self_token(authorized_client, tokens_file): @@ -132,16 +126,12 @@ def test_graphql_delete_self_token(authorized_client, tokens_file): json={ "query": DELETE_TOKEN_MUTATION, "variables": { - "device": "test_token", + "device": DEVICE_WE_AUTH_TESTS_WITH["name"], }, }, ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["deleteDeviceApiToken"]["success"] is False - assert response.json()["data"]["api"]["deleteDeviceApiToken"]["message"] is not None - assert response.json()["data"]["api"]["deleteDeviceApiToken"]["code"] == 400 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert_errorcode(response, "deleteDeviceApiToken", 400) + assert_original(authorized_client) def test_graphql_delete_nonexistent_token( @@ -157,12 +147,9 @@ def test_graphql_delete_nonexistent_token( }, }, ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["deleteDeviceApiToken"]["success"] is False - assert response.json()["data"]["api"]["deleteDeviceApiToken"]["message"] is not None - assert response.json()["data"]["api"]["deleteDeviceApiToken"]["code"] == 404 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert_errorcode(response, "deleteDeviceApiToken", 404) + + assert_original(authorized_client) REFRESH_TOKEN_MUTATION = """ @@ -184,32 +171,22 @@ def test_graphql_refresh_token_unauthorized(client, tokens_file): "/graphql", json={"query": REFRESH_TOKEN_MUTATION}, ) - assert response.status_code == 200 - assert response.json()["data"] is None + assert_empty(response) -def test_graphql_refresh_token( - authorized_client, - tokens_file, - token_repo, -): +def test_graphql_refresh_token(authorized_client, client, tokens_file): + caller_name_and_date = graphql_get_caller_token_info(authorized_client) response = authorized_client.post( "/graphql", json={"query": REFRESH_TOKEN_MUTATION}, ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["refreshDeviceApiToken"]["success"] is True - assert ( - response.json()["data"]["api"]["refreshDeviceApiToken"]["message"] is not None - ) - assert response.json()["data"]["api"]["refreshDeviceApiToken"]["code"] == 200 - token = token_repo.get_token_by_name("test_token") - assert token == Token( - token=response.json()["data"]["api"]["refreshDeviceApiToken"]["token"], - device_name="test_token", - created_at=datetime.datetime(2022, 1, 14, 8, 31, 10, 789314), - ) + assert_ok(response, "refreshDeviceApiToken") + + new_token = response.json()["data"]["api"]["refreshDeviceApiToken"]["token"] + assert_token_valid(client, new_token) + + set_client_token(client, new_token) + assert graphql_get_caller_token_info(client) == caller_name_and_date NEW_DEVICE_KEY_MUTATION = """ @@ -234,33 +211,7 @@ def test_graphql_get_new_device_auth_key_unauthorized( "/graphql", json={"query": NEW_DEVICE_KEY_MUTATION}, ) - assert response.status_code == 200 - assert response.json()["data"] is None - - -def test_graphql_get_new_device_auth_key( - authorized_client, - tokens_file, -): - response = authorized_client.post( - "/graphql", - json={"query": NEW_DEVICE_KEY_MUTATION}, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["getNewDeviceApiKey"]["success"] is True - assert response.json()["data"]["api"]["getNewDeviceApiKey"]["message"] is not None - assert response.json()["data"]["api"]["getNewDeviceApiKey"]["code"] == 200 - assert ( - response.json()["data"]["api"]["getNewDeviceApiKey"]["key"].split(" ").__len__() - == 12 - ) - token = ( - Mnemonic(language="english") - .to_entropy(response.json()["data"]["api"]["getNewDeviceApiKey"]["key"]) - .hex() - ) - assert read_json(tokens_file)["new_device"]["token"] == token + assert_empty(response) INVALIDATE_NEW_DEVICE_KEY_MUTATION = """ @@ -289,48 +240,20 @@ def test_graphql_invalidate_new_device_token_unauthorized( }, }, ) - assert response.status_code == 200 - assert response.json()["data"] is None + assert_empty(response) -def test_graphql_get_and_delete_new_device_key( - authorized_client, - tokens_file, -): - response = authorized_client.post( - "/graphql", - json={"query": NEW_DEVICE_KEY_MUTATION}, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["getNewDeviceApiKey"]["success"] is True - assert response.json()["data"]["api"]["getNewDeviceApiKey"]["message"] is not None - assert response.json()["data"]["api"]["getNewDeviceApiKey"]["code"] == 200 - assert ( - response.json()["data"]["api"]["getNewDeviceApiKey"]["key"].split(" ").__len__() - == 12 - ) - token = ( - Mnemonic(language="english") - .to_entropy(response.json()["data"]["api"]["getNewDeviceApiKey"]["key"]) - .hex() - ) - assert read_json(tokens_file)["new_device"]["token"] == token +def test_graphql_get_and_delete_new_device_key(client, authorized_client, tokens_file): + mnemonic_key = graphql_get_new_device_key(authorized_client) + response = authorized_client.post( "/graphql", json={"query": INVALIDATE_NEW_DEVICE_KEY_MUTATION}, ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert ( - response.json()["data"]["api"]["invalidateNewDeviceApiKey"]["success"] is True - ) - assert ( - response.json()["data"]["api"]["invalidateNewDeviceApiKey"]["message"] - is not None - ) - assert response.json()["data"]["api"]["invalidateNewDeviceApiKey"]["code"] == 200 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert_ok(response, "invalidateNewDeviceApiKey") + + response = graphql_try_auth_new_device(client, mnemonic_key, "new_device") + assert_errorcode(response, "authorizeWithNewDeviceApiKey", 404) AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION = """ @@ -347,209 +270,46 @@ mutation AuthorizeWithNewDeviceKey($input: UseNewDeviceKeyInput!) { """ -def test_graphql_get_and_authorize_new_device( - client, - authorized_client, - tokens_file, -): - response = authorized_client.post( - "/graphql", - json={"query": NEW_DEVICE_KEY_MUTATION}, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["getNewDeviceApiKey"]["success"] is True - assert response.json()["data"]["api"]["getNewDeviceApiKey"]["message"] is not None - assert response.json()["data"]["api"]["getNewDeviceApiKey"]["code"] == 200 - mnemonic_key = response.json()["data"]["api"]["getNewDeviceApiKey"]["key"] - assert mnemonic_key.split(" ").__len__() == 12 - key = Mnemonic(language="english").to_entropy(mnemonic_key).hex() - assert read_json(tokens_file)["new_device"]["token"] == key - response = client.post( - "/graphql", - json={ - "query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION, - "variables": { - "input": { - "key": mnemonic_key, - "deviceName": "new_device", - } - }, - }, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert ( - response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["success"] - is True - ) - assert ( - response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["message"] - is not None - ) - assert response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["code"] == 200 - token = response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["token"] - assert read_json(tokens_file)["tokens"][2]["token"] == token - assert read_json(tokens_file)["tokens"][2]["name"] == "new_device" +def test_graphql_get_and_authorize_new_device(client, authorized_client, tokens_file): + mnemonic_key = graphql_get_new_device_key(authorized_client) + old_devices = graphql_get_devices(authorized_client) + + graphql_authorize_new_device(client, mnemonic_key, "new_device") + new_devices = graphql_get_devices(authorized_client) + + assert len(new_devices) == len(old_devices) + 1 + assert "new_device" in [device["name"] for device in new_devices] def test_graphql_authorize_new_device_with_invalid_key( - client, - tokens_file, + client, authorized_client, tokens_file ): - response = client.post( - "/graphql", - json={ - "query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION, - "variables": { - "input": { - "key": "invalid_token", - "deviceName": "test_token", - } - }, - }, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert ( - response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["success"] - is False - ) - assert ( - response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["message"] - is not None - ) - assert response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["code"] == 404 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + response = graphql_try_auth_new_device(client, "invalid_token", "new_device") + assert_errorcode(response, "authorizeWithNewDeviceApiKey", 404) + + assert_original(authorized_client) -def test_graphql_get_and_authorize_used_key( - client, - authorized_client, - tokens_file, -): - response = authorized_client.post( - "/graphql", - json={"query": NEW_DEVICE_KEY_MUTATION}, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["getNewDeviceApiKey"]["success"] is True - assert response.json()["data"]["api"]["getNewDeviceApiKey"]["message"] is not None - assert response.json()["data"]["api"]["getNewDeviceApiKey"]["code"] == 200 - mnemonic_key = response.json()["data"]["api"]["getNewDeviceApiKey"]["key"] - assert mnemonic_key.split(" ").__len__() == 12 - key = Mnemonic(language="english").to_entropy(mnemonic_key).hex() - assert read_json(tokens_file)["new_device"]["token"] == key - response = client.post( - "/graphql", - json={ - "query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION, - "variables": { - "input": { - "key": mnemonic_key, - "deviceName": "new_token", - } - }, - }, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert ( - response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["success"] - is True - ) - assert ( - response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["message"] - is not None - ) - assert response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["code"] == 200 - assert ( - read_json(tokens_file)["tokens"][2]["token"] - == response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["token"] - ) - assert read_json(tokens_file)["tokens"][2]["name"] == "new_token" +def test_graphql_get_and_authorize_used_key(client, authorized_client, tokens_file): + mnemonic_key = graphql_get_new_device_key(authorized_client) - response = client.post( - "/graphql", - json={ - "query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION, - "variables": { - "input": { - "key": NEW_DEVICE_KEY_MUTATION, - "deviceName": "test_token2", - } - }, - }, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert ( - response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["success"] - is False - ) - assert ( - response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["message"] - is not None - ) - assert response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["code"] == 404 - assert read_json(tokens_file)["tokens"].__len__() == 3 + graphql_authorize_new_device(client, mnemonic_key, "new_device") + devices = graphql_get_devices(authorized_client) + + response = graphql_try_auth_new_device(client, mnemonic_key, "new_device2") + assert_errorcode(response, "authorizeWithNewDeviceApiKey", 404) + + assert graphql_get_devices(authorized_client) == devices def test_graphql_get_and_authorize_key_after_12_minutes( - client, - authorized_client, - tokens_file, + client, authorized_client, tokens_file, mocker ): - response = authorized_client.post( - "/graphql", - json={"query": NEW_DEVICE_KEY_MUTATION}, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["getNewDeviceApiKey"]["success"] is True - assert response.json()["data"]["api"]["getNewDeviceApiKey"]["message"] is not None - assert response.json()["data"]["api"]["getNewDeviceApiKey"]["code"] == 200 - assert ( - response.json()["data"]["api"]["getNewDeviceApiKey"]["key"].split(" ").__len__() - == 12 - ) - key = ( - Mnemonic(language="english") - .to_entropy(response.json()["data"]["api"]["getNewDeviceApiKey"]["key"]) - .hex() - ) - assert read_json(tokens_file)["new_device"]["token"] == key + mnemonic_key = graphql_get_new_device_key(authorized_client) + mock = mocker.patch(DEVICE_KEY_VALIDATION_DATETIME, NearFuture) - file_data = read_json(tokens_file) - file_data["new_device"]["expiration"] = str( - datetime.datetime.now() - datetime.timedelta(minutes=13) - ) - write_json(tokens_file, file_data) - - response = client.post( - "/graphql", - json={ - "query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION, - "variables": { - "input": { - "key": key, - "deviceName": "test_token", - } - }, - }, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert ( - response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["success"] - is False - ) - assert ( - response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["message"] - is not None - ) - assert response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["code"] == 404 + response = graphql_try_auth_new_device(client, mnemonic_key, "new_device") + assert_errorcode(response, "authorizeWithNewDeviceApiKey", 404) def test_graphql_authorize_without_token( @@ -567,5 +327,4 @@ def test_graphql_authorize_without_token( }, }, ) - assert response.status_code == 200 - assert response.json().get("data") is None + assert_empty(response) diff --git a/tests/test_graphql/test_api_recovery.py b/tests/test_graphql/test_api_recovery.py index 87df666..19f8a3d 100644 --- a/tests/test_graphql/test_api_recovery.py +++ b/tests/test_graphql/test_api_recovery.py @@ -1,24 +1,28 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument # pylint: disable=missing-function-docstring -import datetime -from tests.common import generate_api_query, mnemonic_to_hex, read_json, write_json +from tests.common import ( + generate_api_query, + assert_recovery_recent, + NearFuture, + RECOVERY_KEY_VALIDATION_DATETIME, +) -TOKENS_FILE_CONTETS = { - "tokens": [ - { - "token": "TEST_TOKEN", - "name": "test_token", - "date": "2022-01-14 08:31:10.789314", - }, - { - "token": "TEST_TOKEN2", - "name": "test_token2", - "date": "2022-01-14 08:31:10.789314", - }, - ] -} +# Graphql API's output should be timezone-naive +from tests.common import five_minutes_into_future_naive as five_minutes_into_future +from tests.common import five_minutes_into_past_naive as five_minutes_into_past + +from tests.test_graphql.api_common import ( + assert_empty, + assert_data, + assert_ok, + assert_errorcode, + assert_token_valid, + assert_original, + graphql_get_devices, + set_client_token, +) API_RECOVERY_QUERY = """ recoveryKey { @@ -31,28 +35,85 @@ recoveryKey { """ -def test_graphql_recovery_key_status_unauthorized(client, tokens_file): - response = client.post( +def request_recovery_status(client): + return client.post( "/graphql", json={"query": generate_api_query([API_RECOVERY_QUERY])}, ) - assert response.status_code == 200 - assert response.json().get("data") is None + + +def graphql_recovery_status(client): + response = request_recovery_status(client) + data = assert_data(response) + + status = data["recoveryKey"] + assert status is not None + return status + + +def request_make_new_recovery_key(client, expires_at=None, uses=None): + json = {"query": API_RECOVERY_KEY_GENERATE_MUTATION} + limits = {} + + if expires_at is not None: + limits["expirationDate"] = expires_at.isoformat() + if uses is not None: + limits["uses"] = uses + + if limits != {}: + json["variables"] = {"limits": limits} + + response = client.post("/graphql", json=json) + return response + + +def graphql_make_new_recovery_key(client, expires_at=None, uses=None): + response = request_make_new_recovery_key(client, expires_at, uses) + assert_ok(response, "getNewRecoveryApiKey") + key = response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] + assert key is not None + assert key.split(" ").__len__() == 18 + return key + + +def request_recovery_auth(client, key, device_name): + return client.post( + "/graphql", + json={ + "query": API_RECOVERY_KEY_USE_MUTATION, + "variables": { + "input": { + "key": key, + "deviceName": device_name, + }, + }, + }, + ) + + +def graphql_use_recovery_key(client, key, device_name): + response = request_recovery_auth(client, key, device_name) + assert_ok(response, "useRecoveryApiKey") + token = response.json()["data"]["api"]["useRecoveryApiKey"]["token"] + assert token is not None + assert_token_valid(client, token) + set_client_token(client, token) + assert device_name in [device["name"] for device in graphql_get_devices(client)] + return token + + +def test_graphql_recovery_key_status_unauthorized(client, tokens_file): + response = request_recovery_status(client) + assert_empty(response) def test_graphql_recovery_key_status_when_none_exists(authorized_client, tokens_file): - response = authorized_client.post( - "/graphql", - json={"query": generate_api_query([API_RECOVERY_QUERY])}, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["recoveryKey"] is not None - assert response.json()["data"]["api"]["recoveryKey"]["exists"] is False - assert response.json()["data"]["api"]["recoveryKey"]["valid"] is False - assert response.json()["data"]["api"]["recoveryKey"]["creationDate"] is None - assert response.json()["data"]["api"]["recoveryKey"]["expirationDate"] is None - assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] is None + status = graphql_recovery_status(authorized_client) + assert status["exists"] is False + assert status["valid"] is False + assert status["creationDate"] is None + assert status["expirationDate"] is None + assert status["usesLeft"] is None API_RECOVERY_KEY_GENERATE_MUTATION = """ @@ -83,281 +144,71 @@ mutation TestUseRecoveryKey($input: UseRecoveryKeyInput!) { def test_graphql_generate_recovery_key(client, authorized_client, tokens_file): - response = authorized_client.post( - "/graphql", - json={ - "query": API_RECOVERY_KEY_GENERATE_MUTATION, - }, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["success"] is True - assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["message"] is not None - assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["code"] == 200 - assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] is not None - assert ( - response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] - .split(" ") - .__len__() - == 18 - ) - assert read_json(tokens_file)["recovery_token"] is not None - time_generated = read_json(tokens_file)["recovery_token"]["date"] - assert time_generated is not None - key = response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] - assert ( - datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f") - - datetime.timedelta(seconds=5) - < datetime.datetime.now() - ) + key = graphql_make_new_recovery_key(authorized_client) - # Try to get token status - response = authorized_client.post( - "/graphql", - json={"query": generate_api_query([API_RECOVERY_QUERY])}, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["recoveryKey"] is not None - assert response.json()["data"]["api"]["recoveryKey"]["exists"] is True - assert response.json()["data"]["api"]["recoveryKey"]["valid"] is True - assert response.json()["data"]["api"]["recoveryKey"][ - "creationDate" - ] == time_generated.replace("Z", "") - assert response.json()["data"]["api"]["recoveryKey"]["expirationDate"] is None - assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] is None + status = graphql_recovery_status(authorized_client) + assert status["exists"] is True + assert status["valid"] is True + assert_recovery_recent(status["creationDate"]) + assert status["expirationDate"] is None + assert status["usesLeft"] is None - # Try to use token - response = client.post( - "/graphql", - json={ - "query": API_RECOVERY_KEY_USE_MUTATION, - "variables": { - "input": { - "key": key, - "deviceName": "new_test_token", - }, - }, - }, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is True - assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None - assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 200 - assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is not None - assert ( - response.json()["data"]["api"]["useRecoveryApiKey"]["token"] - == read_json(tokens_file)["tokens"][2]["token"] - ) - assert read_json(tokens_file)["tokens"][2]["name"] == "new_test_token" - - # Try to use token again - response = client.post( - "/graphql", - json={ - "query": API_RECOVERY_KEY_USE_MUTATION, - "variables": { - "input": { - "key": key, - "deviceName": "new_test_token2", - }, - }, - }, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is True - assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None - assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 200 - assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is not None - assert ( - response.json()["data"]["api"]["useRecoveryApiKey"]["token"] - == read_json(tokens_file)["tokens"][3]["token"] - ) - assert read_json(tokens_file)["tokens"][3]["name"] == "new_test_token2" + graphql_use_recovery_key(client, key, "new_test_token") + # And again + graphql_use_recovery_key(client, key, "new_test_token2") def test_graphql_generate_recovery_key_with_expiration_date( client, authorized_client, tokens_file ): - expiration_date = datetime.datetime.now() + datetime.timedelta(minutes=5) - expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%f") - response = authorized_client.post( - "/graphql", - json={ - "query": API_RECOVERY_KEY_GENERATE_MUTATION, - "variables": { - "limits": { - "expirationDate": expiration_date_str, - }, - }, - }, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["success"] is True - assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["message"] is not None - assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["code"] == 200 - assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] is not None - assert ( - response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] - .split(" ") - .__len__() - == 18 - ) - assert read_json(tokens_file)["recovery_token"] is not None + expiration_date = five_minutes_into_future() + key = graphql_make_new_recovery_key(authorized_client, expires_at=expiration_date) - key = response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] - assert read_json(tokens_file)["recovery_token"]["expiration"] == expiration_date_str - assert read_json(tokens_file)["recovery_token"]["token"] == mnemonic_to_hex(key) + status = graphql_recovery_status(authorized_client) + assert status["exists"] is True + assert status["valid"] is True + assert_recovery_recent(status["creationDate"]) + assert status["expirationDate"] == expiration_date.isoformat() + assert status["usesLeft"] is None - time_generated = read_json(tokens_file)["recovery_token"]["date"] - assert time_generated is not None - assert ( - datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f") - - datetime.timedelta(seconds=5) - < datetime.datetime.now() - ) + graphql_use_recovery_key(client, key, "new_test_token") + # And again + graphql_use_recovery_key(client, key, "new_test_token2") - # Try to get token status - response = authorized_client.post( - "/graphql", - json={"query": generate_api_query([API_RECOVERY_QUERY])}, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["recoveryKey"] is not None - assert response.json()["data"]["api"]["recoveryKey"]["exists"] is True - assert response.json()["data"]["api"]["recoveryKey"]["valid"] is True - assert response.json()["data"]["api"]["recoveryKey"][ - "creationDate" - ] == time_generated.replace("Z", "") - assert ( - response.json()["data"]["api"]["recoveryKey"]["expirationDate"] - == expiration_date_str - ) - assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] is None - # Try to use token - response = authorized_client.post( - "/graphql", - json={ - "query": API_RECOVERY_KEY_USE_MUTATION, - "variables": { - "input": { - "key": key, - "deviceName": "new_test_token", - }, - }, - }, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is True - assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None - assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 200 - assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is not None - assert ( - response.json()["data"]["api"]["useRecoveryApiKey"]["token"] - == read_json(tokens_file)["tokens"][2]["token"] - ) +def test_graphql_use_recovery_key_after_expiration( + client, authorized_client, tokens_file, mocker +): + expiration_date = five_minutes_into_future() + key = graphql_make_new_recovery_key(authorized_client, expires_at=expiration_date) - # Try to use token again - response = authorized_client.post( - "/graphql", - json={ - "query": API_RECOVERY_KEY_USE_MUTATION, - "variables": { - "input": { - "key": key, - "deviceName": "new_test_token2", - }, - }, - }, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is True - assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None - assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 200 - assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is not None - assert ( - response.json()["data"]["api"]["useRecoveryApiKey"]["token"] - == read_json(tokens_file)["tokens"][3]["token"] - ) + # Timewarp to after it expires + mock = mocker.patch(RECOVERY_KEY_VALIDATION_DATETIME, NearFuture) - # Try to use token after expiration date - new_data = read_json(tokens_file) - new_data["recovery_token"]["expiration"] = ( - datetime.datetime.now() - datetime.timedelta(minutes=5) - ).strftime("%Y-%m-%dT%H:%M:%S.%f") - write_json(tokens_file, new_data) - response = authorized_client.post( - "/graphql", - json={ - "query": API_RECOVERY_KEY_USE_MUTATION, - "variables": { - "input": { - "key": key, - "deviceName": "new_test_token3", - }, - }, - }, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is False - assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None - assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 404 + response = request_recovery_auth(client, key, "new_test_token3") + assert_errorcode(response, "useRecoveryApiKey", 404) assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is None + assert_original(authorized_client) - assert read_json(tokens_file)["tokens"] == new_data["tokens"] - - # Try to get token status - response = authorized_client.post( - "/graphql", - json={"query": generate_api_query([API_RECOVERY_QUERY])}, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["recoveryKey"] is not None - assert response.json()["data"]["api"]["recoveryKey"]["exists"] is True - assert response.json()["data"]["api"]["recoveryKey"]["valid"] is False - assert ( - response.json()["data"]["api"]["recoveryKey"]["creationDate"] == time_generated - ) - assert ( - response.json()["data"]["api"]["recoveryKey"]["expirationDate"] - == new_data["recovery_token"]["expiration"] - ) - assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] is None + status = graphql_recovery_status(authorized_client) + assert status["exists"] is True + assert status["valid"] is False + assert_recovery_recent(status["creationDate"]) + assert status["expirationDate"] == expiration_date.isoformat() + assert status["usesLeft"] is None def test_graphql_generate_recovery_key_with_expiration_in_the_past( authorized_client, tokens_file ): - expiration_date = datetime.datetime.now() - datetime.timedelta(minutes=5) - expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%f") - - response = authorized_client.post( - "/graphql", - json={ - "query": API_RECOVERY_KEY_GENERATE_MUTATION, - "variables": { - "limits": { - "expirationDate": expiration_date_str, - }, - }, - }, + expiration_date = five_minutes_into_past() + response = request_make_new_recovery_key( + authorized_client, expires_at=expiration_date ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["success"] is False - assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["message"] is not None - assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["code"] == 400 + + assert_errorcode(response, "getNewRecoveryApiKey", 400) assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] is None - assert "recovery_token" not in read_json(tokens_file) + assert graphql_recovery_status(authorized_client)["exists"] is False def test_graphql_generate_recovery_key_with_invalid_time_format( @@ -377,183 +228,57 @@ def test_graphql_generate_recovery_key_with_invalid_time_format( }, }, ) - assert response.status_code == 200 - assert response.json().get("data") is None - - assert "recovery_token" not in read_json(tokens_file) + assert_empty(response) + assert graphql_recovery_status(authorized_client)["exists"] is False def test_graphql_generate_recovery_key_with_limited_uses( - authorized_client, tokens_file + authorized_client, client, tokens_file ): - response = authorized_client.post( - "/graphql", - json={ - "query": API_RECOVERY_KEY_GENERATE_MUTATION, - "variables": { - "limits": { - "expirationDate": None, - "uses": 2, - }, - }, - }, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["success"] is True - assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["message"] is not None - assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["code"] == 200 - assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] is not None + mnemonic_key = graphql_make_new_recovery_key(authorized_client, uses=2) - mnemonic_key = response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] - key = mnemonic_to_hex(mnemonic_key) + status = graphql_recovery_status(authorized_client) + assert status["exists"] is True + assert status["valid"] is True + assert status["creationDate"] is not None + assert status["expirationDate"] is None + assert status["usesLeft"] == 2 - assert read_json(tokens_file)["recovery_token"]["token"] == key - assert read_json(tokens_file)["recovery_token"]["uses_left"] == 2 + graphql_use_recovery_key(client, mnemonic_key, "new_test_token1") - # Try to get token status - response = authorized_client.post( - "/graphql", - json={"query": generate_api_query([API_RECOVERY_QUERY])}, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["recoveryKey"] is not None - assert response.json()["data"]["api"]["recoveryKey"]["exists"] is True - assert response.json()["data"]["api"]["recoveryKey"]["valid"] is True - assert response.json()["data"]["api"]["recoveryKey"]["creationDate"] is not None - assert response.json()["data"]["api"]["recoveryKey"]["expirationDate"] is None - assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] == 2 + status = graphql_recovery_status(authorized_client) + assert status["exists"] is True + assert status["valid"] is True + assert status["creationDate"] is not None + assert status["expirationDate"] is None + assert status["usesLeft"] == 1 - # Try to use token - response = authorized_client.post( - "/graphql", - json={ - "query": API_RECOVERY_KEY_USE_MUTATION, - "variables": { - "input": { - "key": mnemonic_key, - "deviceName": "test_token1", - }, - }, - }, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is True - assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None - assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 200 - assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is not None + graphql_use_recovery_key(client, mnemonic_key, "new_test_token2") - # Try to get token status - response = authorized_client.post( - "/graphql", - json={"query": generate_api_query([API_RECOVERY_QUERY])}, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["recoveryKey"] is not None - assert response.json()["data"]["api"]["recoveryKey"]["exists"] is True - assert response.json()["data"]["api"]["recoveryKey"]["valid"] is True - assert response.json()["data"]["api"]["recoveryKey"]["creationDate"] is not None - assert response.json()["data"]["api"]["recoveryKey"]["expirationDate"] is None - assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] == 1 + status = graphql_recovery_status(authorized_client) + assert status["exists"] is True + assert status["valid"] is False + assert status["creationDate"] is not None + assert status["expirationDate"] is None + assert status["usesLeft"] == 0 - # Try to use token - response = authorized_client.post( - "/graphql", - json={ - "query": API_RECOVERY_KEY_USE_MUTATION, - "variables": { - "input": { - "key": mnemonic_key, - "deviceName": "test_token2", - }, - }, - }, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is True - assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None - assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 200 - assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is not None - - # Try to get token status - response = authorized_client.post( - "/graphql", - json={"query": generate_api_query([API_RECOVERY_QUERY])}, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["recoveryKey"] is not None - assert response.json()["data"]["api"]["recoveryKey"]["exists"] is True - assert response.json()["data"]["api"]["recoveryKey"]["valid"] is False - assert response.json()["data"]["api"]["recoveryKey"]["creationDate"] is not None - assert response.json()["data"]["api"]["recoveryKey"]["expirationDate"] is None - assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] == 0 - - # Try to use token - response = authorized_client.post( - "/graphql", - json={ - "query": API_RECOVERY_KEY_USE_MUTATION, - "variables": { - "input": { - "key": mnemonic_key, - "deviceName": "test_token3", - }, - }, - }, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is False - assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None - assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 404 - assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is None + response = request_recovery_auth(client, mnemonic_key, "new_test_token3") + assert_errorcode(response, "useRecoveryApiKey", 404) def test_graphql_generate_recovery_key_with_negative_uses( authorized_client, tokens_file ): - # Try to get token status - response = authorized_client.post( - "/graphql", - json={ - "query": API_RECOVERY_KEY_GENERATE_MUTATION, - "variables": { - "limits": { - "uses": -1, - }, - }, - }, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["success"] is False - assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["message"] is not None - assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["code"] == 400 + response = request_make_new_recovery_key(authorized_client, uses=-1) + + assert_errorcode(response, "getNewRecoveryApiKey", 400) assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] is None def test_graphql_generate_recovery_key_with_zero_uses(authorized_client, tokens_file): - # Try to get token status - response = authorized_client.post( - "/graphql", - json={ - "query": API_RECOVERY_KEY_GENERATE_MUTATION, - "variables": { - "limits": { - "uses": 0, - }, - }, - }, - ) - assert response.status_code == 200 - assert response.json().get("data") is not None - assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["success"] is False - assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["message"] is not None - assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["code"] == 400 + response = request_make_new_recovery_key(authorized_client, uses=0) + + assert_errorcode(response, "getNewRecoveryApiKey", 400) assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] is None + assert graphql_recovery_status(authorized_client)["exists"] is False diff --git a/tests/test_graphql/test_repository/test_json_tokens_repository.py b/tests/test_graphql/test_repository/test_json_tokens_repository.py index af8c844..23df9df 100644 --- a/tests/test_graphql/test_repository/test_json_tokens_repository.py +++ b/tests/test_graphql/test_repository/test_json_tokens_repository.py @@ -25,7 +25,6 @@ from test_tokens_repository import ( mock_recovery_key_generate, mock_generate_token, mock_new_device_key_generate, - empty_keys, ) ORIGINAL_TOKEN_CONTENT = [ @@ -51,6 +50,18 @@ ORIGINAL_TOKEN_CONTENT = [ }, ] +EMPTY_KEYS_JSON = """ +{ + "tokens": [ + { + "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", + "name": "primary_token", + "date": "2022-07-15 17:41:31.675698" + } + ] +} +""" + @pytest.fixture def tokens(mocker, datadir): @@ -59,6 +70,22 @@ def tokens(mocker, datadir): return datadir +@pytest.fixture +def empty_keys(mocker, tmpdir): + tokens_file = tmpdir / "empty_keys.json" + with open(tokens_file, "w") as file: + file.write(EMPTY_KEYS_JSON) + mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=tokens_file) + assert read_json(tokens_file)["tokens"] == [ + { + "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", + "name": "primary_token", + "date": "2022-07-15 17:41:31.675698", + } + ] + return tmpdir + + @pytest.fixture def null_keys(mocker, datadir): mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "null_keys.json") diff --git a/tests/test_graphql/test_repository/test_json_tokens_repository/empty_keys.json b/tests/test_graphql/test_repository/test_json_tokens_repository/empty_keys.json deleted file mode 100644 index 2131ddf..0000000 --- a/tests/test_graphql/test_repository/test_json_tokens_repository/empty_keys.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "tokens": [ - { - "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", - "name": "primary_token", - "date": "2022-07-15 17:41:31.675698" - } - ] -} diff --git a/tests/test_graphql/test_repository/test_tokens_repository.py b/tests/test_graphql/test_repository/test_tokens_repository.py index 020a868..eb5e7cb 100644 --- a/tests/test_graphql/test_repository/test_tokens_repository.py +++ b/tests/test_graphql/test_repository/test_tokens_repository.py @@ -2,7 +2,7 @@ # pylint: disable=unused-argument # pylint: disable=missing-function-docstring -from datetime import datetime, timedelta +from datetime import datetime, timezone from mnemonic import Mnemonic import pytest @@ -16,13 +16,18 @@ from selfprivacy_api.repositories.tokens.exceptions import ( TokenNotFound, NewDeviceKeyNotFound, ) + from selfprivacy_api.repositories.tokens.json_tokens_repository import ( JsonTokensRepository, ) from selfprivacy_api.repositories.tokens.redis_tokens_repository import ( RedisTokensRepository, ) -from tests.common import read_json +from selfprivacy_api.repositories.tokens.abstract_tokens_repository import ( + AbstractTokensRepository, +) + +from tests.common import five_minutes_into_past, five_minutes_into_future ORIGINAL_DEVICE_NAMES = [ @@ -32,24 +37,15 @@ ORIGINAL_DEVICE_NAMES = [ "forth_token", ] +TEST_DATE = datetime(2022, 7, 15, 17, 41, 31, 675698, timezone.utc) +# tokens are not tz-aware +TOKEN_TEST_DATE = datetime(2022, 7, 15, 17, 41, 31, 675698) + def mnemonic_from_hex(hexkey): return Mnemonic(language="english").to_mnemonic(bytes.fromhex(hexkey)) -@pytest.fixture -def empty_keys(mocker, datadir): - mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "empty_keys.json") - assert read_json(datadir / "empty_keys.json")["tokens"] == [ - { - "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", - "name": "primary_token", - "date": "2022-07-15 17:41:31.675698", - } - ] - return datadir - - @pytest.fixture def mock_new_device_key_generate(mocker): mock = mocker.patch( @@ -57,8 +53,8 @@ def mock_new_device_key_generate(mocker): autospec=True, return_value=NewDeviceKey( key="43478d05b35e4781598acd76e33832bb", - created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), - expires_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + created_at=TEST_DATE, + expires_at=TEST_DATE, ), ) return mock @@ -72,8 +68,8 @@ def mock_new_device_key_generate_for_mnemonic(mocker): autospec=True, return_value=NewDeviceKey( key="2237238de23dc71ab558e317bdb8ff8e", - created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), - expires_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + created_at=TEST_DATE, + expires_at=TEST_DATE, ), ) return mock @@ -100,7 +96,7 @@ def mock_recovery_key_generate_invalid(mocker): autospec=True, return_value=RecoveryKey( key="889bf49c1d3199d71a2e704718772bd53a422020334db051", - created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + created_at=TEST_DATE, expires_at=None, uses_left=0, ), @@ -116,7 +112,7 @@ def mock_token_generate(mocker): return_value=Token( token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM", device_name="IamNewDevice", - created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + created_at=TOKEN_TEST_DATE, ), ) return mock @@ -129,7 +125,7 @@ def mock_recovery_key_generate(mocker): autospec=True, return_value=RecoveryKey( key="889bf49c1d3199d71a2e704718772bd53a422020334db051", - created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + created_at=TEST_DATE, expires_at=None, uses_left=1, ), @@ -137,23 +133,6 @@ def mock_recovery_key_generate(mocker): return mock -@pytest.fixture -def empty_json_repo(empty_keys): - repo = JsonTokensRepository() - for token in repo.get_tokens(): - repo.delete_token(token) - assert repo.get_tokens() == [] - return repo - - -@pytest.fixture -def empty_redis_repo(): - repo = RedisTokensRepository() - repo.reset() - assert repo.get_tokens() == [] - return repo - - @pytest.fixture(params=["json", "redis"]) def empty_repo(request, empty_json_repo, empty_redis_repo): if request.param == "json": @@ -250,13 +229,13 @@ def test_create_token(empty_repo, mock_token_generate): assert repo.create_token(device_name="IamNewDevice") == Token( token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM", device_name="IamNewDevice", - created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + created_at=TOKEN_TEST_DATE, ) assert repo.get_tokens() == [ Token( token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM", device_name="IamNewDevice", - created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + created_at=TOKEN_TEST_DATE, ) ] @@ -292,7 +271,7 @@ def test_delete_not_found_token(some_tokens_repo): input_token = Token( token="imbadtoken", device_name="primary_token", - created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + created_at=TEST_DATE, ) with pytest.raises(TokenNotFound): assert repo.delete_token(input_token) is None @@ -321,7 +300,7 @@ def test_refresh_not_found_token(some_tokens_repo, mock_token_generate): input_token = Token( token="idontknowwhoiam", device_name="tellmewhoiam?", - created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + created_at=TEST_DATE, ) with pytest.raises(TokenNotFound): @@ -345,7 +324,7 @@ def test_create_get_recovery_key(some_tokens_repo, mock_recovery_key_generate): assert repo.create_recovery_key(uses_left=1, expiration=None) is not None assert repo.get_recovery_key() == RecoveryKey( key="889bf49c1d3199d71a2e704718772bd53a422020334db051", - created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + created_at=TEST_DATE, expires_at=None, uses_left=1, ) @@ -384,10 +363,13 @@ def test_use_mnemonic_expired_recovery_key( some_tokens_repo, ): repo = some_tokens_repo - expiration = datetime.now() - timedelta(minutes=5) + expiration = five_minutes_into_past() assert repo.create_recovery_key(uses_left=2, expiration=expiration) is not None recovery_key = repo.get_recovery_key() - assert recovery_key.expires_at == expiration + # TODO: do not ignore timezone once json backend is deleted + assert recovery_key.expires_at.replace(tzinfo=None) == expiration.replace( + tzinfo=None + ) assert not repo.is_recovery_key_valid() with pytest.raises(RecoveryKeyNotFound): @@ -484,8 +466,8 @@ def test_get_new_device_key(some_tokens_repo, mock_new_device_key_generate): assert repo.get_new_device_key() == NewDeviceKey( key="43478d05b35e4781598acd76e33832bb", - created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), - expires_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + created_at=TEST_DATE, + expires_at=TEST_DATE, ) @@ -561,7 +543,7 @@ def test_use_mnemonic_expired_new_device_key( some_tokens_repo, ): repo = some_tokens_repo - expiration = datetime.now() - timedelta(minutes=5) + expiration = five_minutes_into_past() key = repo.get_new_device_key() assert key is not None @@ -588,3 +570,36 @@ def test_use_mnemonic_new_device_key_when_empty(empty_repo): ) is None ) + + +def assert_identical( + repo_a: AbstractTokensRepository, repo_b: AbstractTokensRepository +): + tokens_a = repo_a.get_tokens() + tokens_b = repo_b.get_tokens() + assert len(tokens_a) == len(tokens_b) + for token in tokens_a: + assert token in tokens_b + assert repo_a.get_recovery_key() == repo_b.get_recovery_key() + assert repo_a._get_stored_new_device_key() == repo_b._get_stored_new_device_key() + + +def clone_to_redis(repo: JsonTokensRepository): + other_repo = RedisTokensRepository() + other_repo.clone(repo) + assert_identical(repo, other_repo) + + +# we cannot easily parametrize this unfortunately, since some_tokens and empty_repo cannot coexist +def test_clone_json_to_redis_empty(empty_repo): + repo = empty_repo + if isinstance(repo, JsonTokensRepository): + clone_to_redis(repo) + + +def test_clone_json_to_redis_full(some_tokens_repo): + repo = some_tokens_repo + if isinstance(repo, JsonTokensRepository): + repo.get_new_device_key() + repo.create_recovery_key(five_minutes_into_future(), 2) + clone_to_redis(repo) diff --git a/tests/test_graphql/test_repository/test_tokens_repository/empty_keys.json b/tests/test_graphql/test_repository/test_tokens_repository/empty_keys.json deleted file mode 100644 index 2131ddf..0000000 --- a/tests/test_graphql/test_repository/test_tokens_repository/empty_keys.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "tokens": [ - { - "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", - "name": "primary_token", - "date": "2022-07-15 17:41:31.675698" - } - ] -} diff --git a/tests/test_graphql/test_ssh.py b/tests/test_graphql/test_ssh.py index 5f888c8..eabf049 100644 --- a/tests/test_graphql/test_ssh.py +++ b/tests/test_graphql/test_ssh.py @@ -3,6 +3,7 @@ import pytest from tests.common import read_json +from tests.test_graphql.common import assert_empty class ProcessMock: @@ -72,8 +73,7 @@ def test_graphql_add_ssh_key_unauthorized(client, some_users, mock_subprocess_po }, }, ) - assert response.status_code == 200 - assert response.json().get("data") is None + assert_empty(response) def test_graphql_add_ssh_key(authorized_client, some_users, mock_subprocess_popen): @@ -231,8 +231,7 @@ def test_graphql_remove_ssh_key_unauthorized(client, some_users, mock_subprocess }, }, ) - assert response.status_code == 200 - assert response.json().get("data") is None + assert_empty(response) def test_graphql_remove_ssh_key(authorized_client, some_users, mock_subprocess_popen): diff --git a/tests/test_graphql/test_system.py b/tests/test_graphql/test_system.py index 3de4816..ed00268 100644 --- a/tests/test_graphql/test_system.py +++ b/tests/test_graphql/test_system.py @@ -5,6 +5,7 @@ import os import pytest from tests.common import generate_system_query, read_json +from tests.test_graphql.common import assert_empty @pytest.fixture @@ -144,8 +145,7 @@ def test_graphql_get_python_version_wrong_auth( "query": generate_system_query([API_PYTHON_VERSION_INFO]), }, ) - assert response.status_code == 200 - assert response.json().get("data") is None + assert_empty(response) def test_graphql_get_python_version(authorized_client, mock_subprocess_check_output): @@ -181,8 +181,7 @@ def test_graphql_get_system_version_unauthorized( }, ) - assert response.status_code == 200 - assert response.json().get("data") is None + assert_empty(response) assert mock_subprocess_check_output.call_count == 0 @@ -348,8 +347,7 @@ def test_graphql_get_timezone_unauthorized(client, turned_on): "query": generate_system_query([API_GET_TIMEZONE]), }, ) - assert response.status_code == 200 - assert response.json().get("data") is None + assert_empty(response) def test_graphql_get_timezone(authorized_client, turned_on): @@ -405,8 +403,7 @@ def test_graphql_change_timezone_unauthorized(client, turned_on): }, }, ) - assert response.status_code == 200 - assert response.json().get("data") is None + assert_empty(response) def test_graphql_change_timezone(authorized_client, turned_on): @@ -515,8 +512,7 @@ def test_graphql_get_auto_upgrade_unauthorized(client, turned_on): "query": generate_system_query([API_GET_AUTO_UPGRADE_SETTINGS_QUERY]), }, ) - assert response.status_code == 200 - assert response.json().get("data") is None + assert_empty(response) def test_graphql_get_auto_upgrade(authorized_client, turned_on): @@ -624,8 +620,7 @@ def test_graphql_change_auto_upgrade_unauthorized(client, turned_on): }, }, ) - assert response.status_code == 200 - assert response.json().get("data") is None + assert_empty(response) def test_graphql_change_auto_upgrade(authorized_client, turned_on): @@ -932,8 +927,7 @@ def test_graphql_pull_system_configuration_unauthorized(client, mock_subprocess_ }, ) - assert response.status_code == 200 - assert response.json().get("data") is None + assert_empty(response) assert mock_subprocess_popen.call_count == 0 diff --git a/tests/test_graphql/test_users.py b/tests/test_graphql/test_users.py index 9554195..af40981 100644 --- a/tests/test_graphql/test_users.py +++ b/tests/test_graphql/test_users.py @@ -6,6 +6,7 @@ from tests.common import ( generate_users_query, read_json, ) +from tests.test_graphql.common import assert_empty invalid_usernames = [ "messagebus", @@ -125,8 +126,7 @@ def test_graphql_get_users_unauthorized(client, some_users, mock_subprocess_pope "query": generate_users_query([API_USERS_INFO]), }, ) - assert response.status_code == 200 - assert response.json().get("data") is None + assert_empty(response) def test_graphql_get_some_users(authorized_client, some_users, mock_subprocess_popen): @@ -192,8 +192,7 @@ def test_graphql_get_one_user_unauthorized(client, one_user, mock_subprocess_pop }, }, ) - assert response.status_code == 200 - assert response.json().get("data") is None + assert_empty(response) def test_graphql_get_one_user(authorized_client, one_user, mock_subprocess_popen): @@ -323,8 +322,7 @@ def test_graphql_add_user_unauthorize(client, one_user, mock_subprocess_popen): }, }, ) - assert response.status_code == 200 - assert response.json().get("data") is None + assert_empty(response) def test_graphql_add_user(authorized_client, one_user, mock_subprocess_popen): @@ -576,8 +574,7 @@ def test_graphql_delete_user_unauthorized(client, some_users, mock_subprocess_po "variables": {"username": "user1"}, }, ) - assert response.status_code == 200 - assert response.json().get("data") is None + assert_empty(response) def test_graphql_delete_user(authorized_client, some_users, mock_subprocess_popen): @@ -683,8 +680,7 @@ def test_graphql_update_user_unauthorized(client, some_users, mock_subprocess_po }, }, ) - assert response.status_code == 200 - assert response.json().get("data") is None + assert_empty(response) def test_graphql_update_user(authorized_client, some_users, mock_subprocess_popen): diff --git a/tests/test_rest_endpoints/data/tokens.json b/tests/test_rest_endpoints/data/tokens.json deleted file mode 100644 index 9be9d02..0000000 --- a/tests/test_rest_endpoints/data/tokens.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "tokens": [ - { - "token": "TEST_TOKEN", - "name": "test_token", - "date": "2022-01-14 08:31:10.789314" - }, - { - "token": "TEST_TOKEN2", - "name": "test_token2", - "date": "2022-01-14 08:31:10.789314" - } - ] -} \ No newline at end of file diff --git a/tests/test_rest_endpoints/services/data/gitkeep b/tests/test_rest_endpoints/services/data/gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_rest_endpoints/services/data/tokens.json b/tests/test_rest_endpoints/services/data/tokens.json deleted file mode 100644 index 9d35420..0000000 --- a/tests/test_rest_endpoints/services/data/tokens.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "tokens": [ - { - "token": "TEST_TOKEN", - "name": "Test Token", - "date": "2022-01-14 08:31:10.789314" - } - ] -} \ No newline at end of file diff --git a/tests/test_rest_endpoints/test_auth.py b/tests/test_rest_endpoints/test_auth.py index 12de0cf..d62fa18 100644 --- a/tests/test_rest_endpoints/test_auth.py +++ b/tests/test_rest_endpoints/test_auth.py @@ -3,31 +3,16 @@ # pylint: disable=missing-function-docstring import datetime import pytest -from mnemonic import Mnemonic -from selfprivacy_api.repositories.tokens.json_tokens_repository import ( - JsonTokensRepository, +from tests.conftest import TOKENS_FILE_CONTENTS +from tests.common import ( + RECOVERY_KEY_VALIDATION_DATETIME, + DEVICE_KEY_VALIDATION_DATETIME, + NearFuture, + assert_recovery_recent, ) - -TOKEN_REPO = JsonTokensRepository() - -from tests.common import read_json, write_json - - -TOKENS_FILE_CONTETS = { - "tokens": [ - { - "token": "TEST_TOKEN", - "name": "test_token", - "date": "2022-01-14 08:31:10.789314", - }, - { - "token": "TEST_TOKEN2", - "name": "test_token2", - "date": "2022-01-14 08:31:10.789314", - }, - ] -} +from tests.common import five_minutes_into_future_naive as five_minutes_into_future +from tests.common import five_minutes_into_past_naive as five_minutes_into_past DATE_FORMATS = [ "%Y-%m-%dT%H:%M:%S.%fZ", @@ -37,10 +22,99 @@ DATE_FORMATS = [ ] -def test_get_tokens_info(authorized_client, tokens_file): - response = authorized_client.get("/auth/tokens") +def assert_original(client): + new_tokens = rest_get_tokens_info(client) + + for token in TOKENS_FILE_CONTENTS["tokens"]: + assert_token_valid(client, token["token"]) + for new_token in new_tokens: + if new_token["name"] == token["name"]: + assert ( + datetime.datetime.fromisoformat(new_token["date"]) == token["date"] + ) + assert_no_recovery(client) + + +def assert_token_valid(client, token): + client.headers.update({"Authorization": "Bearer " + token}) + assert rest_get_tokens_info(client) is not None + + +def rest_get_tokens_info(client): + response = client.get("/auth/tokens") assert response.status_code == 200 - assert response.json() == [ + return response.json() + + +def rest_try_authorize_new_device(client, token, device_name): + response = client.post( + "/auth/new_device/authorize", + json={ + "token": token, + "device": device_name, + }, + ) + return response + + +def rest_make_recovery_token(client, expires_at=None, timeformat=None, uses=None): + json = {} + + if expires_at is not None: + assert timeformat is not None + expires_at_str = expires_at.strftime(timeformat) + json["expiration"] = expires_at_str + + if uses is not None: + json["uses"] = uses + + if json == {}: + response = client.post("/auth/recovery_token") + else: + response = client.post( + "/auth/recovery_token", + json=json, + ) + + if not response.status_code == 200: + raise ValueError(response.reason, response.text, response.json()["detail"]) + assert response.status_code == 200 + assert "token" in response.json() + return response.json()["token"] + + +def rest_get_recovery_status(client): + response = client.get("/auth/recovery_token") + assert response.status_code == 200 + return response.json() + + +def rest_get_recovery_date(client): + status = rest_get_recovery_status(client) + assert "date" in status + return status["date"] + + +def assert_no_recovery(client): + assert not rest_get_recovery_status(client)["exists"] + + +def rest_recover_with_mnemonic(client, mnemonic_token, device_name): + recovery_response = client.post( + "/auth/recovery_token/use", + json={"token": mnemonic_token, "device": device_name}, + ) + assert recovery_response.status_code == 200 + new_token = recovery_response.json()["token"] + assert_token_valid(client, new_token) + return new_token + + +# Tokens + + +def test_get_tokens_info(authorized_client, tokens_file): + assert sorted(rest_get_tokens_info(authorized_client), key=lambda x: x["name"]) == [ {"name": "test_token", "date": "2022-01-14T08:31:10.789314", "is_caller": True}, { "name": "test_token2", @@ -55,10 +129,10 @@ def test_get_tokens_unauthorized(client, tokens_file): assert response.status_code == 401 -def test_delete_token_unauthorized(client, tokens_file): +def test_delete_token_unauthorized(client, authorized_client, tokens_file): response = client.delete("/auth/tokens") assert response.status_code == 401 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert_original(authorized_client) def test_delete_token(authorized_client, tokens_file): @@ -66,15 +140,9 @@ def test_delete_token(authorized_client, tokens_file): "/auth/tokens", json={"token_name": "test_token2"} ) assert response.status_code == 200 - assert read_json(tokens_file) == { - "tokens": [ - { - "token": "TEST_TOKEN", - "name": "test_token", - "date": "2022-01-14 08:31:10.789314", - } - ] - } + assert rest_get_tokens_info(authorized_client) == [ + {"name": "test_token", "date": "2022-01-14T08:31:10.789314", "is_caller": True} + ] def test_delete_self_token(authorized_client, tokens_file): @@ -82,7 +150,7 @@ def test_delete_self_token(authorized_client, tokens_file): "/auth/tokens", json={"token_name": "test_token"} ) assert response.status_code == 400 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert_original(authorized_client) def test_delete_nonexistent_token(authorized_client, tokens_file): @@ -90,131 +158,103 @@ def test_delete_nonexistent_token(authorized_client, tokens_file): "/auth/tokens", json={"token_name": "test_token3"} ) assert response.status_code == 404 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert_original(authorized_client) -def test_refresh_token_unauthorized(client, tokens_file): +def test_refresh_token_unauthorized(client, authorized_client, tokens_file): response = client.post("/auth/tokens") assert response.status_code == 401 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert_original(authorized_client) def test_refresh_token(authorized_client, tokens_file): response = authorized_client.post("/auth/tokens") assert response.status_code == 200 new_token = response.json()["token"] - assert TOKEN_REPO.get_token_by_token_string(new_token) is not None + assert_token_valid(authorized_client, new_token) -# new device +# New device -def test_get_new_device_auth_token_unauthorized(client, tokens_file): +def test_get_new_device_auth_token_unauthorized(client, authorized_client, tokens_file): response = client.post("/auth/new_device") assert response.status_code == 401 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert "token" not in response.json() + assert "detail" in response.json() + # We only can check existence of a token we know. -def test_get_new_device_auth_token(authorized_client, tokens_file): - response = authorized_client.post("/auth/new_device") +def test_get_and_delete_new_device_token(client, authorized_client, tokens_file): + token = rest_get_new_device_token(authorized_client) + response = authorized_client.delete("/auth/new_device", json={"token": token}) assert response.status_code == 200 - assert "token" in response.json() - token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex() - assert read_json(tokens_file)["new_device"]["token"] == token + assert rest_try_authorize_new_device(client, token, "new_device").status_code == 404 -def test_get_and_delete_new_device_token(authorized_client, tokens_file): - response = authorized_client.post("/auth/new_device") - assert response.status_code == 200 - assert "token" in response.json() - token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex() - assert read_json(tokens_file)["new_device"]["token"] == token - response = authorized_client.delete( - "/auth/new_device", json={"token": response.json()["token"]} - ) - assert response.status_code == 200 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS - - -def test_delete_token_unauthenticated(client, tokens_file): - response = client.delete("/auth/new_device") +def test_delete_token_unauthenticated(client, authorized_client, tokens_file): + token = rest_get_new_device_token(authorized_client) + response = client.delete("/auth/new_device", json={"token": token}) assert response.status_code == 401 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert rest_try_authorize_new_device(client, token, "new_device").status_code == 200 + + +def rest_get_new_device_token(client): + response = client.post("/auth/new_device") + assert response.status_code == 200 + assert "token" in response.json() + return response.json()["token"] def test_get_and_authorize_new_device(client, authorized_client, tokens_file): - response = authorized_client.post("/auth/new_device") + token = rest_get_new_device_token(authorized_client) + response = rest_try_authorize_new_device(client, token, "new_device") assert response.status_code == 200 - assert "token" in response.json() - token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex() - assert read_json(tokens_file)["new_device"]["token"] == token - response = client.post( - "/auth/new_device/authorize", - json={"token": response.json()["token"], "device": "new_device"}, - ) - assert response.status_code == 200 - assert read_json(tokens_file)["tokens"][2]["token"] == response.json()["token"] - assert read_json(tokens_file)["tokens"][2]["name"] == "new_device" + assert_token_valid(authorized_client, response.json()["token"]) -def test_authorize_new_device_with_invalid_token(client, tokens_file): - response = client.post( - "/auth/new_device/authorize", - json={"token": "invalid_token", "device": "new_device"}, - ) +def test_authorize_new_device_with_invalid_token( + client, authorized_client, tokens_file +): + response = rest_try_authorize_new_device(client, "invalid_token", "new_device") assert response.status_code == 404 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert_original(authorized_client) def test_get_and_authorize_used_token(client, authorized_client, tokens_file): - response = authorized_client.post("/auth/new_device") - assert response.status_code == 200 - assert "token" in response.json() - token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex() - assert read_json(tokens_file)["new_device"]["token"] == token - response = client.post( - "/auth/new_device/authorize", - json={"token": response.json()["token"], "device": "new_device"}, + token_to_be_used_2_times = rest_get_new_device_token(authorized_client) + response = rest_try_authorize_new_device( + client, token_to_be_used_2_times, "new_device" ) assert response.status_code == 200 - assert read_json(tokens_file)["tokens"][2]["token"] == response.json()["token"] - assert read_json(tokens_file)["tokens"][2]["name"] == "new_device" - response = client.post( - "/auth/new_device/authorize", - json={"token": response.json()["token"], "device": "new_device"}, + assert_token_valid(authorized_client, response.json()["token"]) + + response = rest_try_authorize_new_device( + client, token_to_be_used_2_times, "new_device" ) assert response.status_code == 404 def test_get_and_authorize_token_after_12_minutes( - client, authorized_client, tokens_file + client, authorized_client, tokens_file, mocker ): - response = authorized_client.post("/auth/new_device") - assert response.status_code == 200 - assert "token" in response.json() - token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex() - assert read_json(tokens_file)["new_device"]["token"] == token + token = rest_get_new_device_token(authorized_client) - file_data = read_json(tokens_file) - file_data["new_device"]["expiration"] = str( - datetime.datetime.now() - datetime.timedelta(minutes=13) - ) - write_json(tokens_file, file_data) + # TARDIS sounds + mock = mocker.patch(DEVICE_KEY_VALIDATION_DATETIME, NearFuture) - response = client.post( - "/auth/new_device/authorize", - json={"token": response.json()["token"], "device": "new_device"}, - ) + response = rest_try_authorize_new_device(client, token, "new_device") assert response.status_code == 404 + assert_original(authorized_client) -def test_authorize_without_token(client, tokens_file): +def test_authorize_without_token(client, authorized_client, tokens_file): response = client.post( "/auth/new_device/authorize", json={"device": "new_device"}, ) assert response.status_code == 422 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert_original(authorized_client) # Recovery tokens @@ -240,10 +280,10 @@ def test_authorize_without_token(client, tokens_file): # - if request is invalid, returns 400 -def test_get_recovery_token_status_unauthorized(client, tokens_file): +def test_get_recovery_token_status_unauthorized(client, authorized_client, tokens_file): response = client.get("/auth/recovery_token") assert response.status_code == 401 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert_original(authorized_client) def test_get_recovery_token_when_none_exists(authorized_client, tokens_file): @@ -256,31 +296,17 @@ def test_get_recovery_token_when_none_exists(authorized_client, tokens_file): "expiration": None, "uses_left": None, } - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert_original(authorized_client) def test_generate_recovery_token(authorized_client, client, tokens_file): # Generate token without expiration and uses_left - response = authorized_client.post("/auth/recovery_token") - assert response.status_code == 200 - assert "token" in response.json() - mnemonic_token = response.json()["token"] - token = Mnemonic(language="english").to_entropy(mnemonic_token).hex() - assert read_json(tokens_file)["recovery_token"]["token"] == token + mnemonic_token = rest_make_recovery_token(authorized_client) - time_generated = read_json(tokens_file)["recovery_token"]["date"] - assert time_generated is not None - # Assert that the token was generated near the current time - assert ( - datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f") - - datetime.timedelta(seconds=5) - < datetime.datetime.now() - ) + time_generated = rest_get_recovery_date(authorized_client) + assert_recovery_recent(time_generated) - # Try to get token status - response = authorized_client.get("/auth/recovery_token") - assert response.status_code == 200 - assert response.json() == { + assert rest_get_recovery_status(authorized_client) == { "exists": True, "valid": True, "date": time_generated, @@ -288,112 +314,49 @@ def test_generate_recovery_token(authorized_client, client, tokens_file): "uses_left": None, } - # Try to use the token - recovery_response = client.post( - "/auth/recovery_token/use", - json={"token": mnemonic_token, "device": "recovery_device"}, - ) - assert recovery_response.status_code == 200 - new_token = recovery_response.json()["token"] - assert read_json(tokens_file)["tokens"][2]["token"] == new_token - assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device" - - # Try to use token again - recovery_response = client.post( - "/auth/recovery_token/use", - json={"token": mnemonic_token, "device": "recovery_device2"}, - ) - assert recovery_response.status_code == 200 - new_token = recovery_response.json()["token"] - assert read_json(tokens_file)["tokens"][3]["token"] == new_token - assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2" + rest_recover_with_mnemonic(client, mnemonic_token, "recover_device") + # And again + rest_recover_with_mnemonic(client, mnemonic_token, "recover_device2") @pytest.mark.parametrize("timeformat", DATE_FORMATS) def test_generate_recovery_token_with_expiration_date( - authorized_client, client, tokens_file, timeformat + authorized_client, client, tokens_file, timeformat, mocker ): # Generate token with expiration date # Generate expiration date in the future - expiration_date = datetime.datetime.now() + datetime.timedelta(minutes=5) - expiration_date_str = expiration_date.strftime(timeformat) - response = authorized_client.post( - "/auth/recovery_token", - json={"expiration": expiration_date_str}, - ) - assert response.status_code == 200 - assert "token" in response.json() - mnemonic_token = response.json()["token"] - token = Mnemonic(language="english").to_entropy(mnemonic_token).hex() - assert read_json(tokens_file)["recovery_token"]["token"] == token - assert datetime.datetime.strptime( - read_json(tokens_file)["recovery_token"]["expiration"], "%Y-%m-%dT%H:%M:%S.%f" - ) == datetime.datetime.strptime(expiration_date_str, timeformat) - - time_generated = read_json(tokens_file)["recovery_token"]["date"] - assert time_generated is not None - # Assert that the token was generated near the current time - assert ( - datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f") - - datetime.timedelta(seconds=5) - < datetime.datetime.now() + expiration_date = five_minutes_into_future() + mnemonic_token = rest_make_recovery_token( + authorized_client, expires_at=expiration_date, timeformat=timeformat ) - # Try to get token status - response = authorized_client.get("/auth/recovery_token") - assert response.status_code == 200 - assert response.json() == { + time_generated = rest_get_recovery_date(authorized_client) + assert_recovery_recent(time_generated) + + assert rest_get_recovery_status(authorized_client) == { "exists": True, "valid": True, "date": time_generated, - "expiration": expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%f"), + "expiration": expiration_date.isoformat(), "uses_left": None, } - # Try to use the token - recovery_response = client.post( - "/auth/recovery_token/use", - json={"token": mnemonic_token, "device": "recovery_device"}, - ) - assert recovery_response.status_code == 200 - new_token = recovery_response.json()["token"] - assert read_json(tokens_file)["tokens"][2]["token"] == new_token - assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device" - - # Try to use token again - recovery_response = client.post( - "/auth/recovery_token/use", - json={"token": mnemonic_token, "device": "recovery_device2"}, - ) - assert recovery_response.status_code == 200 - new_token = recovery_response.json()["token"] - assert read_json(tokens_file)["tokens"][3]["token"] == new_token - assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2" + rest_recover_with_mnemonic(client, mnemonic_token, "recover_device") + # And again + rest_recover_with_mnemonic(client, mnemonic_token, "recover_device2") # Try to use token after expiration date - new_data = read_json(tokens_file) - new_data["recovery_token"]["expiration"] = datetime.datetime.now().strftime( - "%Y-%m-%dT%H:%M:%S.%f" - ) - write_json(tokens_file, new_data) + mock = mocker.patch(RECOVERY_KEY_VALIDATION_DATETIME, NearFuture) + device_name = "recovery_device3" recovery_response = client.post( "/auth/recovery_token/use", - json={"token": mnemonic_token, "device": "recovery_device3"}, + json={"token": mnemonic_token, "device": device_name}, ) assert recovery_response.status_code == 404 - # Assert that the token was not created in JSON - assert read_json(tokens_file)["tokens"] == new_data["tokens"] - - # Get the status of the token - response = authorized_client.get("/auth/recovery_token") - assert response.status_code == 200 - assert response.json() == { - "exists": True, - "valid": False, - "date": time_generated, - "expiration": new_data["recovery_token"]["expiration"], - "uses_left": None, - } + # Assert that the token was not created + assert device_name not in [ + token["name"] for token in rest_get_tokens_info(authorized_client) + ] @pytest.mark.parametrize("timeformat", DATE_FORMATS) @@ -401,14 +364,14 @@ def test_generate_recovery_token_with_expiration_in_the_past( authorized_client, tokens_file, timeformat ): # Server must return 400 if expiration date is in the past - expiration_date = datetime.datetime.utcnow() - datetime.timedelta(minutes=5) + expiration_date = five_minutes_into_past() expiration_date_str = expiration_date.strftime(timeformat) response = authorized_client.post( "/auth/recovery_token", json={"expiration": expiration_date_str}, ) assert response.status_code == 400 - assert "recovery_token" not in read_json(tokens_file) + assert_no_recovery(authorized_client) def test_generate_recovery_token_with_invalid_time_format( @@ -421,37 +384,19 @@ def test_generate_recovery_token_with_invalid_time_format( json={"expiration": expiration_date}, ) assert response.status_code == 422 - assert "recovery_token" not in read_json(tokens_file) + assert_no_recovery(authorized_client) def test_generate_recovery_token_with_limited_uses( authorized_client, client, tokens_file ): # Generate token with limited uses - response = authorized_client.post( - "/auth/recovery_token", - json={"uses": 2}, - ) - assert response.status_code == 200 - assert "token" in response.json() - mnemonic_token = response.json()["token"] - token = Mnemonic(language="english").to_entropy(mnemonic_token).hex() - assert read_json(tokens_file)["recovery_token"]["token"] == token - assert read_json(tokens_file)["recovery_token"]["uses_left"] == 2 + mnemonic_token = rest_make_recovery_token(authorized_client, uses=2) - # Get the date of the token - time_generated = read_json(tokens_file)["recovery_token"]["date"] - assert time_generated is not None - assert ( - datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f") - - datetime.timedelta(seconds=5) - < datetime.datetime.now() - ) + time_generated = rest_get_recovery_date(authorized_client) + assert_recovery_recent(time_generated) - # Try to get token status - response = authorized_client.get("/auth/recovery_token") - assert response.status_code == 200 - assert response.json() == { + assert rest_get_recovery_status(authorized_client) == { "exists": True, "valid": True, "date": time_generated, @@ -460,21 +405,9 @@ def test_generate_recovery_token_with_limited_uses( } # Try to use the token - recovery_response = client.post( - "/auth/recovery_token/use", - json={"token": mnemonic_token, "device": "recovery_device"}, - ) - assert recovery_response.status_code == 200 - new_token = recovery_response.json()["token"] - assert read_json(tokens_file)["tokens"][2]["token"] == new_token - assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device" + rest_recover_with_mnemonic(client, mnemonic_token, "recover_device") - assert read_json(tokens_file)["recovery_token"]["uses_left"] == 1 - - # Get the status of the token - response = authorized_client.get("/auth/recovery_token") - assert response.status_code == 200 - assert response.json() == { + assert rest_get_recovery_status(authorized_client) == { "exists": True, "valid": True, "date": time_generated, @@ -483,19 +416,9 @@ def test_generate_recovery_token_with_limited_uses( } # Try to use token again - recovery_response = client.post( - "/auth/recovery_token/use", - json={"token": mnemonic_token, "device": "recovery_device2"}, - ) - assert recovery_response.status_code == 200 - new_token = recovery_response.json()["token"] - assert read_json(tokens_file)["tokens"][3]["token"] == new_token - assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2" + rest_recover_with_mnemonic(client, mnemonic_token, "recover_device2") - # Get the status of the token - response = authorized_client.get("/auth/recovery_token") - assert response.status_code == 200 - assert response.json() == { + assert rest_get_recovery_status(authorized_client) == { "exists": True, "valid": False, "date": time_generated, @@ -510,8 +433,6 @@ def test_generate_recovery_token_with_limited_uses( ) assert recovery_response.status_code == 404 - assert read_json(tokens_file)["recovery_token"]["uses_left"] == 0 - def test_generate_recovery_token_with_negative_uses( authorized_client, client, tokens_file @@ -522,7 +443,7 @@ def test_generate_recovery_token_with_negative_uses( json={"uses": -2}, ) assert response.status_code == 400 - assert "recovery_token" not in read_json(tokens_file) + assert_no_recovery(authorized_client) def test_generate_recovery_token_with_zero_uses(authorized_client, client, tokens_file): @@ -532,4 +453,4 @@ def test_generate_recovery_token_with_zero_uses(authorized_client, client, token json={"uses": 0}, ) assert response.status_code == 400 - assert "recovery_token" not in read_json(tokens_file) + assert_no_recovery(authorized_client)