Api fixes

graphql
Inex Code 2022-07-05 15:11:41 +03:00
parent 376bf1ef77
commit 5711cf66b0
6 changed files with 193 additions and 59 deletions

View File

@ -5,27 +5,50 @@ import typing
from flask import request
import strawberry
from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.graphql.mutations.mutation_interface import MutationReturnInterface
from selfprivacy_api.utils import parse_date
from selfprivacy_api.graphql.mutations.mutation_interface import GenericMutationReturn, MutationReturnInterface
from selfprivacy_api.utils.auth import (
generate_recovery_token
delete_new_device_auth_token,
delete_token,
generate_recovery_token,
get_new_device_auth_token,
is_token_name_exists,
is_token_name_pair_valid,
refresh_token,
use_mnemonic_recoverery_token,
use_new_device_auth_token
)
@strawberry.type
class ApiKeyMutationReturn(MutationReturnInterface):
key: typing.Optional[str]
@strawberry.type
class DeviceApiTokenMutationReturn(MutationReturnInterface):
token: typing.Optional[str]
@strawberry.input
class RecoveryKeyLimitsInput:
"""Recovery key limits input"""
expiration_date: typing.Optional[datetime.datetime]
uses: typing.Optional[int]
@strawberry.input
class UseRecoveryKeyInput:
"""Use recovery key input"""
key: str
deviceName: str
@strawberry.input
class UseNewDeviceKeyInput:
"""Use new device key input"""
key: str
deviceName: str
@strawberry.type
class ApiMutations:
@strawberry.mutation(permission_classes=[IsAuthenticated])
def getNewRecoveryApiKey(self, limits: RecoveryKeyLimitsInput) -> ApiKeyMutationReturn:
def get_new_recovery_api_key(self, limits: RecoveryKeyLimitsInput) -> ApiKeyMutationReturn:
"""Generate recovery key"""
if limits.expiration_date is not None:
if limits.expiration_date < datetime.datetime.now():
@ -50,3 +73,109 @@ class ApiMutations:
code=200,
key=key,
)
@strawberry.mutation()
def use_recovery_api_key(self, input: UseRecoveryKeyInput) -> DeviceApiTokenMutationReturn:
"""Use recovery key"""
token = use_mnemonic_recoverery_token(input.key, input.deviceName)
if token is None:
return DeviceApiTokenMutationReturn(
success=False,
message="Recovery key not found",
code=404,
token=None,
)
return DeviceApiTokenMutationReturn(
success=True,
message="Recovery key used",
code=200,
token=None,
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def refresh_device_api_token(self) -> DeviceApiTokenMutationReturn:
"""Refresh device api token"""
token = request.headers.get("Authorization").split(" ")[1] if request.headers.get("Authorization") is not None else None
if token is None:
return DeviceApiTokenMutationReturn(
success=False,
message="Token not found",
code=404,
token=None,
)
new_token = refresh_token(token)
if new_token is None:
return DeviceApiTokenMutationReturn(
success=False,
message="Token not found",
code=404,
token=None,
)
return DeviceApiTokenMutationReturn(
success=True,
message="Token refreshed",
code=200,
token=new_token,
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def delete_device_api_token(self, device: str) -> GenericMutationReturn:
"""Delete device api token"""
self_token = request.headers.get("Authorization").split(" ")[1] if request.headers.get("Authorization") is not None else None
if self_token is not None and is_token_name_pair_valid(device, self_token):
return GenericMutationReturn(
success=False,
message="Cannot delete caller's token",
code=400,
)
if not is_token_name_exists(device):
return GenericMutationReturn(
success=False,
message="Token not found",
code=404,
)
delete_token(device)
return GenericMutationReturn(
success=True,
message="Token deleted",
code=200,
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def get_new_device_api_key(self) -> ApiKeyMutationReturn:
"""Generate device api key"""
key = get_new_device_auth_token()
return ApiKeyMutationReturn(
success=True,
message="Device api key generated",
code=200,
key=key,
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def invalidate_new_device_api_key(self) -> GenericMutationReturn:
"""Invalidate new device api key"""
delete_new_device_auth_token()
return GenericMutationReturn(
success=True,
message="New device key deleted",
code=200,
)
@strawberry.mutation()
def authorize_with_new_device_api_key(self, input: UseNewDeviceKeyInput) -> DeviceApiTokenMutationReturn:
"""Authorize with new device api key"""
token = use_new_device_auth_token(input.key, input.deviceName)
if token is None:
return DeviceApiTokenMutationReturn(
success=False,
message="Token not found",
code=404,
token=None,
)
return DeviceApiTokenMutationReturn(
success=True,
message="Token used",
code=200,
token=token,
)

View File

@ -5,3 +5,7 @@ class MutationReturnInterface:
success: bool
message: str
code: int
@strawberry.type
class GenericMutationReturn(MutationReturnInterface):
pass

View File

View File

@ -133,7 +133,7 @@ def delete_token(token_name):
tokens["tokens"] = [t for t in tokens["tokens"] if t["name"] != token_name]
def refresh_token(token):
def refresh_token(token: str) -> typing.Optional[str]:
"""Change the token field of the existing token"""
new_token = _generate_token()
with WriteUserData(UserDataFiles.TOKENS) as tokens:
@ -259,7 +259,7 @@ def use_mnemonic_recoverery_token(mnemonic_phrase, name):
return token
def get_new_device_auth_token():
def get_new_device_auth_token() -> str:
"""Generate a new device auth token which is valid for 10 minutes
and return a mnemonic phrase representation
Write token to the new_device of the tokens.json file.

View File

@ -88,7 +88,7 @@ def test_graphql_delete_token(authorized_client, tokens_file):
json={
"query": DELETE_TOKEN_MUTATION,
"variables": {
"device": "test_token",
"device": "test_token2",
},
},
)
@ -101,7 +101,7 @@ def test_graphql_delete_token(authorized_client, tokens_file):
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token2",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314",
}
]
@ -147,6 +147,7 @@ mutation RefreshToken {
success
message
code
token
}
}
"""
@ -173,14 +174,10 @@ def test_graphql_refresh_token(authorized_client, tokens_file):
assert response.json["data"]["refreshDeviceApiToken"]["success"] is True
assert response.json["data"]["refreshDeviceApiToken"]["message"] is not None
assert response.json["data"]["refreshDeviceApiToken"]["code"] == 200
assert read_json(tokens_file) == {
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314",
}
]
assert read_json(tokens_file)["tokens"][0] == {
"token": response.json["data"]["refreshDeviceApiToken"]["token"],
"name": "test_token",
"date": "2022-01-14 08:31:10.789314",
}
NEW_DEVICE_KEY_MUTATION = """
@ -213,11 +210,11 @@ def test_graphql_get_new_device_auth_key(authorized_client, tokens_file):
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["getNewDeviceApiToken"]["success"] is True
assert response.json["data"]["getNewDeviceApiToken"]["message"] is not None
assert response.json["data"]["getNewDeviceApiToken"]["code"] == 200
assert response.json["data"]["getNewDeviceApiToken"]["key"].split(" ").__len__() == 12
token = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiToken"]["key"]).hex()
assert response.json["data"]["getNewDeviceApiKey"]["success"] is True
assert response.json["data"]["getNewDeviceApiKey"]["message"] is not None
assert response.json["data"]["getNewDeviceApiKey"]["code"] == 200
assert response.json["data"]["getNewDeviceApiKey"]["key"].split(" ").__len__() == 12
token = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiKey"]["key"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
INVALIDATE_NEW_DEVICE_KEY_MUTATION = """
@ -252,11 +249,11 @@ def test_graphql_get_and_delete_new_device_key(authorized_client, tokens_file):
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["getNewDeviceApiToken"]["success"] is True
assert response.json["data"]["getNewDeviceApiToken"]["message"] is not None
assert response.json["data"]["getNewDeviceApiToken"]["code"] == 200
assert response.json["data"]["getNewDeviceApiToken"]["key"].split(" ").__len__() == 12
token = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiToken"]["key"]).hex()
assert response.json["data"]["getNewDeviceApiKey"]["success"] is True
assert response.json["data"]["getNewDeviceApiKey"]["message"] is not None
assert response.json["data"]["getNewDeviceApiKey"]["code"] == 200
assert response.json["data"]["getNewDeviceApiKey"]["key"].split(" ").__len__() == 12
token = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiKey"]["key"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
response = authorized_client.post(
"/graphql",
@ -273,7 +270,7 @@ def test_graphql_get_and_delete_new_device_key(authorized_client, tokens_file):
AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION = """
mutation AuthorizeWithNewDeviceKey($input: UseNewDeviceKeyInput!) {
authorizeWithNewDeviceApiKey(inupt: $input) {
authorizeWithNewDeviceApiKey(input: $input) {
success
message
code
@ -291,20 +288,21 @@ def test_graphql_get_and_authorize_new_device(client, authorized_client, tokens_
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["getNewDeviceApiToken"]["success"] is True
assert response.json["data"]["getNewDeviceApiToken"]["message"] is not None
assert response.json["data"]["getNewDeviceApiToken"]["code"] == 200
assert response.json["data"]["getNewDeviceApiToken"]["key"].split(" ").__len__() == 12
key = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiToken"]["key"]).hex()
assert response.json["data"]["getNewDeviceApiKey"]["success"] is True
assert response.json["data"]["getNewDeviceApiKey"]["message"] is not None
assert response.json["data"]["getNewDeviceApiKey"]["code"] == 200
mnemonic_key = response.json["data"]["getNewDeviceApiKey"]["key"]
assert mnemonic_key.split(" ").__len__() == 12
key = Mnemonic(language="english").to_entropy(mnemonic_key).hex()
assert read_json(tokens_file)["new_device"]["token"] == key
response = client.post(
"/graphql",
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION,
"variables": {
"inupt": {
"key": key,
"deviceName": "test_token",
"input": {
"key": mnemonic_key,
"deviceName": "new_device",
}
},
},
@ -324,7 +322,7 @@ def test_graphql_authorize_new_device_with_invalid_key(client, tokens_file):
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION,
"variables": {
"inupt": {
"input": {
"key": "invalid_token",
"deviceName": "test_token",
}
@ -347,20 +345,21 @@ def test_graphql_get_and_authorize_used_key(client, authorized_client, tokens_fi
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["getNewDeviceApiToken"]["success"] is True
assert response.json["data"]["getNewDeviceApiToken"]["message"] is not None
assert response.json["data"]["getNewDeviceApiToken"]["code"] == 200
assert response.json["data"]["getNewDeviceApiToken"]["key"].split(" ").__len__() == 12
key = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiToken"]["key"]).hex()
assert response.json["data"]["getNewDeviceApiKey"]["success"] is True
assert response.json["data"]["getNewDeviceApiKey"]["message"] is not None
assert response.json["data"]["getNewDeviceApiKey"]["code"] == 200
mnemonic_key = response.json["data"]["getNewDeviceApiKey"]["key"]
assert mnemonic_key.split(" ").__len__() == 12
key = Mnemonic(language="english").to_entropy(mnemonic_key).hex()
assert read_json(tokens_file)["new_device"]["token"] == key
response = client.post(
"/graphql",
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION,
"variables": {
"inupt": {
"key": key,
"deviceName": "test_token",
"input": {
"key": mnemonic_key,
"deviceName": "new_token",
}
},
},
@ -370,14 +369,16 @@ def test_graphql_get_and_authorize_used_key(client, authorized_client, tokens_fi
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["success"] is True
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["message"] is not None
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["code"] == 200
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
assert read_json(tokens_file)["tokens"][2]["token"] == response.json["data"]["authorizeWithNewDeviceApiKey"]["token"]
assert read_json(tokens_file)["tokens"][2]["name"] == "new_token"
response = client.post(
"/graphql",
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION,
"variables": {
"inupt": {
"key": key,
"input": {
"key": mnemonic_key,
"deviceName": "test_token2",
}
},
@ -388,7 +389,7 @@ def test_graphql_get_and_authorize_used_key(client, authorized_client, tokens_fi
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["success"] is False
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["message"] is not None
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["code"] == 404
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
assert read_json(tokens_file)["tokens"].__len__() == 3
def test_graphql_get_and_authorize_key_after_12_minutes(client, authorized_client, tokens_file):
response = authorized_client.post(
@ -399,11 +400,11 @@ def test_graphql_get_and_authorize_key_after_12_minutes(client, authorized_clien
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["getNewDeviceApiToken"]["success"] is True
assert response.json["data"]["getNewDeviceApiToken"]["message"] is not None
assert response.json["data"]["getNewDeviceApiToken"]["code"] == 200
assert response.json["data"]["getNewDeviceApiToken"]["key"].split(" ").__len__() == 12
key = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiToken"]["key"]).hex()
assert response.json["data"]["getNewDeviceApiKey"]["success"] is True
assert response.json["data"]["getNewDeviceApiKey"]["message"] is not None
assert response.json["data"]["getNewDeviceApiKey"]["code"] == 200
assert response.json["data"]["getNewDeviceApiKey"]["key"].split(" ").__len__() == 12
key = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiKey"]["key"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == key
file_data = read_json(tokens_file)
@ -417,7 +418,7 @@ def test_graphql_get_and_authorize_key_after_12_minutes(client, authorized_clien
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION,
"variables": {
"inupt": {
"input": {
"key": key,
"deviceName": "test_token",
}
@ -436,7 +437,7 @@ def test_graphql_authorize_without_token(client, tokens_file):
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION,
"variables": {
"inupt": {
"input": {
"deviceName": "test_token",
}
},

View File

@ -96,12 +96,12 @@ def mock_subprocess_check_output(mocker):
@pytest.fixture
def mock_get_ip4(mocker):
mock = mocker.patch("selfprivacy_api.utils.get_ip4", autospec=True, return_value="157.90.247.192")
mock = mocker.patch("selfprivacy_api.utils.network.get_ip4", autospec=True, return_value="157.90.247.192")
return mock
@pytest.fixture
def mock_get_ip6(mocker):
mock = mocker.patch("selfprivacy_api.utils.get_ip6", autospec=True, return_value="fe80::9400:ff:fef1:34ae")
mock = mocker.patch("selfprivacy_api.utils.network.get_ip6", autospec=True, return_value="fe80::9400:ff:fef1:34ae")
return mock
@pytest.fixture
@ -197,9 +197,9 @@ settings {
}
"""
def test_graphql_get_timezone_unauthorized(unauthorized_client, turned_on):
def test_graphql_get_timezone_unauthorized(client, turned_on):
"""Test get timezone"""
response = unauthorized_client.get(
response = client.get(
"/graphql",
json={
"query": generate_system_query([API_GET_TIMEZONE]),