#!/usr/bin/env python3 """Users management module""" # pylint: disable=too-few-public-methods import strawberry from selfprivacy_api.graphql import IsAuthenticated from selfprivacy_api.graphql.common_types.user import ( UserMutationReturn, get_user_by_username, ) from selfprivacy_api.utils import ( WriteUserData, validate_ssh_public_key, ) @strawberry.input class SshMutationInput: """Input type for ssh mutation""" username: str ssh_key: str @strawberry.type class SshMutations: """Mutations ssh""" @strawberry.mutation(permission_classes=[IsAuthenticated]) def create_ssh(self, ssh_input: SshMutationInput) -> UserMutationReturn: """Create a new ssh""" if not validate_ssh_public_key(ssh_input.ssh_key): return UserMutationReturn( success=False, message="Invalid key type. Only ssh-ed25519 and ssh-rsa are supported", code=400, user=get_user_by_username(ssh_input.username), ) with WriteUserData() as data: if ssh_input.username == data["username"]: if "sshKeys" not in data: data["sshKeys"] = [] # Return 409 if key already in array for key in data["sshKeys"]: if key == ssh_input.ssh_key: return UserMutationReturn( success=False, message="Key already exists", code=409, user=get_user_by_username(ssh_input.username), ) data["sshKeys"].append(ssh_input.ssh_key) return UserMutationReturn( success=True, message="New SSH key successfully written", code=201, user=get_user_by_username(ssh_input.username), ) if "users" not in data: data["users"] = [] for user in data["users"]: if user["username"] == ssh_input.username: if "sshKeys" not in user: user["sshKeys"] = [] # Return 409 if key already in array for key in user["sshKeys"]: if key == ssh_input.ssh_key: return UserMutationReturn( success=False, message="Key already exists", code=409, user=get_user_by_username(ssh_input.username), ) user["sshKeys"].append(ssh_input.ssh_key) return UserMutationReturn( success=True, message="New SSH key successfully written", code=201, user=get_user_by_username(ssh_input.username), ) return UserMutationReturn( success=False, message="User not found", code=404, user=None, ) @strawberry.mutation(permission_classes=[IsAuthenticated]) def delete_ssh(self, ssh_input: SshMutationInput) -> UserMutationReturn: """Delete ssh key from user""" with WriteUserData() as data: if ssh_input.username == "root": if "ssh" not in data: data["ssh"] = {} if "rootKeys" not in data["ssh"]: data["ssh"]["rootKeys"] = [] # Return 404 if key not in array for key in data["ssh"]["rootKeys"]: if key == ssh_input.ssh_key: data["ssh"]["rootKeys"].remove(key) return UserMutationReturn( success=True, message="SSH key deleted", code=200, user=get_user_by_username(ssh_input.username), ) return UserMutationReturn( success=False, message="Key not found", code=404, user=get_user_by_username(ssh_input.username), ) if ssh_input.username == data["username"]: if "sshKeys" not in data: data["sshKeys"] = [] # Return 404 if key not in array for key in data["sshKeys"]: if key == ssh_input.ssh_key: data["sshKeys"].remove(key) return UserMutationReturn( success=True, message="SSH key deleted", code=200, user=get_user_by_username(ssh_input.username), ) return UserMutationReturn( success=False, message="Key not found", code=404, user=get_user_by_username(ssh_input.username), ) if "users" not in data: data["users"] = [] for user in data["users"]: if user["username"] == ssh_input.username: if "sshKeys" not in user: user["sshKeys"] = [] # Return 404 if key not in array for key in user["sshKeys"]: if key == ssh_input.ssh_key: user["sshKeys"].remove(key) return UserMutationReturn( success=True, message="SSH key deleted", code=200, user=get_user_by_username(ssh_input.username), ) return UserMutationReturn( success=False, message="Key not found", code=404, user=get_user_by_username(ssh_input.username), ) return UserMutationReturn( success=False, message="User not found", code=404, user=None, )