diff --git a/selfprivacy_api/resources/services/mailserver.py b/selfprivacy_api/resources/services/mailserver.py index 1185d20..bf42c7d 100644 --- a/selfprivacy_api/resources/services/mailserver.py +++ b/selfprivacy_api/resources/services/mailserver.py @@ -7,7 +7,7 @@ from flask_restful import Resource from selfprivacy_api.resources.services import api -from selfprivacy_api.utils import get_domain +from selfprivacy_api.utils import get_dkim_key, get_domain class DKIMKey(Resource): @@ -31,15 +31,11 @@ class DKIMKey(Resource): """ domain = get_domain() - if os.path.exists("/var/dkim/" + domain + ".selector.txt"): - cat_process = subprocess.Popen( - ["cat", "/var/dkim/" + domain + ".selector.txt"], stdout=subprocess.PIPE - ) - dkim = cat_process.communicate()[0] - dkim = base64.b64encode(dkim) - dkim = str(dkim, "utf-8") - return dkim - return "DKIM file not found", 404 + dkim = get_dkim_key(domain) + if dkim is None: + return "DKIM file not found", 404 + dkim = base64.b64encode(dkim.encode("utf-8")).decode("utf-8") + return dkim api.add_resource(DKIMKey, "/mailserver/dkim") diff --git a/selfprivacy_api/utils/__init__.py b/selfprivacy_api/utils/__init__.py index b0d0acc..adb0409 100644 --- a/selfprivacy_api/utils/__init__.py +++ b/selfprivacy_api/utils/__init__.py @@ -3,6 +3,8 @@ import datetime from enum import Enum import json +import os +import subprocess import portalocker @@ -130,3 +132,13 @@ def parse_date(date_str: str) -> datetime.datetime: if date_str.endswith("Z") else datetime.datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S.%f") ) + +def get_dkim_key(domain): + """Get DKIM key from /var/dkim/.selector.txt""" + if os.path.exists("/var/dkim/" + domain + ".selector.txt"): + cat_process = subprocess.Popen( + ["cat", "/var/dkim/" + domain + ".selector.txt"], stdout=subprocess.PIPE + ) + dkim = cat_process.communicate()[0] + return str(dkim, "utf-8") + return None diff --git a/selfprivacy_api/utils/network.py b/selfprivacy_api/utils/network.py new file mode 100644 index 0000000..1aa4644 --- /dev/null +++ b/selfprivacy_api/utils/network.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 +"""Network utils""" +import subprocess +import re + +def get_ip4(): + """Get IPv4 address""" + try: + ip4 = subprocess.check_output(["ip", "addr", "show", "dev", "eth0"]).decode("utf-8") + ip4 = re.search(r"inet (\d+\.\d+\.\d+\.\d+)\/\d+", ip4) + except subprocess.CalledProcessError: + ip4 = None + return ip4.group(1) if ip4 else None + +def get_ip6(): + """Get IPv6 address""" + try: + ip6 = subprocess.check_output(["ip", "addr", "show", "dev", "eth0"]).decode("utf-8") + ip6 = re.search(r"inet6 (\S+)\/\d+", ip6) + except subprocess.CalledProcessError: + ip6 = None + return ip6.group(1) if ip6 else None diff --git a/tests/common.py b/tests/common.py index 950c850..d3dda69 100644 --- a/tests/common.py +++ b/tests/common.py @@ -13,5 +13,8 @@ def write_json(file_path, data): def generate_api_query(query_array): return "query TestApi {\n api {" + "\n".join(query_array) + "}\n}" +def generate_system_query(query_array): + return "query TestSystem {\n system {" + "\n".join(query_array) + "}\n}" + def mnemonic_to_hex(mnemonic): return Mnemonic(language="english").to_entropy(mnemonic).hex() diff --git a/tests/test_graphql/test_system.py b/tests/test_graphql/test_system.py new file mode 100644 index 0000000..661a68b --- /dev/null +++ b/tests/test_graphql/test_system.py @@ -0,0 +1,227 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=missing-function-docstring +import json +import pytest +import datetime + +from tests.common import generate_system_query, read_json, write_json + +@pytest.fixture +def domain_file(mocker, datadir): + mocker.patch("selfprivacy_api.utils.DOMAIN_FILE", datadir / "domain") + return datadir + + +@pytest.fixture +def turned_on(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json") + assert read_json(datadir / "turned_on.json")["autoUpgrade"]["enable"] == True + assert read_json(datadir / "turned_on.json")["autoUpgrade"]["allowReboot"] == True + assert read_json(datadir / "turned_on.json")["timezone"] == "Europe/Moscow" + return datadir + + +@pytest.fixture +def turned_off(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json") + assert read_json(datadir / "turned_off.json")["autoUpgrade"]["enable"] == False + assert read_json(datadir / "turned_off.json")["autoUpgrade"]["allowReboot"] == False + assert read_json(datadir / "turned_off.json")["timezone"] == "Europe/Moscow" + return datadir + + +@pytest.fixture +def undefined_config(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json") + assert "autoUpgrade" not in read_json(datadir / "undefined.json") + assert "timezone" not in read_json(datadir / "undefined.json") + return datadir + + +@pytest.fixture +def no_values(mocker, datadir): + mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "no_values.json") + assert "enable" not in read_json(datadir / "no_values.json")["autoUpgrade"] + assert "allowReboot" not in read_json(datadir / "no_values.json")["autoUpgrade"] + return datadir + + +class ProcessMock: + """Mock subprocess.Popen""" + + def __init__(self, args, **kwargs): + self.args = args + self.kwargs = kwargs + + def communicate(): + return (b"", None) + + returncode = 0 + + +class BrokenServiceMock(ProcessMock): + def communicate(): + return (b"Testing error", None) + + returncode = 3 + + +@pytest.fixture +def mock_subprocess_popen(mocker): + mock = mocker.patch("subprocess.Popen", autospec=True, return_value=ProcessMock) + return mock + + +@pytest.fixture +def mock_os_chdir(mocker): + mock = mocker.patch("os.chdir", autospec=True) + return mock + + +@pytest.fixture +def mock_broken_service(mocker): + mock = mocker.patch( + "subprocess.Popen", autospec=True, return_value=BrokenServiceMock + ) + return mock + + +@pytest.fixture +def mock_subprocess_check_output(mocker): + mock = mocker.patch( + "subprocess.check_output", autospec=True, return_value=b"Testing Linux" + ) + return mock + +@pytest.fixture +def mock_get_ip4(mocker): + mock = mocker.patch("selfprivacy_api.utils.get_ip4", autospec=True, return_value="157.90.247.192") + return mock + +@pytest.fixture +def mock_get_ip6(mocker): + mock = mocker.patch("selfprivacy_api.utils.get_ip6", autospec=True, return_value="fe80::9400:ff:fef1:34ae") + return mock + +@pytest.fixture +def mock_dkim_key(mocker): + mock = mocker.patch("selfprivacy_api.utils.get_dkim_key", autospec=True, return_value="I am a DKIM key") + +API_PYTHON_VERSION_INFO = """ +info { + pythonVersion +} +""" + + +def test_graphql_wrong_auth(wrong_auth_client): + """Test wrong auth""" + response = wrong_auth_client.get( + "/graphql", + json={ + "query": generate_system_query([API_PYTHON_VERSION_INFO]), + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is None + +API_GET_DOMAIN_INFO = """ +domainInfo { + domain + hostname + provider + requiredDnsRecords { + type + name + content + ttl + priority + } +} +""" + +def dns_record(type="A", name="test.tld", content=None, ttl=3600, priority=None): + if content is None: + if type == "A": + content = "157.90.247.192" + elif type == "AAAA": + content = "fe80::9400:ff:fef1:34ae" + return { + "type": type, + "name": name, + "content": content, + "ttl": ttl, + "priority": priority, + } + +def test_graphql_get_domain(authorized_client, domain_file, mock_get_ip4, mock_get_ip6, turned_on): + """Test get domain""" + response = authorized_client.get( + "/graphql", + json={ + "query": generate_system_query([API_GET_DOMAIN_INFO]), + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["system"]["domainInfo"]["domain"] == "test.tld" + assert response.json["data"]["system"]["domainInfo"]["hostname"] == "test-instance" + assert response.json["data"]["system"]["domainInfo"]["provider"] == "HETZNER" + assert response.json["data"]["system"]["domainInfo"]["requiredDnsRecords"] == [ + dns_record(), + dns_record(type="AAAA"), + dns_record(name="api.test.tld"), + dns_record(name="api.test.tld", type="AAAA"), + dns_record(name="cloud.test.tld"), + dns_record(name="cloud.test.tld", type="AAAA"), + dns_record(name="git.test.tld"), + dns_record(name="git.test.tld", type="AAAA"), + dns_record(name="meet.test.tld"), + dns_record(name="meet.test.tld", type="AAAA"), + dns_record(name="password.test.tld"), + dns_record(name="password.test.tld", type="AAAA"), + dns_record(name="social.test.tld"), + dns_record(name="social.test.tld", type="AAAA"), + dns_record(name="vpn.test.tld"), + dns_record(name="vpn.test.tld", type="AAAA"), + dns_record(name="test.tld", type="MX", content="test.tld", priority=10), + dns_record(name="_dmarc.test.tld", type="TXT", content="v=DMARC1; p=none", ttl=18000), + dns_record(name="test.tld", type="TXT", content="v=spf1 a mx ip4:157.90.247.192 -all", ttl=18000), + dns_record(name="selector._domainkey.test.tld", type="TXT", content="I am a DKIM key", ttl=18000), + ] + +API_GET_TIMEZONE = """ +settings { + timezone +} +""" + +def test_graphql_get_timezone_unauthorized(unauthorized_client, turned_on): + """Test get timezone""" + response = unauthorized_client.get( + "/graphql", + json={ + "query": generate_system_query([API_GET_TIMEZONE]), + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is None + +def test_graphql_get_timezone(authorized_client, turned_on): + """Test get timezone""" + response = authorized_client.get( + "/graphql", + json={ + "query": generate_system_query([API_GET_TIMEZONE]), + }, + ) + assert response.status_code == 200 + assert response.json.get("data") is not None + assert response.json["data"]["system"]["settings"]["timezone"] == "Europe/Moscow" + +API_GET_PYTHON_VERSION = """ +info { + pythonVersion +} +""" diff --git a/tests/test_graphql/test_system/domain b/tests/test_graphql/test_system/domain new file mode 100644 index 0000000..3679d0d --- /dev/null +++ b/tests/test_graphql/test_system/domain @@ -0,0 +1 @@ +test-domain.tld \ No newline at end of file diff --git a/tests/test_graphql/test_system/no_values.json b/tests/test_graphql/test_system/no_values.json new file mode 100644 index 0000000..59e5e71 --- /dev/null +++ b/tests/test_graphql/test_system/no_values.json @@ -0,0 +1,50 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": true + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/test_graphql/test_system/turned_off.json b/tests/test_graphql/test_system/turned_off.json new file mode 100644 index 0000000..f451683 --- /dev/null +++ b/tests/test_graphql/test_system/turned_off.json @@ -0,0 +1,52 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": true + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": false, + "allowReboot": false + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/test_graphql/test_system/turned_on.json b/tests/test_graphql/test_system/turned_on.json new file mode 100644 index 0000000..337e47f --- /dev/null +++ b/tests/test_graphql/test_system/turned_on.json @@ -0,0 +1,52 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": true + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "autoUpgrade": { + "enable": true, + "allowReboot": true + }, + "timezone": "Europe/Moscow", + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/test_graphql/test_system/undefined.json b/tests/test_graphql/test_system/undefined.json new file mode 100644 index 0000000..b67b296 --- /dev/null +++ b/tests/test_graphql/test_system/undefined.json @@ -0,0 +1,47 @@ +{ + "backblaze": { + "accountId": "ID", + "accountKey": "KEY", + "bucket": "selfprivacy" + }, + "api": { + "token": "TEST_TOKEN", + "enableSwagger": false + }, + "bitwarden": { + "enable": true + }, + "cloudflare": { + "apiKey": "TOKEN" + }, + "databasePassword": "PASSWORD", + "domain": "test.tld", + "hashedMasterPassword": "HASHED_PASSWORD", + "hostname": "test-instance", + "nextcloud": { + "adminPassword": "ADMIN", + "databasePassword": "ADMIN", + "enable": true + }, + "resticPassword": "PASS", + "ssh": { + "enable": true, + "passwordAuthentication": true, + "rootKeys": [ + "ssh-ed25519 KEY test@pc" + ] + }, + "username": "tester", + "gitea": { + "enable": false + }, + "ocserv": { + "enable": true + }, + "pleroma": { + "enable": true + }, + "sshKeys": [ + "ssh-rsa KEY test@pc" + ] +} \ No newline at end of file diff --git a/tests/test_network_utils.py b/tests/test_network_utils.py new file mode 100644 index 0000000..b8f9c0d --- /dev/null +++ b/tests/test_network_utils.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=missing-function-docstring +import pytest + +from selfprivacy_api.utils.network import get_ip4, get_ip6 + +OUTPUT_STRING = b""" +2: eth0: mtu 1500 qdisc fq_codel state UP group default qlen 1000 + link/ether 96:00:00:f1:34:ae brd ff:ff:ff:ff:ff:ff + altname enp0s3 + altname ens3 + inet 157.90.247.192/32 brd 157.90.247.192 scope global dynamic eth0 + valid_lft 46061sec preferred_lft 35261sec + inet6 fe80::9400:ff:fef1:34ae/64 scope link + valid_lft forever preferred_lft forever +""" + +FAILED_OUTPUT_STRING = b""" +Device "eth0" does not exist. +""" + +@pytest.fixture +def ip_process_mock(mocker): + mock = mocker.patch("subprocess.check_output", autospec=True, return_value=OUTPUT_STRING) + return mock + +def test_get_ip4(ip_process_mock): + """Test get IPv4 address""" + ip4 = get_ip4() + assert ip4 == "157.90.247.192" + +def test_get_ip6(ip_process_mock): + """Test get IPv6 address""" + ip6 = get_ip6() + assert ip6 == "fe80::9400:ff:fef1:34ae"