From bf3d921e2d08b0db88ee9e2e4bc7d0aa3e29df8b Mon Sep 17 00:00:00 2001 From: def Date: Thu, 27 Oct 2022 18:16:22 +0400 Subject: [PATCH] refactor: implemented a json repository --- .../models/tokens/new_device_key.py | 5 +- selfprivacy_api/models/tokens/token.py | 5 +- .../tokens/abstract_tokens_repository.py | 18 +- .../repositories/tokens/exceptions.py | 2 + .../tokens/json_tokens_repository.py | 155 +++++++++++++++++- 5 files changed, 163 insertions(+), 22 deletions(-) create mode 100644 selfprivacy_api/repositories/tokens/exceptions.py diff --git a/selfprivacy_api/models/tokens/new_device_key.py b/selfprivacy_api/models/tokens/new_device_key.py index 08941b7..dda926c 100644 --- a/selfprivacy_api/models/tokens/new_device_key.py +++ b/selfprivacy_api/models/tokens/new_device_key.py @@ -3,7 +3,6 @@ New device key used to obtain access token. """ from datetime import datetime, timedelta import secrets -from typing import Optional from pydantic import BaseModel from mnemonic import Mnemonic @@ -17,13 +16,13 @@ class NewDeviceKey(BaseModel): key: str created_at: datetime - expires_at: Optional[datetime] + expires_at: datetime def is_valid(self) -> bool: """ Check if the recovery key is valid. """ - if self.expires_at is not None and self.expires_at < datetime.now(): + if self.expires_at < datetime.now(): return False return True diff --git a/selfprivacy_api/models/tokens/token.py b/selfprivacy_api/models/tokens/token.py index 4a5cd7f..4c34f58 100644 --- a/selfprivacy_api/models/tokens/token.py +++ b/selfprivacy_api/models/tokens/token.py @@ -5,7 +5,6 @@ Access token has a token string, device name and date of creation. """ from datetime import datetime import secrets -from typing import Optional from pydantic import BaseModel @@ -21,7 +20,7 @@ class Token(BaseModel): created_at: datetime @staticmethod - def generate(name: str) -> "Token": + def generate(device_name: str) -> "Token": """ Factory to generate a random token. """ @@ -29,6 +28,6 @@ class Token(BaseModel): token = secrets.token_urlsafe(32) return Token( token=token, - device_name=name, + device_name=device_name, created_at=creation_date, ) diff --git a/selfprivacy_api/repositories/tokens/abstract_tokens_repository.py b/selfprivacy_api/repositories/tokens/abstract_tokens_repository.py index 1073ca7..3aac3d7 100644 --- a/selfprivacy_api/repositories/tokens/abstract_tokens_repository.py +++ b/selfprivacy_api/repositories/tokens/abstract_tokens_repository.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod from datetime import datetime -from typing import List, Optional +from typing import Optional from selfprivacy_api.models.tokens.token import Token from selfprivacy_api.models.tokens.recovery_key import RecoveryKey @@ -11,32 +11,26 @@ class AbstractTokensRepository(ABC): @abstractmethod def get_token_by_token_string(self, token_string: str) -> Optional[Token]: """Get the token by token""" - ... @abstractmethod def get_token_by_name(self, token_name: str) -> Optional[Token]: """Get the token by name""" - ... @abstractmethod def get_tokens(self) -> list[Token]: """Get the tokens""" - ... @abstractmethod def create_token(self, name: str) -> Token: """Create new token""" - ... @abstractmethod - def delete_token(self, token: Token) -> None: + def delete_token(self, input_token: Token) -> None: """Delete the token""" - ... @abstractmethod - def refresh_token(self, token: Token) -> Token: + def refresh_token(self, input_token: Token) -> Token: """Refresh the token""" - ... def is_token_valid(self, token_string: str) -> bool: """Check if the token is valid""" @@ -62,7 +56,6 @@ class AbstractTokensRepository(ABC): @abstractmethod def get_recovery_key(self) -> Optional[RecoveryKey]: """Get the recovery key""" - ... @abstractmethod def create_recovery_key( @@ -71,12 +64,10 @@ class AbstractTokensRepository(ABC): uses_left: Optional[int], ) -> RecoveryKey: """Create the recovery key""" - ... @abstractmethod def use_mnemonic_recovery_key(self, mnemonic_phrase: str, name: str) -> Token: """Use the mnemonic recovery key and create a new token with the given name""" - ... def is_recovery_key_valid(self) -> bool: """Check if the recovery key is valid""" @@ -88,14 +79,11 @@ class AbstractTokensRepository(ABC): @abstractmethod def get_new_device_key(self) -> NewDeviceKey: """Creates and returns the new device key""" - ... @abstractmethod def delete_new_device_key(self) -> None: """Delete the new device key""" - ... @abstractmethod def use_mnemonic_new_device_key(self, mnemonic_phrase: str, name: str) -> None: """Use the mnemonic new device key""" - ... diff --git a/selfprivacy_api/repositories/tokens/exceptions.py b/selfprivacy_api/repositories/tokens/exceptions.py new file mode 100644 index 0000000..5f26d46 --- /dev/null +++ b/selfprivacy_api/repositories/tokens/exceptions.py @@ -0,0 +1,2 @@ +class TokenNotFoundError(Exception): + """Token not found!""" diff --git a/selfprivacy_api/repositories/tokens/json_tokens_repository.py b/selfprivacy_api/repositories/tokens/json_tokens_repository.py index 7302096..90bc8ee 100644 --- a/selfprivacy_api/repositories/tokens/json_tokens_repository.py +++ b/selfprivacy_api/repositories/tokens/json_tokens_repository.py @@ -1,7 +1,160 @@ +""" +temporary legacy +""" +from secrets import token_bytes +from typing import Optional +from datetime import datetime + +from selfprivacy_api.utils import UserDataFiles, WriteUserData, ReadUserData +from selfprivacy_api.models.tokens.token import Token +from selfprivacy_api.models.tokens.recovery_key import RecoveryKey +from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey +from selfprivacy_api.repositories.tokens.exceptions import TokenNotFoundError from selfprivacy_api.repositories.tokens.abstract_tokens_repository import ( AbstractTokensRepository, ) class JsonTokensRepository(AbstractTokensRepository): - pass + def get_token_by_token_string(self, token_string: str) -> Optional[Token]: + """Get the token by token""" + with ReadUserData(UserDataFiles.TOKENS) as tokens_file: + for userdata_token in tokens_file["tokens"]: + if userdata_token["token"] == token_string: + + return Token( + token=token_string, + device_name=userdata_token["name"], + created_at=userdata_token["date"], + ) + + raise TokenNotFoundError("Token not found!") + + def get_token_by_name(self, token_name: str) -> Optional[Token]: + """Get the token by name""" + with ReadUserData(UserDataFiles.TOKENS) as tokens_file: + for userdata_token in tokens_file["tokens"]: + if userdata_token["name"] == token_name: + + return Token( + token=userdata_token["token"], + device_name=token_name, + created_at=userdata_token["date"], + ) + + raise TokenNotFoundError("Token not found!") + + def get_tokens(self) -> list[Token]: + """Get the tokens""" + tokens_list = [] + + with ReadUserData(UserDataFiles.TOKENS) as tokens_file: + for userdata_token in tokens_file["tokens"]: + tokens_list.append( + Token( + token=userdata_token.token, + device_name=userdata_token.name, + created_at=userdata_token.date, + ) + ) + + return tokens_list + + def create_token(self, name: str) -> Token: + """Create new token""" + new_token = Token.generate(device_name=name) + + with WriteUserData(UserDataFiles.TOKENS) as tokens_file: + tokens_file["tokens"].append( + { + "token": new_token.token, + "name": new_token.device_name, + "date": new_token.created_at, + } + ) + return new_token + + def delete_token(self, input_token: Token) -> None: + """Delete the token""" + with WriteUserData(UserDataFiles.TOKENS) as tokens_file: + for userdata_token in tokens_file["tokens"]: + if userdata_token["token"] == input_token: + tokens_file["tokens"].remove( + userdata_token + ) # Allah, i pray it works + + def refresh_token(self, input_token: Token) -> Token: + """Change the token field of the existing token""" + new_token = Token.generate(device_name=input_token.device_name) + + with WriteUserData(UserDataFiles.TOKENS) as tokens_file: + for userdata_token in tokens_file["tokens"]: + + if userdata_token["token"] == input_token.token: + userdata_token["token"] = new_token.token + userdata_token["data"] = new_token.created_at + + return new_token + + raise TokenNotFoundError("Token not found!") + + def get_recovery_key(self) -> Optional[RecoveryKey]: + """Get the recovery key""" + with ReadUserData(UserDataFiles.TOKENS) as tokens_file: + + if tokens_file["recovery_token"] is None: + return + + recovery_key = RecoveryKey( + key=tokens_file["recovery_token"]["token"], + created_at=tokens_file["recovery_token"]["date"], + expires_at=tokens_file["recovery_token"]["expitation"], + uses_left=tokens_file["recovery_token"]["uses_left"], + ) + + return recovery_key + + def create_recovery_key( + self, + expiration: Optional[datetime], + uses_left: Optional[int], + ) -> RecoveryKey: + """Create the recovery key""" + + recovery_key = RecoveryKey.generate(expiration=None, uses_left=None) + + with ReadUserData(UserDataFiles.TOKENS) as tokens_file: + tokens_file["recovery_key"] = { + "token": recovery_key.key, + "date": recovery_key.created_at, + "expiration": recovery_key.expires_at, + "uses_left": recovery_key.uses_left, + } + + return recovery_key + + def use_mnemonic_recovery_key(self, mnemonic_phrase: str, name: str) -> Token: + """Use the mnemonic recovery key and create a new token with the given name""" + ... + + def get_new_device_key(self) -> NewDeviceKey: + """Creates and returns the new device key""" + new_device_key = NewDeviceKey.generate() + + with WriteUserData(UserDataFiles.TOKENS) as tokens_file: + tokens_file["new_device"] = { + "token": new_device_key.key, + "data": new_device_key.created_at, + "expiration": new_device_key.expires_at, + } + + return new_device_key + + def delete_new_device_key(self) -> None: + """Delete the new device key""" + with WriteUserData(UserDataFiles.TOKENS) as tokens_file: + tokens_file.pop("new_device") + + def use_mnemonic_new_device_key(self, mnemonic_phrase: str, name: str) -> None: + """Use the mnemonic new device key""" + ...