fix recovery tests

graphql
def 2022-07-07 15:53:19 +02:00
parent 63f3b2f4d1
commit 9bd2896db8
17 changed files with 372 additions and 193 deletions

View File

@ -5,7 +5,10 @@ import typing
from flask import request
import strawberry
from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.graphql.mutations.mutation_interface import GenericMutationReturn, MutationReturnInterface
from selfprivacy_api.graphql.mutations.mutation_interface import (
GenericMutationReturn,
MutationReturnInterface,
)
from selfprivacy_api.utils.auth import (
delete_new_device_auth_token,
@ -16,39 +19,50 @@ from selfprivacy_api.utils.auth import (
is_token_name_pair_valid,
refresh_token,
use_mnemonic_recoverery_token,
use_new_device_auth_token
use_new_device_auth_token,
)
@strawberry.type
class ApiKeyMutationReturn(MutationReturnInterface):
key: typing.Optional[str]
@strawberry.type
class DeviceApiTokenMutationReturn(MutationReturnInterface):
token: typing.Optional[str]
@strawberry.input
class RecoveryKeyLimitsInput:
"""Recovery key limits input"""
expiration_date: typing.Optional[datetime.datetime]
uses: typing.Optional[int]
@strawberry.input
class UseRecoveryKeyInput:
"""Use recovery key input"""
key: str
deviceName: str
@strawberry.input
class UseNewDeviceKeyInput:
"""Use new device key input"""
key: str
deviceName: str
@strawberry.type
class ApiMutations:
@strawberry.mutation(permission_classes=[IsAuthenticated])
def get_new_recovery_api_key(self, limits: RecoveryKeyLimitsInput) -> ApiKeyMutationReturn:
def get_new_recovery_api_key(
self, limits: RecoveryKeyLimitsInput
) -> ApiKeyMutationReturn:
"""Generate recovery key"""
if limits.expiration_date is not None:
if limits.expiration_date < datetime.datetime.now():
@ -75,7 +89,9 @@ class ApiMutations:
)
@strawberry.mutation()
def use_recovery_api_key(self, input: UseRecoveryKeyInput) -> DeviceApiTokenMutationReturn:
def use_recovery_api_key(
self, input: UseRecoveryKeyInput
) -> DeviceApiTokenMutationReturn:
"""Use recovery key"""
token = use_mnemonic_recoverery_token(input.key, input.deviceName)
if token is None:
@ -89,13 +105,17 @@ class ApiMutations:
success=True,
message="Recovery key used",
code=200,
token=None,
token=token,
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def refresh_device_api_token(self) -> DeviceApiTokenMutationReturn:
"""Refresh device api token"""
token = request.headers.get("Authorization").split(" ")[1] if request.headers.get("Authorization") is not None else None
token = (
request.headers.get("Authorization").split(" ")[1]
if request.headers.get("Authorization") is not None
else None
)
if token is None:
return DeviceApiTokenMutationReturn(
success=False,
@ -121,7 +141,11 @@ class ApiMutations:
@strawberry.mutation(permission_classes=[IsAuthenticated])
def delete_device_api_token(self, device: str) -> GenericMutationReturn:
"""Delete device api token"""
self_token = request.headers.get("Authorization").split(" ")[1] if request.headers.get("Authorization") is not None else None
self_token = (
request.headers.get("Authorization").split(" ")[1]
if request.headers.get("Authorization") is not None
else None
)
if self_token is not None and is_token_name_pair_valid(device, self_token):
return GenericMutationReturn(
success=False,
@ -163,7 +187,9 @@ class ApiMutations:
)
@strawberry.mutation()
def authorize_with_new_device_api_key(self, input: UseNewDeviceKeyInput) -> DeviceApiTokenMutationReturn:
def authorize_with_new_device_api_key(
self, input: UseNewDeviceKeyInput
) -> DeviceApiTokenMutationReturn:
"""Authorize with new device api key"""
token = use_new_device_auth_token(input.key, input.deviceName)
if token is None:

View File

@ -1,11 +1,13 @@
import strawberry
@strawberry.interface
class MutationReturnInterface:
success: bool
message: str
code: int
@strawberry.type
class GenericMutationReturn(MutationReturnInterface):
pass

View File

@ -150,12 +150,14 @@ class System:
Base system type which represents common system status
"""
status: Alert = strawberry.field(resolver=lambda: Alert(
severity=Severity.INFO,
title="Test message",
message="Test message",
timestamp=None
))
status: Alert = strawberry.field(
resolver=lambda: Alert(
severity=Severity.INFO,
title="Test message",
message="Test message",
timestamp=None,
)
)
domain: SystemDomainInfo = strawberry.field(resolver=get_system_domain_info)
settings: SystemSettings = SystemSettings()
info: SystemInfo = SystemInfo()

View File

@ -23,9 +23,12 @@ class Query:
"""API access status"""
return Api()
@strawberry.type
class Mutation(ApiMutations):
"""Root schema for mutations"""
pass
schema = strawberry.Schema(query=Query, mutation=Mutation)

View File

@ -33,7 +33,7 @@ class DKIMKey(Resource):
dkim = get_dkim_key(domain)
if dkim is None:
return "DKIM file not found", 404
return "DKIM file not found", 404
dkim = base64.b64encode(dkim.encode("utf-8")).decode("utf-8")
return dkim

View File

@ -133,6 +133,7 @@ def parse_date(date_str: str) -> datetime.datetime:
else datetime.datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S.%f")
)
def get_dkim_key(domain):
"""Get DKIM key from /var/dkim/<domain>.selector.txt"""
if os.path.exists("/var/dkim/" + domain + ".selector.txt"):

View File

@ -191,7 +191,9 @@ def _get_recovery_token():
return tokens["recovery_token"]["token"]
def generate_recovery_token(expiration: typing.Optional[datetime], uses_left: typing.Optional[int]) -> str:
def generate_recovery_token(
expiration: typing.Optional[datetime], uses_left: typing.Optional[int]
) -> str:
"""Generate a 24 bytes recovery token and return a mneomnic word list.
Write a string representation of the recovery token to the tokens.json file.
"""

View File

@ -3,19 +3,25 @@
import subprocess
import re
def get_ip4():
"""Get IPv4 address"""
try:
ip4 = subprocess.check_output(["ip", "addr", "show", "dev", "eth0"]).decode("utf-8")
ip4 = subprocess.check_output(["ip", "addr", "show", "dev", "eth0"]).decode(
"utf-8"
)
ip4 = re.search(r"inet (\d+\.\d+\.\d+\.\d+)\/\d+", ip4)
except subprocess.CalledProcessError:
ip4 = None
return ip4.group(1) if ip4 else None
def get_ip6():
"""Get IPv6 address"""
try:
ip6 = subprocess.check_output(["ip", "addr", "show", "dev", "eth0"]).decode("utf-8")
ip6 = subprocess.check_output(["ip", "addr", "show", "dev", "eth0"]).decode(
"utf-8"
)
ip6 = re.search(r"inet6 (\S+)\/\d+", ip6)
except subprocess.CalledProcessError:
ip6 = None

View File

@ -1,6 +1,7 @@
import json
from mnemonic import Mnemonic
def read_json(file_path):
with open(file_path, "r", encoding="utf-8") as file:
return json.load(file)
@ -10,11 +11,14 @@ def write_json(file_path, data):
with open(file_path, "w", encoding="utf-8") as file:
json.dump(data, file, indent=4)
def generate_api_query(query_array):
return "query TestApi {\n api {" + "\n".join(query_array) + "}\n}"
def generate_system_query(query_array):
return "query TestSystem {\n system {" + "\n".join(query_array) + "}\n}"
def mnemonic_to_hex(mnemonic):
return Mnemonic(language="english").to_entropy(mnemonic).hex()

View File

@ -35,6 +35,7 @@ def client(app, tokens_file):
class AuthorizedClient(testing.FlaskClient):
"""Flask authorized test client."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.token = "TEST_TOKEN"
@ -48,6 +49,7 @@ class AuthorizedClient(testing.FlaskClient):
class WrongAuthClient(testing.FlaskClient):
"""Flask client with wrong token"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.token = "WRONG_TOKEN"

View File

@ -23,11 +23,14 @@ TOKENS_FILE_CONTETS = {
]
}
def test_graphql_get_entire_api_data(authorized_client, tokens_file):
response = authorized_client.get(
"/graphql",
json={
"query": generate_api_query([API_VERSION_QUERY, API_DEVICES_QUERY, API_RECOVERY_QUERY])
"query": generate_api_query(
[API_VERSION_QUERY, API_DEVICES_QUERY, API_RECOVERY_QUERY]
)
},
)
assert response.status_code == 200
@ -35,10 +38,16 @@ def test_graphql_get_entire_api_data(authorized_client, tokens_file):
assert "version" in response.get_json()["data"]["api"]
assert response.json["data"]["api"]["devices"] is not None
assert len(response.json["data"]["api"]["devices"]) == 2
assert response.json["data"]["api"]["devices"][0]["creationDate"] == "2022-01-14T08:31:10.789314"
assert (
response.json["data"]["api"]["devices"][0]["creationDate"]
== "2022-01-14T08:31:10.789314"
)
assert response.json["data"]["api"]["devices"][0]["isCaller"] is True
assert response.json["data"]["api"]["devices"][0]["name"] == "test_token"
assert response.json["data"]["api"]["devices"][1]["creationDate"] == "2022-01-14T08:31:10.789314"
assert (
response.json["data"]["api"]["devices"][1]["creationDate"]
== "2022-01-14T08:31:10.789314"
)
assert response.json["data"]["api"]["devices"][1]["isCaller"] is False
assert response.json["data"]["api"]["devices"][1]["name"] == "test_token2"
assert response.json["data"]["api"]["recoveryKey"] is not None

View File

@ -29,34 +29,39 @@ devices {
}
"""
def test_graphql_tokens_info(authorized_client, tokens_file):
response = authorized_client.get(
"/graphql",
json={
"query": generate_api_query([API_DEVICES_QUERY])
},
json={"query": generate_api_query([API_DEVICES_QUERY])},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["api"]["devices"] is not None
assert len(response.json["data"]["api"]["devices"]) == 2
assert response.json["data"]["api"]["devices"][0]["creationDate"] == "2022-01-14T08:31:10.789314"
assert (
response.json["data"]["api"]["devices"][0]["creationDate"]
== "2022-01-14T08:31:10.789314"
)
assert response.json["data"]["api"]["devices"][0]["isCaller"] is True
assert response.json["data"]["api"]["devices"][0]["name"] == "test_token"
assert response.json["data"]["api"]["devices"][1]["creationDate"] == "2022-01-14T08:31:10.789314"
assert (
response.json["data"]["api"]["devices"][1]["creationDate"]
== "2022-01-14T08:31:10.789314"
)
assert response.json["data"]["api"]["devices"][1]["isCaller"] is False
assert response.json["data"]["api"]["devices"][1]["name"] == "test_token2"
def test_graphql_tokens_info_unauthorized(client, tokens_file):
response = client.get(
"/graphql",
json={
"query": generate_api_query([API_DEVICES_QUERY])
},
json={"query": generate_api_query([API_DEVICES_QUERY])},
)
assert response.status_code == 200
assert response.json["data"] is None
DELETE_TOKEN_MUTATION = """
mutation DeleteToken($device: String!) {
deleteDeviceApiToken(device: $device) {
@ -67,6 +72,7 @@ mutation DeleteToken($device: String!) {
}
"""
def test_graphql_delete_token_unauthorized(client, tokens_file):
response = client.post(
"/graphql",
@ -80,6 +86,7 @@ def test_graphql_delete_token_unauthorized(client, tokens_file):
assert response.status_code == 200
assert response.json["data"] is None
def test_graphql_delete_token(authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
@ -105,6 +112,7 @@ def test_graphql_delete_token(authorized_client, tokens_file):
]
}
def test_graphql_delete_self_token(authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
@ -122,6 +130,7 @@ def test_graphql_delete_self_token(authorized_client, tokens_file):
assert response.json["data"]["deleteDeviceApiToken"]["code"] == 400
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
def test_graphql_delete_nonexistent_token(authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
@ -139,6 +148,7 @@ def test_graphql_delete_nonexistent_token(authorized_client, tokens_file):
assert response.json["data"]["deleteDeviceApiToken"]["code"] == 404
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
REFRESH_TOKEN_MUTATION = """
mutation RefreshToken {
refreshDeviceApiToken {
@ -150,22 +160,20 @@ mutation RefreshToken {
}
"""
def test_graphql_refresh_token_unauthorized(client, tokens_file):
response = client.post(
"/graphql",
json={
"query": REFRESH_TOKEN_MUTATION
},
json={"query": REFRESH_TOKEN_MUTATION},
)
assert response.status_code == 200
assert response.json["data"] is None
def test_graphql_refresh_token(authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
json={
"query": REFRESH_TOKEN_MUTATION
},
json={"query": REFRESH_TOKEN_MUTATION},
)
assert response.status_code == 200
assert response.json.get("data") is not None
@ -178,6 +186,7 @@ def test_graphql_refresh_token(authorized_client, tokens_file):
"date": "2022-01-14 08:31:10.789314",
}
NEW_DEVICE_KEY_MUTATION = """
mutation NewDeviceKey {
getNewDeviceApiKey {
@ -189,22 +198,20 @@ mutation NewDeviceKey {
}
"""
def test_graphql_get_new_device_auth_key_unauthorized(client, tokens_file):
response = client.post(
"/graphql",
json={
"query": NEW_DEVICE_KEY_MUTATION
},
json={"query": NEW_DEVICE_KEY_MUTATION},
)
assert response.status_code == 200
assert response.json["data"] is None
def test_graphql_get_new_device_auth_key(authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
json={
"query": NEW_DEVICE_KEY_MUTATION
},
json={"query": NEW_DEVICE_KEY_MUTATION},
)
assert response.status_code == 200
assert response.json.get("data") is not None
@ -212,9 +219,14 @@ def test_graphql_get_new_device_auth_key(authorized_client, tokens_file):
assert response.json["data"]["getNewDeviceApiKey"]["message"] is not None
assert response.json["data"]["getNewDeviceApiKey"]["code"] == 200
assert response.json["data"]["getNewDeviceApiKey"]["key"].split(" ").__len__() == 12
token = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiKey"]["key"]).hex()
token = (
Mnemonic(language="english")
.to_entropy(response.json["data"]["getNewDeviceApiKey"]["key"])
.hex()
)
assert read_json(tokens_file)["new_device"]["token"] == token
INVALIDATE_NEW_DEVICE_KEY_MUTATION = """
mutation InvalidateNewDeviceKey {
invalidateNewDeviceApiKey {
@ -225,6 +237,7 @@ mutation InvalidateNewDeviceKey {
}
"""
def test_graphql_invalidate_new_device_token_unauthorized(client, tokens_file):
response = client.post(
"/graphql",
@ -238,12 +251,11 @@ def test_graphql_invalidate_new_device_token_unauthorized(client, tokens_file):
assert response.status_code == 200
assert response.json["data"] is None
def test_graphql_get_and_delete_new_device_key(authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
json={
"query": NEW_DEVICE_KEY_MUTATION
},
json={"query": NEW_DEVICE_KEY_MUTATION},
)
assert response.status_code == 200
assert response.json.get("data") is not None
@ -251,13 +263,15 @@ def test_graphql_get_and_delete_new_device_key(authorized_client, tokens_file):
assert response.json["data"]["getNewDeviceApiKey"]["message"] is not None
assert response.json["data"]["getNewDeviceApiKey"]["code"] == 200
assert response.json["data"]["getNewDeviceApiKey"]["key"].split(" ").__len__() == 12
token = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiKey"]["key"]).hex()
token = (
Mnemonic(language="english")
.to_entropy(response.json["data"]["getNewDeviceApiKey"]["key"])
.hex()
)
assert read_json(tokens_file)["new_device"]["token"] == token
response = authorized_client.post(
"/graphql",
json={
"query": INVALIDATE_NEW_DEVICE_KEY_MUTATION
},
json={"query": INVALIDATE_NEW_DEVICE_KEY_MUTATION},
)
assert response.status_code == 200
assert response.json.get("data") is not None
@ -266,6 +280,7 @@ def test_graphql_get_and_delete_new_device_key(authorized_client, tokens_file):
assert response.json["data"]["invalidateNewDeviceApiKey"]["code"] == 200
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION = """
mutation AuthorizeWithNewDeviceKey($input: UseNewDeviceKeyInput!) {
authorizeWithNewDeviceApiKey(input: $input) {
@ -277,12 +292,11 @@ mutation AuthorizeWithNewDeviceKey($input: UseNewDeviceKeyInput!) {
}
"""
def test_graphql_get_and_authorize_new_device(client, authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
json={
"query": NEW_DEVICE_KEY_MUTATION
},
json={"query": NEW_DEVICE_KEY_MUTATION},
)
assert response.status_code == 200
assert response.json.get("data") is not None
@ -314,6 +328,7 @@ def test_graphql_get_and_authorize_new_device(client, authorized_client, tokens_
assert read_json(tokens_file)["tokens"][2]["token"] == token
assert read_json(tokens_file)["tokens"][2]["name"] == "new_device"
def test_graphql_authorize_new_device_with_invalid_key(client, tokens_file):
response = client.post(
"/graphql",
@ -334,12 +349,11 @@ def test_graphql_authorize_new_device_with_invalid_key(client, tokens_file):
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["code"] == 404
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
def test_graphql_get_and_authorize_used_key(client, authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
json={
"query": NEW_DEVICE_KEY_MUTATION
},
json={"query": NEW_DEVICE_KEY_MUTATION},
)
assert response.status_code == 200
assert response.json.get("data") is not None
@ -367,7 +381,10 @@ def test_graphql_get_and_authorize_used_key(client, authorized_client, tokens_fi
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["success"] is True
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["message"] is not None
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["code"] == 200
assert read_json(tokens_file)["tokens"][2]["token"] == response.json["data"]["authorizeWithNewDeviceApiKey"]["token"]
assert (
read_json(tokens_file)["tokens"][2]["token"]
== response.json["data"]["authorizeWithNewDeviceApiKey"]["token"]
)
assert read_json(tokens_file)["tokens"][2]["name"] == "new_token"
response = client.post(
@ -389,12 +406,13 @@ def test_graphql_get_and_authorize_used_key(client, authorized_client, tokens_fi
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["code"] == 404
assert read_json(tokens_file)["tokens"].__len__() == 3
def test_graphql_get_and_authorize_key_after_12_minutes(client, authorized_client, tokens_file):
def test_graphql_get_and_authorize_key_after_12_minutes(
client, authorized_client, tokens_file
):
response = authorized_client.post(
"/graphql",
json={
"query": NEW_DEVICE_KEY_MUTATION
},
json={"query": NEW_DEVICE_KEY_MUTATION},
)
assert response.status_code == 200
assert response.json.get("data") is not None
@ -402,7 +420,11 @@ def test_graphql_get_and_authorize_key_after_12_minutes(client, authorized_clien
assert response.json["data"]["getNewDeviceApiKey"]["message"] is not None
assert response.json["data"]["getNewDeviceApiKey"]["code"] == 200
assert response.json["data"]["getNewDeviceApiKey"]["key"].split(" ").__len__() == 12
key = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiKey"]["key"]).hex()
key = (
Mnemonic(language="english")
.to_entropy(response.json["data"]["getNewDeviceApiKey"]["key"])
.hex()
)
assert read_json(tokens_file)["new_device"]["token"] == key
file_data = read_json(tokens_file)
@ -429,6 +451,7 @@ def test_graphql_get_and_authorize_key_after_12_minutes(client, authorized_clien
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["message"] is not None
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["code"] == 404
def test_graphql_authorize_without_token(client, tokens_file):
response = client.post(
"/graphql",

View File

@ -32,22 +32,20 @@ recoveryKey {
}
"""
def test_graphql_recovery_key_status_unauthorized(client, tokens_file):
response = client.post(
"/graphql",
json={
"query": generate_api_query([API_RECOVERY_QUERY])
},
json={"query": generate_api_query([API_RECOVERY_QUERY])},
)
assert response.status_code == 200
assert response.json.get("data") is None
def test_graphql_recovery_key_status_when_none_exists(authorized_client, tokens_file):
response = authorized_client.get(
"/graphql",
json={
"query": generate_api_query([API_RECOVERY_QUERY])
},
json={"query": generate_api_query([API_RECOVERY_QUERY])},
)
assert response.status_code == 200
assert response.json.get("data") is not None
@ -58,6 +56,7 @@ def test_graphql_recovery_key_status_when_none_exists(authorized_client, tokens_
assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] is None
assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] is None
API_RECOVERY_KEY_GENERATE_MUTATION = """
mutation TestGenerateRecoveryKey($limits: RecoveryKeyLimitsInput!) {
getNewRecoveryApiKey(limits: $limits) {
@ -79,6 +78,8 @@ mutation TestUseRecoveryKey($input: UseRecoveryKeyInput!) {
}
}
"""
def test_graphql_generate_recovery_key(client, authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
@ -98,23 +99,23 @@ def test_graphql_generate_recovery_key(client, authorized_client, tokens_file):
assert response.json["data"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 200
assert response.json["data"]["getNewRecoveryApiKey"]["key"] is not None
assert response.json["data"]["getNewRecoveryApiKey"]["key"].split(" ").__len__() == 18
assert (
response.json["data"]["getNewRecoveryApiKey"]["key"].split(" ").__len__() == 18
)
assert read_json(tokens_file)["recovery_token"] is not None
time_generated = read_json(tokens_file)["recovery_token"]["date"]
assert time_generated is not None
key = response.json["data"]["getNewRecoveryApiKey"]["key"]
assert (
datetime.datetime.strptime(
time_generated, "%Y-%m-%dT%H:%M:%S.%fZ"
) - datetime.timedelta(seconds=5) < datetime.datetime.now()
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%fZ")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
)
# Try to get token status
response = authorized_client.get(
"/graphql",
json={
"query": generate_api_query([API_RECOVERY_QUERY])
},
json={"query": generate_api_query([API_RECOVERY_QUERY])},
)
assert response.status_code == 200
assert response.json.get("data") is not None
@ -132,19 +133,22 @@ def test_graphql_generate_recovery_key(client, authorized_client, tokens_file):
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"token": key,
"tokenName": "test_token",
"key": key,
"deviceName": "test_token",
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["useRecoveryKey"]["success"] is True
assert response.json["data"]["useRecoveryKey"]["message"] is not None
assert response.json["data"]["useRecoveryKey"]["code"] == 200
assert response.json["data"]["useRecoveryKey"]["token"] is not None
assert response.json["data"]["useRecoveryKey"]["token"] == read_json(tokens_file)["tokens"][2]["token"]
assert response.json["data"]["useRecoveryApiKey"]["success"] is True
assert response.json["data"]["useRecoveryApiKey"]["message"] is not None
assert response.json["data"]["useRecoveryApiKey"]["code"] == 200
assert response.json["data"]["useRecoveryApiKey"]["token"] is not None
assert (
response.json["data"]["useRecoveryApiKey"]["token"]
== read_json(tokens_file)["tokens"][2]["token"]
)
assert read_json(tokens_file)["tokens"][2]["name"] == "test_token"
# Try to use token again
@ -154,24 +158,30 @@ def test_graphql_generate_recovery_key(client, authorized_client, tokens_file):
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"token": key,
"tokenName": "test_token2",
"key": key,
"deviceName": "test_token2",
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["useRecoveryKey"]["success"] is True
assert response.json["data"]["useRecoveryKey"]["message"] is not None
assert response.json["data"]["useRecoveryKey"]["code"] == 200
assert response.json["data"]["useRecoveryKey"]["token"] is not None
assert response.json["data"]["useRecoveryKey"]["token"] == read_json(tokens_file)["tokens"][3]["token"]
assert response.json["data"]["useRecoveryApiKey"]["success"] is True
assert response.json["data"]["useRecoveryApiKey"]["message"] is not None
assert response.json["data"]["useRecoveryApiKey"]["code"] == 200
assert response.json["data"]["useRecoveryApiKey"]["token"] is not None
assert (
response.json["data"]["useRecoveryApiKey"]["token"]
== read_json(tokens_file)["tokens"][3]["token"]
)
assert read_json(tokens_file)["tokens"][3]["name"] == "test_token2"
def test_graphql_generate_recovery_key_with_expiration_date(client, authorized_client, tokens_file):
def test_graphql_generate_recovery_key_with_expiration_date(
client, authorized_client, tokens_file
):
expiration_date = datetime.datetime.now() + datetime.timedelta(minutes=5)
expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%f")
response = authorized_client.post(
"/graphql",
json={
@ -190,27 +200,27 @@ def test_graphql_generate_recovery_key_with_expiration_date(client, authorized_c
assert response.json["data"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 200
assert response.json["data"]["getNewRecoveryApiKey"]["key"] is not None
assert response.json["data"]["getNewRecoveryApiKey"]["key"].split(" ").__len__() == 18
assert (
response.json["data"]["getNewRecoveryApiKey"]["key"].split(" ").__len__() == 18
)
assert read_json(tokens_file)["recovery_token"] is not None
key = response.json["data"]["getNewRecoveryApiKey"]["key"]
assert read_json(tokens_file)["recovery_token"]["expirationDate"] == expiration_date_str
assert read_json(tokens_file)["recovery_token"]["expiration"] == expiration_date_str
assert read_json(tokens_file)["recovery_token"]["token"] == mnemonic_to_hex(key)
time_generated = read_json(tokens_file)["recovery_token"]["date"]
assert time_generated is not None
assert (
datetime.datetime.strptime(
time_generated, "%Y-%m-%dT%H:%M:%S.%fZ"
) - datetime.timedelta(seconds=5) < datetime.datetime.now()
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%fZ")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
)
# Try to get token status
response = authorized_client.get(
"/graphql",
json={
"query": generate_api_query([API_RECOVERY_QUERY])
},
json={"query": generate_api_query([API_RECOVERY_QUERY])},
)
assert response.status_code == 200
assert response.json.get("data") is not None
@ -218,7 +228,10 @@ def test_graphql_generate_recovery_key_with_expiration_date(client, authorized_c
assert response.json["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json["data"]["api"]["recoveryKey"]["valid"] is True
assert response.json["data"]["api"]["recoveryKey"]["creationDate"] == time_generated
assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] == expiration_date_str
assert (
response.json["data"]["api"]["recoveryKey"]["expirationDate"]
== expiration_date_str
)
assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] is None
# Try to use token
@ -228,19 +241,22 @@ def test_graphql_generate_recovery_key_with_expiration_date(client, authorized_c
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"token": key,
"tokenName": "test_token",
"key": key,
"deviceName": "test_token",
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["useRecoveryKey"]["success"] is True
assert response.json["data"]["useRecoveryKey"]["message"] is not None
assert response.json["data"]["useRecoveryKey"]["code"] == 200
assert response.json["data"]["useRecoveryKey"]["token"] is not None
assert response.json["data"]["useRecoveryKey"]["token"] == read_json(tokens_file)["tokens"][2]["token"]
assert response.json["data"]["useRecoveryApiKey"]["success"] is True
assert response.json["data"]["useRecoveryApiKey"]["message"] is not None
assert response.json["data"]["useRecoveryApiKey"]["code"] == 200
assert response.json["data"]["useRecoveryApiKey"]["token"] is not None
assert (
response.json["data"]["useRecoveryApiKey"]["token"]
== read_json(tokens_file)["tokens"][2]["token"]
)
# Try to use token again
response = authorized_client.post(
@ -249,23 +265,28 @@ def test_graphql_generate_recovery_key_with_expiration_date(client, authorized_c
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"token": key,
"tokenName": "test_token2",
"key": key,
"deviceName": "test_token2",
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["useRecoveryKey"]["success"] is True
assert response.json["data"]["useRecoveryKey"]["message"] is not None
assert response.json["data"]["useRecoveryKey"]["code"] == 200
assert response.json["data"]["useRecoveryKey"]["token"] is not None
assert response.json["data"]["useRecoveryKey"]["token"] == read_json(tokens_file)["tokens"][3]["token"]
assert response.json["data"]["useRecoveryApiKey"]["success"] is True
assert response.json["data"]["useRecoveryApiKey"]["message"] is not None
assert response.json["data"]["useRecoveryApiKey"]["code"] == 200
assert response.json["data"]["useRecoveryApiKey"]["token"] is not None
assert (
response.json["data"]["useRecoveryApiKey"]["token"]
== read_json(tokens_file)["tokens"][3]["token"]
)
# Try to use token after expiration date
new_data = read_json(tokens_file)
new_data["recovery_token"]["expirationDate"] = datetime.datetime.now() - datetime.timedelta(minutes=5)
new_data["recovery_token"][
"expirationDate"
] = datetime.datetime.now() - datetime.timedelta(minutes=5)
write_json(tokens_file, new_data)
response = authorized_client.post(
"/graphql",
@ -273,27 +294,25 @@ def test_graphql_generate_recovery_key_with_expiration_date(client, authorized_c
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"token": key,
"tokenName": "test_token3",
"key": key,
"deviceName": "test_token3",
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["useRecoveryKey"]["success"] is False
assert response.json["data"]["useRecoveryKey"]["message"] is not None
assert response.json["data"]["useRecoveryKey"]["code"] == 404
assert response.json["data"]["useRecoveryKey"]["token"] is None
assert response.json["data"]["useRecoveryApiKey"]["success"] is False
assert response.json["data"]["useRecoveryApiKey"]["message"] is not None
assert response.json["data"]["useRecoveryApiKey"]["code"] == 404
assert response.json["data"]["useRecoveryApiKey"]["token"] is None
assert read_json(tokens_file)["tokens"] == new_data["tokens"]
# Try to get token status
response = authorized_client.get(
"/graphql",
json={
"query": generate_api_query([API_RECOVERY_QUERY])
},
json={"query": generate_api_query([API_RECOVERY_QUERY])},
)
assert response.status_code == 200
assert response.json.get("data") is not None
@ -301,12 +320,18 @@ def test_graphql_generate_recovery_key_with_expiration_date(client, authorized_c
assert response.json["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json["data"]["api"]["recoveryKey"]["valid"] is False
assert response.json["data"]["api"]["recoveryKey"]["creationDate"] == time_generated
assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] == new_data["recovery_token"]["expiration"]
assert (
response.json["data"]["api"]["recoveryKey"]["expirationDate"]
== new_data["recovery_token"]["expiration"]
)
assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] is None
def test_graphql_generate_recoevry_key_with_expiration_in_the_past(authorized_client, tokens_file):
def test_graphql_generate_recovery_key_with_expiration_in_the_past(
authorized_client, tokens_file
):
expiration_date = datetime.datetime.now() - datetime.timedelta(minutes=5)
expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%f")
response = authorized_client.post(
"/graphql",
@ -326,11 +351,12 @@ def test_graphql_generate_recoevry_key_with_expiration_in_the_past(authorized_cl
assert response.json["data"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 400
assert response.json["data"]["getNewRecoveryApiKey"]["key"] is None
assert read_json(tokens_file)["tokens"] == []
assert "recovery_token" not in read_json(tokens_file)
def test_graphql_generate_recovery_key_with_invalid_time_format(authorized_client, tokens_file):
def test_graphql_generate_recovery_key_with_invalid_time_format(
authorized_client, tokens_file
):
expiration_date = "invalid_time_format"
expiration_date_str = expiration_date
@ -347,16 +373,14 @@ def test_graphql_generate_recovery_key_with_invalid_time_format(authorized_clien
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["getNewRecoveryApiKey"]["success"] is False
assert response.json["data"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 400
assert response.json["data"]["getNewRecoveryApiKey"]["key"] is None
assert response.json.get("data") is None
assert read_json(tokens_file)["tokens"] == []
assert "recovery_token" not in read_json(tokens_file)
def test_graphql_generate_recovery_key_with_limited_uses(authorized_client, tokens_file):
def test_graphql_generate_recovery_key_with_limited_uses(
authorized_client, tokens_file
):
response = authorized_client.post(
"/graphql",
@ -386,9 +410,7 @@ def test_graphql_generate_recovery_key_with_limited_uses(authorized_client, toke
# Try to get token status
response = authorized_client.get(
"/graphql",
json={
"query": generate_api_query([API_RECOVERY_QUERY])
},
json={"query": generate_api_query([API_RECOVERY_QUERY])},
)
assert response.status_code == 200
assert response.json.get("data") is not None
@ -406,25 +428,23 @@ def test_graphql_generate_recovery_key_with_limited_uses(authorized_client, toke
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"token": mnemonic_key,
"tokenName": "test_token1",
"key": mnemonic_key,
"deviceName": "test_token1",
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["useRecoveryKey"]["success"] is True
assert response.json["data"]["useRecoveryKey"]["message"] is not None
assert response.json["data"]["useRecoveryKey"]["code"] == 200
assert response.json["data"]["useRecoveryKey"]["token"] is not None
assert response.json["data"]["useRecoveryApiKey"]["success"] is True
assert response.json["data"]["useRecoveryApiKey"]["message"] is not None
assert response.json["data"]["useRecoveryApiKey"]["code"] == 200
assert response.json["data"]["useRecoveryApiKey"]["token"] is not None
# Try to get token status
response = authorized_client.get(
"/graphql",
json={
"query": generate_api_query([API_RECOVERY_QUERY])
},
json={"query": generate_api_query([API_RECOVERY_QUERY])},
)
assert response.status_code == 200
assert response.json.get("data") is not None
@ -442,25 +462,23 @@ def test_graphql_generate_recovery_key_with_limited_uses(authorized_client, toke
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"token": mnemonic_key,
"tokenName": "test_token2",
"key": mnemonic_key,
"deviceName": "test_token2",
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["useRecoveryKey"]["success"] is True
assert response.json["data"]["useRecoveryKey"]["message"] is not None
assert response.json["data"]["useRecoveryKey"]["code"] == 200
assert response.json["data"]["useRecoveryKey"]["token"] is not None
assert response.json["data"]["useRecoveryApiKey"]["success"] is True
assert response.json["data"]["useRecoveryApiKey"]["message"] is not None
assert response.json["data"]["useRecoveryApiKey"]["code"] == 200
assert response.json["data"]["useRecoveryApiKey"]["token"] is not None
# Try to get token status
response = authorized_client.get(
"/graphql",
json={
"query": generate_api_query([API_RECOVERY_QUERY])
},
json={"query": generate_api_query([API_RECOVERY_QUERY])},
)
assert response.status_code == 200
assert response.json.get("data") is not None
@ -468,7 +486,7 @@ def test_graphql_generate_recovery_key_with_limited_uses(authorized_client, toke
assert response.json["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json["data"]["api"]["recoveryKey"]["valid"] is False
assert response.json["data"]["api"]["recoveryKey"]["creationDate"] is not None
assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] is not None
assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] is None
assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] == 0
# Try to use token
@ -478,20 +496,23 @@ def test_graphql_generate_recovery_key_with_limited_uses(authorized_client, toke
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"token": mnemonic_key,
"tokenName": "test_token3",
"key": mnemonic_key,
"deviceName": "test_token3",
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["useRecoveryKey"]["success"] is False
assert response.json["data"]["useRecoveryKey"]["message"] is not None
assert response.json["data"]["useRecoveryKey"]["code"] == 404
assert response.json["data"]["useRecoveryKey"]["token"] is None
assert response.json["data"]["useRecoveryApiKey"]["success"] is False
assert response.json["data"]["useRecoveryApiKey"]["message"] is not None
assert response.json["data"]["useRecoveryApiKey"]["code"] == 404
assert response.json["data"]["useRecoveryApiKey"]["token"] is None
def test_graphql_generate_recovery_key_with_negative_uses(authorized_client, tokens_file):
def test_graphql_generate_recovery_key_with_negative_uses(
authorized_client, tokens_file
):
# Try to get token status
response = authorized_client.post(
"/graphql",
@ -512,6 +533,7 @@ def test_graphql_generate_recovery_key_with_negative_uses(authorized_client, tok
assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 400
assert response.json["data"]["getNewRecoveryApiKey"]["key"] is None
def test_graphql_generate_recovery_key_with_zero_uses(authorized_client, tokens_file):
# Try to get token status
response = authorized_client.post(

View File

@ -6,12 +6,11 @@ from tests.common import generate_api_query
API_VERSION_QUERY = "version"
def test_graphql_get_api_version(authorized_client):
response = authorized_client.get(
"/graphql",
json={
"query": generate_api_query([API_VERSION_QUERY])
},
json={"query": generate_api_query([API_VERSION_QUERY])},
)
assert response.status_code == 200
assert "version" in response.get_json()["data"]["api"]
@ -20,9 +19,7 @@ def test_graphql_get_api_version(authorized_client):
def test_graphql_api_version_unauthorized(client):
response = client.get(
"/graphql",
json={
"query": generate_api_query([API_VERSION_QUERY])
},
json={"query": generate_api_query([API_VERSION_QUERY])},
)
assert response.status_code == 200
assert "version" in response.get_json()["data"]["api"]

View File

@ -7,6 +7,7 @@ import datetime
from tests.common import generate_system_query, read_json, write_json
@pytest.fixture
def domain_file(mocker, datadir):
mocker.patch("selfprivacy_api.utils.DOMAIN_FILE", datadir / "domain")
@ -54,7 +55,7 @@ class ProcessMock:
self.args = args
self.kwargs = kwargs
def communicate(self):
def communicate():
return (b"", None)
returncode = 0
@ -62,7 +63,8 @@ class ProcessMock:
class BrokenServiceMock(ProcessMock):
"""Mock subprocess.Popen for broken service"""
def communicate(self):
def communicate():
return (b"Testing error", None)
returncode = 3
@ -95,19 +97,35 @@ def mock_subprocess_check_output(mocker):
)
return mock
@pytest.fixture
def mock_get_ip4(mocker):
mock = mocker.patch("selfprivacy_api.utils.network.get_ip4", autospec=True, return_value="157.90.247.192")
mock = mocker.patch(
"selfprivacy_api.utils.network.get_ip4",
autospec=True,
return_value="157.90.247.192",
)
return mock
@pytest.fixture
def mock_get_ip6(mocker):
mock = mocker.patch("selfprivacy_api.utils.network.get_ip6", autospec=True, return_value="fe80::9400:ff:fef1:34ae")
mock = mocker.patch(
"selfprivacy_api.utils.network.get_ip6",
autospec=True,
return_value="fe80::9400:ff:fef1:34ae",
)
return mock
@pytest.fixture
def mock_dkim_key(mocker):
mock = mocker.patch("selfprivacy_api.utils.get_dkim_key", autospec=True, return_value="I am a DKIM key")
mock = mocker.patch(
"selfprivacy_api.utils.get_dkim_key",
autospec=True,
return_value="I am a DKIM key",
)
API_PYTHON_VERSION_INFO = """
info {
@ -115,6 +133,7 @@ info {
}
"""
def test_graphql_wrong_auth(wrong_auth_client):
"""Test wrong auth"""
response = wrong_auth_client.get(
@ -126,6 +145,7 @@ def test_graphql_wrong_auth(wrong_auth_client):
assert response.status_code == 200
assert response.json.get("data") is None
API_GET_DOMAIN_INFO = """
domainInfo {
domain
@ -141,6 +161,7 @@ domainInfo {
}
"""
def dns_record(type="A", name="test.tld", content=None, ttl=3600, priority=None):
if content is None:
if type == "A":
@ -155,13 +176,23 @@ def dns_record(type="A", name="test.tld", content=None, ttl=3600, priority=None)
"priority": priority,
}
def is_dns_record_in_array(records, dns_record) -> bool:
for record in records:
if record["type"] == dns_record["type"] and record["name"] == dns_record["name"] and record["content"] == dns_record["content"] and record["ttl"] == dns_record["ttl"] and record["priority"] == dns_record["priority"]:
if (
record["type"] == dns_record["type"]
and record["name"] == dns_record["name"]
and record["content"] == dns_record["content"]
and record["ttl"] == dns_record["ttl"]
and record["priority"] == dns_record["priority"]
):
return True
return False
def test_graphql_get_domain(authorized_client, domain_file, mock_get_ip4, mock_get_ip6, turned_on):
def test_graphql_get_domain(
authorized_client, domain_file, mock_get_ip4, mock_get_ip6, turned_on
):
"""Test get domain"""
response = authorized_client.get(
"/graphql",
@ -178,23 +209,62 @@ def test_graphql_get_domain(authorized_client, domain_file, mock_get_ip4, mock_g
assert is_dns_record_in_array(dns_records, dns_record())
assert is_dns_record_in_array(dns_records, dns_record(type="AAAA"))
assert is_dns_record_in_array(dns_records, dns_record(name="api.test.tld"))
assert is_dns_record_in_array(dns_records, dns_record(name="api.test.tld", type="AAAA"))
assert is_dns_record_in_array(
dns_records, dns_record(name="api.test.tld", type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="cloud.test.tld"))
assert is_dns_record_in_array(dns_records, dns_record(name="cloud.test.tld", type="AAAA"))
assert is_dns_record_in_array(
dns_records, dns_record(name="cloud.test.tld", type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="git.test.tld"))
assert is_dns_record_in_array(dns_records, dns_record(name="git.test.tld", type="AAAA"))
assert is_dns_record_in_array(
dns_records, dns_record(name="git.test.tld", type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="meet.test.tld"))
assert is_dns_record_in_array(dns_records, dns_record(name="meet.test.tld", type="AAAA"))
assert is_dns_record_in_array(
dns_records, dns_record(name="meet.test.tld", type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="password.test.tld"))
assert is_dns_record_in_array(dns_records, dns_record(name="password.test.tld", type="AAAA"))
assert is_dns_record_in_array(
dns_records, dns_record(name="password.test.tld", type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="social.test.tld"))
assert is_dns_record_in_array(dns_records, dns_record(name="social.test.tld", type="AAAA"))
assert is_dns_record_in_array(
dns_records, dns_record(name="social.test.tld", type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="vpn.test.tld"))
assert is_dns_record_in_array(dns_records, dns_record(name="vpn.test.tld", type="AAAA"))
assert is_dns_record_in_array(dns_records, dns_record(name="test.tld", type="MX", content="test.tld", priority=10))
assert is_dns_record_in_array(dns_records, dns_record(name="_dmarc.test.tld", type="TXT", content="v=DMARC1; p=none", ttl=18000))
assert is_dns_record_in_array(dns_records, dns_record(name="test.tld", type="TXT", content="v=spf1 a mx ip4:157.90.247.192 -all", ttl=18000))
assert is_dns_record_in_array(dns_records, dns_record(name="selector._domainkey.test.tld", type="TXT", content="I am a DKIM key", ttl=18000))
assert is_dns_record_in_array(
dns_records, dns_record(name="vpn.test.tld", type="AAAA")
)
assert is_dns_record_in_array(
dns_records,
dns_record(name="test.tld", type="MX", content="test.tld", priority=10),
)
assert is_dns_record_in_array(
dns_records,
dns_record(
name="_dmarc.test.tld", type="TXT", content="v=DMARC1; p=none", ttl=18000
),
)
assert is_dns_record_in_array(
dns_records,
dns_record(
name="test.tld",
type="TXT",
content="v=spf1 a mx ip4:157.90.247.192 -all",
ttl=18000,
),
)
assert is_dns_record_in_array(
dns_records,
dns_record(
name="selector._domainkey.test.tld",
type="TXT",
content="I am a DKIM key",
ttl=18000,
),
)
API_GET_TIMEZONE = """
settings {
@ -202,6 +272,7 @@ settings {
}
"""
def test_graphql_get_timezone_unauthorized(client, turned_on):
"""Test get timezone without auth"""
response = client.get(
@ -213,6 +284,7 @@ def test_graphql_get_timezone_unauthorized(client, turned_on):
assert response.status_code == 200
assert response.json.get("data") is None
def test_graphql_get_timezone(authorized_client, turned_on):
"""Test get timezone"""
response = authorized_client.get(
@ -225,7 +297,8 @@ def test_graphql_get_timezone(authorized_client, turned_on):
assert response.json.get("data") is not None
assert response.json["data"]["system"]["settings"]["timezone"] == "Europe/Moscow"
def test_graphql_get_timezone_on_undefined(authorized_client, undefiened_config):
def test_graphql_get_timezone_on_undefined(authorized_client, undefined_config):
"""Test get timezone when none is defined in config"""
response = authorized_client.get(
"/graphql",
@ -249,6 +322,7 @@ mutation changeTimezone($timezone: String!) {
}
"""
def test_graphql_change_timezone_unauthorized(client, turned_on):
"""Test change timezone without auth"""
response = client.post(

View File

@ -21,16 +21,21 @@ FAILED_OUTPUT_STRING = b"""
Device "eth0" does not exist.
"""
@pytest.fixture
def ip_process_mock(mocker):
mock = mocker.patch("subprocess.check_output", autospec=True, return_value=OUTPUT_STRING)
mock = mocker.patch(
"subprocess.check_output", autospec=True, return_value=OUTPUT_STRING
)
return mock
def test_get_ip4(ip_process_mock):
"""Test get IPv4 address"""
ip4 = get_ip4()
assert ip4 == "157.90.247.192"
def test_get_ip6(ip_process_mock):
"""Test get IPv6 address"""
ip6 = get_ip6()

View File

@ -60,7 +60,7 @@ class ProcessMock:
self.args = args
self.kwargs = kwargs
def communicate(self):
def communicate():
return (b"", None)
returncode = 0
@ -68,7 +68,8 @@ class ProcessMock:
class BrokenServiceMock(ProcessMock):
"""Mock subprocess.Popen"""
def communicate(self):
def communicate():
return (b"Testing error", None)
returncode = 3