#!/usr/bin/env python3 """Users management module""" # pylint: disable=too-few-public-methods import re import subprocess import typing import strawberry from selfprivacy_api.graphql import IsAuthenticated from selfprivacy_api.graphql.common_types.user import User, UserMutationReturn from selfprivacy_api.graphql.mutations.mutation_interface import ( MutationReturnInterface, ) from selfprivacy_api.utils import WriteUserData, ReadUserData, is_username_forbidden from selfprivacy_api.utils import hash_password @strawberry.input class UserMutationsInput: """Input type for user mutation""" username: str password: str @strawberry.type class UserMutations: """Mutations change user settings""" @strawberry.mutation(permission_classes=[IsAuthenticated]) def create_user(self, user: UserMutationsInput) -> UserMutationReturn: """Create a new user""" hashed_password = hash_password(user.password) # Check if username is forbidden if is_username_forbidden(user.username): return UserMutationReturn( success=False, message="Username is forbidden", code=409, user=None, ) # Check is username passes regex if not re.match(r"^[a-z_][a-z0-9_]+$", user.username): return UserMutationReturn( success=False, message="Username must be alphanumeric", code=400, user=None, ) # Check if username less than 32 characters if len(user.username) >= 32: return UserMutationReturn( success=False, message="Username must be less than 32 characters", code=400, user=None, ) with ReadUserData() as data: if "users" not in data: data["users"] = [] # Return 409 if user already exists if data["username"] == user.username: return UserMutationReturn( success=False, message="User already exists", code=409, user=None, ) for data_user in data["users"]: if data_user["username"] == user.username: return UserMutationReturn( success=False, message="User already exists", code=409, user=None, ) with WriteUserData() as data: data["users"].append( { "username": user.username, "hashedPassword": hashed_password, } ) return UserMutationReturn( success=True, message="User was successfully created!", code=201, user=User(user.username), ) @strawberry.mutation(permission_classes=[IsAuthenticated]) def delete_user(self, username: str) -> MutationReturnInterface: with WriteUserData() as data: if username == data["username"] or username == "root": return MutationReturnInterface( success=False, message="Cannot delete main or root user", code=400, ) # Return 404 if user does not exist for data_user in data["users"]: if data_user["username"] == username: data["users"].remove(data_user) break else: return MutationReturnInterface( success=False, message="User does not exist", code=404, ) return MutationReturnInterface( success=True, message="User was deleted", code=200, ) @strawberry.mutation(permission_classes=[IsAuthenticated]) def update_user(self, user: UserMutationsInput) -> UserMutationReturn: """Update user mutation""" hashed_password = hash_password(user.password) with WriteUserData() as data: ssh_keys = [] if user.username == data["username"]: data["hashedMasterPassword"] = hashed_password # Return 404 if user does not exist else: for data_user in data["users"]: if data_user["username"] == user.username: data_user["hashedPassword"] = hashed_password ssh_keys = data_user["sshKeys"] break else: return UserMutationReturn( success=False, message="User does not exist", code=404, user=None, ) return UserMutationReturn( success=True, message="User was successfully updated", code=200, user=User(user.username, ssh_keys), )