diff --git a/selfprivacy_api/graphql/mutations/api_mutations.py b/selfprivacy_api/graphql/mutations/api_mutations.py index 6604f7e..cd5d53b 100644 --- a/selfprivacy_api/graphql/mutations/api_mutations.py +++ b/selfprivacy_api/graphql/mutations/api_mutations.py @@ -5,27 +5,50 @@ import typing from flask import request import strawberry from selfprivacy_api.graphql import IsAuthenticated -from selfprivacy_api.graphql.mutations.mutation_interface import MutationReturnInterface -from selfprivacy_api.utils import parse_date +from selfprivacy_api.graphql.mutations.mutation_interface import GenericMutationReturn, MutationReturnInterface from selfprivacy_api.utils.auth import ( - generate_recovery_token + delete_new_device_auth_token, + delete_token, + generate_recovery_token, + get_new_device_auth_token, + is_token_name_exists, + is_token_name_pair_valid, + refresh_token, + use_mnemonic_recoverery_token, + use_new_device_auth_token ) @strawberry.type class ApiKeyMutationReturn(MutationReturnInterface): key: typing.Optional[str] +@strawberry.type +class DeviceApiTokenMutationReturn(MutationReturnInterface): + token: typing.Optional[str] + @strawberry.input class RecoveryKeyLimitsInput: """Recovery key limits input""" expiration_date: typing.Optional[datetime.datetime] uses: typing.Optional[int] +@strawberry.input +class UseRecoveryKeyInput: + """Use recovery key input""" + key: str + deviceName: str + +@strawberry.input +class UseNewDeviceKeyInput: + """Use new device key input""" + key: str + deviceName: str + @strawberry.type class ApiMutations: @strawberry.mutation(permission_classes=[IsAuthenticated]) - def getNewRecoveryApiKey(self, limits: RecoveryKeyLimitsInput) -> ApiKeyMutationReturn: + def get_new_recovery_api_key(self, limits: RecoveryKeyLimitsInput) -> ApiKeyMutationReturn: """Generate recovery key""" if limits.expiration_date is not None: if limits.expiration_date < datetime.datetime.now(): @@ -50,3 +73,109 @@ class ApiMutations: code=200, key=key, ) + + @strawberry.mutation() + def use_recovery_api_key(self, input: UseRecoveryKeyInput) -> DeviceApiTokenMutationReturn: + """Use recovery key""" + token = use_mnemonic_recoverery_token(input.key, input.deviceName) + if token is None: + return DeviceApiTokenMutationReturn( + success=False, + message="Recovery key not found", + code=404, + token=None, + ) + return DeviceApiTokenMutationReturn( + success=True, + message="Recovery key used", + code=200, + token=None, + ) + + @strawberry.mutation(permission_classes=[IsAuthenticated]) + def refresh_device_api_token(self) -> DeviceApiTokenMutationReturn: + """Refresh device api token""" + token = request.headers.get("Authorization").split(" ")[1] if request.headers.get("Authorization") is not None else None + if token is None: + return DeviceApiTokenMutationReturn( + success=False, + message="Token not found", + code=404, + token=None, + ) + new_token = refresh_token(token) + if new_token is None: + return DeviceApiTokenMutationReturn( + success=False, + message="Token not found", + code=404, + token=None, + ) + return DeviceApiTokenMutationReturn( + success=True, + message="Token refreshed", + code=200, + token=new_token, + ) + + @strawberry.mutation(permission_classes=[IsAuthenticated]) + def delete_device_api_token(self, device: str) -> GenericMutationReturn: + """Delete device api token""" + self_token = request.headers.get("Authorization").split(" ")[1] if request.headers.get("Authorization") is not None else None + if self_token is not None and is_token_name_pair_valid(device, self_token): + return GenericMutationReturn( + success=False, + message="Cannot delete caller's token", + code=400, + ) + if not is_token_name_exists(device): + return GenericMutationReturn( + success=False, + message="Token not found", + code=404, + ) + delete_token(device) + return GenericMutationReturn( + success=True, + message="Token deleted", + code=200, + ) + + @strawberry.mutation(permission_classes=[IsAuthenticated]) + def get_new_device_api_key(self) -> ApiKeyMutationReturn: + """Generate device api key""" + key = get_new_device_auth_token() + return ApiKeyMutationReturn( + success=True, + message="Device api key generated", + code=200, + key=key, + ) + + @strawberry.mutation(permission_classes=[IsAuthenticated]) + def invalidate_new_device_api_key(self) -> GenericMutationReturn: + """Invalidate new device api key""" + delete_new_device_auth_token() + return GenericMutationReturn( + success=True, + message="New device key deleted", + code=200, + ) + + @strawberry.mutation() + def authorize_with_new_device_api_key(self, input: UseNewDeviceKeyInput) -> DeviceApiTokenMutationReturn: + """Authorize with new device api key""" + token = use_new_device_auth_token(input.key, input.deviceName) + if token is None: + return DeviceApiTokenMutationReturn( + success=False, + message="Token not found", + code=404, + token=None, + ) + return DeviceApiTokenMutationReturn( + success=True, + message="Token used", + code=200, + token=token, + ) diff --git a/selfprivacy_api/graphql/mutations/mutation_interface.py b/selfprivacy_api/graphql/mutations/mutation_interface.py index f5c212b..1cf310c 100644 --- a/selfprivacy_api/graphql/mutations/mutation_interface.py +++ b/selfprivacy_api/graphql/mutations/mutation_interface.py @@ -5,3 +5,7 @@ class MutationReturnInterface: success: bool message: str code: int + +@strawberry.type +class GenericMutationReturn(MutationReturnInterface): + pass diff --git a/selfprivacy_api/services/__init__.py b/selfprivacy_api/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/selfprivacy_api/utils/auth.py b/selfprivacy_api/utils/auth.py index 8810149..30fa7de 100644 --- a/selfprivacy_api/utils/auth.py +++ b/selfprivacy_api/utils/auth.py @@ -133,7 +133,7 @@ def delete_token(token_name): tokens["tokens"] = [t for t in tokens["tokens"] if t["name"] != token_name] -def refresh_token(token): +def refresh_token(token: str) -> typing.Optional[str]: """Change the token field of the existing token""" new_token = _generate_token() with WriteUserData(UserDataFiles.TOKENS) as tokens: @@ -259,7 +259,7 @@ def use_mnemonic_recoverery_token(mnemonic_phrase, name): return token -def get_new_device_auth_token(): +def get_new_device_auth_token() -> str: """Generate a new device auth token which is valid for 10 minutes and return a mnemonic phrase representation Write token to the new_device of the tokens.json file. diff --git a/tests/test_graphql/test_api_devices.py b/tests/test_graphql/test_api_devices.py index 0406371..e5bf7ad 100644 --- a/tests/test_graphql/test_api_devices.py +++ b/tests/test_graphql/test_api_devices.py @@ -88,7 +88,7 @@ def test_graphql_delete_token(authorized_client, tokens_file): json={ "query": DELETE_TOKEN_MUTATION, "variables": { - "device": "test_token", + "device": "test_token2", }, }, ) @@ -101,7 +101,7 @@ def test_graphql_delete_token(authorized_client, tokens_file): "tokens": [ { "token": "TEST_TOKEN", - "name": "test_token2", + "name": "test_token", "date": "2022-01-14 08:31:10.789314", } ] @@ -147,6 +147,7 @@ mutation RefreshToken { success message code + token } } """ @@ -173,14 +174,10 @@ def test_graphql_refresh_token(authorized_client, tokens_file): assert response.json["data"]["refreshDeviceApiToken"]["success"] is True assert response.json["data"]["refreshDeviceApiToken"]["message"] is not None assert response.json["data"]["refreshDeviceApiToken"]["code"] == 200 - assert read_json(tokens_file) == { - "tokens": [ - { - "token": "TEST_TOKEN", - "name": "test_token2", - "date": "2022-01-14 08:31:10.789314", - } - ] + assert read_json(tokens_file)["tokens"][0] == { + "token": response.json["data"]["refreshDeviceApiToken"]["token"], + "name": "test_token", + "date": "2022-01-14 08:31:10.789314", } NEW_DEVICE_KEY_MUTATION = """ @@ -213,11 +210,11 @@ def test_graphql_get_new_device_auth_key(authorized_client, tokens_file): ) assert response.status_code == 200 assert response.json.get("data") is not None - assert response.json["data"]["getNewDeviceApiToken"]["success"] is True - assert response.json["data"]["getNewDeviceApiToken"]["message"] is not None - assert response.json["data"]["getNewDeviceApiToken"]["code"] == 200 - assert response.json["data"]["getNewDeviceApiToken"]["key"].split(" ").__len__() == 12 - token = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiToken"]["key"]).hex() + assert response.json["data"]["getNewDeviceApiKey"]["success"] is True + assert response.json["data"]["getNewDeviceApiKey"]["message"] is not None + assert response.json["data"]["getNewDeviceApiKey"]["code"] == 200 + assert response.json["data"]["getNewDeviceApiKey"]["key"].split(" ").__len__() == 12 + token = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiKey"]["key"]).hex() assert read_json(tokens_file)["new_device"]["token"] == token INVALIDATE_NEW_DEVICE_KEY_MUTATION = """ @@ -252,11 +249,11 @@ def test_graphql_get_and_delete_new_device_key(authorized_client, tokens_file): ) assert response.status_code == 200 assert response.json.get("data") is not None - assert response.json["data"]["getNewDeviceApiToken"]["success"] is True - assert response.json["data"]["getNewDeviceApiToken"]["message"] is not None - assert response.json["data"]["getNewDeviceApiToken"]["code"] == 200 - assert response.json["data"]["getNewDeviceApiToken"]["key"].split(" ").__len__() == 12 - token = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiToken"]["key"]).hex() + assert response.json["data"]["getNewDeviceApiKey"]["success"] is True + assert response.json["data"]["getNewDeviceApiKey"]["message"] is not None + assert response.json["data"]["getNewDeviceApiKey"]["code"] == 200 + assert response.json["data"]["getNewDeviceApiKey"]["key"].split(" ").__len__() == 12 + token = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiKey"]["key"]).hex() assert read_json(tokens_file)["new_device"]["token"] == token response = authorized_client.post( "/graphql", @@ -273,7 +270,7 @@ def test_graphql_get_and_delete_new_device_key(authorized_client, tokens_file): AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION = """ mutation AuthorizeWithNewDeviceKey($input: UseNewDeviceKeyInput!) { - authorizeWithNewDeviceApiKey(inupt: $input) { + authorizeWithNewDeviceApiKey(input: $input) { success message code @@ -291,20 +288,21 @@ def test_graphql_get_and_authorize_new_device(client, authorized_client, tokens_ ) assert response.status_code == 200 assert response.json.get("data") is not None - assert response.json["data"]["getNewDeviceApiToken"]["success"] is True - assert response.json["data"]["getNewDeviceApiToken"]["message"] is not None - assert response.json["data"]["getNewDeviceApiToken"]["code"] == 200 - assert response.json["data"]["getNewDeviceApiToken"]["key"].split(" ").__len__() == 12 - key = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiToken"]["key"]).hex() + assert response.json["data"]["getNewDeviceApiKey"]["success"] is True + assert response.json["data"]["getNewDeviceApiKey"]["message"] is not None + assert response.json["data"]["getNewDeviceApiKey"]["code"] == 200 + mnemonic_key = response.json["data"]["getNewDeviceApiKey"]["key"] + assert mnemonic_key.split(" ").__len__() == 12 + key = Mnemonic(language="english").to_entropy(mnemonic_key).hex() assert read_json(tokens_file)["new_device"]["token"] == key response = client.post( "/graphql", json={ "query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION, "variables": { - "inupt": { - "key": key, - "deviceName": "test_token", + "input": { + "key": mnemonic_key, + "deviceName": "new_device", } }, }, @@ -324,7 +322,7 @@ def test_graphql_authorize_new_device_with_invalid_key(client, tokens_file): json={ "query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION, "variables": { - "inupt": { + "input": { "key": "invalid_token", "deviceName": "test_token", } @@ -347,20 +345,21 @@ def test_graphql_get_and_authorize_used_key(client, authorized_client, tokens_fi ) assert response.status_code == 200 assert response.json.get("data") is not None - assert response.json["data"]["getNewDeviceApiToken"]["success"] is True - assert response.json["data"]["getNewDeviceApiToken"]["message"] is not None - assert response.json["data"]["getNewDeviceApiToken"]["code"] == 200 - assert response.json["data"]["getNewDeviceApiToken"]["key"].split(" ").__len__() == 12 - key = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiToken"]["key"]).hex() + assert response.json["data"]["getNewDeviceApiKey"]["success"] is True + assert response.json["data"]["getNewDeviceApiKey"]["message"] is not None + assert response.json["data"]["getNewDeviceApiKey"]["code"] == 200 + mnemonic_key = response.json["data"]["getNewDeviceApiKey"]["key"] + assert mnemonic_key.split(" ").__len__() == 12 + key = Mnemonic(language="english").to_entropy(mnemonic_key).hex() assert read_json(tokens_file)["new_device"]["token"] == key response = client.post( "/graphql", json={ "query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION, "variables": { - "inupt": { - "key": key, - "deviceName": "test_token", + "input": { + "key": mnemonic_key, + "deviceName": "new_token", } }, }, @@ -370,14 +369,16 @@ def test_graphql_get_and_authorize_used_key(client, authorized_client, tokens_fi assert response.json["data"]["authorizeWithNewDeviceApiKey"]["success"] is True assert response.json["data"]["authorizeWithNewDeviceApiKey"]["message"] is not None assert response.json["data"]["authorizeWithNewDeviceApiKey"]["code"] == 200 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert read_json(tokens_file)["tokens"][2]["token"] == response.json["data"]["authorizeWithNewDeviceApiKey"]["token"] + assert read_json(tokens_file)["tokens"][2]["name"] == "new_token" + response = client.post( "/graphql", json={ "query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION, "variables": { - "inupt": { - "key": key, + "input": { + "key": mnemonic_key, "deviceName": "test_token2", } }, @@ -388,7 +389,7 @@ def test_graphql_get_and_authorize_used_key(client, authorized_client, tokens_fi assert response.json["data"]["authorizeWithNewDeviceApiKey"]["success"] is False assert response.json["data"]["authorizeWithNewDeviceApiKey"]["message"] is not None assert response.json["data"]["authorizeWithNewDeviceApiKey"]["code"] == 404 - assert read_json(tokens_file) == TOKENS_FILE_CONTETS + assert read_json(tokens_file)["tokens"].__len__() == 3 def test_graphql_get_and_authorize_key_after_12_minutes(client, authorized_client, tokens_file): response = authorized_client.post( @@ -399,11 +400,11 @@ def test_graphql_get_and_authorize_key_after_12_minutes(client, authorized_clien ) assert response.status_code == 200 assert response.json.get("data") is not None - assert response.json["data"]["getNewDeviceApiToken"]["success"] is True - assert response.json["data"]["getNewDeviceApiToken"]["message"] is not None - assert response.json["data"]["getNewDeviceApiToken"]["code"] == 200 - assert response.json["data"]["getNewDeviceApiToken"]["key"].split(" ").__len__() == 12 - key = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiToken"]["key"]).hex() + assert response.json["data"]["getNewDeviceApiKey"]["success"] is True + assert response.json["data"]["getNewDeviceApiKey"]["message"] is not None + assert response.json["data"]["getNewDeviceApiKey"]["code"] == 200 + assert response.json["data"]["getNewDeviceApiKey"]["key"].split(" ").__len__() == 12 + key = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiKey"]["key"]).hex() assert read_json(tokens_file)["new_device"]["token"] == key file_data = read_json(tokens_file) @@ -417,7 +418,7 @@ def test_graphql_get_and_authorize_key_after_12_minutes(client, authorized_clien json={ "query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION, "variables": { - "inupt": { + "input": { "key": key, "deviceName": "test_token", } @@ -436,7 +437,7 @@ def test_graphql_authorize_without_token(client, tokens_file): json={ "query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION, "variables": { - "inupt": { + "input": { "deviceName": "test_token", } }, diff --git a/tests/test_graphql/test_system.py b/tests/test_graphql/test_system.py index 661a68b..308c981 100644 --- a/tests/test_graphql/test_system.py +++ b/tests/test_graphql/test_system.py @@ -96,12 +96,12 @@ def mock_subprocess_check_output(mocker): @pytest.fixture def mock_get_ip4(mocker): - mock = mocker.patch("selfprivacy_api.utils.get_ip4", autospec=True, return_value="157.90.247.192") + mock = mocker.patch("selfprivacy_api.utils.network.get_ip4", autospec=True, return_value="157.90.247.192") return mock @pytest.fixture def mock_get_ip6(mocker): - mock = mocker.patch("selfprivacy_api.utils.get_ip6", autospec=True, return_value="fe80::9400:ff:fef1:34ae") + mock = mocker.patch("selfprivacy_api.utils.network.get_ip6", autospec=True, return_value="fe80::9400:ff:fef1:34ae") return mock @pytest.fixture @@ -197,9 +197,9 @@ settings { } """ -def test_graphql_get_timezone_unauthorized(unauthorized_client, turned_on): +def test_graphql_get_timezone_unauthorized(client, turned_on): """Test get timezone""" - response = unauthorized_client.get( + response = client.get( "/graphql", json={ "query": generate_system_query([API_GET_TIMEZONE]),