Browse Source

First wave of unit tests, and bugfixes caused by them

pull/6/head
Inex Code 9 months ago
parent
commit
245964c998
  1. 6
      requirements.txt
  2. 13
      selfprivacy_api/app.py
  3. 20
      selfprivacy_api/resources/services/mailserver.py
  4. 6
      selfprivacy_api/resources/services/ssh.py
  5. 7
      selfprivacy_api/utils.py
  6. 0
      tests/__init__.py
  7. 38
      tests/conftest.py
  8. 80
      tests/services/test_bitwarden.py
  9. 51
      tests/services/test_bitwarden/enable_undefined.json
  10. 52
      tests/services/test_bitwarden/turned_off.json
  11. 52
      tests/services/test_bitwarden/turned_on.json
  12. 49
      tests/services/test_bitwarden/undefined.json
  13. 80
      tests/services/test_gitea.py
  14. 51
      tests/services/test_gitea/enable_undefined.json
  15. 52
      tests/services/test_gitea/turned_off.json
  16. 52
      tests/services/test_gitea/turned_on.json
  17. 49
      tests/services/test_gitea/undefined.json
  18. 65
      tests/services/test_mailserver.py
  19. 80
      tests/services/test_nextcloud.py
  20. 51
      tests/services/test_nextcloud/enable_undefined.json
  21. 52
      tests/services/test_nextcloud/turned_off.json
  22. 52
      tests/services/test_nextcloud/turned_on.json
  23. 44
      tests/services/test_nextcloud/undefined.json
  24. 80
      tests/services/test_ocserv.py
  25. 51
      tests/services/test_ocserv/enable_undefined.json
  26. 52
      tests/services/test_ocserv/turned_off.json
  27. 52
      tests/services/test_ocserv/turned_on.json
  28. 49
      tests/services/test_ocserv/undefined.json
  29. 80
      tests/services/test_pleroma.py
  30. 51
      tests/services/test_pleroma/enable_undefined.json
  31. 52
      tests/services/test_pleroma/turned_off.json
  32. 52
      tests/services/test_pleroma/turned_on.json
  33. 49
      tests/services/test_pleroma/undefined.json
  34. 131
      tests/services/test_services.py
  35. 262
      tests/services/test_ssh.py
  36. 52
      tests/services/test_ssh/all_off.json
  37. 52
      tests/services/test_ssh/root_and_admin_have_keys.json
  38. 46
      tests/services/test_ssh/turned_off.json
  39. 46
      tests/services/test_ssh/turned_on.json
  40. 45
      tests/services/test_ssh/undefined.json

6
requirements.txt

@ -6,3 +6,9 @@ setuptools
portalocker
flask-swagger
flask-swagger-ui
pytz
pytest
coverage
pytest-mock
pytest-datadir

13
selfprivacy_api/app.py

@ -16,15 +16,18 @@ swagger_blueprint = get_swaggerui_blueprint(
)
def create_app():
def create_app(test_config=None):
"""Initiate Flask app and bind routes"""
app = Flask(__name__)
api = Api(app)
app.config["AUTH_TOKEN"] = os.environ.get("AUTH_TOKEN")
if app.config["AUTH_TOKEN"] is None:
raise ValueError("AUTH_TOKEN is not set")
app.config["ENABLE_SWAGGER"] = os.environ.get("ENABLE_SWAGGER", "0")
if test_config is None:
app.config["AUTH_TOKEN"] = os.environ.get("AUTH_TOKEN")
if app.config["AUTH_TOKEN"] is None:
raise ValueError("AUTH_TOKEN is not set")
app.config["ENABLE_SWAGGER"] = os.environ.get("ENABLE_SWAGGER", "0")
else:
app.config.update(test_config)
# Check bearer token
@app.before_request

20
selfprivacy_api/resources/services/mailserver.py

@ -2,6 +2,7 @@
"""Mail server management module"""
import base64
import subprocess
import os
from flask_restful import Resource
from selfprivacy_api.resources.services import api
@ -25,15 +26,20 @@ class DKIMKey(Resource):
description: DKIM key encoded in base64
401:
description: Unauthorized
404:
description: DKIM key not found
"""
domain = get_domain()
cat_process = subprocess.Popen(
["cat", "/var/dkim/" + domain + ".selector.txt"], stdout=subprocess.PIPE
)
dkim = cat_process.communicate()[0]
dkim = base64.b64encode(dkim)
dkim = str(dkim, "utf-8")
return dkim
if os.path.exists("/var/dkim/" + domain + ".selector.txt"):
cat_process = subprocess.Popen(
["cat", "/var/dkim/" + domain + ".selector.txt"], stdout=subprocess.PIPE
)
dkim = cat_process.communicate()[0]
dkim = base64.b64encode(dkim)
dkim = str(dkim, "utf-8")
return dkim
return "DKIM file not found", 404
api.add_resource(DKIMKey, "/mailserver/dkim")

6
selfprivacy_api/resources/services/ssh.py

@ -278,6 +278,12 @@ class SSHKeys(Resource):
if username == data["username"]:
if "sshKeys" not in data:
data["sshKeys"] = []
# Return 409 if key already in array
for key in data["sshKeys"]:
if key == args["public_key"]:
return {
"error": "Key already exists",
}, 409
data["sshKeys"].append(args["public_key"])
return {
"message": "New SSH key successfully written",

7
selfprivacy_api/utils.py

@ -2,8 +2,11 @@
"""Various utility functions"""
import json
import portalocker
from flask import current_app
USERDATA_FILE = "/etc/nixos/userdata/userdata.json"
def get_domain():
"""Get domain from /var/domain without trailing new line"""
with open("/var/domain", "r", encoding="utf-8") as domain_file:
@ -16,7 +19,7 @@ class WriteUserData(object):
def __init__(self):
self.userdata_file = open(
"/etc/nixos/userdata/userdata.json", "r+", encoding="utf-8"
USERDATA_FILE, "r+", encoding="utf-8"
)
portalocker.lock(self.userdata_file, portalocker.LOCK_EX)
self.data = json.load(self.userdata_file)
@ -38,7 +41,7 @@ class ReadUserData(object):
def __init__(self):
self.userdata_file = open(
"/etc/nixos/userdata/userdata.json", "r", encoding="utf-8"
USERDATA_FILE, "r", encoding="utf-8"
)
portalocker.lock(self.userdata_file, portalocker.LOCK_SH)
self.data = json.load(self.userdata_file)

0
tests/__init__.py

38
tests/conftest.py

@ -0,0 +1,38 @@
import pytest
from flask import testing
from selfprivacy_api.app import create_app
@pytest.fixture
def app():
app = create_app({
"AUTH_TOKEN": "TEST_TOKEN",
"ENABLE_SWAGGER": "0",
})
yield app
@pytest.fixture
def client(app):
return app.test_client()
class AuthorizedClient(testing.FlaskClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.token = "TEST_TOKEN"
def open(self, *args, **kwargs):
if "headers" not in kwargs:
kwargs["headers"] = {}
kwargs["headers"]["Authorization"] = f"Bearer {self.token}"
return super().open(*args, **kwargs)
@pytest.fixture
def authorized_client(app):
app.test_client_class = AuthorizedClient
return app.test_client()
@pytest.fixture
def runner(app):
return app.test_cli_runner()

80
tests/services/test_bitwarden.py

@ -0,0 +1,80 @@
import json
import pytest
def read_json(file_path):
with open(file_path, "r") as f:
return json.load(f)
###############################################################################
@pytest.fixture
def bitwarden_off(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json")
assert read_json(datadir / "turned_off.json")["bitwarden"]["enable"] == False
return datadir
@pytest.fixture
def bitwarden_on(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json")
assert read_json(datadir / "turned_on.json")["bitwarden"]["enable"] == True
return datadir
@pytest.fixture
def bitwarden_enable_undefined(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json")
assert "enable" not in read_json(datadir / "enable_undefined.json")["bitwarden"]
return datadir
@pytest.fixture
def bitwarden_undefined(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json")
assert "bitwarden" not in read_json(datadir / "undefined.json")
return datadir
###############################################################################
@pytest.mark.parametrize("endpoint", ["enable", "disable"])
def test_unauthorized(client, bitwarden_off, endpoint):
response = client.post(f"/services/bitwarden/{endpoint}")
assert response.status_code == 401
@pytest.mark.parametrize("endpoint", ["enable", "disable"])
def test_illegal_methods(authorized_client, bitwarden_off, endpoint):
response = authorized_client.get(f"/services/bitwarden/{endpoint}")
assert response.status_code == 405
response = authorized_client.put(f"/services/bitwarden/{endpoint}")
assert response.status_code == 405
response = authorized_client.delete(f"/services/bitwarden/{endpoint}")
assert response.status_code == 405
@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")])
def test_switch_from_off(authorized_client, bitwarden_off, endpoint, target_file):
response = authorized_client.post(f"/services/bitwarden/{endpoint}")
assert response.status_code == 200
assert read_json(bitwarden_off / "turned_off.json") == read_json(bitwarden_off / target_file)
@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")])
def test_switch_from_on(authorized_client, bitwarden_on, endpoint, target_file):
response = authorized_client.post(f"/services/bitwarden/{endpoint}")
assert response.status_code == 200
assert read_json(bitwarden_on / "turned_on.json") == read_json(bitwarden_on / target_file)
@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")])
def test_switch_twice(authorized_client, bitwarden_off, endpoint, target_file):
response = authorized_client.post(f"/services/bitwarden/{endpoint}")
assert response.status_code == 200
response = authorized_client.post(f"/services/bitwarden/{endpoint}")
assert response.status_code == 200
assert read_json(bitwarden_off / "turned_off.json") == read_json(bitwarden_off / target_file)
@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")])
def test_on_attribute_deleted(authorized_client, bitwarden_enable_undefined, endpoint, target_file):
response = authorized_client.post(f"/services/bitwarden/{endpoint}")
assert response.status_code == 200
assert read_json(bitwarden_enable_undefined / "enable_undefined.json") == read_json(bitwarden_enable_undefined / target_file)
@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")])
def test_on_bitwarden_undefined(authorized_client, bitwarden_undefined, endpoint, target_file):
response = authorized_client.post(f"/services/bitwarden/{endpoint}")
assert response.status_code == 200
assert read_json(bitwarden_undefined / "undefined.json") == read_json(bitwarden_undefined / target_file)

51
tests/services/test_bitwarden/enable_undefined.json

@ -0,0 +1,51 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

52
tests/services/test_bitwarden/turned_off.json

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

52
tests/services/test_bitwarden/turned_on.json

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": true
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

49
tests/services/test_bitwarden/undefined.json

@ -0,0 +1,49 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

80
tests/services/test_gitea.py

@ -0,0 +1,80 @@
import json
import pytest
def read_json(file_path):
with open(file_path, "r") as f:
return json.load(f)
###############################################################################
@pytest.fixture
def gitea_off(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json")
assert read_json(datadir / "turned_off.json")["gitea"]["enable"] == False
return datadir
@pytest.fixture
def gitea_on(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json")
assert read_json(datadir / "turned_on.json")["gitea"]["enable"] == True
return datadir
@pytest.fixture
def gitea_enable_undefined(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json")
assert "enable" not in read_json(datadir / "enable_undefined.json")["gitea"]
return datadir
@pytest.fixture
def gitea_undefined(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json")
assert "gitea" not in read_json(datadir / "undefined.json")
return datadir
###############################################################################
@pytest.mark.parametrize("endpoint", ["enable", "disable"])
def test_unauthorized(client, gitea_off, endpoint):
response = client.post(f"/services/gitea/{endpoint}")
assert response.status_code == 401
@pytest.mark.parametrize("endpoint", ["enable", "disable"])
def test_illegal_methods(authorized_client, gitea_off, endpoint):
response = authorized_client.get(f"/services/gitea/{endpoint}")
assert response.status_code == 405
response = authorized_client.put(f"/services/gitea/{endpoint}")
assert response.status_code == 405
response = authorized_client.delete(f"/services/gitea/{endpoint}")
assert response.status_code == 405
@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")])
def test_switch_from_off(authorized_client, gitea_off, endpoint, target_file):
response = authorized_client.post(f"/services/gitea/{endpoint}")
assert response.status_code == 200
assert read_json(gitea_off / "turned_off.json") == read_json(gitea_off / target_file)
@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")])
def test_switch_from_on(authorized_client, gitea_on, endpoint, target_file):
response = authorized_client.post(f"/services/gitea/{endpoint}")
assert response.status_code == 200
assert read_json(gitea_on / "turned_on.json") == read_json(gitea_on / target_file)
@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")])
def test_switch_twice(authorized_client, gitea_off, endpoint, target_file):
response = authorized_client.post(f"/services/gitea/{endpoint}")
assert response.status_code == 200
response = authorized_client.post(f"/services/gitea/{endpoint}")
assert response.status_code == 200
assert read_json(gitea_off / "turned_off.json") == read_json(gitea_off / target_file)
@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")])
def test_on_attribute_deleted(authorized_client, gitea_enable_undefined, endpoint, target_file):
response = authorized_client.post(f"/services/gitea/{endpoint}")
assert response.status_code == 200
assert read_json(gitea_enable_undefined / "enable_undefined.json") == read_json(gitea_enable_undefined / target_file)
@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")])
def test_on_gitea_undefined(authorized_client, gitea_undefined, endpoint, target_file):
response = authorized_client.post(f"/services/gitea/{endpoint}")
assert response.status_code == 200
assert read_json(gitea_undefined / "undefined.json") == read_json(gitea_undefined / target_file)

51
tests/services/test_gitea/enable_undefined.json

@ -0,0 +1,51 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

52
tests/services/test_gitea/turned_off.json

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

52
tests/services/test_gitea/turned_on.json

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": true
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

49
tests/services/test_gitea/undefined.json

@ -0,0 +1,49 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

65
tests/services/test_mailserver.py

@ -0,0 +1,65 @@
import base64
import json
import pytest
def read_json(file_path):
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
###############################################################################
class ProcessMock():
"""Mock subprocess.Popen"""
def __init__(self, args, **kwargs):
self.args = args
self.kwargs = kwargs
def communicate():
return (b"I am a DKIM key", None)
class NoFileMock(ProcessMock):
def communicate():
return (b"", None)
@pytest.fixture
def mock_subproccess_popen(mocker):
mock = mocker.patch("subprocess.Popen", autospec=True, return_value=ProcessMock)
mocker.patch("selfprivacy_api.resources.services.mailserver.get_domain", autospec=True, return_value="example.com")
mocker.patch("os.path.exists", autospec=True, return_value=True)
return mock
@pytest.fixture
def mock_no_file(mocker):
mock = mocker.patch("subprocess.Popen", autospec=True, return_value=NoFileMock)
mocker.patch("selfprivacy_api.resources.services.mailserver.get_domain", autospec=True, return_value="example.com")
mocker.patch("os.path.exists", autospec=True, return_value=False)
return mock
###############################################################################
def test_unauthorized(client, mock_subproccess_popen):
"""Test unauthorized"""
response = client.get("/services/mailserver/dkim")
assert response.status_code == 401
def test_illegal_methods(authorized_client, mock_subproccess_popen):
response = authorized_client.post("/services/mailserver/dkim")
assert response.status_code == 405
response = authorized_client.put("/services/mailserver/dkim")
assert response.status_code == 405
response = authorized_client.delete("/services/mailserver/dkim")
assert response.status_code == 405
def test_dkim_key(authorized_client, mock_subproccess_popen):
"""Test DKIM key"""
response = authorized_client.get("/services/mailserver/dkim")
assert response.status_code == 200
assert base64.b64decode(response.data) == b"I am a DKIM key"
assert mock_subproccess_popen.call_args[0][0] == ["cat", "/var/dkim/example.com.selector.txt"]
def test_no_dkim_key(authorized_client, mock_no_file):
"""Test no DKIM key"""
response = authorized_client.get("/services/mailserver/dkim")
assert response.status_code == 404
assert mock_no_file.called == False

80
tests/services/test_nextcloud.py

@ -0,0 +1,80 @@
import json
import pytest
def read_json(file_path):
with open(file_path, "r") as f:
return json.load(f)
###############################################################################
@pytest.fixture
def nextcloud_off(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json")
assert read_json(datadir / "turned_off.json")["nextcloud"]["enable"] == False
return datadir
@pytest.fixture
def nextcloud_on(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json")
assert read_json(datadir / "turned_on.json")["nextcloud"]["enable"] == True
return datadir
@pytest.fixture
def nextcloud_enable_undefined(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json")
assert "enable" not in read_json(datadir / "enable_undefined.json")["nextcloud"]
return datadir
@pytest.fixture
def nextcloud_undefined(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json")
assert "nextcloud" not in read_json(datadir / "undefined.json")
return datadir
###############################################################################
@pytest.mark.parametrize("endpoint", ["enable", "disable"])
def test_unauthorized(client, nextcloud_off, endpoint):
response = client.post(f"/services/nextcloud/{endpoint}")
assert response.status_code == 401
@pytest.mark.parametrize("endpoint", ["enable", "disable"])
def test_illegal_methods(authorized_client, nextcloud_off, endpoint):
response = authorized_client.get(f"/services/nextcloud/{endpoint}")
assert response.status_code == 405
response = authorized_client.put(f"/services/nextcloud/{endpoint}")
assert response.status_code == 405
response = authorized_client.delete(f"/services/nextcloud/{endpoint}")
assert response.status_code == 405
@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")])
def test_switch_from_off(authorized_client, nextcloud_off, endpoint, target_file):
response = authorized_client.post(f"/services/nextcloud/{endpoint}")
assert response.status_code == 200
assert read_json(nextcloud_off / "turned_off.json") == read_json(nextcloud_off / target_file)
@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")])
def test_switch_from_on(authorized_client, nextcloud_on, endpoint, target_file):
response = authorized_client.post(f"/services/nextcloud/{endpoint}")
assert response.status_code == 200
assert read_json(nextcloud_on / "turned_on.json") == read_json(nextcloud_on / target_file)
@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")])
def test_switch_twice(authorized_client, nextcloud_off, endpoint, target_file):
response = authorized_client.post(f"/services/nextcloud/{endpoint}")
assert response.status_code == 200
response = authorized_client.post(f"/services/nextcloud/{endpoint}")
assert response.status_code == 200
assert read_json(nextcloud_off / "turned_off.json") == read_json(nextcloud_off / target_file)
@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")])
def test_on_attribute_deleted(authorized_client, nextcloud_enable_undefined, endpoint, target_file):
response = authorized_client.post(f"/services/nextcloud/{endpoint}")
assert response.status_code == 200
assert read_json(nextcloud_enable_undefined / "enable_undefined.json") == read_json(nextcloud_enable_undefined / target_file)
@pytest.mark.parametrize("endpoint,target", [("enable", True), ("disable", False)])
def test_on_nextcloud_undefined(authorized_client, nextcloud_undefined, endpoint, target):
response = authorized_client.post(f"/services/nextcloud/{endpoint}")
assert response.status_code == 200
assert read_json(nextcloud_undefined / "undefined.json")["nextcloud"]["enable"] == target

51
tests/services/test_nextcloud/enable_undefined.json

@ -0,0 +1,51 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN"
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

52
tests/services/test_nextcloud/turned_off.json

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": false
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

52
tests/services/test_nextcloud/turned_on.json

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

44
tests/services/test_nextcloud/undefined.json

@ -0,0 +1,44 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

80
tests/services/test_ocserv.py

@ -0,0 +1,80 @@
import json
import pytest
def read_json(file_path):
with open(file_path, "r") as f:
return json.load(f)
###############################################################################
@pytest.fixture
def ocserv_off(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json")
assert read_json(datadir / "turned_off.json")["ocserv"]["enable"] == False
return datadir
@pytest.fixture
def ocserv_on(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json")
assert read_json(datadir / "turned_on.json")["ocserv"]["enable"] == True
return datadir
@pytest.fixture
def ocserv_enable_undefined(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json")
assert "enable" not in read_json(datadir / "enable_undefined.json")["ocserv"]
return datadir
@pytest.fixture
def ocserv_undefined(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json")
assert "ocserv" not in read_json(datadir / "undefined.json")
return datadir
###############################################################################
@pytest.mark.parametrize("endpoint", ["enable", "disable"])
def test_unauthorized(client, ocserv_off, endpoint):
response = client.post(f"/services/ocserv/{endpoint}")
assert response.status_code == 401
@pytest.mark.parametrize("endpoint", ["enable", "disable"])
def test_illegal_methods(authorized_client, ocserv_off, endpoint):
response = authorized_client.get(f"/services/ocserv/{endpoint}")
assert response.status_code == 405
response = authorized_client.put(f"/services/ocserv/{endpoint}")
assert response.status_code == 405
response = authorized_client.delete(f"/services/ocserv/{endpoint}")
assert response.status_code == 405
@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")])
def test_switch_from_off(authorized_client, ocserv_off, endpoint, target_file):
response = authorized_client.post(f"/services/ocserv/{endpoint}")
assert response.status_code == 200
assert read_json(ocserv_off / "turned_off.json") == read_json(ocserv_off / target_file)
@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")])
def test_switch_from_on(authorized_client, ocserv_on, endpoint, target_file):
response = authorized_client.post(f"/services/ocserv/{endpoint}")
assert response.status_code == 200
assert read_json(ocserv_on / "turned_on.json") == read_json(ocserv_on / target_file)
@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")])
def test_switch_twice(authorized_client, ocserv_off, endpoint, target_file):
response = authorized_client.post(f"/services/ocserv/{endpoint}")
assert response.status_code == 200
response = authorized_client.post(f"/services/ocserv/{endpoint}")
assert response.status_code == 200
assert read_json(ocserv_off / "turned_off.json") == read_json(ocserv_off / target_file)
@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")])
def test_on_attribute_deleted(authorized_client, ocserv_enable_undefined, endpoint, target_file):
response = authorized_client.post(f"/services/ocserv/{endpoint}")
assert response.status_code == 200
assert read_json(ocserv_enable_undefined / "enable_undefined.json") == read_json(ocserv_enable_undefined / target_file)
@pytest.mark.parametrize("endpoint,target_file", [("enable", "turned_on.json"), ("disable", "turned_off.json")])
def test_on_ocserv_undefined(authorized_client, ocserv_undefined, endpoint, target_file):
response = authorized_client.post(f"/services/ocserv/{endpoint}")
assert response.status_code == 200
assert read_json(ocserv_undefined / "undefined.json") == read_json(ocserv_undefined / target_file)

51
tests/services/test_ocserv/enable_undefined.json

@ -0,0 +1,51 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": false
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

52
tests/services/test_ocserv/turned_off.json

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": false
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": false
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

52
tests/services/test_ocserv/turned_on.json

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": false
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {