From e3354c73ef08e71408edaa4fdaf886e35ec61986 Mon Sep 17 00:00:00 2001 From: Inex Code Date: Fri, 8 Jul 2022 18:28:08 +0300 Subject: [PATCH] Change datetime formats, more tests --- .../graphql/mutations/api_mutations.py | 44 +- selfprivacy_api/graphql/queries/system.py | 2 +- .../resources/api_auth/recovery_token.py | 9 +- selfprivacy_api/utils/__init__.py | 30 +- selfprivacy_api/utils/auth.py | 21 +- tests/test_auth.py | 32 +- tests/test_graphql/test_api_recovery.py | 46 +- tests/test_graphql/test_system.py | 518 +++++++++++++++++- 8 files changed, 615 insertions(+), 87 deletions(-) diff --git a/selfprivacy_api/graphql/mutations/api_mutations.py b/selfprivacy_api/graphql/mutations/api_mutations.py index d516049..e0d1057 100644 --- a/selfprivacy_api/graphql/mutations/api_mutations.py +++ b/selfprivacy_api/graphql/mutations/api_mutations.py @@ -37,8 +37,8 @@ class DeviceApiTokenMutationReturn(MutationReturnInterface): class RecoveryKeyLimitsInput: """Recovery key limits input""" - expiration_date: typing.Optional[datetime.datetime] - uses: typing.Optional[int] + expiration_date: typing.Optional[datetime.datetime] = None + uses: typing.Optional[int] = None @strawberry.input @@ -61,26 +61,30 @@ class UseNewDeviceKeyInput: class ApiMutations: @strawberry.mutation(permission_classes=[IsAuthenticated]) def get_new_recovery_api_key( - self, limits: RecoveryKeyLimitsInput + self, limits: typing.Optional[RecoveryKeyLimitsInput] = None ) -> ApiKeyMutationReturn: """Generate recovery key""" - if limits.expiration_date is not None: - if limits.expiration_date < datetime.datetime.now(): - return ApiKeyMutationReturn( - success=False, - message="Expiration date must be in the future", - code=400, - key=None, - ) - if limits.uses is not None: - if limits.uses < 1: - return ApiKeyMutationReturn( - success=False, - message="Uses must be greater than 0", - code=400, - key=None, - ) - key = generate_recovery_token(limits.expiration_date, limits.uses) + if limits is not None: + if limits.expiration_date is not None: + if limits.expiration_date < datetime.datetime.now(): + return ApiKeyMutationReturn( + success=False, + message="Expiration date must be in the future", + code=400, + key=None, + ) + if limits.uses is not None: + if limits.uses < 1: + return ApiKeyMutationReturn( + success=False, + message="Uses must be greater than 0", + code=400, + key=None, + ) + if limits is not None: + key = generate_recovery_token(limits.expiration_date, limits.uses) + else: + key = generate_recovery_token(None, None) return ApiKeyMutationReturn( success=True, message="Recovery key generated", diff --git a/selfprivacy_api/graphql/queries/system.py b/selfprivacy_api/graphql/queries/system.py index cadf074..a235e4d 100644 --- a/selfprivacy_api/graphql/queries/system.py +++ b/selfprivacy_api/graphql/queries/system.py @@ -158,7 +158,7 @@ class System: timestamp=None, ) ) - domain: SystemDomainInfo = strawberry.field(resolver=get_system_domain_info) + domain_info: SystemDomainInfo = strawberry.field(resolver=get_system_domain_info) settings: SystemSettings = SystemSettings() info: SystemInfo = SystemInfo() provider: SystemProviderInfo = strawberry.field(resolver=get_system_provider_info) diff --git a/selfprivacy_api/resources/api_auth/recovery_token.py b/selfprivacy_api/resources/api_auth/recovery_token.py index e97c87a..912a50b 100644 --- a/selfprivacy_api/resources/api_auth/recovery_token.py +++ b/selfprivacy_api/resources/api_auth/recovery_token.py @@ -4,6 +4,7 @@ from datetime import datetime from flask_restful import Resource, reqparse from selfprivacy_api.resources.api_auth import api +from selfprivacy_api.utils import parse_date from selfprivacy_api.utils.auth import ( is_recovery_token_exists, is_recovery_token_valid, @@ -129,19 +130,17 @@ class RecoveryToken(Resource): # Convert expiration date to datetime and return 400 if it is not valid if args["expiration"]: try: - expiration = datetime.strptime( - args["expiration"], "%Y-%m-%dT%H:%M:%S.%fZ" - ) + expiration = parse_date(args["expiration"]) # Retrun 400 if expiration date is in the past if expiration < datetime.now(): return {"message": "Expiration date cannot be in the past"}, 400 except ValueError: return { - "error": "Invalid expiration date. Use YYYY-MM-DDTHH:MM:SS.SSSZ" + "error": "Invalid expiration date. Use YYYY-MM-DDTHH:MM:SS.SSS" }, 400 else: expiration = None - if args["uses"] != None and args["uses"] < 1: + if args["uses"] is not None and args["uses"] < 1: return {"message": "Uses must be greater than 0"}, 400 # Generate recovery token token = generate_recovery_token(expiration, args["uses"]) diff --git a/selfprivacy_api/utils/__init__.py b/selfprivacy_api/utils/__init__.py index 81dc354..c80dd99 100644 --- a/selfprivacy_api/utils/__init__.py +++ b/selfprivacy_api/utils/__init__.py @@ -125,13 +125,29 @@ def is_username_forbidden(username): def parse_date(date_str: str) -> datetime.datetime: - """Parse date string which can be in - %Y-%m-%dT%H:%M:%S.%fZ or %Y-%m-%d %H:%M:%S.%f format""" - return ( - datetime.datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%fZ") - if date_str.endswith("Z") - else datetime.datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S.%f") - ) + """Parse date string which can be in one of these formats: + - %Y-%m-%dT%H:%M:%S.%fZ + - %Y-%m-%dT%H:%M:%S.%f + - %Y-%m-%d %H:%M:%S.%fZ + - %Y-%m-%d %H:%M:%S.%f + """ + try: + return datetime.datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S.%fZ") + except ValueError: + pass + try: + return datetime.datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S.%f") + except ValueError: + pass + try: + return datetime.datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + pass + try: + return datetime.datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%f") + except ValueError: + pass + raise ValueError("Invalid date string") def get_dkim_key(domain): diff --git a/selfprivacy_api/utils/auth.py b/selfprivacy_api/utils/auth.py index 6fff698..f512948 100644 --- a/selfprivacy_api/utils/auth.py +++ b/selfprivacy_api/utils/auth.py @@ -7,7 +7,7 @@ import typing from mnemonic import Mnemonic -from . import ReadUserData, UserDataFiles, WriteUserData +from . import ReadUserData, UserDataFiles, WriteUserData, parse_date """ Token are stored in the tokens.json file. @@ -121,7 +121,7 @@ def create_token(name): { "token": token, "name": name, - "date": str(datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")), + "date": str(datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")), } ) return token @@ -161,9 +161,7 @@ def is_recovery_token_valid(): return False if "expiration" not in recovery_token or recovery_token["expiration"] is None: return True - return datetime.now() < datetime.strptime( - recovery_token["expiration"], "%Y-%m-%dT%H:%M:%S.%fZ" - ) + return datetime.now() < parse_date(recovery_token["expiration"]) def get_recovery_token_status(): @@ -213,8 +211,8 @@ def generate_recovery_token( with WriteUserData(UserDataFiles.TOKENS) as tokens: tokens["recovery_token"] = { "token": recovery_token_str, - "date": str(datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")), - "expiration": expiration.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + "date": str(datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")), + "expiration": expiration.strftime("%Y-%m-%dT%H:%M:%S.%f") if expiration is not None else None, "uses_left": uses_left if uses_left is not None else None, @@ -285,14 +283,7 @@ def _get_new_device_auth_token(): new_device = tokens["new_device"] if "expiration" not in new_device: return None - if new_device["expiration"].endswith("Z"): - expiration = datetime.strptime( - new_device["expiration"], "%Y-%m-%dT%H:%M:%S.%fZ" - ) - else: - expiration = datetime.strptime( - new_device["expiration"], "%Y-%m-%d %H:%M:%S.%f" - ) + expiration = parse_date(new_device["expiration"]) if datetime.now() > expiration: return None return new_device["token"] diff --git a/tests/test_auth.py b/tests/test_auth.py index 4d78f62..d209c9c 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -25,6 +25,13 @@ TOKENS_FILE_CONTETS = { ] } +DATE_FORMATS = [ + "%Y-%m-%dT%H:%M:%S.%fZ", + "%Y-%m-%dT%H:%M:%S.%f", + "%Y-%m-%d %H:%M:%S.%fZ", + "%Y-%m-%d %H:%M:%S.%f", +] + def test_get_tokens_info(authorized_client, tokens_file): response = authorized_client.get("/auth/tokens") @@ -261,7 +268,7 @@ def test_generate_recovery_token(authorized_client, client, tokens_file): assert time_generated is not None # Assert that the token was generated near the current time assert ( - datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%fZ") + datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f") - datetime.timedelta(seconds=5) < datetime.datetime.now() ) @@ -298,14 +305,14 @@ def test_generate_recovery_token(authorized_client, client, tokens_file): assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2" +@pytest.mark.parametrize("timeformat", DATE_FORMATS) def test_generate_recovery_token_with_expiration_date( - authorized_client, client, tokens_file + authorized_client, client, tokens_file, timeformat ): # Generate token with expiration date # Generate expiration date in the future - # Expiration date format is YYYY-MM-DDTHH:MM:SS.SSSZ expiration_date = datetime.datetime.now() + datetime.timedelta(minutes=5) - expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + expiration_date_str = expiration_date.strftime(timeformat) response = authorized_client.post( "/auth/recovery_token", json={"expiration": expiration_date_str}, @@ -315,13 +322,15 @@ def test_generate_recovery_token_with_expiration_date( mnemonic_token = response.json["token"] token = Mnemonic(language="english").to_entropy(mnemonic_token).hex() assert read_json(tokens_file)["recovery_token"]["token"] == token - assert read_json(tokens_file)["recovery_token"]["expiration"] == expiration_date_str + assert datetime.datetime.strptime( + read_json(tokens_file)["recovery_token"]["expiration"], "%Y-%m-%dT%H:%M:%S.%f" + ) == datetime.datetime.strptime(expiration_date_str, timeformat) time_generated = read_json(tokens_file)["recovery_token"]["date"] assert time_generated is not None # Assert that the token was generated near the current time assert ( - datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%fZ") + datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f") - datetime.timedelta(seconds=5) < datetime.datetime.now() ) @@ -333,7 +342,7 @@ def test_generate_recovery_token_with_expiration_date( "exists": True, "valid": True, "date": time_generated, - "expiration": expiration_date_str, + "expiration": expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%f"), "uses_left": None, } @@ -360,7 +369,7 @@ def test_generate_recovery_token_with_expiration_date( # Try to use token after expiration date new_data = read_json(tokens_file) new_data["recovery_token"]["expiration"] = datetime.datetime.now().strftime( - "%Y-%m-%dT%H:%M:%S.%fZ" + "%Y-%m-%dT%H:%M:%S.%f" ) write_json(tokens_file, new_data) recovery_response = client.post( @@ -383,12 +392,13 @@ def test_generate_recovery_token_with_expiration_date( } +@pytest.mark.parametrize("timeformat", DATE_FORMATS) def test_generate_recovery_token_with_expiration_in_the_past( - authorized_client, tokens_file + authorized_client, tokens_file, timeformat ): # Server must return 400 if expiration date is in the past expiration_date = datetime.datetime.now() - datetime.timedelta(minutes=5) - expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + expiration_date_str = expiration_date.strftime(timeformat) response = authorized_client.post( "/auth/recovery_token", json={"expiration": expiration_date_str}, @@ -429,7 +439,7 @@ def test_generate_recovery_token_with_limited_uses( time_generated = read_json(tokens_file)["recovery_token"]["date"] assert time_generated is not None assert ( - datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%fZ") + datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f") - datetime.timedelta(seconds=5) < datetime.datetime.now() ) diff --git a/tests/test_graphql/test_api_recovery.py b/tests/test_graphql/test_api_recovery.py index 0021e5d..2d1e16a 100644 --- a/tests/test_graphql/test_api_recovery.py +++ b/tests/test_graphql/test_api_recovery.py @@ -2,6 +2,7 @@ # pylint: disable=unused-argument # pylint: disable=missing-function-docstring import json +from time import strftime import pytest import datetime @@ -58,7 +59,7 @@ def test_graphql_recovery_key_status_when_none_exists(authorized_client, tokens_ API_RECOVERY_KEY_GENERATE_MUTATION = """ -mutation TestGenerateRecoveryKey($limits: RecoveryKeyLimitsInput!) { +mutation TestGenerateRecoveryKey($limits: RecoveryKeyLimitsInput) { getNewRecoveryApiKey(limits: $limits) { success message @@ -85,12 +86,6 @@ def test_graphql_generate_recovery_key(client, authorized_client, tokens_file): "/graphql", json={ "query": API_RECOVERY_KEY_GENERATE_MUTATION, - "variables": { - "limits": { - "uses": None, - "expirationDate": None, - }, - }, }, ) assert response.status_code == 200 @@ -107,7 +102,7 @@ def test_graphql_generate_recovery_key(client, authorized_client, tokens_file): assert time_generated is not None key = response.json["data"]["getNewRecoveryApiKey"]["key"] assert ( - datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%fZ") + datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f") - datetime.timedelta(seconds=5) < datetime.datetime.now() ) @@ -122,7 +117,9 @@ def test_graphql_generate_recovery_key(client, authorized_client, tokens_file): assert response.json["data"]["api"]["recoveryKey"] is not None assert response.json["data"]["api"]["recoveryKey"]["exists"] is True assert response.json["data"]["api"]["recoveryKey"]["valid"] is True - assert response.json["data"]["api"]["recoveryKey"]["creationDate"] == time_generated + assert response.json["data"]["api"]["recoveryKey"][ + "creationDate" + ] == time_generated.replace("Z", "") assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] is None assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] is None @@ -134,7 +131,7 @@ def test_graphql_generate_recovery_key(client, authorized_client, tokens_file): "variables": { "input": { "key": key, - "deviceName": "test_token", + "deviceName": "new_test_token", }, }, }, @@ -149,7 +146,7 @@ def test_graphql_generate_recovery_key(client, authorized_client, tokens_file): response.json["data"]["useRecoveryApiKey"]["token"] == read_json(tokens_file)["tokens"][2]["token"] ) - assert read_json(tokens_file)["tokens"][2]["name"] == "test_token" + assert read_json(tokens_file)["tokens"][2]["name"] == "new_test_token" # Try to use token again response = client.post( @@ -159,7 +156,7 @@ def test_graphql_generate_recovery_key(client, authorized_client, tokens_file): "variables": { "input": { "key": key, - "deviceName": "test_token2", + "deviceName": "new_test_token2", }, }, }, @@ -174,7 +171,7 @@ def test_graphql_generate_recovery_key(client, authorized_client, tokens_file): response.json["data"]["useRecoveryApiKey"]["token"] == read_json(tokens_file)["tokens"][3]["token"] ) - assert read_json(tokens_file)["tokens"][3]["name"] == "test_token2" + assert read_json(tokens_file)["tokens"][3]["name"] == "new_test_token2" def test_graphql_generate_recovery_key_with_expiration_date( @@ -188,7 +185,6 @@ def test_graphql_generate_recovery_key_with_expiration_date( "query": API_RECOVERY_KEY_GENERATE_MUTATION, "variables": { "limits": { - "uses": None, "expirationDate": expiration_date_str, }, }, @@ -212,7 +208,7 @@ def test_graphql_generate_recovery_key_with_expiration_date( time_generated = read_json(tokens_file)["recovery_token"]["date"] assert time_generated is not None assert ( - datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%fZ") + datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f") - datetime.timedelta(seconds=5) < datetime.datetime.now() ) @@ -227,7 +223,9 @@ def test_graphql_generate_recovery_key_with_expiration_date( assert response.json["data"]["api"]["recoveryKey"] is not None assert response.json["data"]["api"]["recoveryKey"]["exists"] is True assert response.json["data"]["api"]["recoveryKey"]["valid"] is True - assert response.json["data"]["api"]["recoveryKey"]["creationDate"] == time_generated + assert response.json["data"]["api"]["recoveryKey"][ + "creationDate" + ] == time_generated.replace("Z", "") assert ( response.json["data"]["api"]["recoveryKey"]["expirationDate"] == expiration_date_str @@ -242,7 +240,7 @@ def test_graphql_generate_recovery_key_with_expiration_date( "variables": { "input": { "key": key, - "deviceName": "test_token", + "deviceName": "new_test_token", }, }, }, @@ -266,7 +264,7 @@ def test_graphql_generate_recovery_key_with_expiration_date( "variables": { "input": { "key": key, - "deviceName": "test_token2", + "deviceName": "new_test_token2", }, }, }, @@ -284,9 +282,9 @@ def test_graphql_generate_recovery_key_with_expiration_date( # Try to use token after expiration date new_data = read_json(tokens_file) - new_data["recovery_token"][ - "expirationDate" - ] = datetime.datetime.now() - datetime.timedelta(minutes=5) + new_data["recovery_token"]["expiration"] = ( + datetime.datetime.now() - datetime.timedelta(minutes=5) + ).strftime("%Y-%m-%dT%H:%M:%S.%f") write_json(tokens_file, new_data) response = authorized_client.post( "/graphql", @@ -295,7 +293,7 @@ def test_graphql_generate_recovery_key_with_expiration_date( "variables": { "input": { "key": key, - "deviceName": "test_token3", + "deviceName": "new_test_token3", }, }, }, @@ -339,7 +337,6 @@ def test_graphql_generate_recovery_key_with_expiration_in_the_past( "query": API_RECOVERY_KEY_GENERATE_MUTATION, "variables": { "limits": { - "uses": None, "expirationDate": expiration_date_str, }, }, @@ -366,7 +363,6 @@ def test_graphql_generate_recovery_key_with_invalid_time_format( "query": API_RECOVERY_KEY_GENERATE_MUTATION, "variables": { "limits": { - "uses": None, "expirationDate": expiration_date_str, }, }, @@ -521,7 +517,6 @@ def test_graphql_generate_recovery_key_with_negative_uses( "variables": { "limits": { "uses": -1, - "expirationDate": None, }, }, }, @@ -543,7 +538,6 @@ def test_graphql_generate_recovery_key_with_zero_uses(authorized_client, tokens_ "variables": { "limits": { "uses": 0, - "expirationDate": None, }, }, }, diff --git a/tests/test_graphql/test_system.py b/tests/test_graphql/test_system.py index 4641329..75b2c28 100644 --- a/tests/test_graphql/test_system.py +++ b/tests/test_graphql/test_system.py @@ -338,8 +338,170 @@ def test_graphql_change_timezone_unauthorized(client, turned_on): assert response.json.get("data") is None -API_CHANGE_SERVER_SETTINGS = """ -mutation changeServerSettings($settings: SystemSettingsInput!) { +def test_graphql_change_timezone(authorized_client, turned_on): + """Test change timezone""" + response = authorized_client.post( + "/graphql", + json={ + "query": API_CHANGE_TIMEZONE_MUTATION, + "variables": { + "timezone": "Europe/Helsinki", + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["changeTimezone"]["success"] is True + assert response.json["data"]["changeTimezone"]["message"] is not None + assert response.json["data"]["changeTimezone"]["code"] == 200 + assert response.json["data"]["changeTimezone"]["timezone"] == "Europe/Helsinki" + assert read_json(turned_on / "turned_on.json")["timezone"] == "Europe/Helsinki" + + +def test_graphql_change_timezone_on_undefined(authorized_client, undefined_config): + """Test change timezone when none is defined in config""" + response = authorized_client.post( + "/graphql", + json={ + "query": API_CHANGE_TIMEZONE_MUTATION, + "variables": { + "timezone": "Europe/Helsinki", + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["changeTimezone"]["success"] is True + assert response.json["data"]["changeTimezone"]["message"] is not None + assert response.json["data"]["changeTimezone"]["code"] == 200 + assert response.json["data"]["changeTimezone"]["timezone"] == "Europe/Helsinki" + assert ( + read_json(undefined_config / "undefined.json")["timezone"] == "Europe/Helsinki" + ) + + +def test_graphql_change_timezone_without_timezone(authorized_client, turned_on): + """Test change timezone without timezone""" + response = authorized_client.post( + "/graphql", + json={ + "query": API_CHANGE_TIMEZONE_MUTATION, + "variables": { + "timezone": "", + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["changeTimezone"]["success"] is False + assert response.json["data"]["changeTimezone"]["message"] is not None + assert response.json["data"]["changeTimezone"]["code"] == 400 + assert response.json["data"]["changeTimezone"]["timezone"] is None + assert read_json(turned_on / "turned_on.json")["timezone"] == "Europe/Moscow" + + +def test_graphql_change_timezone_with_invalid_timezone(authorized_client, turned_on): + """Test change timezone with invalid timezone""" + response = authorized_client.post( + "/graphql", + json={ + "query": API_CHANGE_TIMEZONE_MUTATION, + "variables": { + "timezone": "Invlaid/Timezone", + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["changeTimezone"]["success"] is False + assert response.json["data"]["changeTimezone"]["message"] is not None + assert response.json["data"]["changeTimezone"]["code"] == 400 + assert response.json["data"]["changeTimezone"]["timezone"] is None + assert read_json(turned_on / "turned_on.json")["timezone"] == "Europe/Moscow" + + +API_GET_AUTO_UPGRADE_SETTINGS_QUERY = """ +settings { + autoUpgrade { + enableAutoUpgrade + allowReboot + } +} +""" + + +def test_graphql_get_auto_upgrade_unauthorized(client, turned_on): + """Test get auto upgrade settings without auth""" + response = client.get( + "/graphql", + json={ + "query": API_GET_AUTO_UPGRADE_SETTINGS_QUERY, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is None + + +def test_graphql_get_auto_upgrade(authorized_client, turned_on): + """Test get auto upgrade settings""" + response = authorized_client.get( + "/graphql", + json={ + "query": API_GET_AUTO_UPGRADE_SETTINGS_QUERY, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["settings"]["autoUpgrade"]["enableAutoUpgrade"] is True + assert response.json["data"]["settings"]["autoUpgrade"]["allowReboot"] is True + + +def test_graphql_get_auto_upgrade_on_undefined(authorized_client, undefined_config): + """Test get auto upgrade settings when none is defined in config""" + response = authorized_client.get( + "/graphql", + json={ + "query": API_GET_AUTO_UPGRADE_SETTINGS_QUERY, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["settings"]["autoUpgrade"]["enableAutoUpgrade"] is True + assert response.json["data"]["settings"]["autoUpgrade"]["allowReboot"] is False + + +def test_graphql_get_auto_upgrade_without_vlaues(authorized_client, no_values): + """Test get auto upgrade settings without values""" + response = authorized_client.get( + "/graphql", + json={ + "query": API_GET_AUTO_UPGRADE_SETTINGS_QUERY, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["settings"]["autoUpgrade"]["enableAutoUpgrade"] is True + assert response.json["data"]["settings"]["autoUpgrade"]["allowReboot"] is False + + +def test_graphql_get_auto_upgrade_turned_off(authorized_client, turned_off): + """Test get auto upgrade settings when turned off""" + response = authorized_client.get( + "/graphql", + json={ + "query": API_GET_AUTO_UPGRADE_SETTINGS_QUERY, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert ( + response.json["data"]["settings"]["autoUpgrade"]["enableAutoUpgrade"] is False + ) + assert response.json["data"]["settings"]["autoUpgrade"]["allowReboot"] is False + + +API_CHANGE_AUTO_UPGRADE_SETTINGS = """ +mutation changeServerSettings($settings: AutoUpgradeSettingsInput!) { changeAutoUpgradeSettings(settings: $settings) { success message @@ -349,3 +511,355 @@ mutation changeServerSettings($settings: SystemSettingsInput!) { } } """ + + +def test_graphql_change_auto_upgrade_unauthorized(client, turned_on): + """Test change auto upgrade settings without auth""" + response = client.post( + "/graphql", + json={ + "query": API_CHANGE_AUTO_UPGRADE_SETTINGS, + "variables": { + "settings": { + "enableAutoUpgrade": True, + "allowReboot": True, + }, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is None + + +def test_graphql_change_auto_upgrade(authorized_client, turned_on): + """Test change auto upgrade settings""" + response = authorized_client.post( + "/graphql", + json={ + "query": API_CHANGE_AUTO_UPGRADE_SETTINGS, + "variables": { + "settings": { + "enableAutoUpgrade": False, + "allowReboot": True, + }, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["changeAutoUpgradeSettings"]["success"] is True + assert response.json["data"]["changeAutoUpgradeSettings"]["message"] is not None + assert response.json["data"]["changeAutoUpgradeSettings"]["code"] == 200 + assert ( + response.json["data"]["changeAutoUpgradeSettings"]["enableAutoUpgrade"] is False + ) + assert response.json["data"]["changeAutoUpgradeSettings"]["allowReboot"] is True + assert read_json(turned_on / "turned_on.json")["autoUpgrade"]["enable"] is False + assert read_json(turned_on / "turned_on.json")["autoUpgrade"]["allowReboot"] is True + + +def test_graphql_change_auto_upgrade_on_undefined(authorized_client, undefined_config): + """Test change auto upgrade settings when none is defined in config""" + response = authorized_client.post( + "/graphql", + json={ + "query": API_CHANGE_AUTO_UPGRADE_SETTINGS, + "variables": { + "settings": { + "enableAutoUpgrade": False, + "allowReboot": True, + }, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["changeAutoUpgradeSettings"]["success"] is True + assert response.json["data"]["changeAutoUpgradeSettings"]["message"] is not None + assert response.json["data"]["changeAutoUpgradeSettings"]["code"] == 200 + assert ( + response.json["data"]["changeAutoUpgradeSettings"]["enableAutoUpgrade"] is False + ) + assert response.json["data"]["changeAutoUpgradeSettings"]["allowReboot"] is True + assert ( + read_json(undefined_config / "undefined.json")["autoUpgrade"]["enable"] is False + ) + assert ( + read_json(undefined_config / "undefined.json")["autoUpgrade"]["allowReboot"] + is True + ) + + +def test_graphql_change_auto_upgrade_without_vlaues(authorized_client, no_values): + """Test change auto upgrade settings without values""" + response = authorized_client.post( + "/graphql", + json={ + "query": API_CHANGE_AUTO_UPGRADE_SETTINGS, + "variables": { + "settings": { + "enableAutoUpgrade": True, + "allowReboot": True, + }, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["changeAutoUpgradeSettings"]["success"] is True + assert response.json["data"]["changeAutoUpgradeSettings"]["message"] is not None + assert response.json["data"]["changeAutoUpgradeSettings"]["code"] == 200 + assert ( + response.json["data"]["changeAutoUpgradeSettings"]["enableAutoUpgrade"] is True + ) + assert response.json["data"]["changeAutoUpgradeSettings"]["allowReboot"] is True + assert read_json(no_values / "no_values.json")["autoUpgrade"]["enable"] is True + assert read_json(no_values / "no_values.json")["autoUpgrade"]["allowReboot"] is True + + +def test_graphql_change_auto_upgrade_turned_off(authorized_client, turned_off): + """Test change auto upgrade settings when turned off""" + response = authorized_client.post( + "/graphql", + json={ + "query": API_CHANGE_AUTO_UPGRADE_SETTINGS, + "variables": { + "settings": { + "enableAutoUpgrade": True, + "allowReboot": True, + }, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["changeAutoUpgradeSettings"]["success"] is True + assert response.json["data"]["changeAutoUpgradeSettings"]["message"] is not None + assert response.json["data"]["changeAutoUpgradeSettings"]["code"] == 200 + assert ( + response.json["data"]["changeAutoUpgradeSettings"]["enableAutoUpgrade"] is True + ) + assert response.json["data"]["changeAutoUpgradeSettings"]["allowReboot"] is True + assert read_json(turned_off / "turned_off.json")["autoUpgrade"]["enable"] is True + assert ( + read_json(turned_off / "turned_off.json")["autoUpgrade"]["allowReboot"] is True + ) + + +def test_grphql_change_auto_upgrade_without_enable(authorized_client, turned_off): + """Test change auto upgrade settings without enable""" + response = authorized_client.post( + "/graphql", + json={ + "query": API_CHANGE_AUTO_UPGRADE_SETTINGS, + "variables": { + "settings": { + "allowReboot": True, + }, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["changeAutoUpgradeSettings"]["success"] is True + assert response.json["data"]["changeAutoUpgradeSettings"]["message"] is not None + assert response.json["data"]["changeAutoUpgradeSettings"]["code"] == 200 + assert ( + response.json["data"]["changeAutoUpgradeSettings"]["enableAutoUpgrade"] is False + ) + assert response.json["data"]["changeAutoUpgradeSettings"]["allowReboot"] is True + assert read_json(turned_off / "turned_off.json")["autoUpgrade"]["enable"] is False + assert ( + read_json(turned_off / "turned_off.json")["autoUpgrade"]["allowReboot"] is True + ) + + +def test_graphql_change_auto_upgrade_without_allow_reboot( + authorized_client, turned_off +): + """Test change auto upgrade settings without allow reboot""" + response = authorized_client.post( + "/graphql", + json={ + "query": API_CHANGE_AUTO_UPGRADE_SETTINGS, + "variables": { + "settings": { + "enableAutoUpgrade": True, + }, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["changeAutoUpgradeSettings"]["success"] is True + assert response.json["data"]["changeAutoUpgradeSettings"]["message"] is not None + assert response.json["data"]["changeAutoUpgradeSettings"]["code"] == 200 + assert ( + response.json["data"]["changeAutoUpgradeSettings"]["enableAutoUpgrade"] is True + ) + assert response.json["data"]["changeAutoUpgradeSettings"]["allowReboot"] is False + assert read_json(turned_off / "turned_off.json")["autoUpgrade"]["enable"] is True + assert ( + read_json(turned_off / "turned_off.json")["autoUpgrade"]["allowReboot"] is False + ) + + +def test_graphql_change_auto_upgrade_with_empty_input(authorized_client, turned_off): + """Test change auto upgrade settings with empty input""" + response = authorized_client.post( + "/graphql", + json={ + "query": API_CHANGE_AUTO_UPGRADE_SETTINGS, + "variables": { + "settings": {}, + }, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is None + assert response.json["data"]["changeAutoUpgradeSettings"]["success"] is True + assert response.json["data"]["changeAutoUpgradeSettings"]["message"] is not None + assert response.json["data"]["changeAutoUpgradeSettings"]["code"] == 200 + assert ( + response.json["data"]["changeAutoUpgradeSettings"]["enableAutoUpgrade"] is False + ) + assert response.json["data"]["changeAutoUpgradeSettings"]["allowReboot"] is False + assert read_json(turned_off / "turned_off.json")["autoUpgrade"]["enable"] is False + assert ( + read_json(turned_off / "turned_off.json")["autoUpgrade"]["allowReboot"] is False + ) + + +API_REBUILD_SYSTEM_MUTATION = """ +mutation rebuildSystem() { + runSystemRebuild { + success + message + code + } +} +""" + + +def test_graphql_system_rebuild_unauthorized(client, mock_subprocess_popen): + """Test system rebuild without authorization""" + response = client.post( + "/graphql", + json={ + "query": API_REBUILD_SYSTEM_MUTATION, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is None + assert mock_subprocess_popen.call_count == 0 + + +def test_graphql_system_rebuild(authorized_client, mock_subprocess_popen): + """Test system rebuild""" + response = authorized_client.post( + "/graphql", + json={ + "query": API_REBUILD_SYSTEM_MUTATION, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["runSystemRebuild"]["success"] is True + assert response.json["data"]["runSystemRebuild"]["message"] is not None + assert response.json["data"]["runSystemRebuild"]["code"] == 200 + assert mock_subprocess_popen.call_count == 1 + assert mock_subprocess_popen.call_args[0][0] == [ + "systemctl", + "start", + "sp-nixos-rebuild.service", + ] + + +API_UPGRADE_SYSTEM_MUTATION = """ +mutation upgradeSystem() { + runSystemUpgrade { + success + message + code + } +} +""" + + +def test_graphql_system_upgrade_unauthorized(client, mock_subprocess_popen): + """Test system upgrade without authorization""" + response = client.post( + "/graphql", + json={ + "query": API_UPGRADE_SYSTEM_MUTATION, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is None + assert mock_subprocess_popen.call_count == 0 + + +def test_graphql_system_upgrade(authorized_client, mock_subprocess_popen): + """Test system upgrade""" + response = authorized_client.post( + "/graphql", + json={ + "query": API_UPGRADE_SYSTEM_MUTATION, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["runSystemUpgrade"]["success"] is True + assert response.json["data"]["runSystemUpgrade"]["message"] is not None + assert response.json["data"]["runSystemUpgrade"]["code"] == 200 + assert mock_subprocess_popen.call_count == 1 + assert mock_subprocess_popen.call_args[0][0] == [ + "systemctl", + "start", + "sp-nixos-upgrade.service", + ] + + +API_ROLLBACK_SYSTEM_MUTATION = """ +mutation rollbackSystem() { + runSystemRollback { + success + message + code + } +} +""" + + +def test_graphql_system_rollback_unauthorized(client, mock_subprocess_popen): + """Test system rollback without authorization""" + response = client.post( + "/graphql", + json={ + "query": API_ROLLBACK_SYSTEM_MUTATION, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is None + assert mock_subprocess_popen.call_count == 0 + + +def test_graphql_system_rollback(authorized_client, mock_subprocess_popen): + """Test system rollback""" + response = authorized_client.post( + "/graphql", + json={ + "query": API_ROLLBACK_SYSTEM_MUTATION, + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["runSystemRollback"]["success"] is True + assert response.json["data"]["runSystemRollback"]["message"] is not None + assert response.json["data"]["runSystemRollback"]["code"] == 200 + assert mock_subprocess_popen.call_count == 1 + assert mock_subprocess_popen.call_args[0][0] == [ + "systemctl", + "start", + "sp-nixos-rollback.service", + ]