Browse Source

More unit tests and bugfixes (#7)

Reviewed-on: #7
Co-authored-by: Inex Code <inex.code@selfprivacy.org>
Co-committed-by: Inex Code <inex.code@selfprivacy.org>
backups-fix
Inex Code 7 months ago
parent
commit
13cef67204
  1. 2
      .coveragerc
  2. 10
      .drone.yml
  3. 3
      selfprivacy_api/app.py
  4. 52
      selfprivacy_api/resources/common.py
  5. 2
      selfprivacy_api/resources/services/restic.py
  6. 4
      selfprivacy_api/resources/services/ssh.py
  7. 15
      selfprivacy_api/resources/users.py
  8. 9
      selfprivacy_api/restic_controller/__init__.py
  9. 46
      selfprivacy_api/utils.py
  10. 18
      tests/conftest.py
  11. 6
      tests/services/test_mailserver.py
  12. 503
      tests/services/test_restic.py
  13. 68
      tests/services/test_restic/no_values.json
  14. 71
      tests/services/test_restic/some_values.json
  15. 66
      tests/services/test_restic/undefined.json
  16. 4
      tests/services/test_services.py
  17. 286
      tests/services/test_ssh.py
  18. 71
      tests/services/test_ssh/some_users.json
  19. 5
      tests/services/test_ssh/undefined.json
  20. 46
      tests/services/test_ssh/undefined_values.json
  21. 16
      tests/test_common.py
  22. 397
      tests/test_system.py
  23. 1
      tests/test_system/domain
  24. 50
      tests/test_system/no_values.json
  25. 52
      tests/test_system/turned_off.json
  26. 52
      tests/test_system/turned_on.json
  27. 47
      tests/test_system/undefined.json
  28. 270
      tests/test_users.py
  29. 54
      tests/test_users/no_users.json
  30. 61
      tests/test_users/one_user.json
  31. 71
      tests/test_users/some_users.json
  32. 52
      tests/test_users/undefined.json

2
.coveragerc

@ -0,0 +1,2 @@
[run]
source = selfprivacy_api

10
.drone.yml

@ -9,5 +9,11 @@ platform:
steps:
- name: test
commands:
- pytest
- coverage run -m pytest -q
- coverage xml
- name: bandit
commands:
- bandit -ll -r selfprivacy_api
- name: formatting
commands:
- black --check .

3
selfprivacy_api/app.py

@ -10,7 +10,7 @@ from flask_swagger import swagger
from flask_swagger_ui import get_swaggerui_blueprint
from selfprivacy_api.resources.users import User, Users
from selfprivacy_api.resources.common import ApiVersion, DecryptDisk
from selfprivacy_api.resources.common import ApiVersion
from selfprivacy_api.resources.system import api_system
from selfprivacy_api.resources.services import services as api_services
@ -51,7 +51,6 @@ def create_app(test_config=None):
api.add_resource(ApiVersion, "/api/version")
api.add_resource(Users, "/users")
api.add_resource(User, "/users/<string:username>")
api.add_resource(DecryptDisk, "/decryptDisk")
app.register_blueprint(api_system)
app.register_blueprint(api_services)

52
selfprivacy_api/resources/common.py

@ -25,55 +25,3 @@ class ApiVersion(Resource):
description: Unauthorized
"""
return {"version": "1.1.0"}
class DecryptDisk(Resource):
"""Decrypt disk"""
def post(self):
"""
Decrypt /dev/sdb using cryptsetup luksOpen
---
consumes:
- application/json
tags:
- System
security:
- bearerAuth: []
parameters:
- in: body
name: body
required: true
description: Provide a password for decryption
schema:
type: object
required:
- password
properties:
password:
type: string
description: Decryption password.
responses:
201:
description: OK
400:
description: Bad request
401:
description: Unauthorized
"""
parser = reqparse.RequestParser(bundle_errors=True)
parser.add_argument("password", type=str, required=True)
args = parser.parse_args()
decryption_command = ["cryptsetup", "luksOpen", "/dev/sdb", "decryptedVar"]
# TODO: Check if this works at all
decryption_service = subprocess.Popen(
decryption_command,
shell=False,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
decryption_service.communicate(input=args["password"])
return {"status": decryption_service.returncode}, 201

2
selfprivacy_api/resources/services/restic.py

@ -222,6 +222,8 @@ class BackblazeConfig(Resource):
args = parser.parse_args()
with WriteUserData() as data:
if "backblaze" not in data:
data["backblaze"] = {}
data["backblaze"]["accountId"] = args["accountId"]
data["backblaze"]["accountKey"] = args["accountKey"]
data["backblaze"]["bucket"] = args["bucket"]

4
selfprivacy_api/resources/services/ssh.py

@ -216,10 +216,10 @@ class SSHKeys(Resource):
if "users" not in data:
data["users"] = []
for user in data["users"]:
if user["name"] == username:
if user["username"] == username:
if "sshKeys" not in user:
user["sshKeys"] = []
return user["ssh"]["sshKeys"]
return user["sshKeys"]
return {
"error": "User not found",
}, 404

15
selfprivacy_api/resources/users.py

@ -4,7 +4,7 @@ import subprocess
import re
from flask_restful import Resource, reqparse
from selfprivacy_api.utils import WriteUserData, ReadUserData
from selfprivacy_api.utils import WriteUserData, ReadUserData, is_username_forbidden
class Users(Resource):
@ -26,8 +26,9 @@ class Users(Resource):
"""
with ReadUserData() as data:
users = []
for user in data["users"]:
users.append(user["username"])
if "users" in data:
for user in data["users"]:
users.append(user["username"])
return users
def post(self):
@ -71,7 +72,6 @@ class Users(Resource):
parser.add_argument("username", type=str, required=True)
parser.add_argument("password", type=str, required=True)
args = parser.parse_args()
hashing_command = ["mkpasswd", "-m", "sha-512", args["password"]]
password_hash_process_descriptor = subprocess.Popen(
hashing_command,
@ -82,7 +82,9 @@ class Users(Resource):
hashed_password = password_hash_process_descriptor.communicate()[0]
hashed_password = hashed_password.decode("ascii")
hashed_password = hashed_password.rstrip()
# Check if username is forbidden
if is_username_forbidden(args["username"]):
return {"message": "Username is forbidden"}, 409
# Check is username passes regex
if not re.match(r"^[a-z_][a-z0-9_]+$", args["username"]):
return {"error": "username must be alphanumeric"}, 400
@ -137,9 +139,6 @@ class User(Resource):
description: User not found
"""
with WriteUserData() as data:
# Return 400 if username is not provided
if username is None:
return {"error": "username is required"}, 400
if username == data["username"]:
return {"error": "Cannot delete root user"}, 400
# Return 400 if user does not exist

9
selfprivacy_api/restic_controller/__init__.py

@ -40,7 +40,6 @@ class ResticController:
_initialized = False
def __new__(cls):
print("new is called!")
if not cls._instance:
with cls._lock:
cls._instance = super(ResticController, cls).__new__(cls)
@ -58,7 +57,6 @@ class ResticController:
self.snapshot_list = []
self.error_message = None
self._initialized = True
print("init is called!")
self.load_configuration()
self.write_rclone_config()
self.load_snapshots()
@ -112,7 +110,6 @@ class ResticController:
or self.state == ResticStates.RESTORING
):
return
print("preparing to read snapshots")
with subprocess.Popen(
backup_listing_command,
shell=False,
@ -181,7 +178,7 @@ class ResticController:
"backup",
"/var",
]
with open("/tmp/backup.log", "w", encoding="utf-8") as log_file:
with open("/var/backup.log", "w", encoding="utf-8") as log_file:
subprocess.Popen(
backup_command,
shell=False,
@ -196,7 +193,7 @@ class ResticController:
"""
Check progress of ongoing backup operation
"""
backup_status_check_command = ["tail", "-1", "/tmp/backup.log"]
backup_status_check_command = ["tail", "-1", "/var/backup.log"]
if (
self.state == ResticStates.NO_KEY
@ -205,7 +202,7 @@ class ResticController:
return
# If the log file does not exists
if os.path.exists("/tmp/backup.log") is False:
if os.path.exists("/var/backup.log") is False:
self.state = ResticStates.INITIALIZED
with subprocess.Popen(

46
selfprivacy_api/utils.py

@ -5,11 +5,12 @@ import portalocker
USERDATA_FILE = "/etc/nixos/userdata/userdata.json"
DOMAIN_FILE = "/var/domain"
def get_domain():
"""Get domain from /var/domain without trailing new line"""
with open("/var/domain", "r", encoding="utf-8") as domain_file:
with open(DOMAIN_FILE, "r", encoding="utf-8") as domain_file:
domain = domain_file.readline().rstrip()
return domain
@ -56,3 +57,46 @@ def validate_ssh_public_key(key):
if not key.startswith("ssh-rsa"):
return False
return True
def is_username_forbidden(username):
forbidden_prefixes = ["systemd", "nixbld"]
forbidden_usernames = [
"root",
"messagebus",
"postfix",
"polkituser",
"dovecot2",
"dovenull",
"nginx",
"postgres",
"prosody",
"opendkim",
"rspamd",
"sshd",
"selfprivacy-api",
"restic",
"redis",
"pleroma",
"ocserv",
"nextcloud",
"memcached",
"knot-resolver",
"gitea",
"bitwarden_rs",
"vaultwarden",
"acme",
"virtualMail",
"nobody",
]
for prefix in forbidden_prefixes:
if username.startswith(prefix):
return True
for forbidden_username in forbidden_usernames:
if username == forbidden_username:
return True
return False

18
tests/conftest.py

@ -32,12 +32,30 @@ class AuthorizedClient(testing.FlaskClient):
return super().open(*args, **kwargs)
class WrongAuthClient(testing.FlaskClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.token = "WRONG_TOKEN"
def open(self, *args, **kwargs):
if "headers" not in kwargs:
kwargs["headers"] = {}
kwargs["headers"]["Authorization"] = f"Bearer {self.token}"
return super().open(*args, **kwargs)
@pytest.fixture
def authorized_client(app):
app.test_client_class = AuthorizedClient
return app.test_client()
@pytest.fixture
def wrong_auth_client(app):
app.test_client_class = WrongAuthClient
return app.test_client()
@pytest.fixture
def runner(app):
return app.test_cli_runner()

6
tests/services/test_mailserver.py

@ -2,12 +2,6 @@ import base64
import json
import pytest
def read_json(file_path):
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
###############################################################################

503
tests/services/test_restic.py

@ -0,0 +1,503 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
import json
import pytest
from selfprivacy_api.restic_controller import ResticStates
def read_json(file_path):
with open(file_path, "r") as f:
return json.load(f)
MOCKED_SNAPSHOTS = [
{
"time": "2021-12-06T09:05:04.224685677+03:00",
"tree": "b76152d1e716d86d420407ead05d9911f2b6d971fe1589c12b63e4de65b14d4e",
"paths": ["/var"],
"hostname": "test-host",
"username": "root",
"id": "f96b428f1ca1252089ea3e25cd8ee33e63fb24615f1cc07559ba907d990d81c5",
"short_id": "f96b428f",
},
{
"time": "2021-12-08T07:42:06.998894055+03:00",
"parent": "f96b428f1ca1252089ea3e25cd8ee33e63fb24615f1cc07559ba907d990d81c5",
"tree": "8379b4fdc9ee3e9bb7c322f632a7bed9fc334b0258abbf4e7134f8fe5b3d61b0",
"paths": ["/var"],
"hostname": "test-host",
"username": "root",
"id": "db96b36efec97e5ba385099b43f9062d214c7312c20138aee7b8bd2c6cd8995a",
"short_id": "db96b36e",
},
]
class ResticControllerMock:
snapshot_list = MOCKED_SNAPSHOTS
state = ResticStates.INITIALIZED
progress = 0
error_message = None
@pytest.fixture
def mock_restic_controller(mocker):
mock = mocker.patch(
"selfprivacy_api.resources.services.restic.ResticController",
autospec=True,
return_value=ResticControllerMock,
)
return mock
class ResticControllerMockNoKey:
snapshot_list = []
state = ResticStates.NO_KEY
progress = 0
error_message = None
@pytest.fixture
def mock_restic_controller_no_key(mocker):
mock = mocker.patch(
"selfprivacy_api.resources.services.restic.ResticController",
autospec=True,
return_value=ResticControllerMockNoKey,
)
return mock
class ResticControllerNotInitialized:
snapshot_list = []
state = ResticStates.NOT_INITIALIZED
progress = 0
error_message = None
@pytest.fixture
def mock_restic_controller_not_initialized(mocker):
mock = mocker.patch(
"selfprivacy_api.resources.services.restic.ResticController",
autospec=True,
return_value=ResticControllerNotInitialized,
)
return mock
class ResticControllerInitializing:
snapshot_list = []
state = ResticStates.INITIALIZING
progress = 0
error_message = None
@pytest.fixture
def mock_restic_controller_initializing(mocker):
mock = mocker.patch(
"selfprivacy_api.resources.services.restic.ResticController",
autospec=True,
return_value=ResticControllerInitializing,
)
return mock
class ResticControllerBackingUp:
snapshot_list = MOCKED_SNAPSHOTS
state = ResticStates.BACKING_UP
progress = 0.42
error_message = None
@pytest.fixture
def mock_restic_controller_backing_up(mocker):
mock = mocker.patch(
"selfprivacy_api.resources.services.restic.ResticController",
autospec=True,
return_value=ResticControllerBackingUp,
)
return mock
class ResticControllerError:
snapshot_list = MOCKED_SNAPSHOTS
state = ResticStates.ERROR
progress = 0
error_message = "Error message"
@pytest.fixture
def mock_restic_controller_error(mocker):
mock = mocker.patch(
"selfprivacy_api.resources.services.restic.ResticController",
autospec=True,
return_value=ResticControllerError,
)
return mock
class ResticControllerRestoring:
snapshot_list = MOCKED_SNAPSHOTS
state = ResticStates.RESTORING
progress = 0
error_message = None
@pytest.fixture
def mock_restic_controller_restoring(mocker):
mock = mocker.patch(
"selfprivacy_api.resources.services.restic.ResticController",
autospec=True,
return_value=ResticControllerRestoring,
)
return mock
@pytest.fixture
def mock_restic_tasks(mocker):
mock = mocker.patch(
"selfprivacy_api.resources.services.restic.restic_tasks", autospec=True
)
return mock
@pytest.fixture
def undefined_settings(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json")
assert "backblaze" not in read_json(datadir / "undefined.json")
return datadir
@pytest.fixture
def some_settings(mocker, datadir):
mocker.patch(
"selfprivacy_api.utils.USERDATA_FILE", new=datadir / "some_values.json"
)
assert "backblaze" in read_json(datadir / "some_values.json")
assert read_json(datadir / "some_values.json")["backblaze"]["accountId"] == "ID"
assert read_json(datadir / "some_values.json")["backblaze"]["accountKey"] == "KEY"
assert read_json(datadir / "some_values.json")["backblaze"]["bucket"] == "BUCKET"
return datadir
@pytest.fixture
def no_values(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "no_values.json")
assert "backblaze" in read_json(datadir / "no_values.json")
assert "accountId" not in read_json(datadir / "no_values.json")["backblaze"]
assert "accountKey" not in read_json(datadir / "no_values.json")["backblaze"]
assert "bucket" not in read_json(datadir / "no_values.json")["backblaze"]
return datadir
def test_get_snapshots_unauthorized(client, mock_restic_controller, mock_restic_tasks):
response = client.get("/services/restic/backup/list")
assert response.status_code == 401
def test_get_snapshots(authorized_client, mock_restic_controller, mock_restic_tasks):
response = authorized_client.get("/services/restic/backup/list")
assert response.status_code == 200
assert response.get_json() == MOCKED_SNAPSHOTS
def test_create_backup_unauthorized(client, mock_restic_controller, mock_restic_tasks):
response = client.put("/services/restic/backup/create")
assert response.status_code == 401
def test_create_backup(authorized_client, mock_restic_controller, mock_restic_tasks):
response = authorized_client.put("/services/restic/backup/create")
assert response.status_code == 200
assert mock_restic_tasks.start_backup.call_count == 1
def test_create_backup_without_key(
authorized_client, mock_restic_controller_no_key, mock_restic_tasks
):
response = authorized_client.put("/services/restic/backup/create")
assert response.status_code == 400
assert mock_restic_tasks.start_backup.call_count == 0
def test_create_backup_initializing(
authorized_client, mock_restic_controller_initializing, mock_restic_tasks
):
response = authorized_client.put("/services/restic/backup/create")
assert response.status_code == 400
assert mock_restic_tasks.start_backup.call_count == 0
def test_create_backup_backing_up(
authorized_client, mock_restic_controller_backing_up, mock_restic_tasks
):
response = authorized_client.put("/services/restic/backup/create")
assert response.status_code == 409
assert mock_restic_tasks.start_backup.call_count == 0
def test_check_backup_status_unauthorized(
client, mock_restic_controller, mock_restic_tasks
):
response = client.get("/services/restic/backup/status")
assert response.status_code == 401
def test_check_backup_status(
authorized_client, mock_restic_controller, mock_restic_tasks
):
response = authorized_client.get("/services/restic/backup/status")
assert response.status_code == 200
assert response.get_json() == {
"status": "INITIALIZED",
"progress": 0,
"error_message": None,
}
def test_check_backup_status_no_key(
authorized_client, mock_restic_controller_no_key, mock_restic_tasks
):
response = authorized_client.get("/services/restic/backup/status")
assert response.status_code == 200
assert response.get_json() == {
"status": "NO_KEY",
"progress": 0,
"error_message": None,
}
def test_check_backup_status_not_initialized(
authorized_client, mock_restic_controller_not_initialized, mock_restic_tasks
):
response = authorized_client.get("/services/restic/backup/status")
assert response.status_code == 200
assert response.get_json() == {
"status": "NOT_INITIALIZED",
"progress": 0,
"error_message": None,
}
def test_check_backup_status_initializing(
authorized_client, mock_restic_controller_initializing, mock_restic_tasks
):
response = authorized_client.get("/services/restic/backup/status")
assert response.status_code == 200
assert response.get_json() == {
"status": "INITIALIZING",
"progress": 0,
"error_message": None,
}
def test_check_backup_status_backing_up(
authorized_client, mock_restic_controller_backing_up
):
response = authorized_client.get("/services/restic/backup/status")
assert response.status_code == 200
assert response.get_json() == {
"status": "BACKING_UP",
"progress": 0.42,
"error_message": None,
}
def test_check_backup_status_error(
authorized_client, mock_restic_controller_error, mock_restic_tasks
):
response = authorized_client.get("/services/restic/backup/status")
assert response.status_code == 200
assert response.get_json() == {
"status": "ERROR",
"progress": 0,
"error_message": "Error message",
}
def test_check_backup_status_restoring(
authorized_client, mock_restic_controller_restoring, mock_restic_tasks
):
response = authorized_client.get("/services/restic/backup/status")
assert response.status_code == 200
assert response.get_json() == {
"status": "RESTORING",
"progress": 0,
"error_message": None,
}
def test_reload_unauthenticated(client, mock_restic_controller, mock_restic_tasks):
response = client.get("/services/restic/backup/reload")
assert response.status_code == 401
def test_backup_reload(authorized_client, mock_restic_controller, mock_restic_tasks):
response = authorized_client.get("/services/restic/backup/reload")
assert response.status_code == 200
assert mock_restic_tasks.load_snapshots.call_count == 1
def test_backup_restore_unauthorized(client, mock_restic_controller, mock_restic_tasks):
response = client.put("/services/restic/backup/restore")
assert response.status_code == 401
def test_backup_restore_without_backup_id(
authorized_client, mock_restic_controller, mock_restic_tasks
):
response = authorized_client.put("/services/restic/backup/restore", json={})
assert response.status_code == 400
assert mock_restic_tasks.restore_from_backup.call_count == 0
def test_backup_restore_with_nonexistent_backup_id(
authorized_client, mock_restic_controller, mock_restic_tasks
):
response = authorized_client.put(
"/services/restic/backup/restore", json={"backupId": "nonexistent"}
)
assert response.status_code == 404
assert mock_restic_tasks.restore_from_backup.call_count == 0
def test_backup_restore_when_no_key(
authorized_client, mock_restic_controller_no_key, mock_restic_tasks
):
response = authorized_client.put(
"/services/restic/backup/restore", json={"backupId": "f96b428f"}
)
assert response.status_code == 400
assert mock_restic_tasks.restore_from_backup.call_count == 0
def test_backup_restore_when_not_initialized(
authorized_client, mock_restic_controller_not_initialized, mock_restic_tasks
):
response = authorized_client.put(
"/services/restic/backup/restore", json={"backupId": "f96b428f"}
)
assert response.status_code == 400
assert mock_restic_tasks.restore_from_backup.call_count == 0
def test_backup_restore_when_initializing(
authorized_client, mock_restic_controller_initializing, mock_restic_tasks
):
response = authorized_client.put(
"/services/restic/backup/restore", json={"backupId": "f96b428f"}
)
assert response.status_code == 400
assert mock_restic_tasks.restore_from_backup.call_count == 0
def test_backup_restore_when_backing_up(
authorized_client, mock_restic_controller_backing_up, mock_restic_tasks
):
response = authorized_client.put(
"/services/restic/backup/restore", json={"backupId": "f96b428f"}
)
assert response.status_code == 409
assert mock_restic_tasks.restore_from_backup.call_count == 0
def test_backup_restore_when_restoring(
authorized_client, mock_restic_controller_restoring, mock_restic_tasks
):
response = authorized_client.put(
"/services/restic/backup/restore", json={"backupId": "f96b428f"}
)
assert response.status_code == 409
assert mock_restic_tasks.restore_from_backup.call_count == 0
def test_backup_restore_when_error(
authorized_client, mock_restic_controller_error, mock_restic_tasks
):
response = authorized_client.put(
"/services/restic/backup/restore", json={"backupId": "f96b428f"}
)
assert response.status_code == 200
assert mock_restic_tasks.restore_from_backup.call_count == 1
def test_backup_restore(authorized_client, mock_restic_controller, mock_restic_tasks):
response = authorized_client.put(
"/services/restic/backup/restore", json={"backupId": "f96b428f"}
)
assert response.status_code == 200
assert mock_restic_tasks.restore_from_backup.call_count == 1
def test_set_backblaze_config_unauthorized(
client, mock_restic_controller, mock_restic_tasks, some_settings
):
response = client.put("/services/restic/backblaze/config")
assert response.status_code == 401
assert mock_restic_tasks.update_keys_from_userdata.call_count == 0
def test_set_backblaze_config_without_arguments(
authorized_client, mock_restic_controller, mock_restic_tasks, some_settings
):
response = authorized_client.put("/services/restic/backblaze/config")
assert response.status_code == 400
assert mock_restic_tasks.update_keys_from_userdata.call_count == 0
def test_set_backblaze_config_without_all_values(
authorized_client, mock_restic_controller, mock_restic_tasks, some_settings
):
response = authorized_client.put(
"/services/restic/backblaze/config",
json={"accountId": "123", "applicationKey": "456"},
)
assert response.status_code == 400
assert mock_restic_tasks.update_keys_from_userdata.call_count == 0
def test_set_backblaze_config(
authorized_client, mock_restic_controller, mock_restic_tasks, some_settings
):
response = authorized_client.put(
"/services/restic/backblaze/config",
json={"accountId": "123", "accountKey": "456", "bucket": "789"},
)
assert response.status_code == 200
assert mock_restic_tasks.update_keys_from_userdata.call_count == 1
assert read_json(some_settings / "some_values.json")["backblaze"] == {
"accountId": "123",
"accountKey": "456",
"bucket": "789",
}
def test_set_backblaze_config_on_undefined(
authorized_client, mock_restic_controller, mock_restic_tasks, undefined_settings
):
response = authorized_client.put(
"/services/restic/backblaze/config",
json={"accountId": "123", "accountKey": "456", "bucket": "789"},
)
assert response.status_code == 200
assert mock_restic_tasks.update_keys_from_userdata.call_count == 1
assert read_json(undefined_settings / "undefined.json")["backblaze"] == {
"accountId": "123",
"accountKey": "456",
"bucket": "789",
}
def test_set_backblaze_config_on_no_values(
authorized_client, mock_restic_controller, mock_restic_tasks, no_values
):
response = authorized_client.put(
"/services/restic/backblaze/config",
json={"accountId": "123", "accountKey": "456", "bucket": "789"},
)
assert response.status_code == 200
assert mock_restic_tasks.update_keys_from_userdata.call_count == 1
assert read_json(no_values / "no_values.json")["backblaze"] == {
"accountId": "123",
"accountKey": "456",
"bucket": "789",
}

68
tests/services/test_restic/no_values.json

@ -0,0 +1,68 @@
{
"backblaze": {
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"users": [
{
"username": "user1",
"hashedPassword": "HASHED_PASSWORD_1",
"sshKeys": [
"ssh-rsa KEY user1@pc"
]
},
{
"username": "user2",
"hashedPassword": "HASHED_PASSWORD_2",
"sshKeys": [
]
},
{
"username": "user3",
"hashedPassword": "HASHED_PASSWORD_3"
}
]
}

71
tests/services/test_restic/some_values.json

@ -0,0 +1,71 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "BUCKET"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"users": [
{
"username": "user1",
"hashedPassword": "HASHED_PASSWORD_1",
"sshKeys": [
"ssh-rsa KEY user1@pc"
]
},
{
"username": "user2",
"hashedPassword": "HASHED_PASSWORD_2",
"sshKeys": [
]
},
{
"username": "user3",
"hashedPassword": "HASHED_PASSWORD_3"
}
]
}

66
tests/services/test_restic/undefined.json

@ -0,0 +1,66 @@
{
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": false
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"users": [
{
"username": "user1",
"hashedPassword": "HASHED_PASSWORD_1",
"sshKeys": [
"ssh-rsa KEY user1@pc"
]
},
{
"username": "user2",
"hashedPassword": "HASHED_PASSWORD_2",
"sshKeys": [
]
},
{
"username": "user3",
"hashedPassword": "HASHED_PASSWORD_3"
}
]
}

4
tests/services/test_services.py

@ -4,8 +4,8 @@ import pytest
def read_json(file_path):
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
with open(file_path, "r", encoding="utf-8") as file:
return json.load(file)
def call_args_asserts(mocked_object):

286
tests/services/test_ssh.py

@ -1,41 +1,38 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
import json
from os import read
import pytest
def read_json(file_path):
with open(file_path, "r") as f:
return json.load(f)
with open(file_path, "r", encoding="utf-8") as file:
return json.load(file)
###############################################################################
## FIXTURES ###################################################
@pytest.fixture
def ssh_off(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json")
assert read_json(datadir / "turned_off.json")["ssh"]["enable"] == False
assert (
read_json(datadir / "turned_off.json")["ssh"]["passwordAuthentication"] == True
)
assert not read_json(datadir / "turned_off.json")["ssh"]["enable"]
assert read_json(datadir / "turned_off.json")["ssh"]["passwordAuthentication"]
return datadir
@pytest.fixture
def ssh_on(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json")
assert (
read_json(datadir / "turned_off.json")["ssh"]["passwordAuthentication"] == True
)
assert read_json(datadir / "turned_on.json")["ssh"]["enable"] == True
assert read_json(datadir / "turned_off.json")["ssh"]["passwordAuthentication"]
assert read_json(datadir / "turned_on.json")["ssh"]["enable"]
return datadir
@pytest.fixture
def all_off(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "all_off.json")
assert read_json(datadir / "all_off.json")["ssh"]["passwordAuthentication"] == False
assert read_json(datadir / "all_off.json")["ssh"]["enable"] == False
assert not read_json(datadir / "all_off.json")["ssh"]["passwordAuthentication"]
assert not read_json(datadir / "all_off.json")["ssh"]["enable"]
return datadir
@ -46,19 +43,30 @@ def undefined_settings(mocker, datadir):
return datadir
@pytest.fixture
def undefined_values(mocker, datadir):
mocker.patch(
"selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined_values.json"
)
assert "ssh" in read_json(datadir / "undefined_values.json")
assert "enable" not in read_json(datadir / "undefined_values.json")["ssh"]
assert (
"passwordAuthentication"
not in read_json(datadir / "undefined_values.json")["ssh"]
)
return datadir
@pytest.fixture
def root_and_admin_have_keys(mocker, datadir):
mocker.patch(
"selfprivacy_api.utils.USERDATA_FILE",
new=datadir / "root_and_admin_have_keys.json",
)
assert read_json(datadir / "root_and_admin_have_keys.json")["ssh"]["enable"] == True
assert (
read_json(datadir / "root_and_admin_have_keys.json")["ssh"][
"passwordAuthentication"
]
== True
)
assert read_json(datadir / "root_and_admin_have_keys.json")["ssh"]["enable"]
assert read_json(datadir / "root_and_admin_have_keys.json")["ssh"][
"passwordAuthentication"
]
assert read_json(datadir / "root_and_admin_have_keys.json")["ssh"]["rootKeys"] == [
"ssh-ed25519 KEY test@pc"
]
@ -68,7 +76,23 @@ def root_and_admin_have_keys(mocker, datadir):
return datadir
###############################################################################
@pytest.fixture
def some_users(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "some_users.json")
assert "users" in read_json(datadir / "some_users.json")
assert read_json(datadir / "some_users.json")["users"] == [
{
"username": "user1",
"hashedPassword": "HASHED_PASSWORD_1",
"sshKeys": ["ssh-rsa KEY user1@pc"],
},
{"username": "user2", "hashedPassword": "HASHED_PASSWORD_2", "sshKeys": []},
{"username": "user3", "hashedPassword": "HASHED_PASSWORD_3"},
]
return datadir
## TEST 401 ######################################################
@pytest.mark.parametrize(
@ -79,20 +103,33 @@ def test_unauthorized(client, ssh_off, endpoint):
assert response.status_code == 401
## TEST ENABLE ######################################################
def test_legacy_enable(authorized_client, ssh_off):
response = authorized_client.post(f"/services/ssh/enable")
response = authorized_client.post("/services/ssh/enable")
assert response.status_code == 200
assert read_json(ssh_off / "turned_off.json") == read_json(
ssh_off / "turned_on.json"
)
def test_legacy_on_undefined(authorized_client, undefined_settings):
response = authorized_client.post("/services/ssh/enable")
assert response.status_code == 200
data = read_json(undefined_settings / "undefined.json")
assert data["ssh"]["enable"] == True
def test_legacy_enable_when_enabled(authorized_client, ssh_on):
response = authorized_client.post(f"/services/ssh/enable")
response = authorized_client.post("/services/ssh/enable")
assert response.status_code == 200
assert read_json(ssh_on / "turned_on.json") == read_json(ssh_on / "turned_on.json")
## GET ON /ssh ######################################################
def test_get_current_settings_ssh_off(authorized_client, ssh_off):
response = authorized_client.get("/services/ssh")
assert response.status_code == 200
@ -117,6 +154,14 @@ def test_get_current_settings_undefined(authorized_client, undefined_settings):
assert response.json == {"enable": True, "passwordAuthentication": True}
def test_get_current_settings_mostly_undefined(authorized_client, undefined_values):
response = authorized_client.get("/services/ssh")
assert response.status_code == 200
assert response.json == {"enable": True, "passwordAuthentication": True}
## PUT ON /ssh ######################################################
available_settings = [
{"enable": True, "passwordAuthentication": True},
{"enable": True, "passwordAuthentication": False},
@ -131,7 +176,7 @@ available_settings = [
@pytest.mark.parametrize("settings", available_settings)
def test_set_settings_ssh_off(authorized_client, ssh_off, settings):
response = authorized_client.put(f"/services/ssh", json=settings)
response = authorized_client.put("/services/ssh", json=settings)
assert response.status_code == 200
data = read_json(ssh_off / "turned_off.json")["ssh"]
if "enable" in settings:
@ -142,7 +187,7 @@ def test_set_settings_ssh_off(authorized_client, ssh_off, settings):
@pytest.mark.parametrize("settings", available_settings)
def test_set_settings_ssh_on(authorized_client, ssh_on, settings):
response = authorized_client.put(f"/services/ssh", json=settings)
response = authorized_client.put("/services/ssh", json=settings)
assert response.status_code == 200
data = read_json(ssh_on / "turned_on.json")["ssh"]
if "enable" in settings:
@ -153,7 +198,7 @@ def test_set_settings_ssh_on(authorized_client, ssh_on, settings):
@pytest.mark.parametrize("settings", available_settings)
def test_set_settings_all_off(authorized_client, all_off, settings):
response = authorized_client.put(f"/services/ssh", json=settings)
response = authorized_client.put("/services/ssh", json=settings)
assert response.status_code == 200
data = read_json(all_off / "all_off.json")["ssh"]
if "enable" in settings:
@ -164,7 +209,7 @@ def test_set_settings_all_off(authorized_client, all_off, settings):
@pytest.mark.parametrize("settings", available_settings)
def test_set_settings_undefined(authorized_client, undefined_settings, settings):
response = authorized_client.put(f"/services/ssh", json=settings)
response = authorized_client.put("/services/ssh", json=settings)
assert response.status_code == 200
data = read_json(undefined_settings / "undefined.json")["ssh"]
if "enable" in settings:
@ -173,9 +218,12 @@ def test_set_settings_undefined(authorized_client, undefined_settings, settings)
assert data["passwordAuthentication"] == settings["passwordAuthentication"]
## PUT ON /ssh/key/send ######################################################
def test_add_root_key(authorized_client, ssh_on):
response = authorized_client.put(
f"/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"}
"/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"}
)
assert response.status_code == 201
assert read_json(ssh_on / "turned_on.json")["ssh"]["rootKeys"] == [
@ -183,9 +231,18 @@ def test_add_root_key(authorized_client, ssh_on):
]
def test_add_root_key_on_undefined(authorized_client, undefined_settings):
response = authorized_client.put(
"/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"}
)
assert response.status_code == 201
data = read_json(undefined_settings / "undefined.json")
assert data["ssh"]["rootKeys"] == ["ssh-rsa KEY test@pc"]
def test_add_root_key_one_more(authorized_client, root_and_admin_have_keys):
response = authorized_client.put(
f"/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"}
"/services/ssh/key/send", json={"public_key": "ssh-rsa KEY test@pc"}
)
assert response.status_code == 201
assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][
@ -198,7 +255,7 @@ def test_add_root_key_one_more(authorized_client, root_and_admin_have_keys):
def test_add_existing_root_key(authorized_client, root_and_admin_have_keys):
response = authorized_client.put(
f"/services/ssh/key/send", json={"public_key": "ssh-ed25519 KEY test@pc"}
"/services/ssh/key/send", json={"public_key": "ssh-ed25519 KEY test@pc"}
)
assert response.status_code == 409
assert read_json(root_and_admin_have_keys / "root_and_admin_have_keys.json")["ssh"][