refactor: implemented a json repository

pull/18/head
def 2022-10-27 18:16:22 +04:00
parent e32b95679b
commit bf3d921e2d
5 changed files with 163 additions and 22 deletions

View File

@ -3,7 +3,6 @@ New device key used to obtain access token.
""" """
from datetime import datetime, timedelta from datetime import datetime, timedelta
import secrets import secrets
from typing import Optional
from pydantic import BaseModel from pydantic import BaseModel
from mnemonic import Mnemonic from mnemonic import Mnemonic
@ -17,13 +16,13 @@ class NewDeviceKey(BaseModel):
key: str key: str
created_at: datetime created_at: datetime
expires_at: Optional[datetime] expires_at: datetime
def is_valid(self) -> bool: def is_valid(self) -> bool:
""" """
Check if the recovery key is valid. Check if the recovery key is valid.
""" """
if self.expires_at is not None and self.expires_at < datetime.now(): if self.expires_at < datetime.now():
return False return False
return True return True

View File

@ -5,7 +5,6 @@ Access token has a token string, device name and date of creation.
""" """
from datetime import datetime from datetime import datetime
import secrets import secrets
from typing import Optional
from pydantic import BaseModel from pydantic import BaseModel
@ -21,7 +20,7 @@ class Token(BaseModel):
created_at: datetime created_at: datetime
@staticmethod @staticmethod
def generate(name: str) -> "Token": def generate(device_name: str) -> "Token":
""" """
Factory to generate a random token. Factory to generate a random token.
""" """
@ -29,6 +28,6 @@ class Token(BaseModel):
token = secrets.token_urlsafe(32) token = secrets.token_urlsafe(32)
return Token( return Token(
token=token, token=token,
device_name=name, device_name=device_name,
created_at=creation_date, created_at=creation_date,
) )

View File

@ -1,6 +1,6 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from datetime import datetime from datetime import datetime
from typing import List, Optional from typing import Optional
from selfprivacy_api.models.tokens.token import Token from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.models.tokens.recovery_key import RecoveryKey from selfprivacy_api.models.tokens.recovery_key import RecoveryKey
@ -11,32 +11,26 @@ class AbstractTokensRepository(ABC):
@abstractmethod @abstractmethod
def get_token_by_token_string(self, token_string: str) -> Optional[Token]: def get_token_by_token_string(self, token_string: str) -> Optional[Token]:
"""Get the token by token""" """Get the token by token"""
...
@abstractmethod @abstractmethod
def get_token_by_name(self, token_name: str) -> Optional[Token]: def get_token_by_name(self, token_name: str) -> Optional[Token]:
"""Get the token by name""" """Get the token by name"""
...
@abstractmethod @abstractmethod
def get_tokens(self) -> list[Token]: def get_tokens(self) -> list[Token]:
"""Get the tokens""" """Get the tokens"""
...
@abstractmethod @abstractmethod
def create_token(self, name: str) -> Token: def create_token(self, name: str) -> Token:
"""Create new token""" """Create new token"""
...
@abstractmethod @abstractmethod
def delete_token(self, token: Token) -> None: def delete_token(self, input_token: Token) -> None:
"""Delete the token""" """Delete the token"""
...
@abstractmethod @abstractmethod
def refresh_token(self, token: Token) -> Token: def refresh_token(self, input_token: Token) -> Token:
"""Refresh the token""" """Refresh the token"""
...
def is_token_valid(self, token_string: str) -> bool: def is_token_valid(self, token_string: str) -> bool:
"""Check if the token is valid""" """Check if the token is valid"""
@ -62,7 +56,6 @@ class AbstractTokensRepository(ABC):
@abstractmethod @abstractmethod
def get_recovery_key(self) -> Optional[RecoveryKey]: def get_recovery_key(self) -> Optional[RecoveryKey]:
"""Get the recovery key""" """Get the recovery key"""
...
@abstractmethod @abstractmethod
def create_recovery_key( def create_recovery_key(
@ -71,12 +64,10 @@ class AbstractTokensRepository(ABC):
uses_left: Optional[int], uses_left: Optional[int],
) -> RecoveryKey: ) -> RecoveryKey:
"""Create the recovery key""" """Create the recovery key"""
...
@abstractmethod @abstractmethod
def use_mnemonic_recovery_key(self, mnemonic_phrase: str, name: str) -> Token: def use_mnemonic_recovery_key(self, mnemonic_phrase: str, name: str) -> Token:
"""Use the mnemonic recovery key and create a new token with the given name""" """Use the mnemonic recovery key and create a new token with the given name"""
...
def is_recovery_key_valid(self) -> bool: def is_recovery_key_valid(self) -> bool:
"""Check if the recovery key is valid""" """Check if the recovery key is valid"""
@ -88,14 +79,11 @@ class AbstractTokensRepository(ABC):
@abstractmethod @abstractmethod
def get_new_device_key(self) -> NewDeviceKey: def get_new_device_key(self) -> NewDeviceKey:
"""Creates and returns the new device key""" """Creates and returns the new device key"""
...
@abstractmethod @abstractmethod
def delete_new_device_key(self) -> None: def delete_new_device_key(self) -> None:
"""Delete the new device key""" """Delete the new device key"""
...
@abstractmethod @abstractmethod
def use_mnemonic_new_device_key(self, mnemonic_phrase: str, name: str) -> None: def use_mnemonic_new_device_key(self, mnemonic_phrase: str, name: str) -> None:
"""Use the mnemonic new device key""" """Use the mnemonic new device key"""
...

View File

@ -0,0 +1,2 @@
class TokenNotFoundError(Exception):
"""Token not found!"""

View File

@ -1,7 +1,160 @@
"""
temporary legacy
"""
from secrets import token_bytes
from typing import Optional
from datetime import datetime
from selfprivacy_api.utils import UserDataFiles, WriteUserData, ReadUserData
from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.models.tokens.recovery_key import RecoveryKey
from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey
from selfprivacy_api.repositories.tokens.exceptions import TokenNotFoundError
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import ( from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository, AbstractTokensRepository,
) )
class JsonTokensRepository(AbstractTokensRepository): class JsonTokensRepository(AbstractTokensRepository):
pass def get_token_by_token_string(self, token_string: str) -> Optional[Token]:
"""Get the token by token"""
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
for userdata_token in tokens_file["tokens"]:
if userdata_token["token"] == token_string:
return Token(
token=token_string,
device_name=userdata_token["name"],
created_at=userdata_token["date"],
)
raise TokenNotFoundError("Token not found!")
def get_token_by_name(self, token_name: str) -> Optional[Token]:
"""Get the token by name"""
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
for userdata_token in tokens_file["tokens"]:
if userdata_token["name"] == token_name:
return Token(
token=userdata_token["token"],
device_name=token_name,
created_at=userdata_token["date"],
)
raise TokenNotFoundError("Token not found!")
def get_tokens(self) -> list[Token]:
"""Get the tokens"""
tokens_list = []
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
for userdata_token in tokens_file["tokens"]:
tokens_list.append(
Token(
token=userdata_token.token,
device_name=userdata_token.name,
created_at=userdata_token.date,
)
)
return tokens_list
def create_token(self, name: str) -> Token:
"""Create new token"""
new_token = Token.generate(device_name=name)
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
tokens_file["tokens"].append(
{
"token": new_token.token,
"name": new_token.device_name,
"date": new_token.created_at,
}
)
return new_token
def delete_token(self, input_token: Token) -> None:
"""Delete the token"""
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
for userdata_token in tokens_file["tokens"]:
if userdata_token["token"] == input_token:
tokens_file["tokens"].remove(
userdata_token
) # Allah, i pray it works
def refresh_token(self, input_token: Token) -> Token:
"""Change the token field of the existing token"""
new_token = Token.generate(device_name=input_token.device_name)
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
for userdata_token in tokens_file["tokens"]:
if userdata_token["token"] == input_token.token:
userdata_token["token"] = new_token.token
userdata_token["data"] = new_token.created_at
return new_token
raise TokenNotFoundError("Token not found!")
def get_recovery_key(self) -> Optional[RecoveryKey]:
"""Get the recovery key"""
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
if tokens_file["recovery_token"] is None:
return
recovery_key = RecoveryKey(
key=tokens_file["recovery_token"]["token"],
created_at=tokens_file["recovery_token"]["date"],
expires_at=tokens_file["recovery_token"]["expitation"],
uses_left=tokens_file["recovery_token"]["uses_left"],
)
return recovery_key
def create_recovery_key(
self,
expiration: Optional[datetime],
uses_left: Optional[int],
) -> RecoveryKey:
"""Create the recovery key"""
recovery_key = RecoveryKey.generate(expiration=None, uses_left=None)
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
tokens_file["recovery_key"] = {
"token": recovery_key.key,
"date": recovery_key.created_at,
"expiration": recovery_key.expires_at,
"uses_left": recovery_key.uses_left,
}
return recovery_key
def use_mnemonic_recovery_key(self, mnemonic_phrase: str, name: str) -> Token:
"""Use the mnemonic recovery key and create a new token with the given name"""
...
def get_new_device_key(self) -> NewDeviceKey:
"""Creates and returns the new device key"""
new_device_key = NewDeviceKey.generate()
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
tokens_file["new_device"] = {
"token": new_device_key.key,
"data": new_device_key.created_at,
"expiration": new_device_key.expires_at,
}
return new_device_key
def delete_new_device_key(self) -> None:
"""Delete the new device key"""
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
tokens_file.pop("new_device")
def use_mnemonic_new_device_key(self, mnemonic_phrase: str, name: str) -> None:
"""Use the mnemonic new device key"""
...