From 9bd2896db874259bf3d99bc6e7e05dedac81b631 Mon Sep 17 00:00:00 2001 From: def Date: Thu, 7 Jul 2022 15:53:19 +0200 Subject: [PATCH] fix recovery tests --- .../graphql/mutations/api_mutations.py | 42 +++- .../graphql/mutations/mutation_interface.py | 2 + selfprivacy_api/graphql/queries/system.py | 14 +- selfprivacy_api/graphql/schema.py | 3 + .../resources/services/mailserver.py | 2 +- selfprivacy_api/utils/__init__.py | 1 + selfprivacy_api/utils/auth.py | 4 +- selfprivacy_api/utils/network.py | 10 +- tests/common.py | 4 + tests/conftest.py | 2 + tests/test_graphql/test_api.py | 15 +- tests/test_graphql/test_api_devices.py | 103 +++++--- tests/test_graphql/test_api_recovery.py | 230 ++++++++++-------- tests/test_graphql/test_api_version.py | 9 +- tests/test_graphql/test_system.py | 112 +++++++-- tests/test_network_utils.py | 7 +- tests/test_system.py | 5 +- 17 files changed, 372 insertions(+), 193 deletions(-) diff --git a/selfprivacy_api/graphql/mutations/api_mutations.py b/selfprivacy_api/graphql/mutations/api_mutations.py index cd5d53b..d516049 100644 --- a/selfprivacy_api/graphql/mutations/api_mutations.py +++ b/selfprivacy_api/graphql/mutations/api_mutations.py @@ -5,7 +5,10 @@ import typing from flask import request import strawberry from selfprivacy_api.graphql import IsAuthenticated -from selfprivacy_api.graphql.mutations.mutation_interface import GenericMutationReturn, MutationReturnInterface +from selfprivacy_api.graphql.mutations.mutation_interface import ( + GenericMutationReturn, + MutationReturnInterface, +) from selfprivacy_api.utils.auth import ( delete_new_device_auth_token, @@ -16,39 +19,50 @@ from selfprivacy_api.utils.auth import ( is_token_name_pair_valid, refresh_token, use_mnemonic_recoverery_token, - use_new_device_auth_token + use_new_device_auth_token, ) + @strawberry.type class ApiKeyMutationReturn(MutationReturnInterface): key: typing.Optional[str] + @strawberry.type class DeviceApiTokenMutationReturn(MutationReturnInterface): token: typing.Optional[str] + @strawberry.input class RecoveryKeyLimitsInput: """Recovery key limits input""" + expiration_date: typing.Optional[datetime.datetime] uses: typing.Optional[int] + @strawberry.input class UseRecoveryKeyInput: """Use recovery key input""" + key: str deviceName: str + @strawberry.input class UseNewDeviceKeyInput: """Use new device key input""" + key: str deviceName: str + @strawberry.type class ApiMutations: @strawberry.mutation(permission_classes=[IsAuthenticated]) - def get_new_recovery_api_key(self, limits: RecoveryKeyLimitsInput) -> ApiKeyMutationReturn: + def get_new_recovery_api_key( + self, limits: RecoveryKeyLimitsInput + ) -> ApiKeyMutationReturn: """Generate recovery key""" if limits.expiration_date is not None: if limits.expiration_date < datetime.datetime.now(): @@ -75,7 +89,9 @@ class ApiMutations: ) @strawberry.mutation() - def use_recovery_api_key(self, input: UseRecoveryKeyInput) -> DeviceApiTokenMutationReturn: + def use_recovery_api_key( + self, input: UseRecoveryKeyInput + ) -> DeviceApiTokenMutationReturn: """Use recovery key""" token = use_mnemonic_recoverery_token(input.key, input.deviceName) if token is None: @@ -89,13 +105,17 @@ class ApiMutations: success=True, message="Recovery key used", code=200, - token=None, + token=token, ) @strawberry.mutation(permission_classes=[IsAuthenticated]) def refresh_device_api_token(self) -> DeviceApiTokenMutationReturn: """Refresh device api token""" - token = request.headers.get("Authorization").split(" ")[1] if request.headers.get("Authorization") is not None else None + token = ( + request.headers.get("Authorization").split(" ")[1] + if request.headers.get("Authorization") is not None + else None + ) if token is None: return DeviceApiTokenMutationReturn( success=False, @@ -121,7 +141,11 @@ class ApiMutations: @strawberry.mutation(permission_classes=[IsAuthenticated]) def delete_device_api_token(self, device: str) -> GenericMutationReturn: """Delete device api token""" - self_token = request.headers.get("Authorization").split(" ")[1] if request.headers.get("Authorization") is not None else None + self_token = ( + request.headers.get("Authorization").split(" ")[1] + if request.headers.get("Authorization") is not None + else None + ) if self_token is not None and is_token_name_pair_valid(device, self_token): return GenericMutationReturn( success=False, @@ -163,7 +187,9 @@ class ApiMutations: ) @strawberry.mutation() - def authorize_with_new_device_api_key(self, input: UseNewDeviceKeyInput) -> DeviceApiTokenMutationReturn: + def authorize_with_new_device_api_key( + self, input: UseNewDeviceKeyInput + ) -> DeviceApiTokenMutationReturn: """Authorize with new device api key""" token = use_new_device_auth_token(input.key, input.deviceName) if token is None: diff --git a/selfprivacy_api/graphql/mutations/mutation_interface.py b/selfprivacy_api/graphql/mutations/mutation_interface.py index 1cf310c..32146fc 100644 --- a/selfprivacy_api/graphql/mutations/mutation_interface.py +++ b/selfprivacy_api/graphql/mutations/mutation_interface.py @@ -1,11 +1,13 @@ import strawberry + @strawberry.interface class MutationReturnInterface: success: bool message: str code: int + @strawberry.type class GenericMutationReturn(MutationReturnInterface): pass diff --git a/selfprivacy_api/graphql/queries/system.py b/selfprivacy_api/graphql/queries/system.py index 06405a4..cadf074 100644 --- a/selfprivacy_api/graphql/queries/system.py +++ b/selfprivacy_api/graphql/queries/system.py @@ -150,12 +150,14 @@ class System: Base system type which represents common system status """ - status: Alert = strawberry.field(resolver=lambda: Alert( - severity=Severity.INFO, - title="Test message", - message="Test message", - timestamp=None - )) + status: Alert = strawberry.field( + resolver=lambda: Alert( + severity=Severity.INFO, + title="Test message", + message="Test message", + timestamp=None, + ) + ) domain: SystemDomainInfo = strawberry.field(resolver=get_system_domain_info) settings: SystemSettings = SystemSettings() info: SystemInfo = SystemInfo() diff --git a/selfprivacy_api/graphql/schema.py b/selfprivacy_api/graphql/schema.py index 71c9c18..5aba9b3 100644 --- a/selfprivacy_api/graphql/schema.py +++ b/selfprivacy_api/graphql/schema.py @@ -23,9 +23,12 @@ class Query: """API access status""" return Api() + @strawberry.type class Mutation(ApiMutations): """Root schema for mutations""" + pass + schema = strawberry.Schema(query=Query, mutation=Mutation) diff --git a/selfprivacy_api/resources/services/mailserver.py b/selfprivacy_api/resources/services/mailserver.py index bf42c7d..01fa574 100644 --- a/selfprivacy_api/resources/services/mailserver.py +++ b/selfprivacy_api/resources/services/mailserver.py @@ -33,7 +33,7 @@ class DKIMKey(Resource): dkim = get_dkim_key(domain) if dkim is None: - return "DKIM file not found", 404 + return "DKIM file not found", 404 dkim = base64.b64encode(dkim.encode("utf-8")).decode("utf-8") return dkim diff --git a/selfprivacy_api/utils/__init__.py b/selfprivacy_api/utils/__init__.py index adb0409..81dc354 100644 --- a/selfprivacy_api/utils/__init__.py +++ b/selfprivacy_api/utils/__init__.py @@ -133,6 +133,7 @@ def parse_date(date_str: str) -> datetime.datetime: else datetime.datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S.%f") ) + def get_dkim_key(domain): """Get DKIM key from /var/dkim/.selector.txt""" if os.path.exists("/var/dkim/" + domain + ".selector.txt"): diff --git a/selfprivacy_api/utils/auth.py b/selfprivacy_api/utils/auth.py index 30fa7de..6fff698 100644 --- a/selfprivacy_api/utils/auth.py +++ b/selfprivacy_api/utils/auth.py @@ -191,7 +191,9 @@ def _get_recovery_token(): return tokens["recovery_token"]["token"] -def generate_recovery_token(expiration: typing.Optional[datetime], uses_left: typing.Optional[int]) -> str: +def generate_recovery_token( + expiration: typing.Optional[datetime], uses_left: typing.Optional[int] +) -> str: """Generate a 24 bytes recovery token and return a mneomnic word list. Write a string representation of the recovery token to the tokens.json file. """ diff --git a/selfprivacy_api/utils/network.py b/selfprivacy_api/utils/network.py index 1aa4644..5081f0e 100644 --- a/selfprivacy_api/utils/network.py +++ b/selfprivacy_api/utils/network.py @@ -3,19 +3,25 @@ import subprocess import re + def get_ip4(): """Get IPv4 address""" try: - ip4 = subprocess.check_output(["ip", "addr", "show", "dev", "eth0"]).decode("utf-8") + ip4 = subprocess.check_output(["ip", "addr", "show", "dev", "eth0"]).decode( + "utf-8" + ) ip4 = re.search(r"inet (\d+\.\d+\.\d+\.\d+)\/\d+", ip4) except subprocess.CalledProcessError: ip4 = None return ip4.group(1) if ip4 else None + def get_ip6(): """Get IPv6 address""" try: - ip6 = subprocess.check_output(["ip", "addr", "show", "dev", "eth0"]).decode("utf-8") + ip6 = subprocess.check_output(["ip", "addr", "show", "dev", "eth0"]).decode( + "utf-8" + ) ip6 = re.search(r"inet6 (\S+)\/\d+", ip6) except subprocess.CalledProcessError: ip6 = None diff --git a/tests/common.py b/tests/common.py index d3dda69..01975e8 100644 --- a/tests/common.py +++ b/tests/common.py @@ -1,6 +1,7 @@ import json from mnemonic import Mnemonic + def read_json(file_path): with open(file_path, "r", encoding="utf-8") as file: return json.load(file) @@ -10,11 +11,14 @@ def write_json(file_path, data): with open(file_path, "w", encoding="utf-8") as file: json.dump(data, file, indent=4) + def generate_api_query(query_array): return "query TestApi {\n api {" + "\n".join(query_array) + "}\n}" + def generate_system_query(query_array): return "query TestSystem {\n system {" + "\n".join(query_array) + "}\n}" + def mnemonic_to_hex(mnemonic): return Mnemonic(language="english").to_entropy(mnemonic).hex() diff --git a/tests/conftest.py b/tests/conftest.py index 9acdd24..fb31456 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -35,6 +35,7 @@ def client(app, tokens_file): class AuthorizedClient(testing.FlaskClient): """Flask authorized test client.""" + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.token = "TEST_TOKEN" @@ -48,6 +49,7 @@ class AuthorizedClient(testing.FlaskClient): class WrongAuthClient(testing.FlaskClient): """Flask client with wrong token""" + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.token = "WRONG_TOKEN" diff --git a/tests/test_graphql/test_api.py b/tests/test_graphql/test_api.py index 81b6175..031e052 100644 --- a/tests/test_graphql/test_api.py +++ b/tests/test_graphql/test_api.py @@ -23,11 +23,14 @@ TOKENS_FILE_CONTETS = { ] } + def test_graphql_get_entire_api_data(authorized_client, tokens_file): response = authorized_client.get( "/graphql", json={ - "query": generate_api_query([API_VERSION_QUERY, API_DEVICES_QUERY, API_RECOVERY_QUERY]) + "query": generate_api_query( + [API_VERSION_QUERY, API_DEVICES_QUERY, API_RECOVERY_QUERY] + ) }, ) assert response.status_code == 200 @@ -35,10 +38,16 @@ def test_graphql_get_entire_api_data(authorized_client, tokens_file): assert "version" in response.get_json()["data"]["api"] assert response.json["data"]["api"]["devices"] is not None assert len(response.json["data"]["api"]["devices"]) == 2 - assert response.json["data"]["api"]["devices"][0]["creationDate"] == "2022-01-14T08:31:10.789314" + assert ( + response.json["data"]["api"]["devices"][0]["creationDate"] + == "2022-01-14T08:31:10.789314" + ) assert response.json["data"]["api"]["devices"][0]["isCaller"] is True assert response.json["data"]["api"]["devices"][0]["name"] == "test_token" - assert response.json["data"]["api"]["devices"][1]["creationDate"] == "2022-01-14T08:31:10.789314" + assert ( + response.json["data"]["api"]["devices"][1]["creationDate"] + == "2022-01-14T08:31:10.789314" + ) assert response.json["data"]["api"]["devices"][1]["isCaller"] is False assert response.json["data"]["api"]["devices"][1]["name"] == "test_token2" assert response.json["data"]["api"]["recoveryKey"] is not None diff --git a/tests/test_graphql/test_api_devices.py b/tests/test_graphql/test_api_devices.py index 37cb2d2..627d06a 100644 --- a/tests/test_graphql/test_api_devices.py +++ b/tests/test_graphql/test_api_devices.py @@ -29,34 +29,39 @@ devices { } """ + def test_graphql_tokens_info(authorized_client, tokens_file): response = authorized_client.get( "/graphql", - json={ - "query": generate_api_query([API_DEVICES_QUERY]) - }, + json={"query": generate_api_query([API_DEVICES_QUERY])}, ) assert response.status_code == 200 assert response.json.get("data") is not None assert response.json["data"]["api"]["devices"] is not None assert len(response.json["data"]["api"]["devices"]) == 2 - assert response.json["data"]["api"]["devices"][0]["creationDate"] == "2022-01-14T08:31:10.789314" + assert ( + response.json["data"]["api"]["devices"][0]["creationDate"] + == "2022-01-14T08:31:10.789314" + ) assert response.json["data"]["api"]["devices"][0]["isCaller"] is True assert response.json["data"]["api"]["devices"][0]["name"] == "test_token" - assert response.json["data"]["api"]["devices"][1]["creationDate"] == "2022-01-14T08:31:10.789314" + assert ( + response.json["data"]["api"]["devices"][1]["creationDate"] + == "2022-01-14T08:31:10.789314" + ) assert response.json["data"]["api"]["devices"][1]["isCaller"] is False assert response.json["data"]["api"]["devices"][1]["name"] == "test_token2" + def test_graphql_tokens_info_unauthorized(client, tokens_file): response = client.get( "/graphql", - json={ - "query": generate_api_query([API_DEVICES_QUERY]) - }, + json={"query": generate_api_query([API_DEVICES_QUERY])}, ) assert response.status_code == 200 assert response.json["data"] is None + DELETE_TOKEN_MUTATION = """ mutation DeleteToken($device: String!) { deleteDeviceApiToken(device: $device) { @@ -67,6 +72,7 @@ mutation DeleteToken($device: String!) { } """ + def test_graphql_delete_token_unauthorized(client, tokens_file): response = client.post( "/graphql", @@ -80,6 +86,7 @@ def test_graphql_delete_token_unauthorized(client, tokens_file): assert response.status_code == 200 assert response.json["data"] is None + def test_graphql_delete_token(authorized_client, tokens_file): response = authorized_client.post( "/graphql", @@ -105,6 +112,7 @@ def test_graphql_delete_token(authorized_client, tokens_file): ] } + def test_graphql_delete_self_token(authorized_client, tokens_file): response = authorized_client.post( "/graphql", @@ -122,6 +130,7 @@ def test_graphql_delete_self_token(authorized_client, tokens_file): assert response.json["data"]["deleteDeviceApiToken"]["code"] == 400 assert read_json(tokens_file) == TOKENS_FILE_CONTETS + def test_graphql_delete_nonexistent_token(authorized_client, tokens_file): response = authorized_client.post( "/graphql", @@ -139,6 +148,7 @@ def test_graphql_delete_nonexistent_token(authorized_client, tokens_file): assert response.json["data"]["deleteDeviceApiToken"]["code"] == 404 assert read_json(tokens_file) == TOKENS_FILE_CONTETS + REFRESH_TOKEN_MUTATION = """ mutation RefreshToken { refreshDeviceApiToken { @@ -150,22 +160,20 @@ mutation RefreshToken { } """ + def test_graphql_refresh_token_unauthorized(client, tokens_file): response = client.post( "/graphql", - json={ - "query": REFRESH_TOKEN_MUTATION - }, + json={"query": REFRESH_TOKEN_MUTATION}, ) assert response.status_code == 200 assert response.json["data"] is None + def test_graphql_refresh_token(authorized_client, tokens_file): response = authorized_client.post( "/graphql", - json={ - "query": REFRESH_TOKEN_MUTATION - }, + json={"query": REFRESH_TOKEN_MUTATION}, ) assert response.status_code == 200 assert response.json.get("data") is not None @@ -178,6 +186,7 @@ def test_graphql_refresh_token(authorized_client, tokens_file): "date": "2022-01-14 08:31:10.789314", } + NEW_DEVICE_KEY_MUTATION = """ mutation NewDeviceKey { getNewDeviceApiKey { @@ -189,22 +198,20 @@ mutation NewDeviceKey { } """ + def test_graphql_get_new_device_auth_key_unauthorized(client, tokens_file): response = client.post( "/graphql", - json={ - "query": NEW_DEVICE_KEY_MUTATION - }, + json={"query": NEW_DEVICE_KEY_MUTATION}, ) assert response.status_code == 200 assert response.json["data"] is None + def test_graphql_get_new_device_auth_key(authorized_client, tokens_file): response = authorized_client.post( "/graphql", - json={ - "query": NEW_DEVICE_KEY_MUTATION - }, + json={"query": NEW_DEVICE_KEY_MUTATION}, ) assert response.status_code == 200 assert response.json.get("data") is not None @@ -212,9 +219,14 @@ def test_graphql_get_new_device_auth_key(authorized_client, tokens_file): assert response.json["data"]["getNewDeviceApiKey"]["message"] is not None assert response.json["data"]["getNewDeviceApiKey"]["code"] == 200 assert response.json["data"]["getNewDeviceApiKey"]["key"].split(" ").__len__() == 12 - token = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiKey"]["key"]).hex() + token = ( + Mnemonic(language="english") + .to_entropy(response.json["data"]["getNewDeviceApiKey"]["key"]) + .hex() + ) assert read_json(tokens_file)["new_device"]["token"] == token + INVALIDATE_NEW_DEVICE_KEY_MUTATION = """ mutation InvalidateNewDeviceKey { invalidateNewDeviceApiKey { @@ -225,6 +237,7 @@ mutation InvalidateNewDeviceKey { } """ + def test_graphql_invalidate_new_device_token_unauthorized(client, tokens_file): response = client.post( "/graphql", @@ -238,12 +251,11 @@ def test_graphql_invalidate_new_device_token_unauthorized(client, tokens_file): assert response.status_code == 200 assert response.json["data"] is None + def test_graphql_get_and_delete_new_device_key(authorized_client, tokens_file): response = authorized_client.post( "/graphql", - json={ - "query": NEW_DEVICE_KEY_MUTATION - }, + json={"query": NEW_DEVICE_KEY_MUTATION}, ) assert response.status_code == 200 assert response.json.get("data") is not None @@ -251,13 +263,15 @@ def test_graphql_get_and_delete_new_device_key(authorized_client, tokens_file): assert response.json["data"]["getNewDeviceApiKey"]["message"] is not None assert response.json["data"]["getNewDeviceApiKey"]["code"] == 200 assert response.json["data"]["getNewDeviceApiKey"]["key"].split(" ").__len__() == 12 - token = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiKey"]["key"]).hex() + token = ( + Mnemonic(language="english") + .to_entropy(response.json["data"]["getNewDeviceApiKey"]["key"]) + .hex() + ) assert read_json(tokens_file)["new_device"]["token"] == token response = authorized_client.post( "/graphql", - json={ - "query": INVALIDATE_NEW_DEVICE_KEY_MUTATION - }, + json={"query": INVALIDATE_NEW_DEVICE_KEY_MUTATION}, ) assert response.status_code == 200 assert response.json.get("data") is not None @@ -266,6 +280,7 @@ def test_graphql_get_and_delete_new_device_key(authorized_client, tokens_file): assert response.json["data"]["invalidateNewDeviceApiKey"]["code"] == 200 assert read_json(tokens_file) == TOKENS_FILE_CONTETS + AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION = """ mutation AuthorizeWithNewDeviceKey($input: UseNewDeviceKeyInput!) { authorizeWithNewDeviceApiKey(input: $input) { @@ -277,12 +292,11 @@ mutation AuthorizeWithNewDeviceKey($input: UseNewDeviceKeyInput!) { } """ + def test_graphql_get_and_authorize_new_device(client, authorized_client, tokens_file): response = authorized_client.post( "/graphql", - json={ - "query": NEW_DEVICE_KEY_MUTATION - }, + json={"query": NEW_DEVICE_KEY_MUTATION}, ) assert response.status_code == 200 assert response.json.get("data") is not None @@ -314,6 +328,7 @@ def test_graphql_get_and_authorize_new_device(client, authorized_client, tokens_ assert read_json(tokens_file)["tokens"][2]["token"] == token assert read_json(tokens_file)["tokens"][2]["name"] == "new_device" + def test_graphql_authorize_new_device_with_invalid_key(client, tokens_file): response = client.post( "/graphql", @@ -334,12 +349,11 @@ def test_graphql_authorize_new_device_with_invalid_key(client, tokens_file): assert response.json["data"]["authorizeWithNewDeviceApiKey"]["code"] == 404 assert read_json(tokens_file) == TOKENS_FILE_CONTETS + def test_graphql_get_and_authorize_used_key(client, authorized_client, tokens_file): response = authorized_client.post( "/graphql", - json={ - "query": NEW_DEVICE_KEY_MUTATION - }, + json={"query": NEW_DEVICE_KEY_MUTATION}, ) assert response.status_code == 200 assert response.json.get("data") is not None @@ -367,7 +381,10 @@ def test_graphql_get_and_authorize_used_key(client, authorized_client, tokens_fi assert response.json["data"]["authorizeWithNewDeviceApiKey"]["success"] is True assert response.json["data"]["authorizeWithNewDeviceApiKey"]["message"] is not None assert response.json["data"]["authorizeWithNewDeviceApiKey"]["code"] == 200 - assert read_json(tokens_file)["tokens"][2]["token"] == response.json["data"]["authorizeWithNewDeviceApiKey"]["token"] + assert ( + read_json(tokens_file)["tokens"][2]["token"] + == response.json["data"]["authorizeWithNewDeviceApiKey"]["token"] + ) assert read_json(tokens_file)["tokens"][2]["name"] == "new_token" response = client.post( @@ -389,12 +406,13 @@ def test_graphql_get_and_authorize_used_key(client, authorized_client, tokens_fi assert response.json["data"]["authorizeWithNewDeviceApiKey"]["code"] == 404 assert read_json(tokens_file)["tokens"].__len__() == 3 -def test_graphql_get_and_authorize_key_after_12_minutes(client, authorized_client, tokens_file): + +def test_graphql_get_and_authorize_key_after_12_minutes( + client, authorized_client, tokens_file +): response = authorized_client.post( "/graphql", - json={ - "query": NEW_DEVICE_KEY_MUTATION - }, + json={"query": NEW_DEVICE_KEY_MUTATION}, ) assert response.status_code == 200 assert response.json.get("data") is not None @@ -402,7 +420,11 @@ def test_graphql_get_and_authorize_key_after_12_minutes(client, authorized_clien assert response.json["data"]["getNewDeviceApiKey"]["message"] is not None assert response.json["data"]["getNewDeviceApiKey"]["code"] == 200 assert response.json["data"]["getNewDeviceApiKey"]["key"].split(" ").__len__() == 12 - key = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiKey"]["key"]).hex() + key = ( + Mnemonic(language="english") + .to_entropy(response.json["data"]["getNewDeviceApiKey"]["key"]) + .hex() + ) assert read_json(tokens_file)["new_device"]["token"] == key file_data = read_json(tokens_file) @@ -429,6 +451,7 @@ def test_graphql_get_and_authorize_key_after_12_minutes(client, authorized_clien assert response.json["data"]["authorizeWithNewDeviceApiKey"]["message"] is not None assert response.json["data"]["authorizeWithNewDeviceApiKey"]["code"] == 404 + def test_graphql_authorize_without_token(client, tokens_file): response = client.post( "/graphql", diff --git a/tests/test_graphql/test_api_recovery.py b/tests/test_graphql/test_api_recovery.py index 8ac8560..0021e5d 100644 --- a/tests/test_graphql/test_api_recovery.py +++ b/tests/test_graphql/test_api_recovery.py @@ -32,22 +32,20 @@ recoveryKey { } """ + def test_graphql_recovery_key_status_unauthorized(client, tokens_file): response = client.post( "/graphql", - json={ - "query": generate_api_query([API_RECOVERY_QUERY]) - }, + json={"query": generate_api_query([API_RECOVERY_QUERY])}, ) assert response.status_code == 200 assert response.json.get("data") is None + def test_graphql_recovery_key_status_when_none_exists(authorized_client, tokens_file): response = authorized_client.get( "/graphql", - json={ - "query": generate_api_query([API_RECOVERY_QUERY]) - }, + json={"query": generate_api_query([API_RECOVERY_QUERY])}, ) assert response.status_code == 200 assert response.json.get("data") is not None @@ -58,6 +56,7 @@ def test_graphql_recovery_key_status_when_none_exists(authorized_client, tokens_ assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] is None assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] is None + API_RECOVERY_KEY_GENERATE_MUTATION = """ mutation TestGenerateRecoveryKey($limits: RecoveryKeyLimitsInput!) { getNewRecoveryApiKey(limits: $limits) { @@ -79,6 +78,8 @@ mutation TestUseRecoveryKey($input: UseRecoveryKeyInput!) { } } """ + + def test_graphql_generate_recovery_key(client, authorized_client, tokens_file): response = authorized_client.post( "/graphql", @@ -98,23 +99,23 @@ def test_graphql_generate_recovery_key(client, authorized_client, tokens_file): assert response.json["data"]["getNewRecoveryApiKey"]["message"] is not None assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 200 assert response.json["data"]["getNewRecoveryApiKey"]["key"] is not None - assert response.json["data"]["getNewRecoveryApiKey"]["key"].split(" ").__len__() == 18 + assert ( + response.json["data"]["getNewRecoveryApiKey"]["key"].split(" ").__len__() == 18 + ) assert read_json(tokens_file)["recovery_token"] is not None time_generated = read_json(tokens_file)["recovery_token"]["date"] assert time_generated is not None key = response.json["data"]["getNewRecoveryApiKey"]["key"] assert ( - datetime.datetime.strptime( - time_generated, "%Y-%m-%dT%H:%M:%S.%fZ" - ) - datetime.timedelta(seconds=5) < datetime.datetime.now() + datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%fZ") + - datetime.timedelta(seconds=5) + < datetime.datetime.now() ) # Try to get token status response = authorized_client.get( "/graphql", - json={ - "query": generate_api_query([API_RECOVERY_QUERY]) - }, + json={"query": generate_api_query([API_RECOVERY_QUERY])}, ) assert response.status_code == 200 assert response.json.get("data") is not None @@ -132,19 +133,22 @@ def test_graphql_generate_recovery_key(client, authorized_client, tokens_file): "query": API_RECOVERY_KEY_USE_MUTATION, "variables": { "input": { - "token": key, - "tokenName": "test_token", + "key": key, + "deviceName": "test_token", }, }, }, ) assert response.status_code == 200 assert response.json.get("data") is not None - assert response.json["data"]["useRecoveryKey"]["success"] is True - assert response.json["data"]["useRecoveryKey"]["message"] is not None - assert response.json["data"]["useRecoveryKey"]["code"] == 200 - assert response.json["data"]["useRecoveryKey"]["token"] is not None - assert response.json["data"]["useRecoveryKey"]["token"] == read_json(tokens_file)["tokens"][2]["token"] + assert response.json["data"]["useRecoveryApiKey"]["success"] is True + assert response.json["data"]["useRecoveryApiKey"]["message"] is not None + assert response.json["data"]["useRecoveryApiKey"]["code"] == 200 + assert response.json["data"]["useRecoveryApiKey"]["token"] is not None + assert ( + response.json["data"]["useRecoveryApiKey"]["token"] + == read_json(tokens_file)["tokens"][2]["token"] + ) assert read_json(tokens_file)["tokens"][2]["name"] == "test_token" # Try to use token again @@ -154,24 +158,30 @@ def test_graphql_generate_recovery_key(client, authorized_client, tokens_file): "query": API_RECOVERY_KEY_USE_MUTATION, "variables": { "input": { - "token": key, - "tokenName": "test_token2", + "key": key, + "deviceName": "test_token2", }, }, }, ) assert response.status_code == 200 assert response.json.get("data") is not None - assert response.json["data"]["useRecoveryKey"]["success"] is True - assert response.json["data"]["useRecoveryKey"]["message"] is not None - assert response.json["data"]["useRecoveryKey"]["code"] == 200 - assert response.json["data"]["useRecoveryKey"]["token"] is not None - assert response.json["data"]["useRecoveryKey"]["token"] == read_json(tokens_file)["tokens"][3]["token"] + assert response.json["data"]["useRecoveryApiKey"]["success"] is True + assert response.json["data"]["useRecoveryApiKey"]["message"] is not None + assert response.json["data"]["useRecoveryApiKey"]["code"] == 200 + assert response.json["data"]["useRecoveryApiKey"]["token"] is not None + assert ( + response.json["data"]["useRecoveryApiKey"]["token"] + == read_json(tokens_file)["tokens"][3]["token"] + ) assert read_json(tokens_file)["tokens"][3]["name"] == "test_token2" -def test_graphql_generate_recovery_key_with_expiration_date(client, authorized_client, tokens_file): + +def test_graphql_generate_recovery_key_with_expiration_date( + client, authorized_client, tokens_file +): expiration_date = datetime.datetime.now() + datetime.timedelta(minutes=5) - expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%f") response = authorized_client.post( "/graphql", json={ @@ -190,27 +200,27 @@ def test_graphql_generate_recovery_key_with_expiration_date(client, authorized_c assert response.json["data"]["getNewRecoveryApiKey"]["message"] is not None assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 200 assert response.json["data"]["getNewRecoveryApiKey"]["key"] is not None - assert response.json["data"]["getNewRecoveryApiKey"]["key"].split(" ").__len__() == 18 + assert ( + response.json["data"]["getNewRecoveryApiKey"]["key"].split(" ").__len__() == 18 + ) assert read_json(tokens_file)["recovery_token"] is not None key = response.json["data"]["getNewRecoveryApiKey"]["key"] - assert read_json(tokens_file)["recovery_token"]["expirationDate"] == expiration_date_str + assert read_json(tokens_file)["recovery_token"]["expiration"] == expiration_date_str assert read_json(tokens_file)["recovery_token"]["token"] == mnemonic_to_hex(key) time_generated = read_json(tokens_file)["recovery_token"]["date"] assert time_generated is not None assert ( - datetime.datetime.strptime( - time_generated, "%Y-%m-%dT%H:%M:%S.%fZ" - ) - datetime.timedelta(seconds=5) < datetime.datetime.now() + datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%fZ") + - datetime.timedelta(seconds=5) + < datetime.datetime.now() ) # Try to get token status response = authorized_client.get( "/graphql", - json={ - "query": generate_api_query([API_RECOVERY_QUERY]) - }, + json={"query": generate_api_query([API_RECOVERY_QUERY])}, ) assert response.status_code == 200 assert response.json.get("data") is not None @@ -218,7 +228,10 @@ def test_graphql_generate_recovery_key_with_expiration_date(client, authorized_c assert response.json["data"]["api"]["recoveryKey"]["exists"] is True assert response.json["data"]["api"]["recoveryKey"]["valid"] is True assert response.json["data"]["api"]["recoveryKey"]["creationDate"] == time_generated - assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] == expiration_date_str + assert ( + response.json["data"]["api"]["recoveryKey"]["expirationDate"] + == expiration_date_str + ) assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] is None # Try to use token @@ -228,19 +241,22 @@ def test_graphql_generate_recovery_key_with_expiration_date(client, authorized_c "query": API_RECOVERY_KEY_USE_MUTATION, "variables": { "input": { - "token": key, - "tokenName": "test_token", + "key": key, + "deviceName": "test_token", }, }, }, ) assert response.status_code == 200 assert response.json.get("data") is not None - assert response.json["data"]["useRecoveryKey"]["success"] is True - assert response.json["data"]["useRecoveryKey"]["message"] is not None - assert response.json["data"]["useRecoveryKey"]["code"] == 200 - assert response.json["data"]["useRecoveryKey"]["token"] is not None - assert response.json["data"]["useRecoveryKey"]["token"] == read_json(tokens_file)["tokens"][2]["token"] + assert response.json["data"]["useRecoveryApiKey"]["success"] is True + assert response.json["data"]["useRecoveryApiKey"]["message"] is not None + assert response.json["data"]["useRecoveryApiKey"]["code"] == 200 + assert response.json["data"]["useRecoveryApiKey"]["token"] is not None + assert ( + response.json["data"]["useRecoveryApiKey"]["token"] + == read_json(tokens_file)["tokens"][2]["token"] + ) # Try to use token again response = authorized_client.post( @@ -249,23 +265,28 @@ def test_graphql_generate_recovery_key_with_expiration_date(client, authorized_c "query": API_RECOVERY_KEY_USE_MUTATION, "variables": { "input": { - "token": key, - "tokenName": "test_token2", + "key": key, + "deviceName": "test_token2", }, }, }, ) assert response.status_code == 200 assert response.json.get("data") is not None - assert response.json["data"]["useRecoveryKey"]["success"] is True - assert response.json["data"]["useRecoveryKey"]["message"] is not None - assert response.json["data"]["useRecoveryKey"]["code"] == 200 - assert response.json["data"]["useRecoveryKey"]["token"] is not None - assert response.json["data"]["useRecoveryKey"]["token"] == read_json(tokens_file)["tokens"][3]["token"] + assert response.json["data"]["useRecoveryApiKey"]["success"] is True + assert response.json["data"]["useRecoveryApiKey"]["message"] is not None + assert response.json["data"]["useRecoveryApiKey"]["code"] == 200 + assert response.json["data"]["useRecoveryApiKey"]["token"] is not None + assert ( + response.json["data"]["useRecoveryApiKey"]["token"] + == read_json(tokens_file)["tokens"][3]["token"] + ) # Try to use token after expiration date new_data = read_json(tokens_file) - new_data["recovery_token"]["expirationDate"] = datetime.datetime.now() - datetime.timedelta(minutes=5) + new_data["recovery_token"][ + "expirationDate" + ] = datetime.datetime.now() - datetime.timedelta(minutes=5) write_json(tokens_file, new_data) response = authorized_client.post( "/graphql", @@ -273,27 +294,25 @@ def test_graphql_generate_recovery_key_with_expiration_date(client, authorized_c "query": API_RECOVERY_KEY_USE_MUTATION, "variables": { "input": { - "token": key, - "tokenName": "test_token3", + "key": key, + "deviceName": "test_token3", }, }, }, ) assert response.status_code == 200 assert response.json.get("data") is not None - assert response.json["data"]["useRecoveryKey"]["success"] is False - assert response.json["data"]["useRecoveryKey"]["message"] is not None - assert response.json["data"]["useRecoveryKey"]["code"] == 404 - assert response.json["data"]["useRecoveryKey"]["token"] is None + assert response.json["data"]["useRecoveryApiKey"]["success"] is False + assert response.json["data"]["useRecoveryApiKey"]["message"] is not None + assert response.json["data"]["useRecoveryApiKey"]["code"] == 404 + assert response.json["data"]["useRecoveryApiKey"]["token"] is None assert read_json(tokens_file)["tokens"] == new_data["tokens"] # Try to get token status response = authorized_client.get( "/graphql", - json={ - "query": generate_api_query([API_RECOVERY_QUERY]) - }, + json={"query": generate_api_query([API_RECOVERY_QUERY])}, ) assert response.status_code == 200 assert response.json.get("data") is not None @@ -301,12 +320,18 @@ def test_graphql_generate_recovery_key_with_expiration_date(client, authorized_c assert response.json["data"]["api"]["recoveryKey"]["exists"] is True assert response.json["data"]["api"]["recoveryKey"]["valid"] is False assert response.json["data"]["api"]["recoveryKey"]["creationDate"] == time_generated - assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] == new_data["recovery_token"]["expiration"] + assert ( + response.json["data"]["api"]["recoveryKey"]["expirationDate"] + == new_data["recovery_token"]["expiration"] + ) assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] is None -def test_graphql_generate_recoevry_key_with_expiration_in_the_past(authorized_client, tokens_file): + +def test_graphql_generate_recovery_key_with_expiration_in_the_past( + authorized_client, tokens_file +): expiration_date = datetime.datetime.now() - datetime.timedelta(minutes=5) - expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%f") response = authorized_client.post( "/graphql", @@ -326,11 +351,12 @@ def test_graphql_generate_recoevry_key_with_expiration_in_the_past(authorized_cl assert response.json["data"]["getNewRecoveryApiKey"]["message"] is not None assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 400 assert response.json["data"]["getNewRecoveryApiKey"]["key"] is None - - assert read_json(tokens_file)["tokens"] == [] assert "recovery_token" not in read_json(tokens_file) -def test_graphql_generate_recovery_key_with_invalid_time_format(authorized_client, tokens_file): + +def test_graphql_generate_recovery_key_with_invalid_time_format( + authorized_client, tokens_file +): expiration_date = "invalid_time_format" expiration_date_str = expiration_date @@ -347,16 +373,14 @@ def test_graphql_generate_recovery_key_with_invalid_time_format(authorized_clien }, ) assert response.status_code == 200 - assert response.json.get("data") is not None - assert response.json["data"]["getNewRecoveryApiKey"]["success"] is False - assert response.json["data"]["getNewRecoveryApiKey"]["message"] is not None - assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 400 - assert response.json["data"]["getNewRecoveryApiKey"]["key"] is None + assert response.json.get("data") is None - assert read_json(tokens_file)["tokens"] == [] assert "recovery_token" not in read_json(tokens_file) -def test_graphql_generate_recovery_key_with_limited_uses(authorized_client, tokens_file): + +def test_graphql_generate_recovery_key_with_limited_uses( + authorized_client, tokens_file +): response = authorized_client.post( "/graphql", @@ -386,9 +410,7 @@ def test_graphql_generate_recovery_key_with_limited_uses(authorized_client, toke # Try to get token status response = authorized_client.get( "/graphql", - json={ - "query": generate_api_query([API_RECOVERY_QUERY]) - }, + json={"query": generate_api_query([API_RECOVERY_QUERY])}, ) assert response.status_code == 200 assert response.json.get("data") is not None @@ -406,25 +428,23 @@ def test_graphql_generate_recovery_key_with_limited_uses(authorized_client, toke "query": API_RECOVERY_KEY_USE_MUTATION, "variables": { "input": { - "token": mnemonic_key, - "tokenName": "test_token1", + "key": mnemonic_key, + "deviceName": "test_token1", }, }, }, ) assert response.status_code == 200 assert response.json.get("data") is not None - assert response.json["data"]["useRecoveryKey"]["success"] is True - assert response.json["data"]["useRecoveryKey"]["message"] is not None - assert response.json["data"]["useRecoveryKey"]["code"] == 200 - assert response.json["data"]["useRecoveryKey"]["token"] is not None + assert response.json["data"]["useRecoveryApiKey"]["success"] is True + assert response.json["data"]["useRecoveryApiKey"]["message"] is not None + assert response.json["data"]["useRecoveryApiKey"]["code"] == 200 + assert response.json["data"]["useRecoveryApiKey"]["token"] is not None # Try to get token status response = authorized_client.get( "/graphql", - json={ - "query": generate_api_query([API_RECOVERY_QUERY]) - }, + json={"query": generate_api_query([API_RECOVERY_QUERY])}, ) assert response.status_code == 200 assert response.json.get("data") is not None @@ -442,25 +462,23 @@ def test_graphql_generate_recovery_key_with_limited_uses(authorized_client, toke "query": API_RECOVERY_KEY_USE_MUTATION, "variables": { "input": { - "token": mnemonic_key, - "tokenName": "test_token2", + "key": mnemonic_key, + "deviceName": "test_token2", }, }, }, ) assert response.status_code == 200 assert response.json.get("data") is not None - assert response.json["data"]["useRecoveryKey"]["success"] is True - assert response.json["data"]["useRecoveryKey"]["message"] is not None - assert response.json["data"]["useRecoveryKey"]["code"] == 200 - assert response.json["data"]["useRecoveryKey"]["token"] is not None + assert response.json["data"]["useRecoveryApiKey"]["success"] is True + assert response.json["data"]["useRecoveryApiKey"]["message"] is not None + assert response.json["data"]["useRecoveryApiKey"]["code"] == 200 + assert response.json["data"]["useRecoveryApiKey"]["token"] is not None # Try to get token status response = authorized_client.get( "/graphql", - json={ - "query": generate_api_query([API_RECOVERY_QUERY]) - }, + json={"query": generate_api_query([API_RECOVERY_QUERY])}, ) assert response.status_code == 200 assert response.json.get("data") is not None @@ -468,7 +486,7 @@ def test_graphql_generate_recovery_key_with_limited_uses(authorized_client, toke assert response.json["data"]["api"]["recoveryKey"]["exists"] is True assert response.json["data"]["api"]["recoveryKey"]["valid"] is False assert response.json["data"]["api"]["recoveryKey"]["creationDate"] is not None - assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] is not None + assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] is None assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] == 0 # Try to use token @@ -478,20 +496,23 @@ def test_graphql_generate_recovery_key_with_limited_uses(authorized_client, toke "query": API_RECOVERY_KEY_USE_MUTATION, "variables": { "input": { - "token": mnemonic_key, - "tokenName": "test_token3", + "key": mnemonic_key, + "deviceName": "test_token3", }, }, }, ) assert response.status_code == 200 assert response.json.get("data") is not None - assert response.json["data"]["useRecoveryKey"]["success"] is False - assert response.json["data"]["useRecoveryKey"]["message"] is not None - assert response.json["data"]["useRecoveryKey"]["code"] == 404 - assert response.json["data"]["useRecoveryKey"]["token"] is None + assert response.json["data"]["useRecoveryApiKey"]["success"] is False + assert response.json["data"]["useRecoveryApiKey"]["message"] is not None + assert response.json["data"]["useRecoveryApiKey"]["code"] == 404 + assert response.json["data"]["useRecoveryApiKey"]["token"] is None -def test_graphql_generate_recovery_key_with_negative_uses(authorized_client, tokens_file): + +def test_graphql_generate_recovery_key_with_negative_uses( + authorized_client, tokens_file +): # Try to get token status response = authorized_client.post( "/graphql", @@ -512,6 +533,7 @@ def test_graphql_generate_recovery_key_with_negative_uses(authorized_client, tok assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 400 assert response.json["data"]["getNewRecoveryApiKey"]["key"] is None + def test_graphql_generate_recovery_key_with_zero_uses(authorized_client, tokens_file): # Try to get token status response = authorized_client.post( diff --git a/tests/test_graphql/test_api_version.py b/tests/test_graphql/test_api_version.py index a45aa3a..8f76035 100644 --- a/tests/test_graphql/test_api_version.py +++ b/tests/test_graphql/test_api_version.py @@ -6,12 +6,11 @@ from tests.common import generate_api_query API_VERSION_QUERY = "version" + def test_graphql_get_api_version(authorized_client): response = authorized_client.get( "/graphql", - json={ - "query": generate_api_query([API_VERSION_QUERY]) - }, + json={"query": generate_api_query([API_VERSION_QUERY])}, ) assert response.status_code == 200 assert "version" in response.get_json()["data"]["api"] @@ -20,9 +19,7 @@ def test_graphql_get_api_version(authorized_client): def test_graphql_api_version_unauthorized(client): response = client.get( "/graphql", - json={ - "query": generate_api_query([API_VERSION_QUERY]) - }, + json={"query": generate_api_query([API_VERSION_QUERY])}, ) assert response.status_code == 200 assert "version" in response.get_json()["data"]["api"] diff --git a/tests/test_graphql/test_system.py b/tests/test_graphql/test_system.py index 7ed4d6a..4641329 100644 --- a/tests/test_graphql/test_system.py +++ b/tests/test_graphql/test_system.py @@ -7,6 +7,7 @@ import datetime from tests.common import generate_system_query, read_json, write_json + @pytest.fixture def domain_file(mocker, datadir): mocker.patch("selfprivacy_api.utils.DOMAIN_FILE", datadir / "domain") @@ -54,7 +55,7 @@ class ProcessMock: self.args = args self.kwargs = kwargs - def communicate(self): + def communicate(): return (b"", None) returncode = 0 @@ -62,7 +63,8 @@ class ProcessMock: class BrokenServiceMock(ProcessMock): """Mock subprocess.Popen for broken service""" - def communicate(self): + + def communicate(): return (b"Testing error", None) returncode = 3 @@ -95,19 +97,35 @@ def mock_subprocess_check_output(mocker): ) return mock + @pytest.fixture def mock_get_ip4(mocker): - mock = mocker.patch("selfprivacy_api.utils.network.get_ip4", autospec=True, return_value="157.90.247.192") + mock = mocker.patch( + "selfprivacy_api.utils.network.get_ip4", + autospec=True, + return_value="157.90.247.192", + ) return mock + @pytest.fixture def mock_get_ip6(mocker): - mock = mocker.patch("selfprivacy_api.utils.network.get_ip6", autospec=True, return_value="fe80::9400:ff:fef1:34ae") + mock = mocker.patch( + "selfprivacy_api.utils.network.get_ip6", + autospec=True, + return_value="fe80::9400:ff:fef1:34ae", + ) return mock + @pytest.fixture def mock_dkim_key(mocker): - mock = mocker.patch("selfprivacy_api.utils.get_dkim_key", autospec=True, return_value="I am a DKIM key") + mock = mocker.patch( + "selfprivacy_api.utils.get_dkim_key", + autospec=True, + return_value="I am a DKIM key", + ) + API_PYTHON_VERSION_INFO = """ info { @@ -115,6 +133,7 @@ info { } """ + def test_graphql_wrong_auth(wrong_auth_client): """Test wrong auth""" response = wrong_auth_client.get( @@ -126,6 +145,7 @@ def test_graphql_wrong_auth(wrong_auth_client): assert response.status_code == 200 assert response.json.get("data") is None + API_GET_DOMAIN_INFO = """ domainInfo { domain @@ -141,6 +161,7 @@ domainInfo { } """ + def dns_record(type="A", name="test.tld", content=None, ttl=3600, priority=None): if content is None: if type == "A": @@ -155,13 +176,23 @@ def dns_record(type="A", name="test.tld", content=None, ttl=3600, priority=None) "priority": priority, } + def is_dns_record_in_array(records, dns_record) -> bool: for record in records: - if record["type"] == dns_record["type"] and record["name"] == dns_record["name"] and record["content"] == dns_record["content"] and record["ttl"] == dns_record["ttl"] and record["priority"] == dns_record["priority"]: + if ( + record["type"] == dns_record["type"] + and record["name"] == dns_record["name"] + and record["content"] == dns_record["content"] + and record["ttl"] == dns_record["ttl"] + and record["priority"] == dns_record["priority"] + ): return True return False -def test_graphql_get_domain(authorized_client, domain_file, mock_get_ip4, mock_get_ip6, turned_on): + +def test_graphql_get_domain( + authorized_client, domain_file, mock_get_ip4, mock_get_ip6, turned_on +): """Test get domain""" response = authorized_client.get( "/graphql", @@ -178,23 +209,62 @@ def test_graphql_get_domain(authorized_client, domain_file, mock_get_ip4, mock_g assert is_dns_record_in_array(dns_records, dns_record()) assert is_dns_record_in_array(dns_records, dns_record(type="AAAA")) assert is_dns_record_in_array(dns_records, dns_record(name="api.test.tld")) - assert is_dns_record_in_array(dns_records, dns_record(name="api.test.tld", type="AAAA")) + assert is_dns_record_in_array( + dns_records, dns_record(name="api.test.tld", type="AAAA") + ) assert is_dns_record_in_array(dns_records, dns_record(name="cloud.test.tld")) - assert is_dns_record_in_array(dns_records, dns_record(name="cloud.test.tld", type="AAAA")) + assert is_dns_record_in_array( + dns_records, dns_record(name="cloud.test.tld", type="AAAA") + ) assert is_dns_record_in_array(dns_records, dns_record(name="git.test.tld")) - assert is_dns_record_in_array(dns_records, dns_record(name="git.test.tld", type="AAAA")) + assert is_dns_record_in_array( + dns_records, dns_record(name="git.test.tld", type="AAAA") + ) assert is_dns_record_in_array(dns_records, dns_record(name="meet.test.tld")) - assert is_dns_record_in_array(dns_records, dns_record(name="meet.test.tld", type="AAAA")) + assert is_dns_record_in_array( + dns_records, dns_record(name="meet.test.tld", type="AAAA") + ) assert is_dns_record_in_array(dns_records, dns_record(name="password.test.tld")) - assert is_dns_record_in_array(dns_records, dns_record(name="password.test.tld", type="AAAA")) + assert is_dns_record_in_array( + dns_records, dns_record(name="password.test.tld", type="AAAA") + ) assert is_dns_record_in_array(dns_records, dns_record(name="social.test.tld")) - assert is_dns_record_in_array(dns_records, dns_record(name="social.test.tld", type="AAAA")) + assert is_dns_record_in_array( + dns_records, dns_record(name="social.test.tld", type="AAAA") + ) assert is_dns_record_in_array(dns_records, dns_record(name="vpn.test.tld")) - assert is_dns_record_in_array(dns_records, dns_record(name="vpn.test.tld", type="AAAA")) - assert is_dns_record_in_array(dns_records, dns_record(name="test.tld", type="MX", content="test.tld", priority=10)) - assert is_dns_record_in_array(dns_records, dns_record(name="_dmarc.test.tld", type="TXT", content="v=DMARC1; p=none", ttl=18000)) - assert is_dns_record_in_array(dns_records, dns_record(name="test.tld", type="TXT", content="v=spf1 a mx ip4:157.90.247.192 -all", ttl=18000)) - assert is_dns_record_in_array(dns_records, dns_record(name="selector._domainkey.test.tld", type="TXT", content="I am a DKIM key", ttl=18000)) + assert is_dns_record_in_array( + dns_records, dns_record(name="vpn.test.tld", type="AAAA") + ) + assert is_dns_record_in_array( + dns_records, + dns_record(name="test.tld", type="MX", content="test.tld", priority=10), + ) + assert is_dns_record_in_array( + dns_records, + dns_record( + name="_dmarc.test.tld", type="TXT", content="v=DMARC1; p=none", ttl=18000 + ), + ) + assert is_dns_record_in_array( + dns_records, + dns_record( + name="test.tld", + type="TXT", + content="v=spf1 a mx ip4:157.90.247.192 -all", + ttl=18000, + ), + ) + assert is_dns_record_in_array( + dns_records, + dns_record( + name="selector._domainkey.test.tld", + type="TXT", + content="I am a DKIM key", + ttl=18000, + ), + ) + API_GET_TIMEZONE = """ settings { @@ -202,6 +272,7 @@ settings { } """ + def test_graphql_get_timezone_unauthorized(client, turned_on): """Test get timezone without auth""" response = client.get( @@ -213,6 +284,7 @@ def test_graphql_get_timezone_unauthorized(client, turned_on): assert response.status_code == 200 assert response.json.get("data") is None + def test_graphql_get_timezone(authorized_client, turned_on): """Test get timezone""" response = authorized_client.get( @@ -225,7 +297,8 @@ def test_graphql_get_timezone(authorized_client, turned_on): assert response.json.get("data") is not None assert response.json["data"]["system"]["settings"]["timezone"] == "Europe/Moscow" -def test_graphql_get_timezone_on_undefined(authorized_client, undefiened_config): + +def test_graphql_get_timezone_on_undefined(authorized_client, undefined_config): """Test get timezone when none is defined in config""" response = authorized_client.get( "/graphql", @@ -249,6 +322,7 @@ mutation changeTimezone($timezone: String!) { } """ + def test_graphql_change_timezone_unauthorized(client, turned_on): """Test change timezone without auth""" response = client.post( diff --git a/tests/test_network_utils.py b/tests/test_network_utils.py index b8f9c0d..a7c1511 100644 --- a/tests/test_network_utils.py +++ b/tests/test_network_utils.py @@ -21,16 +21,21 @@ FAILED_OUTPUT_STRING = b""" Device "eth0" does not exist. """ + @pytest.fixture def ip_process_mock(mocker): - mock = mocker.patch("subprocess.check_output", autospec=True, return_value=OUTPUT_STRING) + mock = mocker.patch( + "subprocess.check_output", autospec=True, return_value=OUTPUT_STRING + ) return mock + def test_get_ip4(ip_process_mock): """Test get IPv4 address""" ip4 = get_ip4() assert ip4 == "157.90.247.192" + def test_get_ip6(ip_process_mock): """Test get IPv6 address""" ip6 = get_ip6() diff --git a/tests/test_system.py b/tests/test_system.py index ac108aa..b9c8649 100644 --- a/tests/test_system.py +++ b/tests/test_system.py @@ -60,7 +60,7 @@ class ProcessMock: self.args = args self.kwargs = kwargs - def communicate(self): + def communicate(): return (b"", None) returncode = 0 @@ -68,7 +68,8 @@ class ProcessMock: class BrokenServiceMock(ProcessMock): """Mock subprocess.Popen""" - def communicate(self): + + def communicate(): return (b"Testing error", None) returncode = 3