174 lines
5.9 KiB
Python
174 lines
5.9 KiB
Python
#!/usr/bin/env python3
|
|
"""Users management module"""
|
|
# pylint: disable=too-few-public-methods
|
|
import re
|
|
import subprocess
|
|
import typing
|
|
import strawberry
|
|
from selfprivacy_api.graphql import IsAuthenticated
|
|
from selfprivacy_api.graphql.common_types.user import User, UserMutationReturn
|
|
from selfprivacy_api.graphql.mutations.mutation_interface import (
|
|
MutationReturnInterface,
|
|
)
|
|
from selfprivacy_api.utils import WriteUserData, ReadUserData, is_username_forbidden
|
|
|
|
|
|
@strawberry.input
|
|
class UserMutationsInput:
|
|
"""Input type for user mutation"""
|
|
|
|
username: str
|
|
password: str
|
|
|
|
|
|
@strawberry.type
|
|
class UserMutations:
|
|
"""Mutations change user settings"""
|
|
|
|
@strawberry.mutation(permission_classes=[IsAuthenticated])
|
|
def create_user(self, settings: UserMutationsInput) -> UserMutationReturn:
|
|
"""Create a new user"""
|
|
hashing_command = ["mkpasswd", "-m", "sha-512", settings.password]
|
|
password_hash_process_descriptor = subprocess.Popen(
|
|
hashing_command,
|
|
shell=False,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
)
|
|
hashed_password = password_hash_process_descriptor.communicate()[0]
|
|
hashed_password = hashed_password.decode("ascii")
|
|
hashed_password = hashed_password.rstrip()
|
|
|
|
# Check if username is forbidden
|
|
if is_username_forbidden(settings.username):
|
|
return UserMutationReturn(
|
|
success=False,
|
|
message="Error: Username is forbidden",
|
|
code=409,
|
|
user=None,
|
|
)
|
|
|
|
# Check is username passes regex
|
|
if not re.match(r"^[a-z_][a-z0-9_]+$", settings.username):
|
|
return UserMutationReturn(
|
|
success=False,
|
|
message="Error: username must be alphanumeric",
|
|
code=400,
|
|
user=None,
|
|
)
|
|
|
|
# Check if username less than 32 characters
|
|
if len(settings.username) >= 32:
|
|
return UserMutationReturn(
|
|
success=False,
|
|
message="Error: username must be less than 32 characters",
|
|
code=400,
|
|
user=None,
|
|
)
|
|
|
|
with ReadUserData() as data:
|
|
if "users" not in data:
|
|
data["users"] = []
|
|
|
|
# Return 409 if user already exists
|
|
if data["username"] == settings.username:
|
|
return UserMutationReturn(
|
|
success=False,
|
|
message="Error: User already exists",
|
|
code=409,
|
|
user=None,
|
|
)
|
|
|
|
for user in data["users"]:
|
|
if user["username"] == settings.username:
|
|
return UserMutationReturn(
|
|
success=False,
|
|
message="Error: User already exists",
|
|
code=409,
|
|
user=None,
|
|
)
|
|
|
|
with WriteUserData() as data:
|
|
data["users"].append(
|
|
{
|
|
"username": settings.username,
|
|
"hashedPassword": hashed_password,
|
|
}
|
|
)
|
|
return UserMutationReturn(
|
|
success=False,
|
|
message="User was successfully created!",
|
|
code=201,
|
|
user=User(settings.username, None),
|
|
)
|
|
|
|
@strawberry.mutation(permission_classes=[IsAuthenticated])
|
|
def delete_user(self, username: str) -> MutationReturnInterface:
|
|
with WriteUserData() as data:
|
|
|
|
if username == data["username"] or username == "root":
|
|
return MutationReturnInterface(
|
|
success=False,
|
|
message="Error: Cannot delete main or root user",
|
|
code=400,
|
|
)
|
|
|
|
# Return 404 if user does not exist
|
|
for user in data["users"]:
|
|
if user["username"] == username:
|
|
data["users"].remove(user)
|
|
break
|
|
else:
|
|
return MutationReturnInterface(
|
|
success=False,
|
|
message="Error: User does not exist",
|
|
code=404,
|
|
)
|
|
|
|
return MutationReturnInterface(
|
|
success=True,
|
|
message="Nice frag - user deleted",
|
|
code=200,
|
|
)
|
|
|
|
@strawberry.mutation(permission_classes=[IsAuthenticated])
|
|
def update_user(self, settings: UserMutationsInput) -> UserMutationReturn:
|
|
"""Update user mutation"""
|
|
hashing_command = ["mkpasswd", "-m", "sha-512", settings.password]
|
|
password_hash_process_descriptor = subprocess.Popen(
|
|
hashing_command,
|
|
shell=False,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
)
|
|
hashed_password = password_hash_process_descriptor.communicate()[0]
|
|
hashed_password = hashed_password.decode("ascii")
|
|
hashed_password = hashed_password.rstrip()
|
|
|
|
with WriteUserData() as data:
|
|
ssh_keys = None
|
|
if settings.username == data["username"]:
|
|
data["hashedMasterPassword"] = hashed_password
|
|
|
|
# Return 404 if user does not exist
|
|
else:
|
|
for user in data["users"]:
|
|
if user["username"] == settings.username:
|
|
user["hashedPassword"] = hashed_password
|
|
ssh_keys = user["sshKeys"]
|
|
break
|
|
else:
|
|
return UserMutationReturn(
|
|
success=False,
|
|
message="Error: User does not exist",
|
|
code=404,
|
|
user=None,
|
|
)
|
|
|
|
return UserMutationReturn(
|
|
success=True,
|
|
message="User was successfully updated",
|
|
code=200,
|
|
user=User(settings.username, ssh_keys),
|
|
)
|