refactor(repository): Tokens repository JSON backend #18
|
@ -8,7 +8,9 @@ at api.skippedMigrations in userdata.json and populating it
|
|||
with IDs of the migrations to skip.
|
||||
Adding DISABLE_ALL to that array disables the migrations module entirely.
|
||||
"""
|
||||
from selfprivacy_api.migrations.check_for_failed_binds_migration import CheckForFailedBindsMigration
|
||||
from selfprivacy_api.migrations.check_for_failed_binds_migration import (
|
||||
CheckForFailedBindsMigration,
|
||||
)
|
||||
from selfprivacy_api.utils import ReadUserData
|
||||
from selfprivacy_api.migrations.fix_nixos_config_branch import FixNixosConfigBranch
|
||||
from selfprivacy_api.migrations.create_tokens_json import CreateTokensJson
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
"""
|
||||
New device key used to obtain access token.
|
||||
"""
|
||||
from datetime import datetime, timedelta
|
||||
import secrets
|
||||
from pydantic import BaseModel
|
||||
from mnemonic import Mnemonic
|
||||
|
||||
|
||||
class NewDeviceKey(BaseModel):
|
||||
"""
|
||||
Recovery key used to obtain access token.
|
||||
|
||||
Recovery key has a key string, date of creation, date of expiration.
|
||||
"""
|
||||
|
||||
key: str
|
||||
created_at: datetime
|
||||
expires_at: datetime
|
||||
|
||||
def is_valid(self) -> bool:
|
||||
"""
|
||||
Check if the recovery key is valid.
|
||||
"""
|
||||
if self.expires_at < datetime.now():
|
||||
return False
|
||||
return True
|
||||
|
||||
def as_mnemonic(self) -> str:
|
||||
"""
|
||||
Get the recovery key as a mnemonic.
|
||||
"""
|
||||
return Mnemonic(language="english").to_mnemonic(bytes.fromhex(self.key))
|
||||
|
||||
@staticmethod
|
||||
def generate() -> "NewDeviceKey":
|
||||
"""
|
||||
Factory to generate a random token.
|
||||
"""
|
||||
creation_date = datetime.now()
|
||||
key = secrets.token_bytes(16).hex()
|
||||
return NewDeviceKey(
|
||||
key=key,
|
||||
created_at=creation_date,
|
||||
expires_at=datetime.now() + timedelta(minutes=10),
|
||||
)
|
|
@ -0,0 +1,56 @@
|
|||
"""
|
||||
Recovery key used to obtain access token.
|
||||
|
||||
Recovery key has a token string, date of creation, optional date of expiration and optional count of uses left.
|
||||
"""
|
||||
from datetime import datetime
|
||||
import secrets
|
||||
from typing import Optional
|
||||
from pydantic import BaseModel
|
||||
from mnemonic import Mnemonic
|
||||
|
||||
|
||||
class RecoveryKey(BaseModel):
|
||||
"""
|
||||
Recovery key used to obtain access token.
|
||||
|
||||
Recovery key has a key string, date of creation, optional date of expiration and optional count of uses left.
|
||||
"""
|
||||
|
||||
key: str
|
||||
created_at: datetime
|
||||
expires_at: Optional[datetime]
|
||||
uses_left: Optional[int]
|
||||
|
||||
def is_valid(self) -> bool:
|
||||
"""
|
||||
Check if the recovery key is valid.
|
||||
"""
|
||||
if self.expires_at is not None and self.expires_at < datetime.now():
|
||||
return False
|
||||
if self.uses_left is not None and self.uses_left <= 0:
|
||||
return False
|
||||
return True
|
||||
|
||||
def as_mnemonic(self) -> str:
|
||||
"""
|
||||
Get the recovery key as a mnemonic.
|
||||
"""
|
||||
return Mnemonic(language="english").to_mnemonic(bytes.fromhex(self.key))
|
||||
|
||||
@staticmethod
|
||||
def generate(
|
||||
expiration: Optional[datetime],
|
||||
uses_left: Optional[int],
|
||||
) -> "RecoveryKey":
|
||||
"""
|
||||
Factory to generate a random token.
|
||||
"""
|
||||
creation_date = datetime.now()
|
||||
key = secrets.token_bytes(24).hex()
|
||||
return RecoveryKey(
|
||||
key=key,
|
||||
created_at=creation_date,
|
||||
expires_at=expiration,
|
||||
uses_left=uses_left,
|
||||
)
|
|
@ -0,0 +1,33 @@
|
|||
"""
|
||||
Model of the access token.
|
||||
|
||||
Access token has a token string, device name and date of creation.
|
||||
"""
|
||||
from datetime import datetime
|
||||
import secrets
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class Token(BaseModel):
|
||||
"""
|
||||
Model of the access token.
|
||||
|
||||
Access token has a token string, device name and date of creation.
|
||||
"""
|
||||
|
||||
token: str
|
||||
device_name: str
|
||||
created_at: datetime
|
||||
|
||||
@staticmethod
|
||||
def generate(device_name: str) -> "Token":
|
||||
"""
|
||||
Factory to generate a random token.
|
||||
"""
|
||||
creation_date = datetime.now()
|
||||
token = secrets.token_urlsafe(32)
|
||||
return Token(
|
||||
token=token,
|
||||
device_name=device_name,
|
||||
created_at=creation_date,
|
||||
)
|
|
@ -0,0 +1,8 @@
|
|||
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
|
||||
AbstractTokensRepository,
|
||||
)
|
||||
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
|
||||
JsonTokensRepository,
|
||||
)
|
||||
|
||||
repository = JsonTokensRepository()
|
|
@ -0,0 +1,93 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
from selfprivacy_api.models.tokens.token import Token
|
||||
from selfprivacy_api.models.tokens.recovery_key import RecoveryKey
|
||||
from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey
|
||||
|
||||
|
||||
class AbstractTokensRepository(ABC):
|
||||
@abstractmethod
|
||||
def get_token_by_token_string(self, token_string: str) -> Optional[Token]:
|
||||
"""Get the token by token"""
|
||||
|
||||
@abstractmethod
|
||||
def get_token_by_name(self, token_name: str) -> Optional[Token]:
|
||||
"""Get the token by name"""
|
||||
|
||||
@abstractmethod
|
||||
def get_tokens(self) -> list[Token]:
|
||||
"""Get the tokens"""
|
||||
|
||||
@abstractmethod
|
||||
def create_token(self, device_name: str) -> Token:
|
||||
"""Create new token"""
|
||||
|
||||
@abstractmethod
|
||||
def delete_token(self, input_token: Token) -> None:
|
||||
"""Delete the token"""
|
||||
|
||||
@abstractmethod
|
||||
def refresh_token(self, input_token: Token) -> Token:
|
||||
"""Refresh the token"""
|
||||
|
||||
def is_token_valid(self, token_string: str) -> bool:
|
||||
"""Check if the token is valid"""
|
||||
token = self.get_token_by_token_string(token_string)
|
||||
if token is None:
|
||||
return False
|
||||
return True
|
||||
|
||||
def is_token_name_exists(self, token_name: str) -> bool:
|
||||
"""Check if the token name exists"""
|
||||
token = self.get_token_by_name(token_name)
|
||||
if token is None:
|
||||
return False
|
||||
return True
|
||||
|
||||
def is_token_name_pair_valid(self, token_name: str, token_string: str) -> bool:
|
||||
"""Check if the token name and token are valid"""
|
||||
token = self.get_token_by_name(token_name)
|
||||
if token is None:
|
||||
return False
|
||||
return token.token == token_string
|
||||
|
||||
@abstractmethod
|
||||
def get_recovery_key(self) -> Optional[RecoveryKey]:
|
||||
"""Get the recovery key"""
|
||||
|
||||
@abstractmethod
|
||||
def create_recovery_key(
|
||||
self,
|
||||
expiration: Optional[datetime],
|
||||
uses_left: Optional[int],
|
||||
) -> RecoveryKey:
|
||||
"""Create the recovery key"""
|
||||
|
||||
@abstractmethod
|
||||
def use_mnemonic_recovery_key(
|
||||
self, mnemonic_phrase: str, device_name: str
|
||||
) -> Token:
|
||||
"""Use the mnemonic recovery key and create a new token with the given name"""
|
||||
|
||||
def is_recovery_key_valid(self) -> bool:
|
||||
"""Check if the recovery key is valid"""
|
||||
recovery_key = self.get_recovery_key()
|
||||
if recovery_key is None:
|
||||
return False
|
||||
return recovery_key.is_valid()
|
||||
|
||||
@abstractmethod
|
||||
def get_new_device_key(self) -> NewDeviceKey:
|
||||
"""Creates and returns the new device key"""
|
||||
|
||||
@abstractmethod
|
||||
def delete_new_device_key(self) -> None:
|
||||
"""Delete the new device key"""
|
||||
|
||||
@abstractmethod
|
||||
def use_mnemonic_new_device_key(
|
||||
self, mnemonic_phrase: str, device_name: str
|
||||
) -> Token:
|
||||
"""Use the mnemonic new device key"""
|
|
@ -0,0 +1,14 @@
|
|||
class TokenNotFound(Exception):
|
||||
"""Token not found!"""
|
||||
|
||||
|
||||
class RecoveryKeyNotFound(Exception):
|
||||
"""Recovery key not found!"""
|
||||
|
||||
|
||||
class InvalidMnemonic(Exception):
|
||||
"""Phrase is not mnemonic!"""
|
||||
|
||||
|
||||
class NewDeviceKeyNotFound(Exception):
|
||||
"""New device key not found!"""
|
|
@ -0,0 +1,238 @@
|
|||
"""
|
||||
temporary legacy
|
||||
"""
|
||||
from typing import Optional
|
||||
from datetime import datetime
|
||||
from mnemonic import Mnemonic
|
||||
|
||||
from selfprivacy_api.utils import UserDataFiles, WriteUserData, ReadUserData
|
||||
from selfprivacy_api.models.tokens.token import Token
|
||||
from selfprivacy_api.models.tokens.recovery_key import RecoveryKey
|
||||
from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey
|
||||
from selfprivacy_api.repositories.tokens.exceptions import (
|
||||
TokenNotFound,
|
||||
RecoveryKeyNotFound,
|
||||
InvalidMnemonic,
|
||||
NewDeviceKeyNotFound,
|
||||
)
|
||||
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
|
||||
AbstractTokensRepository,
|
||||
)
|
||||
|
||||
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
|
||||
|
||||
|
||||
class JsonTokensRepository(AbstractTokensRepository):
|
||||
def get_token_by_token_string(self, token_string: str) -> Optional[Token]:
|
||||
"""Get the token by token"""
|
||||
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
|
||||
for userdata_token in tokens_file["tokens"]:
|
||||
if userdata_token["token"] == token_string:
|
||||
|
||||
return Token(
|
||||
token=token_string,
|
||||
device_name=userdata_token["name"],
|
||||
created_at=userdata_token["date"],
|
||||
)
|
||||
|
||||
raise TokenNotFound("Token not found!")
|
||||
|
||||
def get_token_by_name(self, token_name: str) -> Optional[Token]:
|
||||
"""Get the token by name"""
|
||||
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
|
||||
for userdata_token in tokens_file["tokens"]:
|
||||
if userdata_token["name"] == token_name:
|
||||
|
||||
return Token(
|
||||
token=userdata_token["token"],
|
||||
device_name=token_name,
|
||||
created_at=userdata_token["date"],
|
||||
)
|
||||
|
||||
raise TokenNotFound("Token not found!")
|
||||
|
||||
def get_tokens(self) -> list[Token]:
|
||||
"""Get the tokens"""
|
||||
def marked this conversation as resolved
|
||||
tokens_list = []
|
||||
|
||||
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
|
||||
for userdata_token in tokens_file["tokens"]:
|
||||
tokens_list.append(
|
||||
Token(
|
||||
token=userdata_token["token"],
|
||||
device_name=userdata_token["name"],
|
||||
created_at=userdata_token["date"],
|
||||
)
|
||||
)
|
||||
|
||||
return tokens_list
|
||||
|
||||
def create_token(self, device_name: str) -> Token:
|
||||
"""Create new token"""
|
||||
new_token = Token.generate(device_name)
|
||||
|
||||
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
|
||||
tokens_file["tokens"].append(
|
||||
{
|
||||
"token": new_token.token,
|
||||
"name": new_token.device_name,
|
||||
"date": new_token.created_at.strftime(DATETIME_FORMAT),
|
||||
}
|
||||
)
|
||||
return new_token
|
||||
|
||||
def delete_token(self, input_token: Token) -> None:
|
||||
def marked this conversation as resolved
inex
commented
Review
At SelfPrivacy, we do not engage in politics and religion. Pray to your unit tests that will check this code. At SelfPrivacy, we do not engage in politics and religion. Pray to your unit tests that will check this code.
|
||||
"""Delete the token"""
|
||||
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
|
||||
for userdata_token in tokens_file["tokens"]:
|
||||
if userdata_token["token"] == input_token.token:
|
||||
tokens_file["tokens"].remove(userdata_token)
|
||||
return
|
||||
|
||||
raise TokenNotFound("Token not found!")
|
||||
|
||||
def refresh_token(self, input_token: Token) -> Token:
|
||||
"""Change the token field of the existing token"""
|
||||
new_token = Token.generate(device_name=input_token.device_name)
|
||||
|
||||
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
|
||||
for userdata_token in tokens_file["tokens"]:
|
||||
|
||||
if userdata_token["name"] == input_token.device_name:
|
||||
userdata_token["token"] = new_token.token
|
||||
userdata_token["date"] = (
|
||||
new_token.created_at.strftime(DATETIME_FORMAT),
|
||||
)
|
||||
|
||||
return new_token
|
||||
|
||||
raise TokenNotFound("Token not found!")
|
||||
|
||||
def get_recovery_key(self) -> Optional[RecoveryKey]:
|
||||
"""Get the recovery key"""
|
||||
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
|
||||
|
||||
if (
|
||||
"recovery_token" not in tokens_file
|
||||
or tokens_file["recovery_token"] is None
|
||||
):
|
||||
return
|
||||
|
||||
recovery_key = RecoveryKey(
|
||||
key=tokens_file["recovery_token"].get("token"),
|
||||
created_at=tokens_file["recovery_token"].get("date"),
|
||||
expires_at=tokens_file["recovery_token"].get("expitation"),
|
||||
uses_left=tokens_file["recovery_token"].get("uses_left"),
|
||||
)
|
||||
|
||||
return recovery_key
|
||||
|
||||
def create_recovery_key(
|
||||
self,
|
||||
expiration: Optional[datetime],
|
||||
uses_left: Optional[int],
|
||||
) -> RecoveryKey:
|
||||
"""Create the recovery key"""
|
||||
|
||||
recovery_key = RecoveryKey.generate(expiration, uses_left)
|
||||
|
||||
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
|
||||
tokens_file["recovery_token"] = {
|
||||
"token": recovery_key.key,
|
||||
"date": recovery_key.created_at.strftime(DATETIME_FORMAT),
|
||||
"expiration": recovery_key.expires_at,
|
||||
"uses_left": recovery_key.uses_left,
|
||||
}
|
||||
|
||||
return recovery_key
|
||||
|
||||
def use_mnemonic_recovery_key(
|
||||
self, mnemonic_phrase: str, device_name: str
|
||||
) -> Token:
|
||||
"""Use the mnemonic recovery key and create a new token with the given name"""
|
||||
recovery_key = self.get_recovery_key()
|
||||
|
||||
if recovery_key is None:
|
||||
raise RecoveryKeyNotFound("Recovery key not found")
|
||||
|
||||
if not recovery_key.is_valid():
|
||||
raise RecoveryKeyNotFound("Recovery key not found")
|
||||
|
||||
recovery_token = bytes.fromhex(recovery_key.key)
|
||||
|
||||
if not Mnemonic(language="english").check(mnemonic_phrase):
|
||||
raise InvalidMnemonic("Phrase is not mnemonic!")
|
||||
|
||||
phrase_bytes = Mnemonic(language="english").to_entropy(mnemonic_phrase)
|
||||
if phrase_bytes != recovery_token:
|
||||
raise RecoveryKeyNotFound("Recovery key not found")
|
||||
|
||||
new_token = Token.generate(device_name=device_name)
|
||||
|
||||
with WriteUserData(UserDataFiles.TOKENS) as tokens:
|
||||
tokens["tokens"].append(
|
||||
{
|
||||
"token": new_token.token,
|
||||
"name": new_token.device_name,
|
||||
"date": new_token.created_at.strftime(DATETIME_FORMAT),
|
||||
}
|
||||
)
|
||||
|
||||
if "recovery_token" in tokens:
|
||||
if (
|
||||
"uses_left" in tokens["recovery_token"]
|
||||
and tokens["recovery_token"]["uses_left"] is not None
|
||||
):
|
||||
tokens["recovery_token"]["uses_left"] -= 1
|
||||
return new_token
|
||||
|
||||
def get_new_device_key(self) -> NewDeviceKey:
|
||||
"""Creates and returns the new device key"""
|
||||
new_device_key = NewDeviceKey.generate()
|
||||
|
||||
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
|
||||
tokens_file["new_device"] = {
|
||||
"token": new_device_key.key,
|
||||
"date": new_device_key.created_at.strftime(DATETIME_FORMAT),
|
||||
"expiration": new_device_key.expires_at.strftime(DATETIME_FORMAT),
|
||||
}
|
||||
|
||||
return new_device_key
|
||||
|
||||
def delete_new_device_key(self) -> None:
|
||||
"""Delete the new device key"""
|
||||
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
|
||||
if "new_device" in tokens_file:
|
||||
del tokens_file["new_device"]
|
||||
return
|
||||
|
||||
def use_mnemonic_new_device_key(
|
||||
self, mnemonic_phrase: str, device_name: str
|
||||
) -> Token:
|
||||
"""Use the mnemonic new device key"""
|
||||
|
||||
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
|
||||
if "new_device" not in tokens_file or tokens_file["new_device"] is None:
|
||||
raise NewDeviceKeyNotFound("New device key not found")
|
||||
|
||||
new_device_key = NewDeviceKey(
|
||||
key=tokens_file["new_device"]["token"],
|
||||
created_at=tokens_file["new_device"]["date"],
|
||||
expires_at=tokens_file["new_device"]["expiration"],
|
||||
)
|
||||
|
||||
token = bytes.fromhex(new_device_key.key)
|
||||
|
||||
if not Mnemonic(language="english").check(mnemonic_phrase):
|
||||
raise InvalidMnemonic("Phrase is not mnemonic!")
|
||||
|
||||
phrase_bytes = Mnemonic(language="english").to_entropy(mnemonic_phrase)
|
||||
if bytes(phrase_bytes) != bytes(token):
|
||||
raise NewDeviceKeyNotFound("Phrase is not token!")
|
||||
|
||||
new_token = Token.generate(device_name=device_name)
|
||||
with WriteUserData(UserDataFiles.TOKENS) as tokens:
|
||||
if "new_device" in tokens:
|
||||
del tokens["new_device"]
|
||||
|
||||
return new_token
|
|
@ -0,0 +1,15 @@
|
|||
"""
|
||||
Token repository using Redis as backend.
|
||||
"""
|
||||
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
|
||||
AbstractTokensRepository,
|
||||
)
|
||||
|
||||
|
||||
class RedisTokensRepository(AbstractTokensRepository):
|
||||
"""
|
||||
Token repository using Redis as a backend
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
raise NotImplementedError
|
|
@ -0,0 +1,582 @@
|
|||
# pylint: disable=redefined-outer-name
|
||||
# pylint: disable=unused-argument
|
||||
# pylint: disable=missing-function-docstring
|
||||
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import pytest
|
||||
|
||||
from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey
|
||||
from selfprivacy_api.models.tokens.recovery_key import RecoveryKey
|
||||
from selfprivacy_api.models.tokens.token import Token
|
||||
from selfprivacy_api.repositories.tokens.exceptions import (
|
||||
InvalidMnemonic,
|
||||
RecoveryKeyNotFound,
|
||||
TokenNotFound,
|
||||
NewDeviceKeyNotFound,
|
||||
)
|
||||
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
|
||||
JsonTokensRepository,
|
||||
)
|
||||
from tests.common import read_json
|
||||
|
||||
|
||||
ORIGINAL_TOKEN_CONTENT = [
|
||||
{
|
||||
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
"name": "primary_token",
|
||||
"date": "2022-07-15 17:41:31.675698",
|
||||
},
|
||||
{
|
||||
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
|
||||
"name": "second_token",
|
||||
"date": "2022-07-15 17:41:31.675698Z",
|
||||
},
|
||||
{
|
||||
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
|
||||
"name": "third_token",
|
||||
"date": "2022-07-15T17:41:31.675698Z",
|
||||
},
|
||||
{
|
||||
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
|
||||
"name": "forth_token",
|
||||
"date": "2022-07-15T17:41:31.675698",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tokens(mocker, datadir):
|
||||
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "tokens.json")
|
||||
assert read_json(datadir / "tokens.json")["tokens"] == ORIGINAL_TOKEN_CONTENT
|
||||
return datadir
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def empty_keys(mocker, datadir):
|
||||
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "empty_keys.json")
|
||||
assert read_json(datadir / "empty_keys.json")["tokens"] == [
|
||||
{
|
||||
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
"name": "primary_token",
|
||||
"date": "2022-07-15 17:41:31.675698",
|
||||
}
|
||||
]
|
||||
return datadir
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def null_keys(mocker, datadir):
|
||||
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "null_keys.json")
|
||||
assert read_json(datadir / "null_keys.json")["recovery_token"] is None
|
||||
assert read_json(datadir / "null_keys.json")["new_device"] is None
|
||||
return datadir
|
||||
|
||||
|
||||
class RecoveryKeyMockReturnNotValid:
|
||||
def is_valid() -> bool:
|
||||
return False
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_new_device_key_generate(mocker):
|
||||
mock = mocker.patch(
|
||||
"selfprivacy_api.repositories.tokens.json_tokens_repository.NewDeviceKey.generate",
|
||||
autospec=True,
|
||||
return_value=NewDeviceKey(
|
||||
key="43478d05b35e4781598acd76e33832bb",
|
||||
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
||||
expires_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
||||
),
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_generate_token(mocker):
|
||||
mock = mocker.patch(
|
||||
"selfprivacy_api.repositories.tokens.json_tokens_repository.Token.generate",
|
||||
autospec=True,
|
||||
return_value=Token(
|
||||
token="ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4",
|
||||
device_name="newdevice",
|
||||
created_at=datetime(2022, 11, 14, 6, 6, 32, 777123),
|
||||
),
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_get_recovery_key_return_not_valid(mocker):
|
||||
mock = mocker.patch(
|
||||
"selfprivacy_api.repositories.tokens.json_tokens_repository.JsonTokensRepository.get_recovery_key",
|
||||
autospec=True,
|
||||
return_value=RecoveryKeyMockReturnNotValid,
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_token_generate(mocker):
|
||||
mock = mocker.patch(
|
||||
"selfprivacy_api.repositories.tokens.json_tokens_repository.Token.generate",
|
||||
autospec=True,
|
||||
return_value=Token(
|
||||
token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM",
|
||||
device_name="IamNewDevice",
|
||||
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
||||
),
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_recovery_key_generate(mocker):
|
||||
mock = mocker.patch(
|
||||
"selfprivacy_api.repositories.tokens.json_tokens_repository.RecoveryKey.generate",
|
||||
autospec=True,
|
||||
return_value=RecoveryKey(
|
||||
key="889bf49c1d3199d71a2e704718772bd53a422020334db051",
|
||||
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
||||
expires_at=None,
|
||||
uses_left=1,
|
||||
),
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
###############
|
||||
# Test tokens #
|
||||
###############
|
||||
|
||||
|
||||
def test_get_token_by_token_string(tokens):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
assert repo.get_token_by_token_string(
|
||||
token_string="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI"
|
||||
) == Token(
|
||||
token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
device_name="primary_token",
|
||||
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
||||
)
|
||||
|
||||
|
||||
def test_get_token_by_non_existent_token_string(tokens):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
with pytest.raises(TokenNotFound):
|
||||
assert repo.get_token_by_token_string(token_string="iamBadtoken") is None
|
||||
|
||||
|
||||
def test_get_token_by_name(tokens):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
assert repo.get_token_by_name(token_name="primary_token") is not None
|
||||
assert repo.get_token_by_name(token_name="primary_token") == Token(
|
||||
token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
device_name="primary_token",
|
||||
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
||||
)
|
||||
|
||||
|
||||
def test_get_token_by_non_existent_name(tokens):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
with pytest.raises(TokenNotFound):
|
||||
assert repo.get_token_by_name(token_name="badname") is None
|
||||
|
||||
|
||||
def test_get_tokens(tokens):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
assert repo.get_tokens() == [
|
||||
Token(
|
||||
token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
device_name="primary_token",
|
||||
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
||||
),
|
||||
Token(
|
||||
token="3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
|
||||
device_name="second_token",
|
||||
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698, tzinfo=timezone.utc),
|
||||
),
|
||||
Token(
|
||||
token="LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
|
||||
device_name="third_token",
|
||||
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698, tzinfo=timezone.utc),
|
||||
),
|
||||
Token(
|
||||
token="dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
|
||||
device_name="forth_token",
|
||||
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def test_get_tokens_when_one(empty_keys):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
assert repo.get_tokens() == [
|
||||
Token(
|
||||
token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
device_name="primary_token",
|
||||
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
def test_create_token(tokens, mock_token_generate):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
assert repo.create_token(device_name="IamNewDevice") == Token(
|
||||
token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM",
|
||||
device_name="IamNewDevice",
|
||||
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
||||
)
|
||||
|
||||
|
||||
def test_delete_token(tokens):
|
||||
repo = JsonTokensRepository()
|
||||
input_token = Token(
|
||||
token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
device_name="primary_token",
|
||||
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
||||
)
|
||||
|
||||
repo.delete_token(input_token)
|
||||
assert read_json(tokens / "tokens.json")["tokens"] == [
|
||||
{
|
||||
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
|
||||
"name": "second_token",
|
||||
"date": "2022-07-15 17:41:31.675698Z",
|
||||
},
|
||||
{
|
||||
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
|
||||
"name": "third_token",
|
||||
"date": "2022-07-15T17:41:31.675698Z",
|
||||
},
|
||||
{
|
||||
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
|
||||
"name": "forth_token",
|
||||
"date": "2022-07-15T17:41:31.675698",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def test_delete_not_found_token(tokens):
|
||||
repo = JsonTokensRepository()
|
||||
input_token = Token(
|
||||
token="imbadtoken",
|
||||
device_name="primary_token",
|
||||
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
||||
)
|
||||
with pytest.raises(TokenNotFound):
|
||||
assert repo.delete_token(input_token) is None
|
||||
|
||||
assert read_json(tokens / "tokens.json")["tokens"] == ORIGINAL_TOKEN_CONTENT
|
||||
|
||||
|
||||
def test_refresh_token(tokens, mock_token_generate):
|
||||
repo = JsonTokensRepository()
|
||||
input_token = Token(
|
||||
token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
device_name="primary_token",
|
||||
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
||||
)
|
||||
|
||||
assert repo.refresh_token(input_token) == Token(
|
||||
token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM",
|
||||
device_name="IamNewDevice",
|
||||
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
||||
)
|
||||
|
||||
|
||||
def test_refresh_not_found_token(tokens, mock_token_generate):
|
||||
repo = JsonTokensRepository()
|
||||
input_token = Token(
|
||||
token="idontknowwhoiam",
|
||||
device_name="tellmewhoiam?",
|
||||
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
|
||||
)
|
||||
|
||||
with pytest.raises(TokenNotFound):
|
||||
assert repo.refresh_token(input_token) is None
|
||||
|
||||
|
||||
################
|
||||
# Recovery key #
|
||||
################
|
||||
|
||||
|
||||
def test_get_recovery_key(tokens):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
assert repo.get_recovery_key() == RecoveryKey(
|
||||
key="ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54",
|
||||
created_at=datetime(2022, 11, 11, 11, 48, 54, 228038),
|
||||
expires_at=None,
|
||||
uses_left=2,
|
||||
)
|
||||
|
||||
|
||||
def test_get_recovery_key_when_empty(empty_keys):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
assert repo.get_recovery_key() is None
|
||||
|
||||
|
||||
def test_create_recovery_key(tokens, mock_recovery_key_generate):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
assert repo.create_recovery_key(uses_left=1, expiration=None) is not None
|
||||
assert read_json(tokens / "tokens.json")["recovery_token"] == {
|
||||
"token": "889bf49c1d3199d71a2e704718772bd53a422020334db051",
|
||||
"date": "2022-07-15T17:41:31.675698",
|
||||
"expiration": None,
|
||||
"uses_left": 1,
|
||||
}
|
||||
|
||||
|
||||
def test_use_mnemonic_recovery_key_when_empty(
|
||||
empty_keys, mock_recovery_key_generate, mock_token_generate
|
||||
):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
with pytest.raises(RecoveryKeyNotFound):
|
||||
assert (
|
||||
repo.use_mnemonic_recovery_key(
|
||||
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
|
||||
device_name="primary_token",
|
||||
)
|
||||
is None
|
||||
)
|
||||
|
||||
|
||||
def test_use_mnemonic_not_valid_recovery_key(
|
||||
tokens, mock_get_recovery_key_return_not_valid
|
||||
):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
with pytest.raises(RecoveryKeyNotFound):
|
||||
assert (
|
||||
repo.use_mnemonic_recovery_key(
|
||||
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
|
||||
device_name="primary_token",
|
||||
)
|
||||
is None
|
||||
)
|
||||
|
||||
|
||||
def test_use_mnemonic_not_mnemonic_recovery_key(tokens):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
with pytest.raises(InvalidMnemonic):
|
||||
assert (
|
||||
repo.use_mnemonic_recovery_key(
|
||||
mnemonic_phrase="sorry, it was joke",
|
||||
device_name="primary_token",
|
||||
)
|
||||
is None
|
||||
)
|
||||
|
||||
|
||||
def test_use_not_mnemonic_recovery_key(tokens):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
with pytest.raises(InvalidMnemonic):
|
||||
assert (
|
||||
repo.use_mnemonic_recovery_key(
|
||||
mnemonic_phrase="please come back",
|
||||
device_name="primary_token",
|
||||
)
|
||||
is None
|
||||
)
|
||||
|
||||
|
||||
def test_use_not_found_mnemonic_recovery_key(tokens):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
with pytest.raises(RecoveryKeyNotFound):
|
||||
assert (
|
||||
repo.use_mnemonic_recovery_key(
|
||||
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
|
||||
device_name="primary_token",
|
||||
)
|
||||
is None
|
||||
)
|
||||
|
||||
|
||||
def test_use_menemonic_recovery_key_when_empty(empty_keys):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
with pytest.raises(RecoveryKeyNotFound):
|
||||
assert (
|
||||
repo.use_mnemonic_recovery_key(
|
||||
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
|
||||
device_name="primary_token",
|
||||
)
|
||||
is None
|
||||
)
|
||||
|
||||
|
||||
def test_use_menemonic_recovery_key_when_null(null_keys):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
with pytest.raises(RecoveryKeyNotFound):
|
||||
assert (
|
||||
repo.use_mnemonic_recovery_key(
|
||||
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
|
||||
device_name="primary_token",
|
||||
)
|
||||
is None
|
||||
)
|
||||
|
||||
|
||||
def test_use_mnemonic_recovery_key(tokens, mock_generate_token):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
assert repo.use_mnemonic_recovery_key(
|
||||
mnemonic_phrase="uniform clarify napkin bid dress search input armor police cross salon because myself uphold slice bamboo hungry park",
|
||||
device_name="newdevice",
|
||||
) == Token(
|
||||
token="ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4",
|
||||
device_name="newdevice",
|
||||
created_at=datetime(2022, 11, 14, 6, 6, 32, 777123),
|
||||
)
|
||||
|
||||
assert read_json(tokens / "tokens.json")["tokens"] == [
|
||||
{
|
||||
"date": "2022-07-15 17:41:31.675698",
|
||||
"name": "primary_token",
|
||||
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
},
|
||||
{
|
||||
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
|
||||
"name": "second_token",
|
||||
"date": "2022-07-15 17:41:31.675698Z",
|
||||
},
|
||||
{
|
||||
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
|
||||
"name": "third_token",
|
||||
"date": "2022-07-15T17:41:31.675698Z",
|
||||
},
|
||||
{
|
||||
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
|
||||
"name": "forth_token",
|
||||
"date": "2022-07-15T17:41:31.675698",
|
||||
},
|
||||
{
|
||||
"date": "2022-11-14T06:06:32.777123",
|
||||
"name": "newdevice",
|
||||
"token": "ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4",
|
||||
},
|
||||
]
|
||||
|
||||
assert read_json(tokens / "tokens.json")["recovery_token"] == {
|
||||
"date": "2022-11-11T11:48:54.228038",
|
||||
"expiration": None,
|
||||
"token": "ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54",
|
||||
"uses_left": 1,
|
||||
}
|
||||
|
||||
|
||||
##################
|
||||
# New device key #
|
||||
##################
|
||||
|
||||
|
||||
def test_get_new_device_key(tokens, mock_new_device_key_generate):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
assert repo.get_new_device_key() is not None
|
||||
assert read_json(tokens / "tokens.json")["new_device"] == {
|
||||
"date": "2022-07-15T17:41:31.675698",
|
||||
"expiration": "2022-07-15T17:41:31.675698",
|
||||
"token": "43478d05b35e4781598acd76e33832bb",
|
||||
}
|
||||
|
||||
|
||||
def test_delete_new_device_key(tokens):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
assert repo.delete_new_device_key() is None
|
||||
assert "new_device" not in read_json(tokens / "tokens.json")
|
||||
|
||||
|
||||
def test_delete_new_device_key_when_empty(empty_keys):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
repo.delete_new_device_key()
|
||||
assert "new_device" not in read_json(empty_keys / "empty_keys.json")
|
||||
|
||||
|
||||
def test_use_invalid_mnemonic_new_device_key(
|
||||
tokens, mock_new_device_key_generate, datadir, mock_token_generate
|
||||
):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
with pytest.raises(InvalidMnemonic):
|
||||
assert (
|
||||
repo.use_mnemonic_new_device_key(
|
||||
device_name="imnew",
|
||||
mnemonic_phrase="oh-no",
|
||||
)
|
||||
is None
|
||||
)
|
||||
|
||||
|
||||
def test_use_not_exists_mnemonic_new_device_key(
|
||||
tokens, mock_new_device_key_generate, mock_token_generate
|
||||
):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
with pytest.raises(NewDeviceKeyNotFound):
|
||||
assert (
|
||||
repo.use_mnemonic_new_device_key(
|
||||
device_name="imnew",
|
||||
mnemonic_phrase="uniform clarify napkin bid dress search input armor police cross salon because myself uphold slice bamboo hungry park",
|
||||
)
|
||||
is None
|
||||
)
|
||||
|
||||
|
||||
def test_use_mnemonic_new_device_key(
|
||||
tokens, mock_new_device_key_generate, mock_token_generate
|
||||
):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
assert (
|
||||
repo.use_mnemonic_new_device_key(
|
||||
device_name="imnew",
|
||||
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
|
||||
)
|
||||
is not None
|
||||
)
|
||||
# assert read_json(datadir / "tokens.json")["new_device"] == []
|
||||
|
||||
|
||||
def test_use_mnemonic_new_device_key_when_empty(empty_keys):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
with pytest.raises(NewDeviceKeyNotFound):
|
||||
assert (
|
||||
repo.use_mnemonic_new_device_key(
|
||||
device_name="imnew",
|
||||
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
|
||||
)
|
||||
is None
|
||||
)
|
||||
|
||||
|
||||
def test_use_mnemonic_new_device_key_when_null(null_keys):
|
||||
repo = JsonTokensRepository()
|
||||
|
||||
with pytest.raises(NewDeviceKeyNotFound):
|
||||
assert (
|
||||
repo.use_mnemonic_new_device_key(
|
||||
device_name="imnew",
|
||||
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
|
||||
)
|
||||
is None
|
||||
)
|
|
@ -0,0 +1,9 @@
|
|||
{
|
||||
"tokens": [
|
||||
{
|
||||
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
"name": "primary_token",
|
||||
"date": "2022-07-15 17:41:31.675698"
|
||||
}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
{
|
||||
"tokens": [
|
||||
{
|
||||
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
"name": "primary_token",
|
||||
"date": "2022-07-15 17:41:31.675698"
|
||||
},
|
||||
{
|
||||
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
|
||||
"name": "second_token",
|
||||
"date": "2022-07-15 17:41:31.675698Z"
|
||||
},
|
||||
{
|
||||
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
|
||||
"name": "third_token",
|
||||
"date": "2022-07-15T17:41:31.675698Z"
|
||||
},
|
||||
{
|
||||
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
|
||||
"name": "forth_token",
|
||||
"date": "2022-07-15T17:41:31.675698"
|
||||
}
|
||||
],
|
||||
"recovery_token": null,
|
||||
"new_device": null
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
{
|
||||
"tokens": [
|
||||
{
|
||||
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
|
||||
"name": "primary_token",
|
||||
"date": "2022-07-15 17:41:31.675698"
|
||||
},
|
||||
{
|
||||
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
|
||||
"name": "second_token",
|
||||
"date": "2022-07-15 17:41:31.675698Z"
|
||||
},
|
||||
{
|
||||
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
|
||||
"name": "third_token",
|
||||
"date": "2022-07-15T17:41:31.675698Z"
|
||||
},
|
||||
{
|
||||
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
|
||||
"name": "forth_token",
|
||||
"date": "2022-07-15T17:41:31.675698"
|
||||
}
|
||||
],
|
||||
"recovery_token": {
|
||||
"token": "ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54",
|
||||
"date": "2022-11-11T11:48:54.228038",
|
||||
"expiration": null,
|
||||
"uses_left": 2
|
||||
},
|
||||
"new_device": {
|
||||
"token": "2237238de23dc71ab558e317bdb8ff8e",
|
||||
"date": "2022-10-26 20:50:47.973212",
|
||||
"expiration": "2022-10-26 21:00:47.974153"
|
||||
}
|
||||
}
|
|
@ -516,7 +516,6 @@ def test_graphql_add_long_username(authorized_client, one_user, mock_subprocess_
|
|||
},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json().get("data") is not None
|
||||
|
||||
assert response.json()["data"]["createUser"]["message"] is not None
|
||||
|
|
Loading…
Reference in New Issue
Are you sure this notation will work? Not consistent with the previous functions.