Compare commits

...

57 Commits

Author SHA1 Message Date
houkime 66480c9904 Merge pull request 'Implement redis token repository' (#26) from redis/token-repo into redis/connection-pool
Reviewed-on: #26
2022-12-21 19:39:14 +02:00
Houkime 5a25e2a270 feat(tokens-repo): getting stored device key 2022-12-16 13:05:06 +00:00
Houkime 0ae7c43ebf refactor(tokens-repo): break out generic hash_as_model casting 2022-12-16 13:05:00 +00:00
Houkime 6f6a9f5ef0 test(tokens-repo): do not require order in test_delete_not_found_token 2022-12-16 13:04:56 +00:00
Houkime fda5d315a9 fix(tokens-repo): return device key instead of NewDeviceKey class 2022-12-16 13:04:51 +00:00
Houkime 13e84e2697 feat(tokens-repo): recovery key uses decrement 2022-12-16 13:04:47 +00:00
Houkime eba1d01b3d feat(tokens-repo): recovery key creation 2022-12-16 13:04:43 +00:00
Houkime 8dfb3eb936 feat(tokens-repo): fuller reset 2022-12-16 13:04:39 +00:00
Houkime 4579fec569 feat(tokens-repo): get recovery key 2022-12-16 13:04:35 +00:00
Houkime 257096084f refactor(tokens-repo): split out date field detection 2022-12-16 13:04:26 +00:00
Houkime bf6c230ae0 fix(tokens-repo): raise token not found when deleting nonexistent token
even if device name exists
2022-12-16 13:04:26 +00:00
Houkime 95e200bfc5 feat(tokens-repo): reset function 2022-12-16 13:04:19 +00:00
Houkime 9ffd67fa19 feat(tokens-repo): get new device key 2022-12-16 13:04:14 +00:00
Houkime b98ccb88d1 refactor(tokens-repo): separate getting model dict 2022-12-16 13:04:04 +00:00
Houkime 3cb7f29593 refactor(tokens-repo): detach preparing a dict before a model cast 2022-12-16 13:03:57 +00:00
Houkime e504585437 test(tokens-repo): do not require order 2022-12-16 13:03:53 +00:00
Houkime 647e02f25b feat(tokens-repo): redis delete token 2022-12-16 13:03:46 +00:00
Houkime ba6a5261fa refactor(tokens-repo): redis token key func 2022-12-16 13:03:41 +00:00
Houkime d8e3cd67e0 feat(tokens-repo): redis store token 2022-12-16 13:03:36 +00:00
Houkime 256c16fa9f feat(tokens-repo): redis get tokens 2022-12-16 13:03:27 +00:00
Houkime f2fa47466b feat(tokens-repo):empty implementation of redis token repo
But it initializes and fails tests!
2022-12-16 13:03:27 +00:00
Houkime ca822cdf6f refactor(tokens-repository): move use_mnemonic_new_device_key() to abstract class 2022-12-16 13:03:27 +00:00
Houkime 2797c6f88f fix(tokens-repository): use_mnemonic_new_device_key() now stores a token 2022-12-16 13:03:27 +00:00
Houkime 4498003aca refactor(tokens-repository): dissect use_mnemonic_new_device_key() 2022-12-16 13:03:27 +00:00
Houkime 772c0dfc64 refactor(tokens-repository): move use_mnemonic_recovery_key() to abstract class 2022-12-16 13:03:27 +00:00
Houkime 671203e990 refactor(tokens-repository): dissect use_mnemonic_recovery_key() 2022-12-16 13:03:26 +00:00
Houkime 9a49067e53 refactor(tokens-repo): move token refreshing to parent class 2022-12-16 13:03:12 +00:00
Houkime 682cd4ae87 refactor(tokens-repo): move create_token to abstract class 2022-12-16 13:02:55 +00:00
Houkime 572ec75c39 refactor(tokens-repo): rewrite token refresh
now it is not json-dependent.
2022-12-16 13:02:43 +00:00
Houkime 27a7c24bc3 refactor(tokens-repo): separate token storing 2022-12-16 13:02:21 +00:00
Houkime 4e60d1d37a refactor(tokens-repo): move token getters to abstract class
Not performance-optimal, but not in critical path either.
100 tokens max irl?
2022-12-16 13:02:05 +00:00
Houkime ff264ec808 refactor(tokens-repo): simplify getting tokens
get_token_by_token_string and get_token_by_name are no longer tied to
json.
2022-12-16 13:01:28 +00:00
Houkime b856a2aad3 test(tokens-repo): re-add delete token test 2022-12-16 13:01:19 +00:00
Houkime 0d748d7ab1 test(tokens-repo): move original token content to json tests 2022-12-16 13:01:09 +00:00
Houkime c12dca9d9b refactor(tokens-repo): delete unused timezone import 2022-12-16 13:00:44 +00:00
Houkime 4492bbe995 test(tokens-repo): move null keys and tokens fixtures to json tests
and remove corresponding json files from the folder
2022-12-16 13:00:31 +00:00
Houkime 84bfa333fa test(tokens-repo): move new device key null test to json tests 2022-12-16 13:00:16 +00:00
Houkime be13d6163e test(tokens-repo): use a mnemonic device key on an empty repo 2022-12-16 13:00:07 +00:00
Houkime ce411e9291 test(tokens-repo): using a mnemonic device key 2022-12-16 12:59:57 +00:00
Houkime cf7b7eb8a7 test(tokens-repo): notfound mnemonic new device key 2022-12-16 12:59:49 +00:00
Houkime 3feebd5290 test(tokens-repo): invalid mnemonic new device key 2022-12-16 12:59:38 +00:00
Houkime 73584872f0 test(tokens-repo): agnosticise simple new device key tests
the state of json file is tested separately in test_json_tokens_repository.py
2022-12-16 12:59:27 +00:00
Houkime dc778b545e test(tokens-repo): get new device key 2022-12-16 12:59:16 +00:00
Houkime f96d8b7d7c test(tokens-repo): make another mock token generator agnostic 2022-12-16 12:58:50 +00:00
Houkime dd525fe723 test(tokens-repo): agnostic use recovery token
converted json-reading asserts to backend-agnostic asserts
2022-12-16 12:58:27 +00:00
Houkime b9c570720b test(tokens-repo): move null recovery token test to json tests
Because the null state seems to be specific to json and not reproducible
in abstract case.
2022-12-16 12:57:14 +00:00
Houkime 732e72d414 test(tokens-repo): mnemonic non-null invalid 2022-12-16 12:56:50 +00:00
Houkime 6f400911fc test(tokens-repo): agnostic recovery keys testing 2022-12-16 12:56:25 +00:00
Houkime c86eb8b786 test(tokens-repo): agnostic refresh token nonexistent 2022-12-16 12:56:04 +00:00
Houkime fa54220327 test(tokens-repo): agnostic refresh token 2022-12-16 12:55:46 +00:00
Houkime b43c4014e2 test(tokens-repo): agnostic delete not found 2022-12-16 12:55:28 +00:00
Houkime db55685488 test(tokens-repo): use 'repo' for consistency 2022-12-16 12:55:07 +00:00
Houkime 3921d9fe4c test(tokens-repo): agnostic token creation test 2022-12-16 12:54:46 +00:00
Houkime 2e2d344f43 test(tokens-repo): get_tokens metaproperties test 2022-12-16 12:54:25 +00:00
Houkime 55ad2484b8 test(tokens-repo): agnostic test for getting by name 2022-12-16 12:53:33 +00:00
Houkime 8a05a55b80 test(tokens-repo): parameterized fixture 2022-12-16 12:51:56 +00:00
Houkime 4cfe0515ea test(tokens-repo): split between abstract api and backend-specific tests 2022-12-16 12:50:08 +00:00
8 changed files with 674 additions and 394 deletions

View File

@ -1,36 +1,64 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from datetime import datetime from datetime import datetime
from typing import Optional from typing import Optional
from mnemonic import Mnemonic
from selfprivacy_api.models.tokens.token import Token from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.repositories.tokens.exceptions import (
TokenNotFound,
InvalidMnemonic,
RecoveryKeyNotFound,
NewDeviceKeyNotFound,
)
from selfprivacy_api.models.tokens.recovery_key import RecoveryKey from selfprivacy_api.models.tokens.recovery_key import RecoveryKey
from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey
class AbstractTokensRepository(ABC): class AbstractTokensRepository(ABC):
@abstractmethod
def get_token_by_token_string(self, token_string: str) -> Optional[Token]: def get_token_by_token_string(self, token_string: str) -> Optional[Token]:
"""Get the token by token""" """Get the token by token"""
tokens = self.get_tokens()
for token in tokens:
if token.token == token_string:
return token
raise TokenNotFound("Token not found!")
@abstractmethod
def get_token_by_name(self, token_name: str) -> Optional[Token]: def get_token_by_name(self, token_name: str) -> Optional[Token]:
"""Get the token by name""" """Get the token by name"""
tokens = self.get_tokens()
for token in tokens:
if token.device_name == token_name:
return token
raise TokenNotFound("Token not found!")
@abstractmethod @abstractmethod
def get_tokens(self) -> list[Token]: def get_tokens(self) -> list[Token]:
"""Get the tokens""" """Get the tokens"""
@abstractmethod
def create_token(self, device_name: str) -> Token: def create_token(self, device_name: str) -> Token:
"""Create new token""" """Create new token"""
new_token = Token.generate(device_name)
self._store_token(new_token)
return new_token
@abstractmethod @abstractmethod
def delete_token(self, input_token: Token) -> None: def delete_token(self, input_token: Token) -> None:
"""Delete the token""" """Delete the token"""
@abstractmethod
def refresh_token(self, input_token: Token) -> Token: def refresh_token(self, input_token: Token) -> Token:
"""Refresh the token""" """Change the token field of the existing token"""
new_token = Token.generate(device_name=input_token.device_name)
if input_token in self.get_tokens():
self.delete_token(input_token)
self._store_token(new_token)
return new_token
raise TokenNotFound("Token not found!")
def is_token_valid(self, token_string: str) -> bool: def is_token_valid(self, token_string: str) -> bool:
"""Check if the token is valid""" """Check if the token is valid"""
@ -65,11 +93,22 @@ class AbstractTokensRepository(ABC):
) -> RecoveryKey: ) -> RecoveryKey:
"""Create the recovery key""" """Create the recovery key"""
@abstractmethod
def use_mnemonic_recovery_key( def use_mnemonic_recovery_key(
self, mnemonic_phrase: str, device_name: str self, mnemonic_phrase: str, device_name: str
) -> Token: ) -> Token:
"""Use the mnemonic recovery key and create a new token with the given name""" """Use the mnemonic recovery key and create a new token with the given name"""
if not self.is_recovery_key_valid():
raise RecoveryKeyNotFound("Recovery key not found")
recovery_hex_key = self.get_recovery_key().key
if not self._assert_mnemonic(recovery_hex_key, mnemonic_phrase):
raise RecoveryKeyNotFound("Recovery key not found")
new_token = self.create_token(device_name=device_name)
self._decrement_recovery_token()
return new_token
def is_recovery_key_valid(self) -> bool: def is_recovery_key_valid(self) -> bool:
"""Check if the recovery key is valid""" """Check if the recovery key is valid"""
@ -86,8 +125,41 @@ class AbstractTokensRepository(ABC):
def delete_new_device_key(self) -> None: def delete_new_device_key(self) -> None:
"""Delete the new device key""" """Delete the new device key"""
@abstractmethod
def use_mnemonic_new_device_key( def use_mnemonic_new_device_key(
self, mnemonic_phrase: str, device_name: str self, mnemonic_phrase: str, device_name: str
) -> Token: ) -> Token:
"""Use the mnemonic new device key""" """Use the mnemonic new device key"""
new_device_key = self._get_stored_new_device_key()
if not new_device_key:
raise NewDeviceKeyNotFound
if not self._assert_mnemonic(new_device_key.key, mnemonic_phrase):
raise NewDeviceKeyNotFound("Phrase is not token!")
new_token = self.create_token(device_name=device_name)
self.delete_new_device_key()
return new_token
@abstractmethod
def _store_token(self, new_token: Token):
"""Store a token directly"""
@abstractmethod
def _decrement_recovery_token(self):
"""Decrement recovery key use count by one"""
@abstractmethod
def _get_stored_new_device_key(self) -> Optional[NewDeviceKey]:
"""Retrieves new device key that is already stored."""
# TODO: find a proper place for it
def _assert_mnemonic(self, hex_key: str, mnemonic_phrase: str):
"""Return true if hex string matches the phrase, false otherwise
Raise an InvalidMnemonic error if not mnemonic"""
recovery_token = bytes.fromhex(hex_key)
if not Mnemonic(language="english").check(mnemonic_phrase):
raise InvalidMnemonic("Phrase is not mnemonic!")
phrase_bytes = Mnemonic(language="english").to_entropy(mnemonic_phrase)
return phrase_bytes == recovery_token

View File

@ -3,7 +3,6 @@ temporary legacy
""" """
from typing import Optional from typing import Optional
from datetime import datetime from datetime import datetime
from mnemonic import Mnemonic
from selfprivacy_api.utils import UserDataFiles, WriteUserData, ReadUserData from selfprivacy_api.utils import UserDataFiles, WriteUserData, ReadUserData
from selfprivacy_api.models.tokens.token import Token from selfprivacy_api.models.tokens.token import Token
@ -11,9 +10,6 @@ from selfprivacy_api.models.tokens.recovery_key import RecoveryKey
from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey
from selfprivacy_api.repositories.tokens.exceptions import ( from selfprivacy_api.repositories.tokens.exceptions import (
TokenNotFound, TokenNotFound,
RecoveryKeyNotFound,
InvalidMnemonic,
NewDeviceKeyNotFound,
) )
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import ( from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository, AbstractTokensRepository,
@ -23,34 +19,6 @@ DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
class JsonTokensRepository(AbstractTokensRepository): class JsonTokensRepository(AbstractTokensRepository):
def get_token_by_token_string(self, token_string: str) -> Optional[Token]:
"""Get the token by token"""
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
for userdata_token in tokens_file["tokens"]:
if userdata_token["token"] == token_string:
return Token(
token=token_string,
device_name=userdata_token["name"],
created_at=userdata_token["date"],
)
raise TokenNotFound("Token not found!")
def get_token_by_name(self, token_name: str) -> Optional[Token]:
"""Get the token by name"""
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
for userdata_token in tokens_file["tokens"]:
if userdata_token["name"] == token_name:
return Token(
token=userdata_token["token"],
device_name=token_name,
created_at=userdata_token["date"],
)
raise TokenNotFound("Token not found!")
def get_tokens(self) -> list[Token]: def get_tokens(self) -> list[Token]:
"""Get the tokens""" """Get the tokens"""
tokens_list = [] tokens_list = []
@ -67,10 +35,8 @@ class JsonTokensRepository(AbstractTokensRepository):
return tokens_list return tokens_list
def create_token(self, device_name: str) -> Token: def _store_token(self, new_token: Token):
"""Create new token""" """Store a token directly"""
new_token = Token.generate(device_name)
with WriteUserData(UserDataFiles.TOKENS) as tokens_file: with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
tokens_file["tokens"].append( tokens_file["tokens"].append(
{ {
@ -79,7 +45,6 @@ class JsonTokensRepository(AbstractTokensRepository):
"date": new_token.created_at.strftime(DATETIME_FORMAT), "date": new_token.created_at.strftime(DATETIME_FORMAT),
} }
) )
return new_token
def delete_token(self, input_token: Token) -> None: def delete_token(self, input_token: Token) -> None:
"""Delete the token""" """Delete the token"""
@ -91,23 +56,6 @@ class JsonTokensRepository(AbstractTokensRepository):
raise TokenNotFound("Token not found!") raise TokenNotFound("Token not found!")
def refresh_token(self, input_token: Token) -> Token:
"""Change the token field of the existing token"""
new_token = Token.generate(device_name=input_token.device_name)
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
for userdata_token in tokens_file["tokens"]:
if userdata_token["name"] == input_token.device_name:
userdata_token["token"] = new_token.token
userdata_token["date"] = (
new_token.created_at.strftime(DATETIME_FORMAT),
)
return new_token
raise TokenNotFound("Token not found!")
def get_recovery_key(self) -> Optional[RecoveryKey]: def get_recovery_key(self) -> Optional[RecoveryKey]:
"""Get the recovery key""" """Get the recovery key"""
with ReadUserData(UserDataFiles.TOKENS) as tokens_file: with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
@ -146,45 +94,11 @@ class JsonTokensRepository(AbstractTokensRepository):
return recovery_key return recovery_key
def use_mnemonic_recovery_key( def _decrement_recovery_token(self):
self, mnemonic_phrase: str, device_name: str """Decrement recovery key use count by one"""
) -> Token: if self.is_recovery_key_valid():
"""Use the mnemonic recovery key and create a new token with the given name"""
recovery_key = self.get_recovery_key()
if recovery_key is None:
raise RecoveryKeyNotFound("Recovery key not found")
if not recovery_key.is_valid():
raise RecoveryKeyNotFound("Recovery key not found")
recovery_token = bytes.fromhex(recovery_key.key)
if not Mnemonic(language="english").check(mnemonic_phrase):
raise InvalidMnemonic("Phrase is not mnemonic!")
phrase_bytes = Mnemonic(language="english").to_entropy(mnemonic_phrase)
if phrase_bytes != recovery_token:
raise RecoveryKeyNotFound("Recovery key not found")
new_token = Token.generate(device_name=device_name)
with WriteUserData(UserDataFiles.TOKENS) as tokens: with WriteUserData(UserDataFiles.TOKENS) as tokens:
tokens["tokens"].append(
{
"token": new_token.token,
"name": new_token.device_name,
"date": new_token.created_at.strftime(DATETIME_FORMAT),
}
)
if "recovery_token" in tokens:
if (
"uses_left" in tokens["recovery_token"]
and tokens["recovery_token"]["uses_left"] is not None
):
tokens["recovery_token"]["uses_left"] -= 1 tokens["recovery_token"]["uses_left"] -= 1
return new_token
def get_new_device_key(self) -> NewDeviceKey: def get_new_device_key(self) -> NewDeviceKey:
"""Creates and returns the new device key""" """Creates and returns the new device key"""
@ -206,33 +120,15 @@ class JsonTokensRepository(AbstractTokensRepository):
del tokens_file["new_device"] del tokens_file["new_device"]
return return
def use_mnemonic_new_device_key( def _get_stored_new_device_key(self) -> Optional[NewDeviceKey]:
self, mnemonic_phrase: str, device_name: str """Retrieves new device key that is already stored."""
) -> Token:
"""Use the mnemonic new device key"""
with ReadUserData(UserDataFiles.TOKENS) as tokens_file: with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
if "new_device" not in tokens_file or tokens_file["new_device"] is None: if "new_device" not in tokens_file or tokens_file["new_device"] is None:
raise NewDeviceKeyNotFound("New device key not found") return
new_device_key = NewDeviceKey( new_device_key = NewDeviceKey(
key=tokens_file["new_device"]["token"], key=tokens_file["new_device"]["token"],
created_at=tokens_file["new_device"]["date"], created_at=tokens_file["new_device"]["date"],
expires_at=tokens_file["new_device"]["expiration"], expires_at=tokens_file["new_device"]["expiration"],
) )
return new_device_key
token = bytes.fromhex(new_device_key.key)
if not Mnemonic(language="english").check(mnemonic_phrase):
raise InvalidMnemonic("Phrase is not mnemonic!")
phrase_bytes = Mnemonic(language="english").to_entropy(mnemonic_phrase)
if bytes(phrase_bytes) != bytes(token):
raise NewDeviceKeyNotFound("Phrase is not token!")
new_token = Token.generate(device_name=device_name)
with WriteUserData(UserDataFiles.TOKENS) as tokens:
if "new_device" in tokens:
del tokens["new_device"]
return new_token

View File

@ -1,9 +1,21 @@
""" """
Token repository using Redis as backend. Token repository using Redis as backend.
""" """
from typing import Optional
from datetime import datetime
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import ( from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository, AbstractTokensRepository,
) )
from selfprivacy_api.utils.redis_pool import RedisPool
from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.models.tokens.recovery_key import RecoveryKey
from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey
from selfprivacy_api.repositories.tokens.exceptions import TokenNotFound
TOKENS_PREFIX = "token_repo:tokens:"
NEW_DEVICE_KEY_REDIS_KEY = "token_repo:new_device_key"
RECOVERY_KEY_REDIS_KEY = "token_repo:recovery_key"
class RedisTokensRepository(AbstractTokensRepository): class RedisTokensRepository(AbstractTokensRepository):
@ -11,5 +23,125 @@ class RedisTokensRepository(AbstractTokensRepository):
Token repository using Redis as a backend Token repository using Redis as a backend
""" """
def __init__(self) -> None: def __init__(self):
raise NotImplementedError self.connection = RedisPool().get_connection()
@staticmethod
def token_key_for_device(device_name: str):
return TOKENS_PREFIX + str(hash(device_name))
def get_tokens(self) -> list[Token]:
"""Get the tokens"""
r = self.connection
token_keys = r.keys(TOKENS_PREFIX + "*")
return [self._token_from_hash(key) for key in token_keys]
def delete_token(self, input_token: Token) -> None:
"""Delete the token"""
r = self.connection
key = RedisTokensRepository._token_redis_key(input_token)
if input_token not in self.get_tokens():
raise TokenNotFound
r.delete(key)
def reset(self):
for token in self.get_tokens():
self.delete_token(token)
self.delete_new_device_key()
r = self.connection
r.delete(RECOVERY_KEY_REDIS_KEY)
def get_recovery_key(self) -> Optional[RecoveryKey]:
"""Get the recovery key"""
r = self.connection
if r.exists(RECOVERY_KEY_REDIS_KEY):
return self._recovery_key_from_hash(RECOVERY_KEY_REDIS_KEY)
return None
def create_recovery_key(
self,
expiration: Optional[datetime],
uses_left: Optional[int],
) -> RecoveryKey:
"""Create the recovery key"""
recovery_key = RecoveryKey.generate(expiration=expiration, uses_left=uses_left)
self._store_model_as_hash(RECOVERY_KEY_REDIS_KEY, recovery_key)
return recovery_key
def get_new_device_key(self) -> NewDeviceKey:
"""Creates and returns the new device key"""
new_device_key = NewDeviceKey.generate()
self._store_model_as_hash(NEW_DEVICE_KEY_REDIS_KEY, new_device_key)
return new_device_key
def delete_new_device_key(self) -> None:
"""Delete the new device key"""
r = self.connection
r.delete(NEW_DEVICE_KEY_REDIS_KEY)
@staticmethod
def _token_redis_key(token: Token) -> str:
return RedisTokensRepository.token_key_for_device(token.device_name)
def _store_token(self, new_token: Token):
"""Store a token directly"""
key = RedisTokensRepository._token_redis_key(new_token)
self._store_model_as_hash(key, new_token)
def _decrement_recovery_token(self):
"""Decrement recovery key use count by one"""
if self.is_recovery_key_valid():
uses_left = self.get_recovery_key().uses_left
r = self.connection
r.hset(RECOVERY_KEY_REDIS_KEY, "uses_left", uses_left - 1)
def _get_stored_new_device_key(self) -> Optional[NewDeviceKey]:
"""Retrieves new device key that is already stored."""
return self._new_device_key_from_hash(NEW_DEVICE_KEY_REDIS_KEY)
@staticmethod
def _is_date_key(key: str):
return key in [
"created_at",
"expires_at",
]
@staticmethod
def _prepare_model_dict(d: dict):
date_keys = [key for key in d.keys() if RedisTokensRepository._is_date_key(key)]
for date in date_keys:
if d[date] != "None":
d[date] = datetime.fromisoformat(d[date])
for key in d.keys():
if d[key] == "None":
d[key] = None
def _model_dict_from_hash(self, redis_key: str) -> Optional[dict]:
r = self.connection
if r.exists(redis_key):
token_dict = r.hgetall(redis_key)
RedisTokensRepository._prepare_model_dict(token_dict)
return token_dict
return None
def _hash_as_model(self, redis_key: str, model_class):
token_dict = self._model_dict_from_hash(redis_key)
if token_dict is not None:
return model_class(**token_dict)
return None
def _token_from_hash(self, redis_key: str) -> Optional[Token]:
return self._hash_as_model(redis_key, Token)
def _recovery_key_from_hash(self, redis_key: str) -> Optional[RecoveryKey]:
return self._hash_as_model(redis_key, RecoveryKey)
def _new_device_key_from_hash(self, redis_key: str) -> Optional[NewDeviceKey]:
return self._hash_as_model(redis_key, NewDeviceKey)
def _store_model_as_hash(self, redis_key, model):
r = self.connection
for key, value in model.dict().items():
if isinstance(value, datetime):
value = value.isoformat()
r.hset(redis_key, key, str(value))

View File

@ -0,0 +1,218 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=missing-function-docstring
"""
tests that restrict json token repository implementation
"""
import pytest
from datetime import datetime
from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.repositories.tokens.exceptions import (
TokenNotFound,
RecoveryKeyNotFound,
NewDeviceKeyNotFound,
)
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
from tests.common import read_json
from test_tokens_repository import (
mock_recovery_key_generate,
mock_generate_token,
mock_new_device_key_generate,
empty_keys,
)
ORIGINAL_TOKEN_CONTENT = [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698",
},
{
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
"name": "second_token",
"date": "2022-07-15 17:41:31.675698Z",
},
{
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
"name": "third_token",
"date": "2022-07-15T17:41:31.675698Z",
},
{
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
"name": "forth_token",
"date": "2022-07-15T17:41:31.675698",
},
]
@pytest.fixture
def tokens(mocker, datadir):
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "tokens.json")
assert read_json(datadir / "tokens.json")["tokens"] == ORIGINAL_TOKEN_CONTENT
return datadir
@pytest.fixture
def null_keys(mocker, datadir):
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "null_keys.json")
assert read_json(datadir / "null_keys.json")["recovery_token"] is None
assert read_json(datadir / "null_keys.json")["new_device"] is None
return datadir
def test_delete_token(tokens):
repo = JsonTokensRepository()
input_token = Token(
token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
device_name="primary_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
)
repo.delete_token(input_token)
assert read_json(tokens / "tokens.json")["tokens"] == [
{
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
"name": "second_token",
"date": "2022-07-15 17:41:31.675698Z",
},
{
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
"name": "third_token",
"date": "2022-07-15T17:41:31.675698Z",
},
{
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
"name": "forth_token",
"date": "2022-07-15T17:41:31.675698",
},
]
def test_delete_not_found_token(tokens):
repo = JsonTokensRepository()
input_token = Token(
token="imbadtoken",
device_name="primary_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
)
with pytest.raises(TokenNotFound):
assert repo.delete_token(input_token) is None
assert read_json(tokens / "tokens.json")["tokens"] == ORIGINAL_TOKEN_CONTENT
def test_create_recovery_key(tokens, mock_recovery_key_generate):
repo = JsonTokensRepository()
assert repo.create_recovery_key(uses_left=1, expiration=None) is not None
assert read_json(tokens / "tokens.json")["recovery_token"] == {
"token": "889bf49c1d3199d71a2e704718772bd53a422020334db051",
"date": "2022-07-15T17:41:31.675698",
"expiration": None,
"uses_left": 1,
}
def test_use_mnemonic_recovery_key_when_null(null_keys):
repo = JsonTokensRepository()
with pytest.raises(RecoveryKeyNotFound):
assert (
repo.use_mnemonic_recovery_key(
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
device_name="primary_token",
)
is None
)
def test_use_mnemonic_recovery_key(tokens, mock_generate_token):
repo = JsonTokensRepository()
assert repo.use_mnemonic_recovery_key(
mnemonic_phrase="uniform clarify napkin bid dress search input armor police cross salon because myself uphold slice bamboo hungry park",
device_name="newdevice",
) == Token(
token="ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4",
device_name="newdevice",
created_at=datetime(2022, 11, 14, 6, 6, 32, 777123),
)
assert read_json(tokens / "tokens.json")["tokens"] == [
{
"date": "2022-07-15 17:41:31.675698",
"name": "primary_token",
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
},
{
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
"name": "second_token",
"date": "2022-07-15 17:41:31.675698Z",
},
{
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
"name": "third_token",
"date": "2022-07-15T17:41:31.675698Z",
},
{
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
"name": "forth_token",
"date": "2022-07-15T17:41:31.675698",
},
{
"date": "2022-11-14T06:06:32.777123",
"name": "newdevice",
"token": "ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4",
},
]
assert read_json(tokens / "tokens.json")["recovery_token"] == {
"date": "2022-11-11T11:48:54.228038",
"expiration": None,
"token": "ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54",
"uses_left": 1,
}
def test_get_new_device_key(tokens, mock_new_device_key_generate):
repo = JsonTokensRepository()
assert repo.get_new_device_key() is not None
assert read_json(tokens / "tokens.json")["new_device"] == {
"date": "2022-07-15T17:41:31.675698",
"expiration": "2022-07-15T17:41:31.675698",
"token": "43478d05b35e4781598acd76e33832bb",
}
def test_delete_new_device_key(tokens):
repo = JsonTokensRepository()
assert repo.delete_new_device_key() is None
assert "new_device" not in read_json(tokens / "tokens.json")
def test_delete_new_device_key_when_empty(empty_keys):
repo = JsonTokensRepository()
repo.delete_new_device_key()
assert "new_device" not in read_json(empty_keys / "empty_keys.json")
def test_use_mnemonic_new_device_key_when_null(null_keys):
repo = JsonTokensRepository()
with pytest.raises(NewDeviceKeyNotFound):
assert (
repo.use_mnemonic_new_device_key(
device_name="imnew",
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
)
is None
)

View File

@ -0,0 +1,9 @@
{
"tokens": [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698"
}
]
}

View File

@ -2,7 +2,7 @@
# pylint: disable=unused-argument # pylint: disable=unused-argument
# pylint: disable=missing-function-docstring # pylint: disable=missing-function-docstring
from datetime import datetime, timezone from datetime import datetime
import pytest import pytest
@ -18,40 +18,20 @@ from selfprivacy_api.repositories.tokens.exceptions import (
from selfprivacy_api.repositories.tokens.json_tokens_repository import ( from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository, JsonTokensRepository,
) )
from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
RedisTokensRepository,
)
from tests.common import read_json from tests.common import read_json
ORIGINAL_TOKEN_CONTENT = [ ORIGINAL_DEVICE_NAMES = [
{ "primary_token",
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", "second_token",
"name": "primary_token", "third_token",
"date": "2022-07-15 17:41:31.675698", "forth_token",
},
{
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
"name": "second_token",
"date": "2022-07-15 17:41:31.675698Z",
},
{
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
"name": "third_token",
"date": "2022-07-15T17:41:31.675698Z",
},
{
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
"name": "forth_token",
"date": "2022-07-15T17:41:31.675698",
},
] ]
@pytest.fixture
def tokens(mocker, datadir):
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "tokens.json")
assert read_json(datadir / "tokens.json")["tokens"] == ORIGINAL_TOKEN_CONTENT
return datadir
@pytest.fixture @pytest.fixture
def empty_keys(mocker, datadir): def empty_keys(mocker, datadir):
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "empty_keys.json") mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "empty_keys.json")
@ -65,23 +45,10 @@ def empty_keys(mocker, datadir):
return datadir return datadir
@pytest.fixture
def null_keys(mocker, datadir):
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "null_keys.json")
assert read_json(datadir / "null_keys.json")["recovery_token"] is None
assert read_json(datadir / "null_keys.json")["new_device"] is None
return datadir
class RecoveryKeyMockReturnNotValid:
def is_valid() -> bool:
return False
@pytest.fixture @pytest.fixture
def mock_new_device_key_generate(mocker): def mock_new_device_key_generate(mocker):
mock = mocker.patch( mock = mocker.patch(
"selfprivacy_api.repositories.tokens.json_tokens_repository.NewDeviceKey.generate", "selfprivacy_api.models.tokens.new_device_key.NewDeviceKey.generate",
autospec=True, autospec=True,
return_value=NewDeviceKey( return_value=NewDeviceKey(
key="43478d05b35e4781598acd76e33832bb", key="43478d05b35e4781598acd76e33832bb",
@ -92,10 +59,25 @@ def mock_new_device_key_generate(mocker):
return mock return mock
# mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
@pytest.fixture
def mock_new_device_key_generate_for_mnemonic(mocker):
mock = mocker.patch(
"selfprivacy_api.models.tokens.new_device_key.NewDeviceKey.generate",
autospec=True,
return_value=NewDeviceKey(
key="2237238de23dc71ab558e317bdb8ff8e",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
expires_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
),
)
return mock
@pytest.fixture @pytest.fixture
def mock_generate_token(mocker): def mock_generate_token(mocker):
mock = mocker.patch( mock = mocker.patch(
"selfprivacy_api.repositories.tokens.json_tokens_repository.Token.generate", "selfprivacy_api.models.tokens.token.Token.generate",
autospec=True, autospec=True,
return_value=Token( return_value=Token(
token="ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4", token="ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4",
@ -107,11 +89,16 @@ def mock_generate_token(mocker):
@pytest.fixture @pytest.fixture
def mock_get_recovery_key_return_not_valid(mocker): def mock_recovery_key_generate_invalid(mocker):
mock = mocker.patch( mock = mocker.patch(
"selfprivacy_api.repositories.tokens.json_tokens_repository.JsonTokensRepository.get_recovery_key", "selfprivacy_api.models.tokens.recovery_key.RecoveryKey.generate",
autospec=True, autospec=True,
return_value=RecoveryKeyMockReturnNotValid, return_value=RecoveryKey(
key="889bf49c1d3199d71a2e704718772bd53a422020334db051",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
expires_at=None,
uses_left=0,
),
) )
return mock return mock
@ -119,7 +106,7 @@ def mock_get_recovery_key_return_not_valid(mocker):
@pytest.fixture @pytest.fixture
def mock_token_generate(mocker): def mock_token_generate(mocker):
mock = mocker.patch( mock = mocker.patch(
"selfprivacy_api.repositories.tokens.json_tokens_repository.Token.generate", "selfprivacy_api.models.tokens.token.Token.generate",
autospec=True, autospec=True,
return_value=Token( return_value=Token(
token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM", token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM",
@ -133,7 +120,7 @@ def mock_token_generate(mocker):
@pytest.fixture @pytest.fixture
def mock_recovery_key_generate(mocker): def mock_recovery_key_generate(mocker):
mock = mocker.patch( mock = mocker.patch(
"selfprivacy_api.repositories.tokens.json_tokens_repository.RecoveryKey.generate", "selfprivacy_api.models.tokens.recovery_key.RecoveryKey.generate",
autospec=True, autospec=True,
return_value=RecoveryKey( return_value=RecoveryKey(
key="889bf49c1d3199d71a2e704718772bd53a422020334db051", key="889bf49c1d3199d71a2e704718772bd53a422020334db051",
@ -145,127 +132,140 @@ def mock_recovery_key_generate(mocker):
return mock return mock
@pytest.fixture
def mock_recovery_key_generate_for_mnemonic(mocker):
mock = mocker.patch(
"selfprivacy_api.models.tokens.recovery_key.RecoveryKey.generate",
autospec=True,
return_value=RecoveryKey(
key="ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
expires_at=None,
uses_left=1,
),
)
return mock
@pytest.fixture
def empty_json_repo(empty_keys):
repo = JsonTokensRepository()
for token in repo.get_tokens():
repo.delete_token(token)
assert repo.get_tokens() == []
return repo
@pytest.fixture
def empty_redis_repo():
repo = RedisTokensRepository()
repo.reset()
assert repo.get_tokens() == []
return repo
@pytest.fixture(params=["json", "redis"])
def empty_repo(request, empty_json_repo, empty_redis_repo):
if request.param == "json":
return empty_json_repo
if request.param == "redis":
return empty_redis_repo
# return empty_json_repo
else:
raise NotImplementedError
@pytest.fixture
def some_tokens_repo(empty_repo):
for name in ORIGINAL_DEVICE_NAMES:
empty_repo.create_token(name)
assert len(empty_repo.get_tokens()) == len(ORIGINAL_DEVICE_NAMES)
for name in ORIGINAL_DEVICE_NAMES:
assert empty_repo.get_token_by_name(name) is not None
assert empty_repo.get_new_device_key() is not None
return empty_repo
############### ###############
# Test tokens # # Test tokens #
############### ###############
def test_get_token_by_token_string(tokens): def test_get_token_by_token_string(some_tokens_repo):
repo = JsonTokensRepository() repo = some_tokens_repo
test_token = repo.get_tokens()[2]
assert repo.get_token_by_token_string( assert repo.get_token_by_token_string(token_string=test_token.token) == test_token
token_string="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI"
) == Token(
token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
device_name="primary_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
)
def test_get_token_by_non_existent_token_string(tokens): def test_get_token_by_non_existent_token_string(some_tokens_repo):
repo = JsonTokensRepository() repo = some_tokens_repo
with pytest.raises(TokenNotFound): with pytest.raises(TokenNotFound):
assert repo.get_token_by_token_string(token_string="iamBadtoken") is None assert repo.get_token_by_token_string(token_string="iamBadtoken") is None
def test_get_token_by_name(tokens): def test_get_token_by_name(some_tokens_repo):
repo = JsonTokensRepository() repo = some_tokens_repo
assert repo.get_token_by_name(token_name="primary_token") is not None token = repo.get_token_by_name(token_name="primary_token")
assert repo.get_token_by_name(token_name="primary_token") == Token( assert token is not None
token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", assert token.device_name == "primary_token"
device_name="primary_token", assert token in repo.get_tokens()
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
)
def test_get_token_by_non_existent_name(tokens): def test_get_token_by_non_existent_name(some_tokens_repo):
repo = JsonTokensRepository() repo = some_tokens_repo
with pytest.raises(TokenNotFound): with pytest.raises(TokenNotFound):
assert repo.get_token_by_name(token_name="badname") is None assert repo.get_token_by_name(token_name="badname") is None
def test_get_tokens(tokens): def test_get_tokens(some_tokens_repo):
repo = JsonTokensRepository() repo = some_tokens_repo
tokenstrings = []
assert repo.get_tokens() == [ # we cannot insert tokens directly via api, so we check meta-properties instead
Token( for token in repo.get_tokens():
token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", len(token.token) == 43 # assuming secrets.token_urlsafe
device_name="primary_token", assert token.token not in tokenstrings
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), tokenstrings.append(token.token)
), assert token.created_at.day == datetime.today().day
Token(
token="3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
device_name="second_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698, tzinfo=timezone.utc),
),
Token(
token="LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
device_name="third_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698, tzinfo=timezone.utc),
),
Token(
token="dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
device_name="forth_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
),
]
def test_get_tokens_when_one(empty_keys): def test_create_token(empty_repo, mock_token_generate):
repo = JsonTokensRepository() repo = empty_repo
assert repo.get_tokens() == [
Token(
token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
device_name="primary_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
)
]
def test_create_token(tokens, mock_token_generate):
repo = JsonTokensRepository()
assert repo.create_token(device_name="IamNewDevice") == Token( assert repo.create_token(device_name="IamNewDevice") == Token(
token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM", token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM",
device_name="IamNewDevice", device_name="IamNewDevice",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
) )
assert repo.get_tokens() == [
Token(
def test_delete_token(tokens): token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM",
repo = JsonTokensRepository() device_name="IamNewDevice",
input_token = Token(
token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
device_name="primary_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
) )
repo.delete_token(input_token)
assert read_json(tokens / "tokens.json")["tokens"] == [
{
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
"name": "second_token",
"date": "2022-07-15 17:41:31.675698Z",
},
{
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
"name": "third_token",
"date": "2022-07-15T17:41:31.675698Z",
},
{
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
"name": "forth_token",
"date": "2022-07-15T17:41:31.675698",
},
] ]
def test_delete_not_found_token(tokens): def test_delete_token(some_tokens_repo):
repo = JsonTokensRepository() repo = some_tokens_repo
original_tokens = repo.get_tokens()
input_token = original_tokens[1]
repo.delete_token(input_token)
tokens_after_delete = repo.get_tokens()
for token in original_tokens:
if token != input_token:
assert token in tokens_after_delete
assert len(original_tokens) == len(tokens_after_delete) + 1
def test_delete_not_found_token(some_tokens_repo):
repo = some_tokens_repo
initial_tokens = repo.get_tokens()
input_token = Token( input_token = Token(
token="imbadtoken", token="imbadtoken",
device_name="primary_token", device_name="primary_token",
@ -274,16 +274,15 @@ def test_delete_not_found_token(tokens):
with pytest.raises(TokenNotFound): with pytest.raises(TokenNotFound):
assert repo.delete_token(input_token) is None assert repo.delete_token(input_token) is None
assert read_json(tokens / "tokens.json")["tokens"] == ORIGINAL_TOKEN_CONTENT new_tokens = repo.get_tokens()
assert len(new_tokens) == len(initial_tokens)
for token in initial_tokens:
assert token in new_tokens
def test_refresh_token(tokens, mock_token_generate): def test_refresh_token(some_tokens_repo, mock_token_generate):
repo = JsonTokensRepository() repo = some_tokens_repo
input_token = Token( input_token = some_tokens_repo.get_tokens()[0]
token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
device_name="primary_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
)
assert repo.refresh_token(input_token) == Token( assert repo.refresh_token(input_token) == Token(
token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM", token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM",
@ -292,8 +291,8 @@ def test_refresh_token(tokens, mock_token_generate):
) )
def test_refresh_not_found_token(tokens, mock_token_generate): def test_refresh_not_found_token(some_tokens_repo, mock_token_generate):
repo = JsonTokensRepository() repo = some_tokens_repo
input_token = Token( input_token = Token(
token="idontknowwhoiam", token="idontknowwhoiam",
device_name="tellmewhoiam?", device_name="tellmewhoiam?",
@ -309,39 +308,26 @@ def test_refresh_not_found_token(tokens, mock_token_generate):
################ ################
def test_get_recovery_key(tokens): def test_get_recovery_key_when_empty(empty_repo):
repo = JsonTokensRepository() repo = empty_repo
assert repo.get_recovery_key() == RecoveryKey(
key="ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54",
created_at=datetime(2022, 11, 11, 11, 48, 54, 228038),
expires_at=None,
uses_left=2,
)
def test_get_recovery_key_when_empty(empty_keys):
repo = JsonTokensRepository()
assert repo.get_recovery_key() is None assert repo.get_recovery_key() is None
def test_create_recovery_key(tokens, mock_recovery_key_generate): def test_create_get_recovery_key(some_tokens_repo, mock_recovery_key_generate):
repo = JsonTokensRepository() repo = some_tokens_repo
assert repo.create_recovery_key(uses_left=1, expiration=None) is not None assert repo.create_recovery_key(uses_left=1, expiration=None) is not None
assert read_json(tokens / "tokens.json")["recovery_token"] == { assert repo.get_recovery_key() == RecoveryKey(
"token": "889bf49c1d3199d71a2e704718772bd53a422020334db051", key="889bf49c1d3199d71a2e704718772bd53a422020334db051",
"date": "2022-07-15T17:41:31.675698", created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
"expiration": None, expires_at=None,
"uses_left": 1, uses_left=1,
} )
def test_use_mnemonic_recovery_key_when_empty( def test_use_mnemonic_recovery_key_when_empty(empty_repo):
empty_keys, mock_recovery_key_generate, mock_token_generate repo = empty_repo
):
repo = JsonTokensRepository()
with pytest.raises(RecoveryKeyNotFound): with pytest.raises(RecoveryKeyNotFound):
assert ( assert (
@ -354,9 +340,10 @@ def test_use_mnemonic_recovery_key_when_empty(
def test_use_mnemonic_not_valid_recovery_key( def test_use_mnemonic_not_valid_recovery_key(
tokens, mock_get_recovery_key_return_not_valid some_tokens_repo, mock_recovery_key_generate_invalid
): ):
repo = JsonTokensRepository() repo = some_tokens_repo
assert repo.create_recovery_key(uses_left=0, expiration=None) is not None
with pytest.raises(RecoveryKeyNotFound): with pytest.raises(RecoveryKeyNotFound):
assert ( assert (
@ -368,8 +355,9 @@ def test_use_mnemonic_not_valid_recovery_key(
) )
def test_use_mnemonic_not_mnemonic_recovery_key(tokens): def test_use_mnemonic_not_mnemonic_recovery_key(some_tokens_repo):
repo = JsonTokensRepository() repo = some_tokens_repo
assert repo.create_recovery_key(uses_left=1, expiration=None) is not None
with pytest.raises(InvalidMnemonic): with pytest.raises(InvalidMnemonic):
assert ( assert (
@ -381,8 +369,9 @@ def test_use_mnemonic_not_mnemonic_recovery_key(tokens):
) )
def test_use_not_mnemonic_recovery_key(tokens): def test_use_not_mnemonic_recovery_key(some_tokens_repo):
repo = JsonTokensRepository() repo = some_tokens_repo
assert repo.create_recovery_key(uses_left=1, expiration=None) is not None
with pytest.raises(InvalidMnemonic): with pytest.raises(InvalidMnemonic):
assert ( assert (
@ -394,8 +383,9 @@ def test_use_not_mnemonic_recovery_key(tokens):
) )
def test_use_not_found_mnemonic_recovery_key(tokens): def test_use_not_found_mnemonic_recovery_key(some_tokens_repo):
repo = JsonTokensRepository() repo = some_tokens_repo
assert repo.create_recovery_key(uses_left=1, expiration=None) is not None
with pytest.raises(RecoveryKeyNotFound): with pytest.raises(RecoveryKeyNotFound):
assert ( assert (
@ -407,8 +397,8 @@ def test_use_not_found_mnemonic_recovery_key(tokens):
) )
def test_use_menemonic_recovery_key_when_empty(empty_keys): def test_use_mnemonic_recovery_key_when_empty(empty_repo):
repo = JsonTokensRepository() repo = empty_repo
with pytest.raises(RecoveryKeyNotFound): with pytest.raises(RecoveryKeyNotFound):
assert ( assert (
@ -420,65 +410,34 @@ def test_use_menemonic_recovery_key_when_empty(empty_keys):
) )
def test_use_menemonic_recovery_key_when_null(null_keys): # agnostic test mixed with an implementation test
repo = JsonTokensRepository() def test_use_mnemonic_recovery_key(
some_tokens_repo, mock_recovery_key_generate_for_mnemonic, mock_generate_token
):
repo = some_tokens_repo
assert repo.create_recovery_key(uses_left=1, expiration=None) is not None
with pytest.raises(RecoveryKeyNotFound): test_token = Token(
assert (
repo.use_mnemonic_recovery_key(
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
device_name="primary_token",
)
is None
)
def test_use_mnemonic_recovery_key(tokens, mock_generate_token):
repo = JsonTokensRepository()
assert repo.use_mnemonic_recovery_key(
mnemonic_phrase="uniform clarify napkin bid dress search input armor police cross salon because myself uphold slice bamboo hungry park",
device_name="newdevice",
) == Token(
token="ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4", token="ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4",
device_name="newdevice", device_name="newdevice",
created_at=datetime(2022, 11, 14, 6, 6, 32, 777123), created_at=datetime(2022, 11, 14, 6, 6, 32, 777123),
) )
assert read_json(tokens / "tokens.json")["tokens"] == [ assert (
{ repo.use_mnemonic_recovery_key(
"date": "2022-07-15 17:41:31.675698", mnemonic_phrase="uniform clarify napkin bid dress search input armor police cross salon because myself uphold slice bamboo hungry park",
"name": "primary_token", device_name="newdevice",
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", )
}, == test_token
{ )
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
"name": "second_token",
"date": "2022-07-15 17:41:31.675698Z",
},
{
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
"name": "third_token",
"date": "2022-07-15T17:41:31.675698Z",
},
{
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
"name": "forth_token",
"date": "2022-07-15T17:41:31.675698",
},
{
"date": "2022-11-14T06:06:32.777123",
"name": "newdevice",
"token": "ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4",
},
]
assert read_json(tokens / "tokens.json")["recovery_token"] == { assert test_token in repo.get_tokens()
"date": "2022-11-11T11:48:54.228038", assert repo.get_recovery_key() == RecoveryKey(
"expiration": None, key="ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54",
"token": "ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54", created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
"uses_left": 1, expires_at=None,
} uses_left=0,
)
################## ##################
@ -486,35 +445,31 @@ def test_use_mnemonic_recovery_key(tokens, mock_generate_token):
################## ##################
def test_get_new_device_key(tokens, mock_new_device_key_generate): def test_get_new_device_key(some_tokens_repo, mock_new_device_key_generate):
repo = JsonTokensRepository() repo = some_tokens_repo
assert repo.get_new_device_key() is not None assert repo.get_new_device_key() == NewDeviceKey(
assert read_json(tokens / "tokens.json")["new_device"] == { key="43478d05b35e4781598acd76e33832bb",
"date": "2022-07-15T17:41:31.675698", created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
"expiration": "2022-07-15T17:41:31.675698", expires_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
"token": "43478d05b35e4781598acd76e33832bb", )
}
def test_delete_new_device_key(tokens): def test_delete_new_device_key(some_tokens_repo):
repo = JsonTokensRepository() repo = some_tokens_repo
assert repo.delete_new_device_key() is None assert repo.delete_new_device_key() is None
assert "new_device" not in read_json(tokens / "tokens.json") # we cannot say if there is ot not without creating it?
def test_delete_new_device_key_when_empty(empty_keys): def test_delete_new_device_key_when_empty(empty_repo):
repo = JsonTokensRepository() repo = empty_repo
repo.delete_new_device_key() assert repo.delete_new_device_key() is None
assert "new_device" not in read_json(empty_keys / "empty_keys.json")
def test_use_invalid_mnemonic_new_device_key( def test_use_invalid_mnemonic_new_device_key(some_tokens_repo):
tokens, mock_new_device_key_generate, datadir, mock_token_generate repo = some_tokens_repo
):
repo = JsonTokensRepository()
with pytest.raises(InvalidMnemonic): with pytest.raises(InvalidMnemonic):
assert ( assert (
@ -527,9 +482,10 @@ def test_use_invalid_mnemonic_new_device_key(
def test_use_not_exists_mnemonic_new_device_key( def test_use_not_exists_mnemonic_new_device_key(
tokens, mock_new_device_key_generate, mock_token_generate empty_repo, mock_new_device_key_generate
): ):
repo = JsonTokensRepository() repo = empty_repo
assert repo.get_new_device_key() is not None
with pytest.raises(NewDeviceKeyNotFound): with pytest.raises(NewDeviceKeyNotFound):
assert ( assert (
@ -542,23 +498,20 @@ def test_use_not_exists_mnemonic_new_device_key(
def test_use_mnemonic_new_device_key( def test_use_mnemonic_new_device_key(
tokens, mock_new_device_key_generate, mock_token_generate empty_repo, mock_new_device_key_generate_for_mnemonic
): ):
repo = JsonTokensRepository() repo = empty_repo
assert repo.get_new_device_key() is not None
assert ( new_token = repo.use_mnemonic_new_device_key(
repo.use_mnemonic_new_device_key(
device_name="imnew", device_name="imnew",
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb", mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
) )
is not None
)
# assert read_json(datadir / "tokens.json")["new_device"] == []
assert new_token.device_name == "imnew"
assert new_token in repo.get_tokens()
def test_use_mnemonic_new_device_key_when_empty(empty_keys): # we must delete the key after use
repo = JsonTokensRepository()
with pytest.raises(NewDeviceKeyNotFound): with pytest.raises(NewDeviceKeyNotFound):
assert ( assert (
repo.use_mnemonic_new_device_key( repo.use_mnemonic_new_device_key(
@ -569,8 +522,8 @@ def test_use_mnemonic_new_device_key_when_empty(empty_keys):
) )
def test_use_mnemonic_new_device_key_when_null(null_keys): def test_use_mnemonic_new_device_key_when_empty(empty_repo):
repo = JsonTokensRepository() repo = empty_repo
with pytest.raises(NewDeviceKeyNotFound): with pytest.raises(NewDeviceKeyNotFound):
assert ( assert (