diff --git a/selfprivacy_api/migrations/__init__.py b/selfprivacy_api/migrations/__init__.py index 8209198..b051f04 100644 --- a/selfprivacy_api/migrations/__init__.py +++ b/selfprivacy_api/migrations/__init__.py @@ -8,7 +8,9 @@ at api.skippedMigrations in userdata.json and populating it with IDs of the migrations to skip. Adding DISABLE_ALL to that array disables the migrations module entirely. """ -from selfprivacy_api.migrations.check_for_failed_binds_migration import CheckForFailedBindsMigration +from selfprivacy_api.migrations.check_for_failed_binds_migration import ( + CheckForFailedBindsMigration, +) from selfprivacy_api.utils import ReadUserData from selfprivacy_api.migrations.fix_nixos_config_branch import FixNixosConfigBranch from selfprivacy_api.migrations.create_tokens_json import CreateTokensJson diff --git a/selfprivacy_api/models/__init__.py b/selfprivacy_api/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/selfprivacy_api/models/tokens/new_device_key.py b/selfprivacy_api/models/tokens/new_device_key.py new file mode 100644 index 0000000..08941b7 --- /dev/null +++ b/selfprivacy_api/models/tokens/new_device_key.py @@ -0,0 +1,47 @@ +""" +New device key used to obtain access token. +""" +from datetime import datetime, timedelta +import secrets +from typing import Optional +from pydantic import BaseModel +from mnemonic import Mnemonic + + +class NewDeviceKey(BaseModel): + """ + Recovery key used to obtain access token. + + Recovery key has a key string, date of creation, date of expiration. + """ + + key: str + created_at: datetime + expires_at: Optional[datetime] + + def is_valid(self) -> bool: + """ + Check if the recovery key is valid. + """ + if self.expires_at is not None and self.expires_at < datetime.now(): + return False + return True + + def as_mnemonic(self) -> str: + """ + Get the recovery key as a mnemonic. + """ + return Mnemonic(language="english").to_mnemonic(bytes.fromhex(self.key)) + + @staticmethod + def generate() -> "NewDeviceKey": + """ + Factory to generate a random token. + """ + creation_date = datetime.now() + key = secrets.token_bytes(16).hex() + return NewDeviceKey( + key=key, + created_at=creation_date, + expires_at=datetime.now() + timedelta(minutes=10), + ) diff --git a/selfprivacy_api/models/tokens/recovery_key.py b/selfprivacy_api/models/tokens/recovery_key.py new file mode 100644 index 0000000..098aceb --- /dev/null +++ b/selfprivacy_api/models/tokens/recovery_key.py @@ -0,0 +1,56 @@ +""" +Recovery key used to obtain access token. + +Recovery key has a token string, date of creation, optional date of expiration and optional count of uses left. +""" +from datetime import datetime +import secrets +from typing import Optional +from pydantic import BaseModel +from mnemonic import Mnemonic + + +class RecoveryKey(BaseModel): + """ + Recovery key used to obtain access token. + + Recovery key has a key string, date of creation, optional date of expiration and optional count of uses left. + """ + + key: str + created_at: datetime + expires_at: Optional[datetime] + uses_left: Optional[int] + + def is_valid(self) -> bool: + """ + Check if the recovery key is valid. + """ + if self.expires_at is not None and self.expires_at < datetime.now(): + return False + if self.uses_left is not None and self.uses_left <= 0: + return False + return True + + def as_mnemonic(self) -> str: + """ + Get the recovery key as a mnemonic. + """ + return Mnemonic(language="english").to_mnemonic(bytes.fromhex(self.key)) + + @staticmethod + def generate( + expiration: Optional[datetime], + uses_left: Optional[int], + ) -> "RecoveryKey": + """ + Factory to generate a random token. + """ + creation_date = datetime.now() + key = secrets.token_bytes(24).hex() + return RecoveryKey( + key=key, + created_at=creation_date, + expires_at=expiration, + uses_left=uses_left, + ) diff --git a/selfprivacy_api/models/tokens/token.py b/selfprivacy_api/models/tokens/token.py new file mode 100644 index 0000000..4a5cd7f --- /dev/null +++ b/selfprivacy_api/models/tokens/token.py @@ -0,0 +1,34 @@ +""" +Model of the access token. + +Access token has a token string, device name and date of creation. +""" +from datetime import datetime +import secrets +from typing import Optional +from pydantic import BaseModel + + +class Token(BaseModel): + """ + Model of the access token. + + Access token has a token string, device name and date of creation. + """ + + token: str + device_name: str + created_at: datetime + + @staticmethod + def generate(name: str) -> "Token": + """ + Factory to generate a random token. + """ + creation_date = datetime.now() + token = secrets.token_urlsafe(32) + return Token( + token=token, + device_name=name, + created_at=creation_date, + ) diff --git a/selfprivacy_api/repositories/__init__.py b/selfprivacy_api/repositories/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/selfprivacy_api/repositories/tokens/__init__.py b/selfprivacy_api/repositories/tokens/__init__.py new file mode 100644 index 0000000..9941bdc --- /dev/null +++ b/selfprivacy_api/repositories/tokens/__init__.py @@ -0,0 +1,8 @@ +from selfprivacy_api.repositories.tokens.abstract_tokens_repository import ( + AbstractTokensRepository, +) +from selfprivacy_api.repositories.tokens.json_tokens_repository import ( + JsonTokensRepository, +) + +repository = JsonTokensRepository() diff --git a/selfprivacy_api/repositories/tokens/abstract_tokens_repository.py b/selfprivacy_api/repositories/tokens/abstract_tokens_repository.py new file mode 100644 index 0000000..1073ca7 --- /dev/null +++ b/selfprivacy_api/repositories/tokens/abstract_tokens_repository.py @@ -0,0 +1,101 @@ +from abc import ABC, abstractmethod +from datetime import datetime +from typing import List, Optional + +from selfprivacy_api.models.tokens.token import Token +from selfprivacy_api.models.tokens.recovery_key import RecoveryKey +from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey + + +class AbstractTokensRepository(ABC): + @abstractmethod + def get_token_by_token_string(self, token_string: str) -> Optional[Token]: + """Get the token by token""" + ... + + @abstractmethod + def get_token_by_name(self, token_name: str) -> Optional[Token]: + """Get the token by name""" + ... + + @abstractmethod + def get_tokens(self) -> list[Token]: + """Get the tokens""" + ... + + @abstractmethod + def create_token(self, name: str) -> Token: + """Create new token""" + ... + + @abstractmethod + def delete_token(self, token: Token) -> None: + """Delete the token""" + ... + + @abstractmethod + def refresh_token(self, token: Token) -> Token: + """Refresh the token""" + ... + + def is_token_valid(self, token_string: str) -> bool: + """Check if the token is valid""" + token = self.get_token_by_token_string(token_string) + if token is None: + return False + return True + + def is_token_name_exists(self, token_name: str) -> bool: + """Check if the token name exists""" + token = self.get_token_by_name(token_name) + if token is None: + return False + return True + + def is_token_name_pair_valid(self, token_name: str, token_string: str) -> bool: + """Check if the token name and token are valid""" + token = self.get_token_by_name(token_name) + if token is None: + return False + return token.token == token_string + + @abstractmethod + def get_recovery_key(self) -> Optional[RecoveryKey]: + """Get the recovery key""" + ... + + @abstractmethod + def create_recovery_key( + self, + expiration: Optional[datetime], + uses_left: Optional[int], + ) -> RecoveryKey: + """Create the recovery key""" + ... + + @abstractmethod + def use_mnemonic_recovery_key(self, mnemonic_phrase: str, name: str) -> Token: + """Use the mnemonic recovery key and create a new token with the given name""" + ... + + def is_recovery_key_valid(self) -> bool: + """Check if the recovery key is valid""" + recovery_key = self.get_recovery_key() + if recovery_key is None: + return False + return recovery_key.is_valid() + + @abstractmethod + def get_new_device_key(self) -> NewDeviceKey: + """Creates and returns the new device key""" + ... + + @abstractmethod + def delete_new_device_key(self) -> None: + """Delete the new device key""" + ... + + @abstractmethod + def use_mnemonic_new_device_key(self, mnemonic_phrase: str, name: str) -> None: + """Use the mnemonic new device key""" + ... diff --git a/selfprivacy_api/repositories/tokens/json_tokens_repository.py b/selfprivacy_api/repositories/tokens/json_tokens_repository.py new file mode 100644 index 0000000..7302096 --- /dev/null +++ b/selfprivacy_api/repositories/tokens/json_tokens_repository.py @@ -0,0 +1,7 @@ +from selfprivacy_api.repositories.tokens.abstract_tokens_repository import ( + AbstractTokensRepository, +) + + +class JsonTokensRepository(AbstractTokensRepository): + pass diff --git a/selfprivacy_api/repositories/tokens/redis_tokens_repository.py b/selfprivacy_api/repositories/tokens/redis_tokens_repository.py new file mode 100644 index 0000000..0186c11 --- /dev/null +++ b/selfprivacy_api/repositories/tokens/redis_tokens_repository.py @@ -0,0 +1,15 @@ +""" +Token repository using Redis as backend. +""" +from selfprivacy_api.repositories.tokens.abstract_tokens_repository import ( + AbstractTokensRepository, +) + + +class RedisTokensRepository(AbstractTokensRepository): + """ + Token repository using Redis as a backend + """ + + def __init__(self) -> None: + raise NotImplementedError