From 501c4ce43c4e61b1daa0d541cb535ff7453cf29b Mon Sep 17 00:00:00 2001 From: def Date: Wed, 20 Jul 2022 23:23:59 +0200 Subject: [PATCH] add user and ssh management functions --- selfprivacy_api/graphql/common_types/user.py | 21 ++ .../graphql/mutations/ssh_mutations.py | 186 ++++++++++++++++++ .../graphql/mutations/users_mutations.py | 173 ++++++++++++++++ selfprivacy_api/graphql/queries/users.py | 42 ++++ 4 files changed, 422 insertions(+) create mode 100644 selfprivacy_api/graphql/common_types/user.py create mode 100644 selfprivacy_api/graphql/mutations/ssh_mutations.py create mode 100644 selfprivacy_api/graphql/mutations/users_mutations.py create mode 100644 selfprivacy_api/graphql/queries/users.py diff --git a/selfprivacy_api/graphql/common_types/user.py b/selfprivacy_api/graphql/common_types/user.py new file mode 100644 index 0000000..0fcce46 --- /dev/null +++ b/selfprivacy_api/graphql/common_types/user.py @@ -0,0 +1,21 @@ +import typing +import strawberry +from selfprivacy_api.graphql.mutations.mutation_interface import ( + MutationReturnInterface, +) + + +@strawberry.type +class User: + """Users management""" + + username: str + # userHomeFolderspace: UserHomeFolderUsage + sshKeys: typing.Optional[typing.List[str]] = None + + +@strawberry.type +class UserMutationReturn(MutationReturnInterface): + """Return type for user mutation""" + + user: typing.Optional[User] diff --git a/selfprivacy_api/graphql/mutations/ssh_mutations.py b/selfprivacy_api/graphql/mutations/ssh_mutations.py new file mode 100644 index 0000000..72b02ef --- /dev/null +++ b/selfprivacy_api/graphql/mutations/ssh_mutations.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +"""Users management module""" +# pylint: disable=too-few-public-methods + +import typing +import strawberry +from selfprivacy_api.graphql import IsAuthenticated +from selfprivacy_api.graphql.common_types.user import User, UserMutationReturn +from selfprivacy_api.graphql.mutations.mutation_interface import ( + MutationReturnInterface, +) +from selfprivacy_api.utils import ( + WriteUserData, + ReadUserData, + is_username_forbidden, + validate_ssh_public_key, +) + + +@strawberry.input +class SshMutationsInput: + """Input type for ssh mutation""" + + username: str + sshKey: str + + +@strawberry.type +class UserMutations: + """Mutations ssh""" + + @strawberry.mutation(permission_classes=[IsAuthenticated]) + def create_ssh(self, settings: SshMutationsInput) -> UserMutationReturn: + """Create a new ssh""" + with ReadUserData() as data: + if settings.username == "root": + return UserMutationReturn( + success=False, + message="Error: Use /ssh/key/send to add root keys", + code=400, + user=User("root", data["ssh"]["rootKeys"]), + ) + + if not validate_ssh_public_key(settings.sshKey): + return UserMutationReturn( + success=False, + message="Error: Invalid key type. Only ssh-ed25519 and ssh-rsa are supported", + code=400, + user=User("root", data["ssh"]["rootKeys"]), + ) + + with WriteUserData() as data: + if settings.username == data["username"]: + if "sshKeys" not in data: + data["sshKeys"] = [] + # Return 409 if key already in array + for key in data["sshKeys"]: + if key == settings.sshKey: + + return UserMutationReturn( + success=False, + message="Error: Key already exists", + code=409, + user=User(data["username"], data["sshKeys"]), + ) + data["sshKeys"].append(settings.sshKey) + + return UserMutationReturn( + success=True, + message="New SSH key successfully written", + code=201, + user=User(data["username"], data["sshKeys"]), + ) + + if "users" not in data: + data["users"] = [] + for user in data["users"]: + if user["username"] == settings.username: + if "sshKeys" not in user: + user["sshKeys"] = [] + # Return 409 if key already in array + for key in user["sshKeys"]: + if key == settings.sshKey: + + return UserMutationReturn( + success=False, + message="Error: Key already exists", + code=409, + user=User(user["username"], user["sshKeys"]), + ) + + user["sshKeys"].append(settings.sshKey) + + return UserMutationReturn( + success=True, + message="New SSH key successfully written", + code=201, + user=User(user["username"], user["sshKeys"]), + ) + + return UserMutationReturn( + success=False, + message="Error: User not found", + code=404, + user=None, + ) + + @strawberry.mutation(permission_classes=[IsAuthenticated]) + def delete_ssh(self, settings: SshMutationsInput) -> UserMutationReturn: + """Delete ssh""" + + with WriteUserData() as data: + if settings.username == "root": + if "ssh" not in data: + data["ssh"] = {} + if "rootKeys" not in data["ssh"]: + data["ssh"]["rootKeys"] = [] + # Return 404 if key not in array + for key in data["ssh"]["rootKeys"]: + if key == settings.sshKey: + data["ssh"]["rootKeys"].remove(key) + # If rootKeys became zero length, delete it + if len(data["ssh"]["rootKeys"]) == 0: + del data["ssh"]["rootKeys"] + return UserMutationReturn( + success=True, + message="SSH key deleted", + code=200, + user=User("root", data["ssh"]["rootKeys"]), + ) + return UserMutationReturn( + success=False, + message="Error: Key not found", + code=404, + user=User("root", data["ssh"]["rootKeys"]), + ) + if settings.username == data["username"]: + if "sshKeys" not in data: + data["sshKeys"] = [] + # Return 404 if key not in array + for key in data["sshKeys"]: + if key == settings.sshKey: + data["sshKeys"].remove(key) + return UserMutationReturn( + success=True, + message="SSH key deleted", + code=200, + user=User("root", data["ssh"]["rootKeys"]), + ) + + return UserMutationReturn( + success=False, + message="Error: Key not found", + code=404, + user=User(data["username"], data["sshKeys"]), + ) + if "users" not in data: + data["users"] = [] + for user in data["users"]: + if user["username"] == settings.username: + if "sshKeys" not in user: + user["sshKeys"] = [] + # Return 404 if key not in array + for key in user["sshKeys"]: + if key == settings.sshKey: + user["sshKeys"].remove(key) + return UserMutationReturn( + success=True, + message="SSH key deleted", + code=200, + user=User(settings.username, user["sshKeys"]), + ) + + return UserMutationReturn( + success=False, + message="Error: Key not found", + code=404, + user=User(settings.username, user["sshKeys"]), + ) + + return UserMutationReturn( + success=False, + message="Error: User not found", + code=404, + user=None, + ) diff --git a/selfprivacy_api/graphql/mutations/users_mutations.py b/selfprivacy_api/graphql/mutations/users_mutations.py new file mode 100644 index 0000000..ab91a11 --- /dev/null +++ b/selfprivacy_api/graphql/mutations/users_mutations.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python3 +"""Users management module""" +# pylint: disable=too-few-public-methods +import re +import subprocess +import typing +import strawberry +from selfprivacy_api.graphql import IsAuthenticated +from selfprivacy_api.graphql.common_types.user import User, UserMutationReturn +from selfprivacy_api.graphql.mutations.mutation_interface import ( + MutationReturnInterface, +) +from selfprivacy_api.utils import WriteUserData, ReadUserData, is_username_forbidden + + +@strawberry.input +class UserMutationsInput: + """Input type for user mutation""" + + username: str + password: str + + +@strawberry.type +class UserMutations: + """Mutations change user settings""" + + @strawberry.mutation(permission_classes=[IsAuthenticated]) + def create_user(self, settings: UserMutationsInput) -> UserMutationReturn: + """Create a new user""" + hashing_command = ["mkpasswd", "-m", "sha-512", settings.password] + password_hash_process_descriptor = subprocess.Popen( + hashing_command, + shell=False, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + hashed_password = password_hash_process_descriptor.communicate()[0] + hashed_password = hashed_password.decode("ascii") + hashed_password = hashed_password.rstrip() + + # Check if username is forbidden + if is_username_forbidden(settings.username): + return UserMutationReturn( + success=False, + message="Error: Username is forbidden", + code=409, + user=None, + ) + + # Check is username passes regex + if not re.match(r"^[a-z_][a-z0-9_]+$", settings.username): + return UserMutationReturn( + success=False, + message="Error: username must be alphanumeric", + code=400, + user=None, + ) + + # Check if username less than 32 characters + if len(settings.username) >= 32: + return UserMutationReturn( + success=False, + message="Error: username must be less than 32 characters", + code=400, + user=None, + ) + + with ReadUserData() as data: + if "users" not in data: + data["users"] = [] + + # Return 409 if user already exists + if data["username"] == settings.username: + return UserMutationReturn( + success=False, + message="Error: User already exists", + code=409, + user=None, + ) + + for user in data["users"]: + if user["username"] == settings.username: + return UserMutationReturn( + success=False, + message="Error: User already exists", + code=409, + user=None, + ) + + with WriteUserData() as data: + data["users"].append( + { + "username": settings.username, + "hashedPassword": hashed_password, + } + ) + return UserMutationReturn( + success=False, + message="User was successfully created!", + code=201, + user=User(settings.username, None), + ) + + @strawberry.mutation(permission_classes=[IsAuthenticated]) + def delete_user(self, username: str) -> MutationReturnInterface: + with WriteUserData() as data: + + if username == data["username"] or username == "root": + return MutationReturnInterface( + success=False, + message="Error: Cannot delete main or root user", + code=400, + ) + + # Return 404 if user does not exist + for user in data["users"]: + if user["username"] == username: + data["users"].remove(user) + break + else: + return MutationReturnInterface( + success=False, + message="Error: User does not exist", + code=404, + ) + + return MutationReturnInterface( + success=True, + message="Nice frag - user deleted", + code=200, + ) + + @strawberry.mutation(permission_classes=[IsAuthenticated]) + def update_user(self, settings: UserMutationsInput) -> UserMutationReturn: + """Update user mutation""" + hashing_command = ["mkpasswd", "-m", "sha-512", settings.password] + password_hash_process_descriptor = subprocess.Popen( + hashing_command, + shell=False, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + hashed_password = password_hash_process_descriptor.communicate()[0] + hashed_password = hashed_password.decode("ascii") + hashed_password = hashed_password.rstrip() + + with WriteUserData() as data: + ssh_keys = None + if settings.username == data["username"]: + data["hashedMasterPassword"] = hashed_password + + # Return 404 if user does not exist + else: + for user in data["users"]: + if user["username"] == settings.username: + user["hashedPassword"] = hashed_password + ssh_keys = user["sshKeys"] + break + else: + return UserMutationReturn( + success=False, + message="Error: User does not exist", + code=404, + user=None, + ) + + return UserMutationReturn( + success=True, + message="User was successfully updated", + code=200, + user=User(settings.username, ssh_keys), + ) diff --git a/selfprivacy_api/graphql/queries/users.py b/selfprivacy_api/graphql/queries/users.py new file mode 100644 index 0000000..8fab60f --- /dev/null +++ b/selfprivacy_api/graphql/queries/users.py @@ -0,0 +1,42 @@ +"""Users""" +# pylint: disable=too-few-public-methods +import typing +import strawberry + +from selfprivacy_api.graphql.common_types.user import User +from selfprivacy_api.utils import ReadUserData + + +def get_users() -> typing.List[User]: + """Get users""" + user_list = [] + with ReadUserData() as data: + for user in data["users"]: + user_list.append(User(**user)) + + user_list.append(User(data["username"], data["sshKeys"])) + + return user_list + + +@strawberry.type +class Users: + @strawberry.field + def get_user(self, username: str) -> typing.Optional[User]: + """Get user""" + user = None + with ReadUserData() as data: + if username == data["username"]: + user = User(data["username"], data["sshKeys"]) + + elif username == "root": + user = User("root", data["ssh"]["rootKeys"]) + + else: + for user in data["users"]: + if user["username"] == username: + user = User(data["username"], data["sshKeys"]) + + return user + + all_users: typing.List[User] = strawberry.field(resolver=get_users)