The starting point for Def

pull/18/head
Inex Code 2022-10-25 23:59:39 +03:00
parent 0a09a338b8
commit e32b95679b
10 changed files with 271 additions and 1 deletions

View File

@ -8,7 +8,9 @@ at api.skippedMigrations in userdata.json and populating it
with IDs of the migrations to skip.
Adding DISABLE_ALL to that array disables the migrations module entirely.
"""
from selfprivacy_api.migrations.check_for_failed_binds_migration import CheckForFailedBindsMigration
from selfprivacy_api.migrations.check_for_failed_binds_migration import (
CheckForFailedBindsMigration,
)
from selfprivacy_api.utils import ReadUserData
from selfprivacy_api.migrations.fix_nixos_config_branch import FixNixosConfigBranch
from selfprivacy_api.migrations.create_tokens_json import CreateTokensJson

View File

View File

@ -0,0 +1,47 @@
"""
New device key used to obtain access token.
"""
from datetime import datetime, timedelta
import secrets
from typing import Optional
from pydantic import BaseModel
from mnemonic import Mnemonic
class NewDeviceKey(BaseModel):
"""
Recovery key used to obtain access token.
Recovery key has a key string, date of creation, date of expiration.
"""
key: str
created_at: datetime
expires_at: Optional[datetime]
def is_valid(self) -> bool:
"""
Check if the recovery key is valid.
"""
if self.expires_at is not None and self.expires_at < datetime.now():
return False
return True
def as_mnemonic(self) -> str:
"""
Get the recovery key as a mnemonic.
"""
return Mnemonic(language="english").to_mnemonic(bytes.fromhex(self.key))
@staticmethod
def generate() -> "NewDeviceKey":
"""
Factory to generate a random token.
"""
creation_date = datetime.now()
key = secrets.token_bytes(16).hex()
return NewDeviceKey(
key=key,
created_at=creation_date,
expires_at=datetime.now() + timedelta(minutes=10),
)

View File

@ -0,0 +1,56 @@
"""
Recovery key used to obtain access token.
Recovery key has a token string, date of creation, optional date of expiration and optional count of uses left.
"""
from datetime import datetime
import secrets
from typing import Optional
from pydantic import BaseModel
from mnemonic import Mnemonic
class RecoveryKey(BaseModel):
"""
Recovery key used to obtain access token.
Recovery key has a key string, date of creation, optional date of expiration and optional count of uses left.
"""
key: str
created_at: datetime
expires_at: Optional[datetime]
uses_left: Optional[int]
def is_valid(self) -> bool:
"""
Check if the recovery key is valid.
"""
if self.expires_at is not None and self.expires_at < datetime.now():
return False
if self.uses_left is not None and self.uses_left <= 0:
return False
return True
def as_mnemonic(self) -> str:
"""
Get the recovery key as a mnemonic.
"""
return Mnemonic(language="english").to_mnemonic(bytes.fromhex(self.key))
@staticmethod
def generate(
expiration: Optional[datetime],
uses_left: Optional[int],
) -> "RecoveryKey":
"""
Factory to generate a random token.
"""
creation_date = datetime.now()
key = secrets.token_bytes(24).hex()
return RecoveryKey(
key=key,
created_at=creation_date,
expires_at=expiration,
uses_left=uses_left,
)

View File

@ -0,0 +1,34 @@
"""
Model of the access token.
Access token has a token string, device name and date of creation.
"""
from datetime import datetime
import secrets
from typing import Optional
from pydantic import BaseModel
class Token(BaseModel):
"""
Model of the access token.
Access token has a token string, device name and date of creation.
"""
token: str
device_name: str
created_at: datetime
@staticmethod
def generate(name: str) -> "Token":
"""
Factory to generate a random token.
"""
creation_date = datetime.now()
token = secrets.token_urlsafe(32)
return Token(
token=token,
device_name=name,
created_at=creation_date,
)

View File

View File

@ -0,0 +1,8 @@
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
repository = JsonTokensRepository()

View File

@ -0,0 +1,101 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import List, Optional
from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.models.tokens.recovery_key import RecoveryKey
from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey
class AbstractTokensRepository(ABC):
@abstractmethod
def get_token_by_token_string(self, token_string: str) -> Optional[Token]:
"""Get the token by token"""
...
@abstractmethod
def get_token_by_name(self, token_name: str) -> Optional[Token]:
"""Get the token by name"""
...
@abstractmethod
def get_tokens(self) -> list[Token]:
"""Get the tokens"""
...
@abstractmethod
def create_token(self, name: str) -> Token:
"""Create new token"""
...
@abstractmethod
def delete_token(self, token: Token) -> None:
"""Delete the token"""
...
@abstractmethod
def refresh_token(self, token: Token) -> Token:
"""Refresh the token"""
...
def is_token_valid(self, token_string: str) -> bool:
"""Check if the token is valid"""
token = self.get_token_by_token_string(token_string)
if token is None:
return False
return True
def is_token_name_exists(self, token_name: str) -> bool:
"""Check if the token name exists"""
token = self.get_token_by_name(token_name)
if token is None:
return False
return True
def is_token_name_pair_valid(self, token_name: str, token_string: str) -> bool:
"""Check if the token name and token are valid"""
token = self.get_token_by_name(token_name)
if token is None:
return False
return token.token == token_string
@abstractmethod
def get_recovery_key(self) -> Optional[RecoveryKey]:
"""Get the recovery key"""
...
@abstractmethod
def create_recovery_key(
self,
expiration: Optional[datetime],
uses_left: Optional[int],
) -> RecoveryKey:
"""Create the recovery key"""
...
@abstractmethod
def use_mnemonic_recovery_key(self, mnemonic_phrase: str, name: str) -> Token:
"""Use the mnemonic recovery key and create a new token with the given name"""
...
def is_recovery_key_valid(self) -> bool:
"""Check if the recovery key is valid"""
recovery_key = self.get_recovery_key()
if recovery_key is None:
return False
return recovery_key.is_valid()
@abstractmethod
def get_new_device_key(self) -> NewDeviceKey:
"""Creates and returns the new device key"""
...
@abstractmethod
def delete_new_device_key(self) -> None:
"""Delete the new device key"""
...
@abstractmethod
def use_mnemonic_new_device_key(self, mnemonic_phrase: str, name: str) -> None:
"""Use the mnemonic new device key"""
...

View File

@ -0,0 +1,7 @@
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
class JsonTokensRepository(AbstractTokensRepository):
pass

View File

@ -0,0 +1,15 @@
"""
Token repository using Redis as backend.
"""
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
class RedisTokensRepository(AbstractTokensRepository):
"""
Token repository using Redis as a backend
"""
def __init__(self) -> None:
raise NotImplementedError