diff --git a/selfprivacy_api/repositories/tokens/abstract_tokens_repository.py b/selfprivacy_api/repositories/tokens/abstract_tokens_repository.py index 3cf6e1d..a67d62d 100644 --- a/selfprivacy_api/repositories/tokens/abstract_tokens_repository.py +++ b/selfprivacy_api/repositories/tokens/abstract_tokens_repository.py @@ -1,36 +1,64 @@ from abc import ABC, abstractmethod from datetime import datetime from typing import Optional +from mnemonic import Mnemonic from selfprivacy_api.models.tokens.token import Token +from selfprivacy_api.repositories.tokens.exceptions import ( + TokenNotFound, + InvalidMnemonic, + RecoveryKeyNotFound, + NewDeviceKeyNotFound, +) from selfprivacy_api.models.tokens.recovery_key import RecoveryKey from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey class AbstractTokensRepository(ABC): - @abstractmethod def get_token_by_token_string(self, token_string: str) -> Optional[Token]: """Get the token by token""" + tokens = self.get_tokens() + for token in tokens: + if token.token == token_string: + return token + + raise TokenNotFound("Token not found!") - @abstractmethod def get_token_by_name(self, token_name: str) -> Optional[Token]: """Get the token by name""" + tokens = self.get_tokens() + for token in tokens: + if token.device_name == token_name: + return token + + raise TokenNotFound("Token not found!") @abstractmethod def get_tokens(self) -> list[Token]: """Get the tokens""" - @abstractmethod def create_token(self, device_name: str) -> Token: """Create new token""" + new_token = Token.generate(device_name) + + self._store_token(new_token) + + return new_token @abstractmethod def delete_token(self, input_token: Token) -> None: """Delete the token""" - @abstractmethod def refresh_token(self, input_token: Token) -> Token: - """Refresh the token""" + """Change the token field of the existing token""" + new_token = Token.generate(device_name=input_token.device_name) + + if input_token in self.get_tokens(): + self.delete_token(input_token) + self._store_token(new_token) + return new_token + + raise TokenNotFound("Token not found!") def is_token_valid(self, token_string: str) -> bool: """Check if the token is valid""" @@ -65,11 +93,22 @@ class AbstractTokensRepository(ABC): ) -> RecoveryKey: """Create the recovery key""" - @abstractmethod def use_mnemonic_recovery_key( self, mnemonic_phrase: str, device_name: str ) -> Token: """Use the mnemonic recovery key and create a new token with the given name""" + if not self.is_recovery_key_valid(): + raise RecoveryKeyNotFound("Recovery key not found") + + recovery_hex_key = self.get_recovery_key().key + if not self._assert_mnemonic(recovery_hex_key, mnemonic_phrase): + raise RecoveryKeyNotFound("Recovery key not found") + + new_token = self.create_token(device_name=device_name) + + self._decrement_recovery_token() + + return new_token def is_recovery_key_valid(self) -> bool: """Check if the recovery key is valid""" @@ -86,8 +125,41 @@ class AbstractTokensRepository(ABC): def delete_new_device_key(self) -> None: """Delete the new device key""" - @abstractmethod def use_mnemonic_new_device_key( self, mnemonic_phrase: str, device_name: str ) -> Token: """Use the mnemonic new device key""" + new_device_key = self._get_stored_new_device_key() + if not new_device_key: + raise NewDeviceKeyNotFound + + if not self._assert_mnemonic(new_device_key.key, mnemonic_phrase): + raise NewDeviceKeyNotFound("Phrase is not token!") + + new_token = self.create_token(device_name=device_name) + self.delete_new_device_key() + + return new_token + + @abstractmethod + def _store_token(self, new_token: Token): + """Store a token directly""" + + @abstractmethod + def _decrement_recovery_token(self): + """Decrement recovery key use count by one""" + + @abstractmethod + def _get_stored_new_device_key(self) -> Optional[NewDeviceKey]: + """Retrieves new device key that is already stored.""" + + # TODO: find a proper place for it + def _assert_mnemonic(self, hex_key: str, mnemonic_phrase: str): + """Return true if hex string matches the phrase, false otherwise + Raise an InvalidMnemonic error if not mnemonic""" + recovery_token = bytes.fromhex(hex_key) + if not Mnemonic(language="english").check(mnemonic_phrase): + raise InvalidMnemonic("Phrase is not mnemonic!") + + phrase_bytes = Mnemonic(language="english").to_entropy(mnemonic_phrase) + return phrase_bytes == recovery_token diff --git a/selfprivacy_api/repositories/tokens/json_tokens_repository.py b/selfprivacy_api/repositories/tokens/json_tokens_repository.py index aad3158..b4c0ab2 100644 --- a/selfprivacy_api/repositories/tokens/json_tokens_repository.py +++ b/selfprivacy_api/repositories/tokens/json_tokens_repository.py @@ -3,7 +3,6 @@ temporary legacy """ from typing import Optional from datetime import datetime -from mnemonic import Mnemonic from selfprivacy_api.utils import UserDataFiles, WriteUserData, ReadUserData from selfprivacy_api.models.tokens.token import Token @@ -11,9 +10,6 @@ from selfprivacy_api.models.tokens.recovery_key import RecoveryKey from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey from selfprivacy_api.repositories.tokens.exceptions import ( TokenNotFound, - RecoveryKeyNotFound, - InvalidMnemonic, - NewDeviceKeyNotFound, ) from selfprivacy_api.repositories.tokens.abstract_tokens_repository import ( AbstractTokensRepository, @@ -23,34 +19,6 @@ DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f" class JsonTokensRepository(AbstractTokensRepository): - def get_token_by_token_string(self, token_string: str) -> Optional[Token]: - """Get the token by token""" - with ReadUserData(UserDataFiles.TOKENS) as tokens_file: - for userdata_token in tokens_file["tokens"]: - if userdata_token["token"] == token_string: - - return Token( - token=token_string, - device_name=userdata_token["name"], - created_at=userdata_token["date"], - ) - - raise TokenNotFound("Token not found!") - - def get_token_by_name(self, token_name: str) -> Optional[Token]: - """Get the token by name""" - with ReadUserData(UserDataFiles.TOKENS) as tokens_file: - for userdata_token in tokens_file["tokens"]: - if userdata_token["name"] == token_name: - - return Token( - token=userdata_token["token"], - device_name=token_name, - created_at=userdata_token["date"], - ) - - raise TokenNotFound("Token not found!") - def get_tokens(self) -> list[Token]: """Get the tokens""" tokens_list = [] @@ -67,10 +35,8 @@ class JsonTokensRepository(AbstractTokensRepository): return tokens_list - def create_token(self, device_name: str) -> Token: - """Create new token""" - new_token = Token.generate(device_name) - + def _store_token(self, new_token: Token): + """Store a token directly""" with WriteUserData(UserDataFiles.TOKENS) as tokens_file: tokens_file["tokens"].append( { @@ -79,7 +45,6 @@ class JsonTokensRepository(AbstractTokensRepository): "date": new_token.created_at.strftime(DATETIME_FORMAT), } ) - return new_token def delete_token(self, input_token: Token) -> None: """Delete the token""" @@ -91,23 +56,6 @@ class JsonTokensRepository(AbstractTokensRepository): raise TokenNotFound("Token not found!") - def refresh_token(self, input_token: Token) -> Token: - """Change the token field of the existing token""" - new_token = Token.generate(device_name=input_token.device_name) - - with WriteUserData(UserDataFiles.TOKENS) as tokens_file: - for userdata_token in tokens_file["tokens"]: - - if userdata_token["name"] == input_token.device_name: - userdata_token["token"] = new_token.token - userdata_token["date"] = ( - new_token.created_at.strftime(DATETIME_FORMAT), - ) - - return new_token - - raise TokenNotFound("Token not found!") - def get_recovery_key(self) -> Optional[RecoveryKey]: """Get the recovery key""" with ReadUserData(UserDataFiles.TOKENS) as tokens_file: @@ -146,45 +94,11 @@ class JsonTokensRepository(AbstractTokensRepository): return recovery_key - def use_mnemonic_recovery_key( - self, mnemonic_phrase: str, device_name: str - ) -> Token: - """Use the mnemonic recovery key and create a new token with the given name""" - recovery_key = self.get_recovery_key() - - if recovery_key is None: - raise RecoveryKeyNotFound("Recovery key not found") - - if not recovery_key.is_valid(): - raise RecoveryKeyNotFound("Recovery key not found") - - recovery_token = bytes.fromhex(recovery_key.key) - - if not Mnemonic(language="english").check(mnemonic_phrase): - raise InvalidMnemonic("Phrase is not mnemonic!") - - phrase_bytes = Mnemonic(language="english").to_entropy(mnemonic_phrase) - if phrase_bytes != recovery_token: - raise RecoveryKeyNotFound("Recovery key not found") - - new_token = Token.generate(device_name=device_name) - - with WriteUserData(UserDataFiles.TOKENS) as tokens: - tokens["tokens"].append( - { - "token": new_token.token, - "name": new_token.device_name, - "date": new_token.created_at.strftime(DATETIME_FORMAT), - } - ) - - if "recovery_token" in tokens: - if ( - "uses_left" in tokens["recovery_token"] - and tokens["recovery_token"]["uses_left"] is not None - ): - tokens["recovery_token"]["uses_left"] -= 1 - return new_token + def _decrement_recovery_token(self): + """Decrement recovery key use count by one""" + if self.is_recovery_key_valid(): + with WriteUserData(UserDataFiles.TOKENS) as tokens: + tokens["recovery_token"]["uses_left"] -= 1 def get_new_device_key(self) -> NewDeviceKey: """Creates and returns the new device key""" @@ -206,33 +120,15 @@ class JsonTokensRepository(AbstractTokensRepository): del tokens_file["new_device"] return - def use_mnemonic_new_device_key( - self, mnemonic_phrase: str, device_name: str - ) -> Token: - """Use the mnemonic new device key""" - + def _get_stored_new_device_key(self) -> Optional[NewDeviceKey]: + """Retrieves new device key that is already stored.""" with ReadUserData(UserDataFiles.TOKENS) as tokens_file: if "new_device" not in tokens_file or tokens_file["new_device"] is None: - raise NewDeviceKeyNotFound("New device key not found") + return new_device_key = NewDeviceKey( key=tokens_file["new_device"]["token"], created_at=tokens_file["new_device"]["date"], expires_at=tokens_file["new_device"]["expiration"], ) - - token = bytes.fromhex(new_device_key.key) - - if not Mnemonic(language="english").check(mnemonic_phrase): - raise InvalidMnemonic("Phrase is not mnemonic!") - - phrase_bytes = Mnemonic(language="english").to_entropy(mnemonic_phrase) - if bytes(phrase_bytes) != bytes(token): - raise NewDeviceKeyNotFound("Phrase is not token!") - - new_token = Token.generate(device_name=device_name) - with WriteUserData(UserDataFiles.TOKENS) as tokens: - if "new_device" in tokens: - del tokens["new_device"] - - return new_token + return new_device_key diff --git a/selfprivacy_api/repositories/tokens/redis_tokens_repository.py b/selfprivacy_api/repositories/tokens/redis_tokens_repository.py index 0186c11..b1fb4b0 100644 --- a/selfprivacy_api/repositories/tokens/redis_tokens_repository.py +++ b/selfprivacy_api/repositories/tokens/redis_tokens_repository.py @@ -1,9 +1,21 @@ """ Token repository using Redis as backend. """ +from typing import Optional +from datetime import datetime + from selfprivacy_api.repositories.tokens.abstract_tokens_repository import ( AbstractTokensRepository, ) +from selfprivacy_api.utils.redis_pool import RedisPool +from selfprivacy_api.models.tokens.token import Token +from selfprivacy_api.models.tokens.recovery_key import RecoveryKey +from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey +from selfprivacy_api.repositories.tokens.exceptions import TokenNotFound + +TOKENS_PREFIX = "token_repo:tokens:" +NEW_DEVICE_KEY_REDIS_KEY = "token_repo:new_device_key" +RECOVERY_KEY_REDIS_KEY = "token_repo:recovery_key" class RedisTokensRepository(AbstractTokensRepository): @@ -11,5 +23,125 @@ class RedisTokensRepository(AbstractTokensRepository): Token repository using Redis as a backend """ - def __init__(self) -> None: - raise NotImplementedError + def __init__(self): + self.connection = RedisPool().get_connection() + + @staticmethod + def token_key_for_device(device_name: str): + return TOKENS_PREFIX + str(hash(device_name)) + + def get_tokens(self) -> list[Token]: + """Get the tokens""" + r = self.connection + token_keys = r.keys(TOKENS_PREFIX + "*") + return [self._token_from_hash(key) for key in token_keys] + + def delete_token(self, input_token: Token) -> None: + """Delete the token""" + r = self.connection + key = RedisTokensRepository._token_redis_key(input_token) + if input_token not in self.get_tokens(): + raise TokenNotFound + r.delete(key) + + def reset(self): + for token in self.get_tokens(): + self.delete_token(token) + self.delete_new_device_key() + r = self.connection + r.delete(RECOVERY_KEY_REDIS_KEY) + + def get_recovery_key(self) -> Optional[RecoveryKey]: + """Get the recovery key""" + r = self.connection + if r.exists(RECOVERY_KEY_REDIS_KEY): + return self._recovery_key_from_hash(RECOVERY_KEY_REDIS_KEY) + return None + + def create_recovery_key( + self, + expiration: Optional[datetime], + uses_left: Optional[int], + ) -> RecoveryKey: + """Create the recovery key""" + recovery_key = RecoveryKey.generate(expiration=expiration, uses_left=uses_left) + self._store_model_as_hash(RECOVERY_KEY_REDIS_KEY, recovery_key) + return recovery_key + + def get_new_device_key(self) -> NewDeviceKey: + """Creates and returns the new device key""" + new_device_key = NewDeviceKey.generate() + self._store_model_as_hash(NEW_DEVICE_KEY_REDIS_KEY, new_device_key) + return new_device_key + + def delete_new_device_key(self) -> None: + """Delete the new device key""" + r = self.connection + r.delete(NEW_DEVICE_KEY_REDIS_KEY) + + @staticmethod + def _token_redis_key(token: Token) -> str: + return RedisTokensRepository.token_key_for_device(token.device_name) + + def _store_token(self, new_token: Token): + """Store a token directly""" + key = RedisTokensRepository._token_redis_key(new_token) + self._store_model_as_hash(key, new_token) + + def _decrement_recovery_token(self): + """Decrement recovery key use count by one""" + if self.is_recovery_key_valid(): + uses_left = self.get_recovery_key().uses_left + r = self.connection + r.hset(RECOVERY_KEY_REDIS_KEY, "uses_left", uses_left - 1) + + def _get_stored_new_device_key(self) -> Optional[NewDeviceKey]: + """Retrieves new device key that is already stored.""" + return self._new_device_key_from_hash(NEW_DEVICE_KEY_REDIS_KEY) + + @staticmethod + def _is_date_key(key: str): + return key in [ + "created_at", + "expires_at", + ] + + @staticmethod + def _prepare_model_dict(d: dict): + date_keys = [key for key in d.keys() if RedisTokensRepository._is_date_key(key)] + for date in date_keys: + if d[date] != "None": + d[date] = datetime.fromisoformat(d[date]) + for key in d.keys(): + if d[key] == "None": + d[key] = None + + def _model_dict_from_hash(self, redis_key: str) -> Optional[dict]: + r = self.connection + if r.exists(redis_key): + token_dict = r.hgetall(redis_key) + RedisTokensRepository._prepare_model_dict(token_dict) + return token_dict + return None + + def _hash_as_model(self, redis_key: str, model_class): + token_dict = self._model_dict_from_hash(redis_key) + if token_dict is not None: + return model_class(**token_dict) + return None + + def _token_from_hash(self, redis_key: str) -> Optional[Token]: + return self._hash_as_model(redis_key, Token) + + def _recovery_key_from_hash(self, redis_key: str) -> Optional[RecoveryKey]: + return self._hash_as_model(redis_key, RecoveryKey) + + def _new_device_key_from_hash(self, redis_key: str) -> Optional[NewDeviceKey]: + return self._hash_as_model(redis_key, NewDeviceKey) + + def _store_model_as_hash(self, redis_key, model): + r = self.connection + for key, value in model.dict().items(): + if isinstance(value, datetime): + value = value.isoformat() + r.hset(redis_key, key, str(value)) diff --git a/tests/test_graphql/test_repository/test_json_tokens_repository.py b/tests/test_graphql/test_repository/test_json_tokens_repository.py new file mode 100644 index 0000000..af8c844 --- /dev/null +++ b/tests/test_graphql/test_repository/test_json_tokens_repository.py @@ -0,0 +1,218 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=missing-function-docstring +""" +tests that restrict json token repository implementation +""" + +import pytest + + +from datetime import datetime + +from selfprivacy_api.models.tokens.token import Token +from selfprivacy_api.repositories.tokens.exceptions import ( + TokenNotFound, + RecoveryKeyNotFound, + NewDeviceKeyNotFound, +) +from selfprivacy_api.repositories.tokens.json_tokens_repository import ( + JsonTokensRepository, +) + +from tests.common import read_json +from test_tokens_repository import ( + mock_recovery_key_generate, + mock_generate_token, + mock_new_device_key_generate, + empty_keys, +) + +ORIGINAL_TOKEN_CONTENT = [ + { + "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", + "name": "primary_token", + "date": "2022-07-15 17:41:31.675698", + }, + { + "token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68", + "name": "second_token", + "date": "2022-07-15 17:41:31.675698Z", + }, + { + "token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8", + "name": "third_token", + "date": "2022-07-15T17:41:31.675698Z", + }, + { + "token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM", + "name": "forth_token", + "date": "2022-07-15T17:41:31.675698", + }, +] + + +@pytest.fixture +def tokens(mocker, datadir): + mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "tokens.json") + assert read_json(datadir / "tokens.json")["tokens"] == ORIGINAL_TOKEN_CONTENT + return datadir + + +@pytest.fixture +def null_keys(mocker, datadir): + mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "null_keys.json") + assert read_json(datadir / "null_keys.json")["recovery_token"] is None + assert read_json(datadir / "null_keys.json")["new_device"] is None + return datadir + + +def test_delete_token(tokens): + repo = JsonTokensRepository() + input_token = Token( + token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", + device_name="primary_token", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + ) + + repo.delete_token(input_token) + assert read_json(tokens / "tokens.json")["tokens"] == [ + { + "token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68", + "name": "second_token", + "date": "2022-07-15 17:41:31.675698Z", + }, + { + "token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8", + "name": "third_token", + "date": "2022-07-15T17:41:31.675698Z", + }, + { + "token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM", + "name": "forth_token", + "date": "2022-07-15T17:41:31.675698", + }, + ] + + +def test_delete_not_found_token(tokens): + repo = JsonTokensRepository() + input_token = Token( + token="imbadtoken", + device_name="primary_token", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + ) + with pytest.raises(TokenNotFound): + assert repo.delete_token(input_token) is None + + assert read_json(tokens / "tokens.json")["tokens"] == ORIGINAL_TOKEN_CONTENT + + +def test_create_recovery_key(tokens, mock_recovery_key_generate): + repo = JsonTokensRepository() + + assert repo.create_recovery_key(uses_left=1, expiration=None) is not None + assert read_json(tokens / "tokens.json")["recovery_token"] == { + "token": "889bf49c1d3199d71a2e704718772bd53a422020334db051", + "date": "2022-07-15T17:41:31.675698", + "expiration": None, + "uses_left": 1, + } + + +def test_use_mnemonic_recovery_key_when_null(null_keys): + repo = JsonTokensRepository() + + with pytest.raises(RecoveryKeyNotFound): + assert ( + repo.use_mnemonic_recovery_key( + mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb", + device_name="primary_token", + ) + is None + ) + + +def test_use_mnemonic_recovery_key(tokens, mock_generate_token): + repo = JsonTokensRepository() + + assert repo.use_mnemonic_recovery_key( + mnemonic_phrase="uniform clarify napkin bid dress search input armor police cross salon because myself uphold slice bamboo hungry park", + device_name="newdevice", + ) == Token( + token="ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4", + device_name="newdevice", + created_at=datetime(2022, 11, 14, 6, 6, 32, 777123), + ) + + assert read_json(tokens / "tokens.json")["tokens"] == [ + { + "date": "2022-07-15 17:41:31.675698", + "name": "primary_token", + "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", + }, + { + "token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68", + "name": "second_token", + "date": "2022-07-15 17:41:31.675698Z", + }, + { + "token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8", + "name": "third_token", + "date": "2022-07-15T17:41:31.675698Z", + }, + { + "token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM", + "name": "forth_token", + "date": "2022-07-15T17:41:31.675698", + }, + { + "date": "2022-11-14T06:06:32.777123", + "name": "newdevice", + "token": "ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4", + }, + ] + assert read_json(tokens / "tokens.json")["recovery_token"] == { + "date": "2022-11-11T11:48:54.228038", + "expiration": None, + "token": "ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54", + "uses_left": 1, + } + + +def test_get_new_device_key(tokens, mock_new_device_key_generate): + repo = JsonTokensRepository() + + assert repo.get_new_device_key() is not None + assert read_json(tokens / "tokens.json")["new_device"] == { + "date": "2022-07-15T17:41:31.675698", + "expiration": "2022-07-15T17:41:31.675698", + "token": "43478d05b35e4781598acd76e33832bb", + } + + +def test_delete_new_device_key(tokens): + repo = JsonTokensRepository() + + assert repo.delete_new_device_key() is None + assert "new_device" not in read_json(tokens / "tokens.json") + + +def test_delete_new_device_key_when_empty(empty_keys): + repo = JsonTokensRepository() + + repo.delete_new_device_key() + assert "new_device" not in read_json(empty_keys / "empty_keys.json") + + +def test_use_mnemonic_new_device_key_when_null(null_keys): + repo = JsonTokensRepository() + + with pytest.raises(NewDeviceKeyNotFound): + assert ( + repo.use_mnemonic_new_device_key( + device_name="imnew", + mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb", + ) + is None + ) diff --git a/tests/test_graphql/test_repository/test_json_tokens_repository/empty_keys.json b/tests/test_graphql/test_repository/test_json_tokens_repository/empty_keys.json new file mode 100644 index 0000000..2131ddf --- /dev/null +++ b/tests/test_graphql/test_repository/test_json_tokens_repository/empty_keys.json @@ -0,0 +1,9 @@ +{ + "tokens": [ + { + "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", + "name": "primary_token", + "date": "2022-07-15 17:41:31.675698" + } + ] +} diff --git a/tests/test_graphql/test_repository/test_tokens_repository/null_keys.json b/tests/test_graphql/test_repository/test_json_tokens_repository/null_keys.json similarity index 100% rename from tests/test_graphql/test_repository/test_tokens_repository/null_keys.json rename to tests/test_graphql/test_repository/test_json_tokens_repository/null_keys.json diff --git a/tests/test_graphql/test_repository/test_tokens_repository/tokens.json b/tests/test_graphql/test_repository/test_json_tokens_repository/tokens.json similarity index 100% rename from tests/test_graphql/test_repository/test_tokens_repository/tokens.json rename to tests/test_graphql/test_repository/test_json_tokens_repository/tokens.json diff --git a/tests/test_graphql/test_repository/test_tokens_repository.py b/tests/test_graphql/test_repository/test_tokens_repository.py index 878e242..43f7626 100644 --- a/tests/test_graphql/test_repository/test_tokens_repository.py +++ b/tests/test_graphql/test_repository/test_tokens_repository.py @@ -2,7 +2,7 @@ # pylint: disable=unused-argument # pylint: disable=missing-function-docstring -from datetime import datetime, timezone +from datetime import datetime import pytest @@ -18,40 +18,20 @@ from selfprivacy_api.repositories.tokens.exceptions import ( from selfprivacy_api.repositories.tokens.json_tokens_repository import ( JsonTokensRepository, ) +from selfprivacy_api.repositories.tokens.redis_tokens_repository import ( + RedisTokensRepository, +) from tests.common import read_json -ORIGINAL_TOKEN_CONTENT = [ - { - "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", - "name": "primary_token", - "date": "2022-07-15 17:41:31.675698", - }, - { - "token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68", - "name": "second_token", - "date": "2022-07-15 17:41:31.675698Z", - }, - { - "token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8", - "name": "third_token", - "date": "2022-07-15T17:41:31.675698Z", - }, - { - "token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM", - "name": "forth_token", - "date": "2022-07-15T17:41:31.675698", - }, +ORIGINAL_DEVICE_NAMES = [ + "primary_token", + "second_token", + "third_token", + "forth_token", ] -@pytest.fixture -def tokens(mocker, datadir): - mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "tokens.json") - assert read_json(datadir / "tokens.json")["tokens"] == ORIGINAL_TOKEN_CONTENT - return datadir - - @pytest.fixture def empty_keys(mocker, datadir): mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "empty_keys.json") @@ -65,23 +45,10 @@ def empty_keys(mocker, datadir): return datadir -@pytest.fixture -def null_keys(mocker, datadir): - mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "null_keys.json") - assert read_json(datadir / "null_keys.json")["recovery_token"] is None - assert read_json(datadir / "null_keys.json")["new_device"] is None - return datadir - - -class RecoveryKeyMockReturnNotValid: - def is_valid() -> bool: - return False - - @pytest.fixture def mock_new_device_key_generate(mocker): mock = mocker.patch( - "selfprivacy_api.repositories.tokens.json_tokens_repository.NewDeviceKey.generate", + "selfprivacy_api.models.tokens.new_device_key.NewDeviceKey.generate", autospec=True, return_value=NewDeviceKey( key="43478d05b35e4781598acd76e33832bb", @@ -92,10 +59,25 @@ def mock_new_device_key_generate(mocker): return mock +# mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb", +@pytest.fixture +def mock_new_device_key_generate_for_mnemonic(mocker): + mock = mocker.patch( + "selfprivacy_api.models.tokens.new_device_key.NewDeviceKey.generate", + autospec=True, + return_value=NewDeviceKey( + key="2237238de23dc71ab558e317bdb8ff8e", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + expires_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + ), + ) + return mock + + @pytest.fixture def mock_generate_token(mocker): mock = mocker.patch( - "selfprivacy_api.repositories.tokens.json_tokens_repository.Token.generate", + "selfprivacy_api.models.tokens.token.Token.generate", autospec=True, return_value=Token( token="ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4", @@ -107,11 +89,16 @@ def mock_generate_token(mocker): @pytest.fixture -def mock_get_recovery_key_return_not_valid(mocker): +def mock_recovery_key_generate_invalid(mocker): mock = mocker.patch( - "selfprivacy_api.repositories.tokens.json_tokens_repository.JsonTokensRepository.get_recovery_key", + "selfprivacy_api.models.tokens.recovery_key.RecoveryKey.generate", autospec=True, - return_value=RecoveryKeyMockReturnNotValid, + return_value=RecoveryKey( + key="889bf49c1d3199d71a2e704718772bd53a422020334db051", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + expires_at=None, + uses_left=0, + ), ) return mock @@ -119,7 +106,7 @@ def mock_get_recovery_key_return_not_valid(mocker): @pytest.fixture def mock_token_generate(mocker): mock = mocker.patch( - "selfprivacy_api.repositories.tokens.json_tokens_repository.Token.generate", + "selfprivacy_api.models.tokens.token.Token.generate", autospec=True, return_value=Token( token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM", @@ -133,7 +120,7 @@ def mock_token_generate(mocker): @pytest.fixture def mock_recovery_key_generate(mocker): mock = mocker.patch( - "selfprivacy_api.repositories.tokens.json_tokens_repository.RecoveryKey.generate", + "selfprivacy_api.models.tokens.recovery_key.RecoveryKey.generate", autospec=True, return_value=RecoveryKey( key="889bf49c1d3199d71a2e704718772bd53a422020334db051", @@ -145,127 +132,140 @@ def mock_recovery_key_generate(mocker): return mock +@pytest.fixture +def mock_recovery_key_generate_for_mnemonic(mocker): + mock = mocker.patch( + "selfprivacy_api.models.tokens.recovery_key.RecoveryKey.generate", + autospec=True, + return_value=RecoveryKey( + key="ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + expires_at=None, + uses_left=1, + ), + ) + return mock + + +@pytest.fixture +def empty_json_repo(empty_keys): + repo = JsonTokensRepository() + for token in repo.get_tokens(): + repo.delete_token(token) + assert repo.get_tokens() == [] + return repo + + +@pytest.fixture +def empty_redis_repo(): + repo = RedisTokensRepository() + repo.reset() + assert repo.get_tokens() == [] + return repo + + +@pytest.fixture(params=["json", "redis"]) +def empty_repo(request, empty_json_repo, empty_redis_repo): + if request.param == "json": + return empty_json_repo + if request.param == "redis": + return empty_redis_repo + # return empty_json_repo + else: + raise NotImplementedError + + +@pytest.fixture +def some_tokens_repo(empty_repo): + for name in ORIGINAL_DEVICE_NAMES: + empty_repo.create_token(name) + assert len(empty_repo.get_tokens()) == len(ORIGINAL_DEVICE_NAMES) + for name in ORIGINAL_DEVICE_NAMES: + assert empty_repo.get_token_by_name(name) is not None + assert empty_repo.get_new_device_key() is not None + return empty_repo + + ############### # Test tokens # ############### -def test_get_token_by_token_string(tokens): - repo = JsonTokensRepository() +def test_get_token_by_token_string(some_tokens_repo): + repo = some_tokens_repo + test_token = repo.get_tokens()[2] - assert repo.get_token_by_token_string( - token_string="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI" - ) == Token( - token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", - device_name="primary_token", - created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), - ) + assert repo.get_token_by_token_string(token_string=test_token.token) == test_token -def test_get_token_by_non_existent_token_string(tokens): - repo = JsonTokensRepository() +def test_get_token_by_non_existent_token_string(some_tokens_repo): + repo = some_tokens_repo with pytest.raises(TokenNotFound): assert repo.get_token_by_token_string(token_string="iamBadtoken") is None -def test_get_token_by_name(tokens): - repo = JsonTokensRepository() +def test_get_token_by_name(some_tokens_repo): + repo = some_tokens_repo - assert repo.get_token_by_name(token_name="primary_token") is not None - assert repo.get_token_by_name(token_name="primary_token") == Token( - token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", - device_name="primary_token", - created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), - ) + token = repo.get_token_by_name(token_name="primary_token") + assert token is not None + assert token.device_name == "primary_token" + assert token in repo.get_tokens() -def test_get_token_by_non_existent_name(tokens): - repo = JsonTokensRepository() +def test_get_token_by_non_existent_name(some_tokens_repo): + repo = some_tokens_repo with pytest.raises(TokenNotFound): assert repo.get_token_by_name(token_name="badname") is None -def test_get_tokens(tokens): - repo = JsonTokensRepository() - - assert repo.get_tokens() == [ - Token( - token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", - device_name="primary_token", - created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), - ), - Token( - token="3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68", - device_name="second_token", - created_at=datetime(2022, 7, 15, 17, 41, 31, 675698, tzinfo=timezone.utc), - ), - Token( - token="LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8", - device_name="third_token", - created_at=datetime(2022, 7, 15, 17, 41, 31, 675698, tzinfo=timezone.utc), - ), - Token( - token="dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM", - device_name="forth_token", - created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), - ), - ] +def test_get_tokens(some_tokens_repo): + repo = some_tokens_repo + tokenstrings = [] + # we cannot insert tokens directly via api, so we check meta-properties instead + for token in repo.get_tokens(): + len(token.token) == 43 # assuming secrets.token_urlsafe + assert token.token not in tokenstrings + tokenstrings.append(token.token) + assert token.created_at.day == datetime.today().day -def test_get_tokens_when_one(empty_keys): - repo = JsonTokensRepository() - - assert repo.get_tokens() == [ - Token( - token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", - device_name="primary_token", - created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), - ) - ] - - -def test_create_token(tokens, mock_token_generate): - repo = JsonTokensRepository() +def test_create_token(empty_repo, mock_token_generate): + repo = empty_repo assert repo.create_token(device_name="IamNewDevice") == Token( token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM", device_name="IamNewDevice", created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), ) - - -def test_delete_token(tokens): - repo = JsonTokensRepository() - input_token = Token( - token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", - device_name="primary_token", - created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), - ) - - repo.delete_token(input_token) - assert read_json(tokens / "tokens.json")["tokens"] == [ - { - "token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68", - "name": "second_token", - "date": "2022-07-15 17:41:31.675698Z", - }, - { - "token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8", - "name": "third_token", - "date": "2022-07-15T17:41:31.675698Z", - }, - { - "token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM", - "name": "forth_token", - "date": "2022-07-15T17:41:31.675698", - }, + assert repo.get_tokens() == [ + Token( + token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM", + device_name="IamNewDevice", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + ) ] -def test_delete_not_found_token(tokens): - repo = JsonTokensRepository() +def test_delete_token(some_tokens_repo): + repo = some_tokens_repo + original_tokens = repo.get_tokens() + input_token = original_tokens[1] + + repo.delete_token(input_token) + + tokens_after_delete = repo.get_tokens() + for token in original_tokens: + if token != input_token: + assert token in tokens_after_delete + assert len(original_tokens) == len(tokens_after_delete) + 1 + + +def test_delete_not_found_token(some_tokens_repo): + repo = some_tokens_repo + initial_tokens = repo.get_tokens() input_token = Token( token="imbadtoken", device_name="primary_token", @@ -274,16 +274,15 @@ def test_delete_not_found_token(tokens): with pytest.raises(TokenNotFound): assert repo.delete_token(input_token) is None - assert read_json(tokens / "tokens.json")["tokens"] == ORIGINAL_TOKEN_CONTENT + new_tokens = repo.get_tokens() + assert len(new_tokens) == len(initial_tokens) + for token in initial_tokens: + assert token in new_tokens -def test_refresh_token(tokens, mock_token_generate): - repo = JsonTokensRepository() - input_token = Token( - token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", - device_name="primary_token", - created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), - ) +def test_refresh_token(some_tokens_repo, mock_token_generate): + repo = some_tokens_repo + input_token = some_tokens_repo.get_tokens()[0] assert repo.refresh_token(input_token) == Token( token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM", @@ -292,8 +291,8 @@ def test_refresh_token(tokens, mock_token_generate): ) -def test_refresh_not_found_token(tokens, mock_token_generate): - repo = JsonTokensRepository() +def test_refresh_not_found_token(some_tokens_repo, mock_token_generate): + repo = some_tokens_repo input_token = Token( token="idontknowwhoiam", device_name="tellmewhoiam?", @@ -309,39 +308,26 @@ def test_refresh_not_found_token(tokens, mock_token_generate): ################ -def test_get_recovery_key(tokens): - repo = JsonTokensRepository() - - assert repo.get_recovery_key() == RecoveryKey( - key="ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54", - created_at=datetime(2022, 11, 11, 11, 48, 54, 228038), - expires_at=None, - uses_left=2, - ) - - -def test_get_recovery_key_when_empty(empty_keys): - repo = JsonTokensRepository() +def test_get_recovery_key_when_empty(empty_repo): + repo = empty_repo assert repo.get_recovery_key() is None -def test_create_recovery_key(tokens, mock_recovery_key_generate): - repo = JsonTokensRepository() +def test_create_get_recovery_key(some_tokens_repo, mock_recovery_key_generate): + repo = some_tokens_repo assert repo.create_recovery_key(uses_left=1, expiration=None) is not None - assert read_json(tokens / "tokens.json")["recovery_token"] == { - "token": "889bf49c1d3199d71a2e704718772bd53a422020334db051", - "date": "2022-07-15T17:41:31.675698", - "expiration": None, - "uses_left": 1, - } + assert repo.get_recovery_key() == RecoveryKey( + key="889bf49c1d3199d71a2e704718772bd53a422020334db051", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + expires_at=None, + uses_left=1, + ) -def test_use_mnemonic_recovery_key_when_empty( - empty_keys, mock_recovery_key_generate, mock_token_generate -): - repo = JsonTokensRepository() +def test_use_mnemonic_recovery_key_when_empty(empty_repo): + repo = empty_repo with pytest.raises(RecoveryKeyNotFound): assert ( @@ -354,9 +340,10 @@ def test_use_mnemonic_recovery_key_when_empty( def test_use_mnemonic_not_valid_recovery_key( - tokens, mock_get_recovery_key_return_not_valid + some_tokens_repo, mock_recovery_key_generate_invalid ): - repo = JsonTokensRepository() + repo = some_tokens_repo + assert repo.create_recovery_key(uses_left=0, expiration=None) is not None with pytest.raises(RecoveryKeyNotFound): assert ( @@ -368,8 +355,9 @@ def test_use_mnemonic_not_valid_recovery_key( ) -def test_use_mnemonic_not_mnemonic_recovery_key(tokens): - repo = JsonTokensRepository() +def test_use_mnemonic_not_mnemonic_recovery_key(some_tokens_repo): + repo = some_tokens_repo + assert repo.create_recovery_key(uses_left=1, expiration=None) is not None with pytest.raises(InvalidMnemonic): assert ( @@ -381,8 +369,9 @@ def test_use_mnemonic_not_mnemonic_recovery_key(tokens): ) -def test_use_not_mnemonic_recovery_key(tokens): - repo = JsonTokensRepository() +def test_use_not_mnemonic_recovery_key(some_tokens_repo): + repo = some_tokens_repo + assert repo.create_recovery_key(uses_left=1, expiration=None) is not None with pytest.raises(InvalidMnemonic): assert ( @@ -394,8 +383,9 @@ def test_use_not_mnemonic_recovery_key(tokens): ) -def test_use_not_found_mnemonic_recovery_key(tokens): - repo = JsonTokensRepository() +def test_use_not_found_mnemonic_recovery_key(some_tokens_repo): + repo = some_tokens_repo + assert repo.create_recovery_key(uses_left=1, expiration=None) is not None with pytest.raises(RecoveryKeyNotFound): assert ( @@ -407,8 +397,8 @@ def test_use_not_found_mnemonic_recovery_key(tokens): ) -def test_use_menemonic_recovery_key_when_empty(empty_keys): - repo = JsonTokensRepository() +def test_use_mnemonic_recovery_key_when_empty(empty_repo): + repo = empty_repo with pytest.raises(RecoveryKeyNotFound): assert ( @@ -420,65 +410,34 @@ def test_use_menemonic_recovery_key_when_empty(empty_keys): ) -def test_use_menemonic_recovery_key_when_null(null_keys): - repo = JsonTokensRepository() +# agnostic test mixed with an implementation test +def test_use_mnemonic_recovery_key( + some_tokens_repo, mock_recovery_key_generate_for_mnemonic, mock_generate_token +): + repo = some_tokens_repo + assert repo.create_recovery_key(uses_left=1, expiration=None) is not None - with pytest.raises(RecoveryKeyNotFound): - assert ( - repo.use_mnemonic_recovery_key( - mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb", - device_name="primary_token", - ) - is None - ) - - -def test_use_mnemonic_recovery_key(tokens, mock_generate_token): - repo = JsonTokensRepository() - - assert repo.use_mnemonic_recovery_key( - mnemonic_phrase="uniform clarify napkin bid dress search input armor police cross salon because myself uphold slice bamboo hungry park", - device_name="newdevice", - ) == Token( + test_token = Token( token="ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4", device_name="newdevice", created_at=datetime(2022, 11, 14, 6, 6, 32, 777123), ) - assert read_json(tokens / "tokens.json")["tokens"] == [ - { - "date": "2022-07-15 17:41:31.675698", - "name": "primary_token", - "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", - }, - { - "token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68", - "name": "second_token", - "date": "2022-07-15 17:41:31.675698Z", - }, - { - "token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8", - "name": "third_token", - "date": "2022-07-15T17:41:31.675698Z", - }, - { - "token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM", - "name": "forth_token", - "date": "2022-07-15T17:41:31.675698", - }, - { - "date": "2022-11-14T06:06:32.777123", - "name": "newdevice", - "token": "ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4", - }, - ] + assert ( + repo.use_mnemonic_recovery_key( + mnemonic_phrase="uniform clarify napkin bid dress search input armor police cross salon because myself uphold slice bamboo hungry park", + device_name="newdevice", + ) + == test_token + ) - assert read_json(tokens / "tokens.json")["recovery_token"] == { - "date": "2022-11-11T11:48:54.228038", - "expiration": None, - "token": "ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54", - "uses_left": 1, - } + assert test_token in repo.get_tokens() + assert repo.get_recovery_key() == RecoveryKey( + key="ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + expires_at=None, + uses_left=0, + ) ################## @@ -486,35 +445,31 @@ def test_use_mnemonic_recovery_key(tokens, mock_generate_token): ################## -def test_get_new_device_key(tokens, mock_new_device_key_generate): - repo = JsonTokensRepository() +def test_get_new_device_key(some_tokens_repo, mock_new_device_key_generate): + repo = some_tokens_repo - assert repo.get_new_device_key() is not None - assert read_json(tokens / "tokens.json")["new_device"] == { - "date": "2022-07-15T17:41:31.675698", - "expiration": "2022-07-15T17:41:31.675698", - "token": "43478d05b35e4781598acd76e33832bb", - } + assert repo.get_new_device_key() == NewDeviceKey( + key="43478d05b35e4781598acd76e33832bb", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + expires_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + ) -def test_delete_new_device_key(tokens): - repo = JsonTokensRepository() +def test_delete_new_device_key(some_tokens_repo): + repo = some_tokens_repo assert repo.delete_new_device_key() is None - assert "new_device" not in read_json(tokens / "tokens.json") + # we cannot say if there is ot not without creating it? -def test_delete_new_device_key_when_empty(empty_keys): - repo = JsonTokensRepository() +def test_delete_new_device_key_when_empty(empty_repo): + repo = empty_repo - repo.delete_new_device_key() - assert "new_device" not in read_json(empty_keys / "empty_keys.json") + assert repo.delete_new_device_key() is None -def test_use_invalid_mnemonic_new_device_key( - tokens, mock_new_device_key_generate, datadir, mock_token_generate -): - repo = JsonTokensRepository() +def test_use_invalid_mnemonic_new_device_key(some_tokens_repo): + repo = some_tokens_repo with pytest.raises(InvalidMnemonic): assert ( @@ -527,9 +482,10 @@ def test_use_invalid_mnemonic_new_device_key( def test_use_not_exists_mnemonic_new_device_key( - tokens, mock_new_device_key_generate, mock_token_generate + empty_repo, mock_new_device_key_generate ): - repo = JsonTokensRepository() + repo = empty_repo + assert repo.get_new_device_key() is not None with pytest.raises(NewDeviceKeyNotFound): assert ( @@ -542,23 +498,20 @@ def test_use_not_exists_mnemonic_new_device_key( def test_use_mnemonic_new_device_key( - tokens, mock_new_device_key_generate, mock_token_generate + empty_repo, mock_new_device_key_generate_for_mnemonic ): - repo = JsonTokensRepository() + repo = empty_repo + assert repo.get_new_device_key() is not None - assert ( - repo.use_mnemonic_new_device_key( - device_name="imnew", - mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb", - ) - is not None + new_token = repo.use_mnemonic_new_device_key( + device_name="imnew", + mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb", ) - # assert read_json(datadir / "tokens.json")["new_device"] == [] + assert new_token.device_name == "imnew" + assert new_token in repo.get_tokens() -def test_use_mnemonic_new_device_key_when_empty(empty_keys): - repo = JsonTokensRepository() - + # we must delete the key after use with pytest.raises(NewDeviceKeyNotFound): assert ( repo.use_mnemonic_new_device_key( @@ -569,8 +522,8 @@ def test_use_mnemonic_new_device_key_when_empty(empty_keys): ) -def test_use_mnemonic_new_device_key_when_null(null_keys): - repo = JsonTokensRepository() +def test_use_mnemonic_new_device_key_when_empty(empty_repo): + repo = empty_repo with pytest.raises(NewDeviceKeyNotFound): assert (