Coverage for users object
continuous-integration/drone/push Build is failing
Details
continuous-integration/drone/push Build is failing
Details
parent
2896f8d34d
commit
749ea3afe8
|
@ -4,7 +4,7 @@ import subprocess
|
|||
import re
|
||||
from flask_restful import Resource, reqparse
|
||||
|
||||
from selfprivacy_api.utils import WriteUserData, ReadUserData
|
||||
from selfprivacy_api.utils import WriteUserData, ReadUserData, is_username_forbidden
|
||||
|
||||
|
||||
class Users(Resource):
|
||||
|
@ -26,8 +26,9 @@ class Users(Resource):
|
|||
"""
|
||||
with ReadUserData() as data:
|
||||
users = []
|
||||
for user in data["users"]:
|
||||
users.append(user["username"])
|
||||
if "users" in data:
|
||||
for user in data["users"]:
|
||||
users.append(user["username"])
|
||||
return users
|
||||
|
||||
def post(self):
|
||||
|
@ -81,6 +82,9 @@ class Users(Resource):
|
|||
hashed_password = password_hash_process_descriptor.communicate()[0]
|
||||
hashed_password = hashed_password.decode("ascii")
|
||||
hashed_password = hashed_password.rstrip()
|
||||
# Check if username is forbidden
|
||||
if is_username_forbidden(args["username"]):
|
||||
return {"message": "Username is forbidden"}, 409
|
||||
# Check is username passes regex
|
||||
if not re.match(r"^[a-z_][a-z0-9_]+$", args["username"]):
|
||||
return {"error": "username must be alphanumeric"}, 400
|
||||
|
@ -135,9 +139,6 @@ class User(Resource):
|
|||
description: User not found
|
||||
"""
|
||||
with WriteUserData() as data:
|
||||
# Return 400 if username is not provided
|
||||
if username is None:
|
||||
return {"error": "username is required"}, 400
|
||||
if username == data["username"]:
|
||||
return {"error": "Cannot delete root user"}, 400
|
||||
# Return 400 if user does not exist
|
||||
|
|
|
@ -57,3 +57,46 @@ def validate_ssh_public_key(key):
|
|||
if not key.startswith("ssh-rsa"):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def is_username_forbidden(username):
|
||||
forbidden_prefixes = ["systemd", "nixbld"]
|
||||
|
||||
forbidden_usernames = [
|
||||
"root",
|
||||
"messagebus",
|
||||
"postfix",
|
||||
"polkituser",
|
||||
"dovecot2",
|
||||
"dovenull",
|
||||
"nginx",
|
||||
"postgres",
|
||||
"prosody",
|
||||
"opendkim",
|
||||
"rspamd",
|
||||
"sshd",
|
||||
"selfprivacy-api",
|
||||
"restic",
|
||||
"redis",
|
||||
"pleroma",
|
||||
"ocserv",
|
||||
"nextcloud",
|
||||
"memcached",
|
||||
"knot-resolver",
|
||||
"gitea",
|
||||
"bitwarden_rs",
|
||||
"vaultwarden",
|
||||
"acme",
|
||||
"virtualMail",
|
||||
"nobody",
|
||||
]
|
||||
|
||||
for prefix in forbidden_prefixes:
|
||||
if username.startswith(prefix):
|
||||
return True
|
||||
|
||||
for forbidden_username in forbidden_usernames:
|
||||
if username == forbidden_username:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
|
|
@ -0,0 +1,232 @@
|
|||
# pylint: disable=redefined-outer-name
|
||||
# pylint: disable=unused-argument
|
||||
import json
|
||||
import pytest
|
||||
|
||||
|
||||
def read_json(file_path):
|
||||
with open(file_path, "r", encoding="utf-8") as file:
|
||||
return json.load(file)
|
||||
|
||||
|
||||
invalid_usernames = [
|
||||
"root",
|
||||
"messagebus",
|
||||
"postfix",
|
||||
"polkituser",
|
||||
"dovecot2",
|
||||
"dovenull",
|
||||
"nginx",
|
||||
"postgres",
|
||||
"systemd-journal-gateway",
|
||||
"prosody",
|
||||
"systemd-network",
|
||||
"systemd-resolve",
|
||||
"systemd-timesync",
|
||||
"opendkim",
|
||||
"rspamd",
|
||||
"sshd",
|
||||
"selfprivacy-api",
|
||||
"restic",
|
||||
"redis",
|
||||
"pleroma",
|
||||
"ocserv",
|
||||
"nextcloud",
|
||||
"memcached",
|
||||
"knot-resolver",
|
||||
"gitea",
|
||||
"bitwarden_rs",
|
||||
"vaultwarden",
|
||||
"acme",
|
||||
"virtualMail",
|
||||
"nixbld1",
|
||||
"nixbld2",
|
||||
"nixbld29",
|
||||
"nobody",
|
||||
]
|
||||
|
||||
|
||||
## FIXTURES ###################################################
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def no_users(mocker, datadir):
|
||||
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "no_users.json")
|
||||
assert read_json(datadir / "no_users.json")["users"] == []
|
||||
return datadir
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def one_user(mocker, datadir):
|
||||
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "one_user.json")
|
||||
assert read_json(datadir / "one_user.json")["users"] == [
|
||||
{
|
||||
"username": "user1",
|
||||
"hashedPassword": "HASHED_PASSWORD_1",
|
||||
"sshKeys": ["ssh-rsa KEY user1@pc"],
|
||||
}
|
||||
]
|
||||
return datadir
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def some_users(mocker, datadir):
|
||||
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "some_users.json")
|
||||
assert read_json(datadir / "some_users.json")["users"] == [
|
||||
{
|
||||
"username": "user1",
|
||||
"hashedPassword": "HASHED_PASSWORD_1",
|
||||
"sshKeys": ["ssh-rsa KEY user1@pc"],
|
||||
},
|
||||
{"username": "user2", "hashedPassword": "HASHED_PASSWORD_2", "sshKeys": []},
|
||||
{"username": "user3", "hashedPassword": "HASHED_PASSWORD_3"},
|
||||
]
|
||||
return datadir
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def undefined_settings(mocker, datadir):
|
||||
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json")
|
||||
assert "users" not in read_json(datadir / "undefined.json")
|
||||
return datadir
|
||||
|
||||
|
||||
## TESTS ######################################################
|
||||
|
||||
|
||||
def test_get_users_unauthorized(client, some_users):
|
||||
response = client.get("/users")
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
def test_get_some_users(authorized_client, some_users):
|
||||
response = authorized_client.get("/users")
|
||||
assert response.status_code == 200
|
||||
assert response.json == ["user1", "user2", "user3"]
|
||||
|
||||
|
||||
def test_get_one_user(authorized_client, one_user):
|
||||
response = authorized_client.get("/users")
|
||||
assert response.status_code == 200
|
||||
assert response.json == ["user1"]
|
||||
|
||||
|
||||
def test_get_no_users(authorized_client, no_users):
|
||||
response = authorized_client.get("/users")
|
||||
assert response.status_code == 200
|
||||
assert response.json == []
|
||||
|
||||
|
||||
def test_get_undefined_users(authorized_client, undefined_settings):
|
||||
response = authorized_client.get("/users")
|
||||
assert response.status_code == 200
|
||||
assert response.json == []
|
||||
|
||||
|
||||
def test_post_users_unauthorized(client, some_users):
|
||||
response = client.post("/users")
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
def test_post_one_user(authorized_client, one_user):
|
||||
response = authorized_client.post(
|
||||
"/users", json={"username": "user4", "password": "password"}
|
||||
)
|
||||
assert response.status_code == 201
|
||||
assert read_json(one_user / "one_user.json")["users"][1]["username"] == "user4"
|
||||
|
||||
|
||||
def test_post_without_username(authorized_client, one_user):
|
||||
response = authorized_client.post("/users", json={"password": "password"})
|
||||
assert response.status_code == 400
|
||||
|
||||
|
||||
def test_post_without_password(authorized_client, one_user):
|
||||
response = authorized_client.post("/users", json={"username": "user4"})
|
||||
assert response.status_code == 400
|
||||
|
||||
|
||||
def test_post_without_username_and_password(authorized_client, one_user):
|
||||
response = authorized_client.post("/users", json={})
|
||||
assert response.status_code == 400
|
||||
|
||||
|
||||
@pytest.mark.parametrize("username", invalid_usernames)
|
||||
def test_post_system_user(authorized_client, one_user, username):
|
||||
response = authorized_client.post(
|
||||
"/users", json={"username": username, "password": "password"}
|
||||
)
|
||||
assert response.status_code == 409
|
||||
|
||||
|
||||
def test_post_existing_user(authorized_client, one_user):
|
||||
response = authorized_client.post(
|
||||
"/users", json={"username": "user1", "password": "password"}
|
||||
)
|
||||
assert response.status_code == 409
|
||||
|
||||
|
||||
def test_post_user_to_undefined_users(authorized_client, undefined_settings):
|
||||
response = authorized_client.post(
|
||||
"/users", json={"username": "user4", "password": "password"}
|
||||
)
|
||||
assert response.status_code == 201
|
||||
assert (
|
||||
read_json(undefined_settings / "undefined.json")["users"][0]["username"]
|
||||
== "user4"
|
||||
)
|
||||
|
||||
|
||||
def test_post_very_long_username(authorized_client, one_user):
|
||||
response = authorized_client.post(
|
||||
"/users", json={"username": "a" * 100, "password": "password"}
|
||||
)
|
||||
assert response.status_code == 400
|
||||
|
||||
|
||||
@pytest.mark.parametrize("username", ["", "1", "фыр", "user1@", "№:%##$^&@$&^()_"])
|
||||
def test_post_invalid_username(authorized_client, one_user, username):
|
||||
response = authorized_client.post(
|
||||
"/users", json={"username": username, "password": "password"}
|
||||
)
|
||||
assert response.status_code == 400
|
||||
|
||||
|
||||
def test_delete_user_unauthorized(client, some_users):
|
||||
response = client.delete("/users/user1")
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
def test_delete_user_not_found(authorized_client, some_users):
|
||||
response = authorized_client.delete("/users/user4")
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
def test_delete_user(authorized_client, some_users):
|
||||
response = authorized_client.delete("/users/user1")
|
||||
assert response.status_code == 200
|
||||
assert read_json(some_users / "some_users.json")["users"] == [
|
||||
{"username": "user2", "hashedPassword": "HASHED_PASSWORD_2", "sshKeys": []},
|
||||
{"username": "user3", "hashedPassword": "HASHED_PASSWORD_3"},
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("username", invalid_usernames)
|
||||
def test_delete_system_user(authorized_client, some_users, username):
|
||||
response = authorized_client.delete("/users/" + username)
|
||||
assert response.status_code == 400 or response.status_code == 404
|
||||
|
||||
|
||||
def test_delete_main_user(authorized_client, some_users):
|
||||
response = authorized_client.delete("/users/tester")
|
||||
assert response.status_code == 400
|
||||
|
||||
|
||||
def test_delete_without_argument(authorized_client, some_users):
|
||||
response = authorized_client.delete("/users/")
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
def test_delete_just_delete(authorized_client, some_users):
|
||||
response = authorized_client.delete("/users")
|
||||
assert response.status_code == 405
|
|
@ -0,0 +1,54 @@
|
|||
{
|
||||
"backblaze": {
|
||||
"accountId": "ID",
|
||||
"accountKey": "KEY",
|
||||
"bucket": "selfprivacy"
|
||||
},
|
||||
"api": {
|
||||
"token": "TEST_TOKEN",
|
||||
"enableSwagger": false
|
||||
},
|
||||
"bitwarden": {
|
||||
"enable": false
|
||||
},
|
||||
"cloudflare": {
|
||||
"apiKey": "TOKEN"
|
||||
},
|
||||
"databasePassword": "PASSWORD",
|
||||
"domain": "test.tld",
|
||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||
"hostname": "test-instance",
|
||||
"nextcloud": {
|
||||
"adminPassword": "ADMIN",
|
||||
"databasePassword": "ADMIN",
|
||||
"enable": true
|
||||
},
|
||||
"resticPassword": "PASS",
|
||||
"ssh": {
|
||||
"enable": true,
|
||||
"passwordAuthentication": true,
|
||||
"rootKeys": [
|
||||
"ssh-ed25519 KEY test@pc"
|
||||
]
|
||||
},
|
||||
"username": "tester",
|
||||
"gitea": {
|
||||
"enable": false
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true
|
||||
},
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"timezone": "Europe/Moscow",
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
"users": [
|
||||
]
|
||||
}
|
|
@ -0,0 +1,61 @@
|
|||
{
|
||||
"backblaze": {
|
||||
"accountId": "ID",
|
||||
"accountKey": "KEY",
|
||||
"bucket": "selfprivacy"
|
||||
},
|
||||
"api": {
|
||||
"token": "TEST_TOKEN",
|
||||
"enableSwagger": false
|
||||
},
|
||||
"bitwarden": {
|
||||
"enable": false
|
||||
},
|
||||
"cloudflare": {
|
||||
"apiKey": "TOKEN"
|
||||
},
|
||||
"databasePassword": "PASSWORD",
|
||||
"domain": "test.tld",
|
||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||
"hostname": "test-instance",
|
||||
"nextcloud": {
|
||||
"adminPassword": "ADMIN",
|
||||
"databasePassword": "ADMIN",
|
||||
"enable": true
|
||||
},
|
||||
"resticPassword": "PASS",
|
||||
"ssh": {
|
||||
"enable": true,
|
||||
"passwordAuthentication": true,
|
||||
"rootKeys": [
|
||||
"ssh-ed25519 KEY test@pc"
|
||||
]
|
||||
},
|
||||
"username": "tester",
|
||||
"gitea": {
|
||||
"enable": false
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true
|
||||
},
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"timezone": "Europe/Moscow",
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
"users": [
|
||||
{
|
||||
"username": "user1",
|
||||
"hashedPassword": "HASHED_PASSWORD_1",
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY user1@pc"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
{
|
||||
"backblaze": {
|
||||
"accountId": "ID",
|
||||
"accountKey": "KEY",
|
||||
"bucket": "selfprivacy"
|
||||
},
|
||||
"api": {
|
||||
"token": "TEST_TOKEN",
|
||||
"enableSwagger": false
|
||||
},
|
||||
"bitwarden": {
|
||||
"enable": false
|
||||
},
|
||||
"cloudflare": {
|
||||
"apiKey": "TOKEN"
|
||||
},
|
||||
"databasePassword": "PASSWORD",
|
||||
"domain": "test.tld",
|
||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||
"hostname": "test-instance",
|
||||
"nextcloud": {
|
||||
"adminPassword": "ADMIN",
|
||||
"databasePassword": "ADMIN",
|
||||
"enable": true
|
||||
},
|
||||
"resticPassword": "PASS",
|
||||
"ssh": {
|
||||
"enable": true,
|
||||
"passwordAuthentication": true,
|
||||
"rootKeys": [
|
||||
"ssh-ed25519 KEY test@pc"
|
||||
]
|
||||
},
|
||||
"username": "tester",
|
||||
"gitea": {
|
||||
"enable": false
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true
|
||||
},
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"timezone": "Europe/Moscow",
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
],
|
||||
"users": [
|
||||
{
|
||||
"username": "user1",
|
||||
"hashedPassword": "HASHED_PASSWORD_1",
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY user1@pc"
|
||||
]
|
||||
},
|
||||
{
|
||||
"username": "user2",
|
||||
"hashedPassword": "HASHED_PASSWORD_2",
|
||||
"sshKeys": [
|
||||
]
|
||||
},
|
||||
{
|
||||
"username": "user3",
|
||||
"hashedPassword": "HASHED_PASSWORD_3"
|
||||
}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
{
|
||||
"backblaze": {
|
||||
"accountId": "ID",
|
||||
"accountKey": "KEY",
|
||||
"bucket": "selfprivacy"
|
||||
},
|
||||
"api": {
|
||||
"token": "TEST_TOKEN",
|
||||
"enableSwagger": false
|
||||
},
|
||||
"bitwarden": {
|
||||
"enable": false
|
||||
},
|
||||
"cloudflare": {
|
||||
"apiKey": "TOKEN"
|
||||
},
|
||||
"databasePassword": "PASSWORD",
|
||||
"domain": "test.tld",
|
||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||
"hostname": "test-instance",
|
||||
"nextcloud": {
|
||||
"adminPassword": "ADMIN",
|
||||
"databasePassword": "ADMIN",
|
||||
"enable": true
|
||||
},
|
||||
"resticPassword": "PASS",
|
||||
"ssh": {
|
||||
"enable": true,
|
||||
"passwordAuthentication": true,
|
||||
"rootKeys": [
|
||||
"ssh-ed25519 KEY test@pc"
|
||||
]
|
||||
},
|
||||
"username": "tester",
|
||||
"gitea": {
|
||||
"enable": false
|
||||
},
|
||||
"ocserv": {
|
||||
"enable": true
|
||||
},
|
||||
"pleroma": {
|
||||
"enable": true
|
||||
},
|
||||
"autoUpgrade": {
|
||||
"enable": true,
|
||||
"allowReboot": true
|
||||
},
|
||||
"timezone": "Europe/Moscow",
|
||||
"sshKeys": [
|
||||
"ssh-rsa KEY test@pc"
|
||||
]
|
||||
}
|
Loading…
Reference in New Issue