293 lines
8.0 KiB
Python
293 lines
8.0 KiB
Python
# pylint: disable=redefined-outer-name
|
|
# pylint: disable=unused-argument
|
|
# pylint: disable=missing-function-docstring
|
|
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
from pydantic import BaseModel
|
|
import pytest
|
|
from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey
|
|
from selfprivacy_api.models.tokens.recovery_key import RecoveryKey
|
|
|
|
from tests.common import read_json
|
|
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
|
|
JsonTokensRepository,
|
|
)
|
|
from selfprivacy_api.models.tokens.token import Token
|
|
from selfprivacy_api.repositories.tokens.exceptions import (
|
|
TokenNotFoundError,
|
|
RecoveryKeyNotFoundError,
|
|
MnemonicError,
|
|
RecoveryKeyIsNotValidError,
|
|
RecoveryTokenError,
|
|
)
|
|
|
|
|
|
class TokenMock(BaseModel):
|
|
token: str
|
|
device_name: str
|
|
created_at: datetime
|
|
|
|
@staticmethod
|
|
def generate(device_name: str) -> "Token":
|
|
return Token(
|
|
token="iamtoken",
|
|
device_name="imnew",
|
|
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
|
)
|
|
|
|
|
|
class NewDeviceKeyMock(BaseModel):
|
|
key: str
|
|
created_at: datetime
|
|
expires_at: datetime
|
|
|
|
@staticmethod
|
|
def generate() -> "NewDeviceKey":
|
|
return NewDeviceKey(
|
|
key="imkey",
|
|
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
|
expires_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
|
)
|
|
|
|
|
|
class RecoveryKeyMock(BaseModel):
|
|
key: str
|
|
created_at: datetime
|
|
expires_at: Optional[datetime]
|
|
uses_left: Optional[int]
|
|
|
|
@staticmethod
|
|
def generate(
|
|
expiration: Optional[datetime],
|
|
uses_left: Optional[int],
|
|
) -> "RecoveryKey":
|
|
return RecoveryKey(
|
|
key="imnewrecoverykey",
|
|
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
|
expires_at=None,
|
|
uses_left=1,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_new_device_key_generate(mocker):
|
|
mock = mocker.patch(
|
|
"selfprivacy_api.repositories.tokens.json_tokens_repository.NewDeviceKey",
|
|
autospec=True,
|
|
return_value=NewDeviceKeyMock,
|
|
)
|
|
return mock
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_token_generate(mocker):
|
|
mock = mocker.patch(
|
|
"selfprivacy_api.repositories.tokens.json_tokens_repository.Token",
|
|
autospec=True,
|
|
return_value=TokenMock,
|
|
)
|
|
return mock
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_recovery_key_generate(mocker):
|
|
mock = mocker.patch(
|
|
"selfprivacy_api.repositories.tokens.json_tokens_repository.RecoveryKey",
|
|
autospec=True,
|
|
return_value=RecoveryKeyMock,
|
|
)
|
|
return mock
|
|
|
|
|
|
@pytest.fixture
|
|
def tokens(mocker, datadir):
|
|
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "tokens.json")
|
|
assert read_json(datadir / "tokens.json")["tokens"] == [
|
|
{
|
|
"token": "iamtoken",
|
|
"name": "primary_token",
|
|
"date": "2022-07-15 17:41:31.675698",
|
|
}
|
|
]
|
|
return datadir
|
|
|
|
|
|
def test_get_token_by_token_string(tokens):
|
|
repo = JsonTokensRepository()
|
|
|
|
assert repo.get_token_by_token_string(token_string="iamtoken") is not None
|
|
assert repo.get_token_by_token_string(token_string="iamtoken") == Token(
|
|
token="iamtoken",
|
|
device_name="primary_token",
|
|
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
|
)
|
|
|
|
|
|
def test_get_token_by_non_existent_token_string(tokens):
|
|
repo = JsonTokensRepository()
|
|
|
|
with pytest.raises(TokenNotFoundError):
|
|
assert repo.get_token_by_token_string(token_string="iamBadtoken") is None
|
|
|
|
|
|
def test_get_token_by_name(tokens):
|
|
repo = JsonTokensRepository()
|
|
|
|
assert repo.get_token_by_name(token_name="primary_token") is not None
|
|
assert repo.get_token_by_name(token_name="primary_token") == Token(
|
|
token="iamtoken",
|
|
device_name="primary_token",
|
|
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
|
)
|
|
|
|
|
|
def test_get_token_by_non_existent_name(tokens):
|
|
repo = JsonTokensRepository()
|
|
|
|
with pytest.raises(TokenNotFoundError):
|
|
assert repo.get_token_by_name(token_name="badname") is None
|
|
|
|
|
|
def test_get_tokens(tokens):
|
|
repo = JsonTokensRepository()
|
|
|
|
assert repo.get_tokens() is not None
|
|
assert repo.get_tokens() == [
|
|
Token(
|
|
token="iamtoken",
|
|
device_name="primary_token",
|
|
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
|
)
|
|
]
|
|
|
|
|
|
def test_create_token(tokens, mock_token_generate):
|
|
repo = JsonTokensRepository()
|
|
assert repo.create_token(device_name="imnew") is not None
|
|
assert repo.create_token(device_name="imnew") == Token(
|
|
token="iamtoken",
|
|
device_name="imnew",
|
|
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
|
)
|
|
|
|
|
|
def test_delete_token(tokens, datadir):
|
|
repo = JsonTokensRepository()
|
|
input_token = Token(
|
|
token="iamtoken",
|
|
device_name="primary_token",
|
|
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
|
)
|
|
|
|
assert repo.delete_token(input_token) is None
|
|
assert read_json(datadir / "tokens.json")["tokens"] == []
|
|
|
|
|
|
def test_delete_not_found_token(tokens, datadir):
|
|
repo = JsonTokensRepository()
|
|
input_token = Token(
|
|
token="imbadtoken",
|
|
device_name="primary_token",
|
|
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
|
)
|
|
with pytest.raises(TokenNotFoundError):
|
|
assert repo.delete_token(input_token) is None
|
|
|
|
|
|
def test_refresh_token(tokens, mock_token_generate):
|
|
repo = JsonTokensRepository()
|
|
input_token = Token(
|
|
token="imtoken",
|
|
device_name="primary_token",
|
|
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
|
)
|
|
|
|
assert repo.refresh_token(input_token) is not None
|
|
assert repo.refresh_token(input_token) == Token(
|
|
token="iamtoken",
|
|
device_name="imnew",
|
|
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
|
)
|
|
|
|
|
|
def test_refresh_not_found_token(tokens, mock_token_generate):
|
|
repo = JsonTokensRepository()
|
|
input_token = Token(
|
|
token="idontknowwhoiam",
|
|
device_name="tellmewhoiam?",
|
|
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
|
)
|
|
|
|
with pytest.raises(TokenNotFoundError):
|
|
assert repo.refresh_token(input_token) is None
|
|
|
|
|
|
def test_get_recovery_key(tokens):
|
|
repo = JsonTokensRepository()
|
|
|
|
assert repo.get_recovery_key() is not None
|
|
assert repo.get_recovery_key() == RecoveryKey(
|
|
key="iamtoken",
|
|
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
|
expires_at=None,
|
|
uses_left=None,
|
|
)
|
|
|
|
|
|
def test_create_recovery_key(tokens, mock_recovery_key_generate, datadir):
|
|
repo = JsonTokensRepository()
|
|
|
|
assert repo.create_recovery_key(uses_left=1, expiration=None) is not None
|
|
assert read_json(datadir / "tokens.json")["recovery_token"] == RecoveryKey(
|
|
key="imnewrecoverykey",
|
|
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
|
expires_at=None,
|
|
uses_left=1,
|
|
)
|
|
|
|
|
|
def test_get_new_device_key(tokens, mock_new_device_key_generate, datadir):
|
|
repo = JsonTokensRepository()
|
|
|
|
assert repo.get_new_device_key() is not None
|
|
# assert read_json(datadir / "tokens.json")["new_device"] == RecoveryKey(
|
|
# key="imrecoverykey",
|
|
# created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
|
# expires_at=None,
|
|
# uses_left=1,
|
|
# )
|
|
|
|
|
|
# use_mnemonic_recovery_key
|
|
# use_mnemonic_new_device_key
|
|
|
|
|
|
def test_delete_new_device_key(tokens, datadir):
|
|
repo = JsonTokensRepository()
|
|
|
|
assert repo.delete_new_device_key() is None
|
|
assert "new_device" not in read_json(datadir / "tokens.json")
|
|
|
|
|
|
####################################################
|
|
|
|
|
|
def test_use_mnemonic_new_device_key(
|
|
tokens, mock_new_device_key_generate, datadir, mock_token_generate
|
|
):
|
|
repo = JsonTokensRepository()
|
|
|
|
assert repo.use_mnemonic_new_device_key(
|
|
device_name="imnew", mnemonic_phrase="oh-no"
|
|
) == Token(
|
|
token="iamtoken",
|
|
device_name="imnew",
|
|
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
|
)
|
|
assert read_json(datadir / "tokens.json")["new_device"] == []
|
|
|
|
|
|
def use_mnemonic_recovery_key():
|
|
...
|