diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..6a584c2 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,2 @@ +[MASTER] +init-hook='import sys; sys.path.append("/path/to/root")' diff --git a/selfprivacy_api/graphql/mutations/__init__.py b/selfprivacy_api/graphql/mutations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/selfprivacy_api/graphql/mutations/api_mutations.py b/selfprivacy_api/graphql/mutations/api_mutations.py new file mode 100644 index 0000000..6604f7e --- /dev/null +++ b/selfprivacy_api/graphql/mutations/api_mutations.py @@ -0,0 +1,52 @@ +"""API access mutations""" +# pylint: disable=too-few-public-methods +import datetime +import typing +from flask import request +import strawberry +from selfprivacy_api.graphql import IsAuthenticated +from selfprivacy_api.graphql.mutations.mutation_interface import MutationReturnInterface +from selfprivacy_api.utils import parse_date + +from selfprivacy_api.utils.auth import ( + generate_recovery_token +) + +@strawberry.type +class ApiKeyMutationReturn(MutationReturnInterface): + key: typing.Optional[str] + +@strawberry.input +class RecoveryKeyLimitsInput: + """Recovery key limits input""" + expiration_date: typing.Optional[datetime.datetime] + uses: typing.Optional[int] + +@strawberry.type +class ApiMutations: + @strawberry.mutation(permission_classes=[IsAuthenticated]) + def getNewRecoveryApiKey(self, limits: RecoveryKeyLimitsInput) -> ApiKeyMutationReturn: + """Generate recovery key""" + if limits.expiration_date is not None: + if limits.expiration_date < datetime.datetime.now(): + return ApiKeyMutationReturn( + success=False, + message="Expiration date must be in the future", + code=400, + key=None, + ) + if limits.uses is not None: + if limits.uses < 1: + return ApiKeyMutationReturn( + success=False, + message="Uses must be greater than 0", + code=400, + key=None, + ) + key = generate_recovery_token(limits.expiration_date, limits.uses) + return ApiKeyMutationReturn( + success=True, + message="Recovery key generated", + code=200, + key=key, + ) diff --git a/selfprivacy_api/graphql/mutations/mutation_interface.py b/selfprivacy_api/graphql/mutations/mutation_interface.py new file mode 100644 index 0000000..f5c212b --- /dev/null +++ b/selfprivacy_api/graphql/mutations/mutation_interface.py @@ -0,0 +1,7 @@ +import strawberry + +@strawberry.interface +class MutationReturnInterface: + success: bool + message: str + code: int diff --git a/selfprivacy_api/graphql/queries/api.py b/selfprivacy_api/graphql/queries/api_queries.py similarity index 100% rename from selfprivacy_api/graphql/queries/api.py rename to selfprivacy_api/graphql/queries/api_queries.py diff --git a/selfprivacy_api/graphql/schema.py b/selfprivacy_api/graphql/schema.py index 3e3fac7..71c9c18 100644 --- a/selfprivacy_api/graphql/schema.py +++ b/selfprivacy_api/graphql/schema.py @@ -2,8 +2,10 @@ # pylint: disable=too-few-public-methods import typing import strawberry +from selfprivacy_api.graphql import IsAuthenticated +from selfprivacy_api.graphql.mutations.api_mutations import ApiMutations -from selfprivacy_api.graphql.queries.api import Api +from selfprivacy_api.graphql.queries.api_queries import Api from selfprivacy_api.graphql.queries.system import System @@ -11,7 +13,7 @@ from selfprivacy_api.graphql.queries.system import System class Query: """Root schema for queries""" - @strawberry.field + @strawberry.field(permission_classes=[IsAuthenticated]) def system(self) -> System: """System queries""" return System() @@ -21,5 +23,9 @@ class Query: """API access status""" return Api() +@strawberry.type +class Mutation(ApiMutations): + """Root schema for mutations""" + pass -schema = strawberry.Schema(query=Query) +schema = strawberry.Schema(query=Query, mutation=Mutation) diff --git a/selfprivacy_api/resources/common.py b/selfprivacy_api/resources/common.py index 9ec060a..f78aad6 100644 --- a/selfprivacy_api/resources/common.py +++ b/selfprivacy_api/resources/common.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """Unassigned views""" from flask_restful import Resource -from selfprivacy_api.graphql.queries.api import get_api_version +from selfprivacy_api.graphql.queries.api_queries import get_api_version class ApiVersion(Resource): diff --git a/selfprivacy_api/utils/auth.py b/selfprivacy_api/utils/auth.py index 8e5a5b2..8810149 100644 --- a/selfprivacy_api/utils/auth.py +++ b/selfprivacy_api/utils/auth.py @@ -3,6 +3,7 @@ import secrets from datetime import datetime, timedelta import re +import typing from mnemonic import Mnemonic @@ -190,7 +191,7 @@ def _get_recovery_token(): return tokens["recovery_token"]["token"] -def generate_recovery_token(expiration=None, uses_left=None): +def generate_recovery_token(expiration: typing.Optional[datetime], uses_left: typing.Optional[int]) -> str: """Generate a 24 bytes recovery token and return a mneomnic word list. Write a string representation of the recovery token to the tokens.json file. """ diff --git a/tests/common.py b/tests/common.py index c1b33fe..950c850 100644 --- a/tests/common.py +++ b/tests/common.py @@ -1,5 +1,5 @@ import json - +from mnemonic import Mnemonic def read_json(file_path): with open(file_path, "r", encoding="utf-8") as file: @@ -9,3 +9,9 @@ def read_json(file_path): def write_json(file_path, data): with open(file_path, "w", encoding="utf-8") as file: json.dump(data, file, indent=4) + +def generate_api_query(query_array): + return "query TestApi {\n api {" + "\n".join(query_array) + "}\n}" + +def mnemonic_to_hex(mnemonic): + return Mnemonic(language="english").to_entropy(mnemonic).hex() diff --git a/tests/test_auth.py b/tests/test_auth.py index 10720ca..4d78f62 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -1,5 +1,6 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument +# pylint: disable=missing-function-docstring import datetime import json import re @@ -383,7 +384,7 @@ def test_generate_recovery_token_with_expiration_date( def test_generate_recovery_token_with_expiration_in_the_past( - authorized_client, client, tokens_file + authorized_client, tokens_file ): # Server must return 400 if expiration date is in the past expiration_date = datetime.datetime.now() - datetime.timedelta(minutes=5) @@ -397,7 +398,7 @@ def test_generate_recovery_token_with_expiration_in_the_past( def test_generate_recovery_token_with_invalid_time_format( - authorized_client, client, tokens_file + authorized_client, tokens_file ): # Server must return 400 if expiration date is in the past expiration_date = "invalid_time_format" diff --git a/tests/test_graphql/__init__.py b/tests/test_graphql/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_graphql/test_api.py b/tests/test_graphql/test_api.py index fb0aec8..81b6175 100644 --- a/tests/test_graphql/test_api.py +++ b/tests/test_graphql/test_api.py @@ -1,8 +1,13 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument -import json +# pylint: disable=missing-function-docstring import pytest +from tests.common import generate_api_query +from tests.test_graphql.test_api_devices import API_DEVICES_QUERY +from tests.test_graphql.test_api_recovery import API_RECOVERY_QUERY +from tests.test_graphql.test_api_version import API_VERSION_QUERY + TOKENS_FILE_CONTETS = { "tokens": [ { @@ -18,95 +23,27 @@ TOKENS_FILE_CONTETS = { ] } - -def test_graphql_get_api_version(authorized_client): +def test_graphql_get_entire_api_data(authorized_client, tokens_file): response = authorized_client.get( "/graphql", json={ - "query": """ - query { - api { - version - } - } - """ + "query": generate_api_query([API_VERSION_QUERY, API_DEVICES_QUERY, API_RECOVERY_QUERY]) }, ) assert response.status_code == 200 + assert response.json.get("data") is not None assert "version" in response.get_json()["data"]["api"] - - -def test_graphql_api_version_unauthorized(client): - response = client.get( - "/graphql", - json={ - "query": """ - query { - api { - version - } - } - """ - }, - ) - assert response.status_code == 200 - assert "version" in response.get_json()["data"]["api"] - - -def test_graphql_tokens_info(authorized_client, tokens_file): - response = authorized_client.get( - "/graphql", - json={ - "query": """ - query { - api { - devices { - creationDate - isCaller - name - } - } - } - """ - }, - ) - assert response.status_code == 200 - assert response.json == { - "data": { - "api": { - "devices": [ - { - "creationDate": "2022-01-14T08:31:10.789314", - "isCaller": True, - "name": "test_token", - }, - { - "creationDate": "2022-01-14T08:31:10.789314", - "isCaller": False, - "name": "test_token2", - }, - ] - } - } - } - - -def test_graphql_tokens_info_unauthorized(client, tokens_file): - response = client.get( - "/graphql", - json={ - "query": """ - query { - api { - devices { - creationDate - isCaller - name - } - } - } - """ - }, - ) - assert response.status_code == 200 - assert response.json["data"] is None + assert response.json["data"]["api"]["devices"] is not None + assert len(response.json["data"]["api"]["devices"]) == 2 + assert response.json["data"]["api"]["devices"][0]["creationDate"] == "2022-01-14T08:31:10.789314" + assert response.json["data"]["api"]["devices"][0]["isCaller"] is True + assert response.json["data"]["api"]["devices"][0]["name"] == "test_token" + assert response.json["data"]["api"]["devices"][1]["creationDate"] == "2022-01-14T08:31:10.789314" + assert response.json["data"]["api"]["devices"][1]["isCaller"] is False + assert response.json["data"]["api"]["devices"][1]["name"] == "test_token2" + assert response.json["data"]["api"]["recoveryKey"] is not None + assert response.json["data"]["api"]["recoveryKey"]["exists"] is False + assert response.json["data"]["api"]["recoveryKey"]["valid"] is False + assert response.json["data"]["api"]["recoveryKey"]["creationDate"] is None + assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] is None + assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] is None diff --git a/tests/test_graphql/test_api_devices.py b/tests/test_graphql/test_api_devices.py new file mode 100644 index 0000000..0406371 --- /dev/null +++ b/tests/test_graphql/test_api_devices.py @@ -0,0 +1,446 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=missing-function-docstring +import datetime +import json +import pytest +from mnemonic import Mnemonic + +from tests.common import generate_api_query, read_json, write_json + +TOKENS_FILE_CONTETS = { + "tokens": [ + { + "token": "TEST_TOKEN", + "name": "test_token", + "date": "2022-01-14 08:31:10.789314", + }, + { + "token": "TEST_TOKEN2", + "name": "test_token2", + "date": "2022-01-14 08:31:10.789314", + }, + ] +} + +API_DEVICES_QUERY = """ +devices { + creationDate + isCaller + name +} +""" + +def test_graphql_tokens_info(authorized_client, tokens_file): + response = authorized_client.get( + "/graphql", + json={ + "query": generate_api_query([API_DEVICES_QUERY]) + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["api"]["devices"] is not None + assert len(response.json["data"]["api"]["devices"]) == 2 + assert response.json["data"]["api"]["devices"][0]["creationDate"] == "2022-01-14T08:31:10.789314" + assert response.json["data"]["api"]["devices"][0]["isCaller"] is True + assert response.json["data"]["api"]["devices"][0]["name"] == "test_token" + assert response.json["data"]["api"]["devices"][1]["creationDate"] == "2022-01-14T08:31:10.789314" + assert response.json["data"]["api"]["devices"][1]["isCaller"] is False + assert response.json["data"]["api"]["devices"][1]["name"] == "test_token2" + +def test_graphql_tokens_info_unauthorized(client, tokens_file): + response = client.get( + "/graphql", + json={ + "query": generate_api_query([API_DEVICES_QUERY]) + }, + ) + assert response.status_code == 200 + assert response.json["data"] is None + +DELETE_TOKEN_MUTATION = """ +mutation DeleteToken($device: String!) { + deleteDeviceApiToken(device: $device) { + success + message + code + } +} +""" + +def test_graphql_delete_token_unauthorized(client, tokens_file): + response = client.post( + "/graphql", + json={ + "query": DELETE_TOKEN_MUTATION, + "variables": { + "device": "test_token", + }, + }, + ) + assert response.status_code == 200 + assert response.json["data"] is None + +def test_graphql_delete_token(authorized_client, tokens_file): + response = authorized_client.post( + "/graphql", + json={ + "query": DELETE_TOKEN_MUTATION, + "variables": { + "device": "test_token", + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["deleteDeviceApiToken"]["success"] is True + assert response.json["data"]["deleteDeviceApiToken"]["message"] is not None + assert response.json["data"]["deleteDeviceApiToken"]["code"] == 200 + assert read_json(tokens_file) == { + "tokens": [ + { + "token": "TEST_TOKEN", + "name": "test_token2", + "date": "2022-01-14 08:31:10.789314", + } + ] + } + +def test_graphql_delete_self_token(authorized_client, tokens_file): + response = authorized_client.post( + "/graphql", + json={ + "query": DELETE_TOKEN_MUTATION, + "variables": { + "device": "test_token", + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["deleteDeviceApiToken"]["success"] is False + assert response.json["data"]["deleteDeviceApiToken"]["message"] is not None + assert response.json["data"]["deleteDeviceApiToken"]["code"] == 400 + assert read_json(tokens_file) == TOKENS_FILE_CONTETS + +def test_graphql_delete_nonexistent_token(authorized_client, tokens_file): + response = authorized_client.post( + "/graphql", + json={ + "query": DELETE_TOKEN_MUTATION, + "variables": { + "device": "test_token3", + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["deleteDeviceApiToken"]["success"] is False + assert response.json["data"]["deleteDeviceApiToken"]["message"] is not None + assert response.json["data"]["deleteDeviceApiToken"]["code"] == 404 + assert read_json(tokens_file) == TOKENS_FILE_CONTETS + +REFRESH_TOKEN_MUTATION = """ +mutation RefreshToken { + refreshDeviceApiToken { + success + message + code + } +} +""" + +def test_graphql_refresh_token_unauthorized(client, tokens_file): + response = client.post( + "/graphql", + json={ + "query": REFRESH_TOKEN_MUTATION + }, + ) + assert response.status_code == 200 + assert response.json["data"] is None + +def test_graphql_refresh_token(authorized_client, tokens_file): + response = authorized_client.post( + "/graphql", + json={ + "query": REFRESH_TOKEN_MUTATION + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["refreshDeviceApiToken"]["success"] is True + assert response.json["data"]["refreshDeviceApiToken"]["message"] is not None + assert response.json["data"]["refreshDeviceApiToken"]["code"] == 200 + assert read_json(tokens_file) == { + "tokens": [ + { + "token": "TEST_TOKEN", + "name": "test_token2", + "date": "2022-01-14 08:31:10.789314", + } + ] + } + +NEW_DEVICE_KEY_MUTATION = """ +mutation NewDeviceKey { + getNewDeviceApiKey { + success + message + code + key + } +} +""" + +def test_graphql_get_new_device_auth_key_unauthorized(client, tokens_file): + response = client.post( + "/graphql", + json={ + "query": NEW_DEVICE_KEY_MUTATION + }, + ) + assert response.status_code == 200 + assert response.json["data"] is None + +def test_graphql_get_new_device_auth_key(authorized_client, tokens_file): + response = authorized_client.post( + "/graphql", + json={ + "query": NEW_DEVICE_KEY_MUTATION + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["getNewDeviceApiToken"]["success"] is True + assert response.json["data"]["getNewDeviceApiToken"]["message"] is not None + assert response.json["data"]["getNewDeviceApiToken"]["code"] == 200 + assert response.json["data"]["getNewDeviceApiToken"]["key"].split(" ").__len__() == 12 + token = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiToken"]["key"]).hex() + assert read_json(tokens_file)["new_device"]["token"] == token + +INVALIDATE_NEW_DEVICE_KEY_MUTATION = """ +mutation InvalidateNewDeviceKey { + invalidateNewDeviceApiKey { + success + message + code + } +} +""" + +def test_graphql_invalidate_new_device_token_unauthorized(client, tokens_file): + response = client.post( + "/graphql", + json={ + "query": DELETE_TOKEN_MUTATION, + "variables": { + "device": "test_token", + }, + }, + ) + assert response.status_code == 200 + assert response.json["data"] is None + +def test_graphql_get_and_delete_new_device_key(authorized_client, tokens_file): + response = authorized_client.post( + "/graphql", + json={ + "query": NEW_DEVICE_KEY_MUTATION + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["getNewDeviceApiToken"]["success"] is True + assert response.json["data"]["getNewDeviceApiToken"]["message"] is not None + assert response.json["data"]["getNewDeviceApiToken"]["code"] == 200 + assert response.json["data"]["getNewDeviceApiToken"]["key"].split(" ").__len__() == 12 + token = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiToken"]["key"]).hex() + assert read_json(tokens_file)["new_device"]["token"] == token + response = authorized_client.post( + "/graphql", + json={ + "query": INVALIDATE_NEW_DEVICE_KEY_MUTATION + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["invalidateNewDeviceApiKey"]["success"] is True + assert response.json["data"]["invalidateNewDeviceApiKey"]["message"] is not None + assert response.json["data"]["invalidateNewDeviceApiKey"]["code"] == 200 + assert read_json(tokens_file) == TOKENS_FILE_CONTETS + +AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION = """ +mutation AuthorizeWithNewDeviceKey($input: UseNewDeviceKeyInput!) { + authorizeWithNewDeviceApiKey(inupt: $input) { + success + message + code + token + } +} +""" + +def test_graphql_get_and_authorize_new_device(client, authorized_client, tokens_file): + response = authorized_client.post( + "/graphql", + json={ + "query": NEW_DEVICE_KEY_MUTATION + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["getNewDeviceApiToken"]["success"] is True + assert response.json["data"]["getNewDeviceApiToken"]["message"] is not None + assert response.json["data"]["getNewDeviceApiToken"]["code"] == 200 + assert response.json["data"]["getNewDeviceApiToken"]["key"].split(" ").__len__() == 12 + key = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiToken"]["key"]).hex() + assert read_json(tokens_file)["new_device"]["token"] == key + response = client.post( + "/graphql", + json={ + "query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION, + "variables": { + "inupt": { + "key": key, + "deviceName": "test_token", + } + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["authorizeWithNewDeviceApiKey"]["success"] is True + assert response.json["data"]["authorizeWithNewDeviceApiKey"]["message"] is not None + assert response.json["data"]["authorizeWithNewDeviceApiKey"]["code"] == 200 + token = response.json["data"]["authorizeWithNewDeviceApiKey"]["token"] + assert read_json(tokens_file)["tokens"][2]["token"] == token + assert read_json(tokens_file)["tokens"][2]["name"] == "new_device" + +def test_graphql_authorize_new_device_with_invalid_key(client, tokens_file): + response = client.post( + "/graphql", + json={ + "query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION, + "variables": { + "inupt": { + "key": "invalid_token", + "deviceName": "test_token", + } + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["authorizeWithNewDeviceApiKey"]["success"] is False + assert response.json["data"]["authorizeWithNewDeviceApiKey"]["message"] is not None + assert response.json["data"]["authorizeWithNewDeviceApiKey"]["code"] == 404 + assert read_json(tokens_file) == TOKENS_FILE_CONTETS + +def test_graphql_get_and_authorize_used_key(client, authorized_client, tokens_file): + response = authorized_client.post( + "/graphql", + json={ + "query": NEW_DEVICE_KEY_MUTATION + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["getNewDeviceApiToken"]["success"] is True + assert response.json["data"]["getNewDeviceApiToken"]["message"] is not None + assert response.json["data"]["getNewDeviceApiToken"]["code"] == 200 + assert response.json["data"]["getNewDeviceApiToken"]["key"].split(" ").__len__() == 12 + key = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiToken"]["key"]).hex() + assert read_json(tokens_file)["new_device"]["token"] == key + response = client.post( + "/graphql", + json={ + "query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION, + "variables": { + "inupt": { + "key": key, + "deviceName": "test_token", + } + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["authorizeWithNewDeviceApiKey"]["success"] is True + assert response.json["data"]["authorizeWithNewDeviceApiKey"]["message"] is not None + assert response.json["data"]["authorizeWithNewDeviceApiKey"]["code"] == 200 + assert read_json(tokens_file) == TOKENS_FILE_CONTETS + response = client.post( + "/graphql", + json={ + "query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION, + "variables": { + "inupt": { + "key": key, + "deviceName": "test_token2", + } + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["authorizeWithNewDeviceApiKey"]["success"] is False + assert response.json["data"]["authorizeWithNewDeviceApiKey"]["message"] is not None + assert response.json["data"]["authorizeWithNewDeviceApiKey"]["code"] == 404 + assert read_json(tokens_file) == TOKENS_FILE_CONTETS + +def test_graphql_get_and_authorize_key_after_12_minutes(client, authorized_client, tokens_file): + response = authorized_client.post( + "/graphql", + json={ + "query": NEW_DEVICE_KEY_MUTATION + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["getNewDeviceApiToken"]["success"] is True + assert response.json["data"]["getNewDeviceApiToken"]["message"] is not None + assert response.json["data"]["getNewDeviceApiToken"]["code"] == 200 + assert response.json["data"]["getNewDeviceApiToken"]["key"].split(" ").__len__() == 12 + key = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiToken"]["key"]).hex() + assert read_json(tokens_file)["new_device"]["token"] == key + + file_data = read_json(tokens_file) + file_data["new_device"]["expiration"] = str( + datetime.datetime.now() - datetime.timedelta(minutes=13) + ) + write_json(tokens_file, file_data) + + response = client.post( + "/graphql", + json={ + "query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION, + "variables": { + "inupt": { + "key": key, + "deviceName": "test_token", + } + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["authorizeWithNewDeviceApiKey"]["success"] is False + assert response.json["data"]["authorizeWithNewDeviceApiKey"]["message"] is not None + assert response.json["data"]["authorizeWithNewDeviceApiKey"]["code"] == 404 + +def test_graphql_authorize_without_token(client, tokens_file): + response = client.post( + "/graphql", + json={ + "query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION, + "variables": { + "inupt": { + "deviceName": "test_token", + } + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is None diff --git a/tests/test_graphql/test_api_recovery.py b/tests/test_graphql/test_api_recovery.py new file mode 100644 index 0000000..8ac8560 --- /dev/null +++ b/tests/test_graphql/test_api_recovery.py @@ -0,0 +1,534 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=missing-function-docstring +import json +import pytest +import datetime + +from tests.common import generate_api_query, mnemonic_to_hex, read_json, write_json + +TOKENS_FILE_CONTETS = { + "tokens": [ + { + "token": "TEST_TOKEN", + "name": "test_token", + "date": "2022-01-14 08:31:10.789314", + }, + { + "token": "TEST_TOKEN2", + "name": "test_token2", + "date": "2022-01-14 08:31:10.789314", + }, + ] +} + +API_RECOVERY_QUERY = """ +recoveryKey { + exists + valid + creationDate + expirationDate + usesLeft +} +""" + +def test_graphql_recovery_key_status_unauthorized(client, tokens_file): + response = client.post( + "/graphql", + json={ + "query": generate_api_query([API_RECOVERY_QUERY]) + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is None + +def test_graphql_recovery_key_status_when_none_exists(authorized_client, tokens_file): + response = authorized_client.get( + "/graphql", + json={ + "query": generate_api_query([API_RECOVERY_QUERY]) + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["api"]["recoveryKey"] is not None + assert response.json["data"]["api"]["recoveryKey"]["exists"] is False + assert response.json["data"]["api"]["recoveryKey"]["valid"] is False + assert response.json["data"]["api"]["recoveryKey"]["creationDate"] is None + assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] is None + assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] is None + +API_RECOVERY_KEY_GENERATE_MUTATION = """ +mutation TestGenerateRecoveryKey($limits: RecoveryKeyLimitsInput!) { + getNewRecoveryApiKey(limits: $limits) { + success + message + code + key + } +} +""" + +API_RECOVERY_KEY_USE_MUTATION = """ +mutation TestUseRecoveryKey($input: UseRecoveryKeyInput!) { + useRecoveryApiKey(input: $input) { + success + message + code + token + } +} +""" +def test_graphql_generate_recovery_key(client, authorized_client, tokens_file): + response = authorized_client.post( + "/graphql", + json={ + "query": API_RECOVERY_KEY_GENERATE_MUTATION, + "variables": { + "limits": { + "uses": None, + "expirationDate": None, + }, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["getNewRecoveryApiKey"]["success"] is True + assert response.json["data"]["getNewRecoveryApiKey"]["message"] is not None + assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 200 + assert response.json["data"]["getNewRecoveryApiKey"]["key"] is not None + assert response.json["data"]["getNewRecoveryApiKey"]["key"].split(" ").__len__() == 18 + assert read_json(tokens_file)["recovery_token"] is not None + time_generated = read_json(tokens_file)["recovery_token"]["date"] + assert time_generated is not None + key = response.json["data"]["getNewRecoveryApiKey"]["key"] + assert ( + datetime.datetime.strptime( + time_generated, "%Y-%m-%dT%H:%M:%S.%fZ" + ) - datetime.timedelta(seconds=5) < datetime.datetime.now() + ) + + # Try to get token status + response = authorized_client.get( + "/graphql", + json={ + "query": generate_api_query([API_RECOVERY_QUERY]) + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["api"]["recoveryKey"] is not None + assert response.json["data"]["api"]["recoveryKey"]["exists"] is True + assert response.json["data"]["api"]["recoveryKey"]["valid"] is True + assert response.json["data"]["api"]["recoveryKey"]["creationDate"] == time_generated + assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] is None + assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] is None + + # Try to use token + response = client.post( + "/graphql", + json={ + "query": API_RECOVERY_KEY_USE_MUTATION, + "variables": { + "input": { + "token": key, + "tokenName": "test_token", + }, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["useRecoveryKey"]["success"] is True + assert response.json["data"]["useRecoveryKey"]["message"] is not None + assert response.json["data"]["useRecoveryKey"]["code"] == 200 + assert response.json["data"]["useRecoveryKey"]["token"] is not None + assert response.json["data"]["useRecoveryKey"]["token"] == read_json(tokens_file)["tokens"][2]["token"] + assert read_json(tokens_file)["tokens"][2]["name"] == "test_token" + + # Try to use token again + response = client.post( + "/graphql", + json={ + "query": API_RECOVERY_KEY_USE_MUTATION, + "variables": { + "input": { + "token": key, + "tokenName": "test_token2", + }, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["useRecoveryKey"]["success"] is True + assert response.json["data"]["useRecoveryKey"]["message"] is not None + assert response.json["data"]["useRecoveryKey"]["code"] == 200 + assert response.json["data"]["useRecoveryKey"]["token"] is not None + assert response.json["data"]["useRecoveryKey"]["token"] == read_json(tokens_file)["tokens"][3]["token"] + assert read_json(tokens_file)["tokens"][3]["name"] == "test_token2" + +def test_graphql_generate_recovery_key_with_expiration_date(client, authorized_client, tokens_file): + expiration_date = datetime.datetime.now() + datetime.timedelta(minutes=5) + expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + response = authorized_client.post( + "/graphql", + json={ + "query": API_RECOVERY_KEY_GENERATE_MUTATION, + "variables": { + "limits": { + "uses": None, + "expirationDate": expiration_date_str, + }, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["getNewRecoveryApiKey"]["success"] is True + assert response.json["data"]["getNewRecoveryApiKey"]["message"] is not None + assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 200 + assert response.json["data"]["getNewRecoveryApiKey"]["key"] is not None + assert response.json["data"]["getNewRecoveryApiKey"]["key"].split(" ").__len__() == 18 + assert read_json(tokens_file)["recovery_token"] is not None + + key = response.json["data"]["getNewRecoveryApiKey"]["key"] + assert read_json(tokens_file)["recovery_token"]["expirationDate"] == expiration_date_str + assert read_json(tokens_file)["recovery_token"]["token"] == mnemonic_to_hex(key) + + time_generated = read_json(tokens_file)["recovery_token"]["date"] + assert time_generated is not None + assert ( + datetime.datetime.strptime( + time_generated, "%Y-%m-%dT%H:%M:%S.%fZ" + ) - datetime.timedelta(seconds=5) < datetime.datetime.now() + ) + + # Try to get token status + response = authorized_client.get( + "/graphql", + json={ + "query": generate_api_query([API_RECOVERY_QUERY]) + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["api"]["recoveryKey"] is not None + assert response.json["data"]["api"]["recoveryKey"]["exists"] is True + assert response.json["data"]["api"]["recoveryKey"]["valid"] is True + assert response.json["data"]["api"]["recoveryKey"]["creationDate"] == time_generated + assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] == expiration_date_str + assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] is None + + # Try to use token + response = authorized_client.post( + "/graphql", + json={ + "query": API_RECOVERY_KEY_USE_MUTATION, + "variables": { + "input": { + "token": key, + "tokenName": "test_token", + }, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["useRecoveryKey"]["success"] is True + assert response.json["data"]["useRecoveryKey"]["message"] is not None + assert response.json["data"]["useRecoveryKey"]["code"] == 200 + assert response.json["data"]["useRecoveryKey"]["token"] is not None + assert response.json["data"]["useRecoveryKey"]["token"] == read_json(tokens_file)["tokens"][2]["token"] + + # Try to use token again + response = authorized_client.post( + "/graphql", + json={ + "query": API_RECOVERY_KEY_USE_MUTATION, + "variables": { + "input": { + "token": key, + "tokenName": "test_token2", + }, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["useRecoveryKey"]["success"] is True + assert response.json["data"]["useRecoveryKey"]["message"] is not None + assert response.json["data"]["useRecoveryKey"]["code"] == 200 + assert response.json["data"]["useRecoveryKey"]["token"] is not None + assert response.json["data"]["useRecoveryKey"]["token"] == read_json(tokens_file)["tokens"][3]["token"] + + # Try to use token after expiration date + new_data = read_json(tokens_file) + new_data["recovery_token"]["expirationDate"] = datetime.datetime.now() - datetime.timedelta(minutes=5) + write_json(tokens_file, new_data) + response = authorized_client.post( + "/graphql", + json={ + "query": API_RECOVERY_KEY_USE_MUTATION, + "variables": { + "input": { + "token": key, + "tokenName": "test_token3", + }, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["useRecoveryKey"]["success"] is False + assert response.json["data"]["useRecoveryKey"]["message"] is not None + assert response.json["data"]["useRecoveryKey"]["code"] == 404 + assert response.json["data"]["useRecoveryKey"]["token"] is None + + assert read_json(tokens_file)["tokens"] == new_data["tokens"] + + # Try to get token status + response = authorized_client.get( + "/graphql", + json={ + "query": generate_api_query([API_RECOVERY_QUERY]) + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["api"]["recoveryKey"] is not None + assert response.json["data"]["api"]["recoveryKey"]["exists"] is True + assert response.json["data"]["api"]["recoveryKey"]["valid"] is False + assert response.json["data"]["api"]["recoveryKey"]["creationDate"] == time_generated + assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] == new_data["recovery_token"]["expiration"] + assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] is None + +def test_graphql_generate_recoevry_key_with_expiration_in_the_past(authorized_client, tokens_file): + expiration_date = datetime.datetime.now() - datetime.timedelta(minutes=5) + expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + + response = authorized_client.post( + "/graphql", + json={ + "query": API_RECOVERY_KEY_GENERATE_MUTATION, + "variables": { + "limits": { + "uses": None, + "expirationDate": expiration_date_str, + }, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["getNewRecoveryApiKey"]["success"] is False + assert response.json["data"]["getNewRecoveryApiKey"]["message"] is not None + assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 400 + assert response.json["data"]["getNewRecoveryApiKey"]["key"] is None + + assert read_json(tokens_file)["tokens"] == [] + assert "recovery_token" not in read_json(tokens_file) + +def test_graphql_generate_recovery_key_with_invalid_time_format(authorized_client, tokens_file): + expiration_date = "invalid_time_format" + expiration_date_str = expiration_date + + response = authorized_client.post( + "/graphql", + json={ + "query": API_RECOVERY_KEY_GENERATE_MUTATION, + "variables": { + "limits": { + "uses": None, + "expirationDate": expiration_date_str, + }, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["getNewRecoveryApiKey"]["success"] is False + assert response.json["data"]["getNewRecoveryApiKey"]["message"] is not None + assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 400 + assert response.json["data"]["getNewRecoveryApiKey"]["key"] is None + + assert read_json(tokens_file)["tokens"] == [] + assert "recovery_token" not in read_json(tokens_file) + +def test_graphql_generate_recovery_key_with_limited_uses(authorized_client, tokens_file): + + response = authorized_client.post( + "/graphql", + json={ + "query": API_RECOVERY_KEY_GENERATE_MUTATION, + "variables": { + "limits": { + "expirationDate": None, + "uses": 2, + }, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["getNewRecoveryApiKey"]["success"] is True + assert response.json["data"]["getNewRecoveryApiKey"]["message"] is not None + assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 200 + assert response.json["data"]["getNewRecoveryApiKey"]["key"] is not None + + mnemonic_key = response.json["data"]["getNewRecoveryApiKey"]["key"] + key = mnemonic_to_hex(mnemonic_key) + + assert read_json(tokens_file)["recovery_token"]["token"] == key + assert read_json(tokens_file)["recovery_token"]["uses_left"] == 2 + + # Try to get token status + response = authorized_client.get( + "/graphql", + json={ + "query": generate_api_query([API_RECOVERY_QUERY]) + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["api"]["recoveryKey"] is not None + assert response.json["data"]["api"]["recoveryKey"]["exists"] is True + assert response.json["data"]["api"]["recoveryKey"]["valid"] is True + assert response.json["data"]["api"]["recoveryKey"]["creationDate"] is not None + assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] is None + assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] == 2 + + # Try to use token + response = authorized_client.post( + "/graphql", + json={ + "query": API_RECOVERY_KEY_USE_MUTATION, + "variables": { + "input": { + "token": mnemonic_key, + "tokenName": "test_token1", + }, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["useRecoveryKey"]["success"] is True + assert response.json["data"]["useRecoveryKey"]["message"] is not None + assert response.json["data"]["useRecoveryKey"]["code"] == 200 + assert response.json["data"]["useRecoveryKey"]["token"] is not None + + # Try to get token status + response = authorized_client.get( + "/graphql", + json={ + "query": generate_api_query([API_RECOVERY_QUERY]) + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["api"]["recoveryKey"] is not None + assert response.json["data"]["api"]["recoveryKey"]["exists"] is True + assert response.json["data"]["api"]["recoveryKey"]["valid"] is True + assert response.json["data"]["api"]["recoveryKey"]["creationDate"] is not None + assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] is None + assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] == 1 + + # Try to use token + response = authorized_client.post( + "/graphql", + json={ + "query": API_RECOVERY_KEY_USE_MUTATION, + "variables": { + "input": { + "token": mnemonic_key, + "tokenName": "test_token2", + }, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["useRecoveryKey"]["success"] is True + assert response.json["data"]["useRecoveryKey"]["message"] is not None + assert response.json["data"]["useRecoveryKey"]["code"] == 200 + assert response.json["data"]["useRecoveryKey"]["token"] is not None + + # Try to get token status + response = authorized_client.get( + "/graphql", + json={ + "query": generate_api_query([API_RECOVERY_QUERY]) + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["api"]["recoveryKey"] is not None + assert response.json["data"]["api"]["recoveryKey"]["exists"] is True + assert response.json["data"]["api"]["recoveryKey"]["valid"] is False + assert response.json["data"]["api"]["recoveryKey"]["creationDate"] is not None + assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] is not None + assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] == 0 + + # Try to use token + response = authorized_client.post( + "/graphql", + json={ + "query": API_RECOVERY_KEY_USE_MUTATION, + "variables": { + "input": { + "token": mnemonic_key, + "tokenName": "test_token3", + }, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["useRecoveryKey"]["success"] is False + assert response.json["data"]["useRecoveryKey"]["message"] is not None + assert response.json["data"]["useRecoveryKey"]["code"] == 404 + assert response.json["data"]["useRecoveryKey"]["token"] is None + +def test_graphql_generate_recovery_key_with_negative_uses(authorized_client, tokens_file): + # Try to get token status + response = authorized_client.post( + "/graphql", + json={ + "query": API_RECOVERY_KEY_GENERATE_MUTATION, + "variables": { + "limits": { + "uses": -1, + "expirationDate": None, + }, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["getNewRecoveryApiKey"]["success"] is False + assert response.json["data"]["getNewRecoveryApiKey"]["message"] is not None + assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 400 + assert response.json["data"]["getNewRecoveryApiKey"]["key"] is None + +def test_graphql_generate_recovery_key_with_zero_uses(authorized_client, tokens_file): + # Try to get token status + response = authorized_client.post( + "/graphql", + json={ + "query": API_RECOVERY_KEY_GENERATE_MUTATION, + "variables": { + "limits": { + "uses": 0, + "expirationDate": None, + }, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["getNewRecoveryApiKey"]["success"] is False + assert response.json["data"]["getNewRecoveryApiKey"]["message"] is not None + assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 400 + assert response.json["data"]["getNewRecoveryApiKey"]["key"] is None diff --git a/tests/test_graphql/test_api_version.py b/tests/test_graphql/test_api_version.py new file mode 100644 index 0000000..a45aa3a --- /dev/null +++ b/tests/test_graphql/test_api_version.py @@ -0,0 +1,28 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=missing-function-docstring + +from tests.common import generate_api_query + +API_VERSION_QUERY = "version" + +def test_graphql_get_api_version(authorized_client): + response = authorized_client.get( + "/graphql", + json={ + "query": generate_api_query([API_VERSION_QUERY]) + }, + ) + assert response.status_code == 200 + assert "version" in response.get_json()["data"]["api"] + + +def test_graphql_api_version_unauthorized(client): + response = client.get( + "/graphql", + json={ + "query": generate_api_query([API_VERSION_QUERY]) + }, + ) + assert response.status_code == 200 + assert "version" in response.get_json()["data"]["api"]