From 4de85acf60586df83c6ee70ec88864618884dbb6 Mon Sep 17 00:00:00 2001 From: def Date: Sat, 30 Jul 2022 23:02:19 +0200 Subject: [PATCH] big refactoring --- selfprivacy_api/graphql/common_types/user.py | 33 ++-- .../graphql/mutations/ssh_mutations.py | 154 ++---------------- .../graphql/mutations/ssh_utils.py | 74 +++++++++ .../graphql/mutations/users_mutations.py | 139 ++-------------- .../graphql/mutations/users_utils.py | 93 +++++++++++ selfprivacy_api/utils/__init__.py | 14 ++ tests/test_graphql/test_ssh.py | 16 +- tests/test_graphql/test_users.py | 8 +- 8 files changed, 235 insertions(+), 296 deletions(-) create mode 100644 selfprivacy_api/graphql/mutations/ssh_utils.py create mode 100644 selfprivacy_api/graphql/mutations/users_utils.py diff --git a/selfprivacy_api/graphql/common_types/user.py b/selfprivacy_api/graphql/common_types/user.py index f401c82..0e8d9cb 100644 --- a/selfprivacy_api/graphql/common_types/user.py +++ b/selfprivacy_api/graphql/common_types/user.py @@ -5,7 +5,7 @@ from selfprivacy_api.graphql.mutations.mutation_interface import ( ) from enum import Enum -from selfprivacy_api.utils import ReadUserData +from selfprivacy_api.utils import ReadUserData, ensure_ssh_and_users_fields_exist @strawberry.enum @@ -34,40 +34,29 @@ class UserMutationReturn(MutationReturnInterface): def get_user_by_username(username: str) -> typing.Optional[User]: with ReadUserData() as data: + ensure_ssh_and_users_fields_exist(data) if username == "root": - if "ssh" not in data: - data["ssh"] = [] - - elif data["ssh"].get("rootKeys") is None: - data["ssh"]["rootKeys"] = [] - return User( user_type=UserType.ROOT, username="root", ssh_keys=data["ssh"]["rootKeys"], ) - elif username == data["username"]: - if "sshKeys" not in data: - data["sshKeys"] = [] + if username == data["username"]: return User( user_type=UserType.PRIMARY, username=username, ssh_keys=data["sshKeys"], ) - else: - if "users" not in data: - data["users"] = [] - for user in data["users"]: - if user["username"] == username: - if "sshKeys" not in user: - user["sshKeys"] = [] + for user in data["users"]: + if user["username"] == username: + + return User( + user_type=UserType.NORMAL, + username=username, + ssh_keys=user["sshKeys"], + ) - return User( - user_type=UserType.NORMAL, - username=username, - ssh_keys=user["sshKeys"], - ) return None diff --git a/selfprivacy_api/graphql/mutations/ssh_mutations.py b/selfprivacy_api/graphql/mutations/ssh_mutations.py index d76fc2b..ec4bcf3 100644 --- a/selfprivacy_api/graphql/mutations/ssh_mutations.py +++ b/selfprivacy_api/graphql/mutations/ssh_mutations.py @@ -4,17 +4,16 @@ import strawberry +from selfprivacy_api.graphql.mutations.ssh_utils import ( + create_ssh_key, + delete_ssh_key, +) from selfprivacy_api.graphql import IsAuthenticated from selfprivacy_api.graphql.common_types.user import ( UserMutationReturn, get_user_by_username, ) -from selfprivacy_api.utils import ( - WriteUserData, - validate_ssh_public_key, -) - @strawberry.input class SshMutationInput: @@ -32,145 +31,24 @@ class SshMutations: def create_ssh(self, ssh_input: SshMutationInput) -> UserMutationReturn: """Create a new ssh""" - if not validate_ssh_public_key(ssh_input.ssh_key): - return UserMutationReturn( - success=False, - message="Invalid key type. Only ssh-ed25519 and ssh-rsa are supported", - code=400, - user=get_user_by_username(ssh_input.username), - ) + success, message, code = create_ssh_key(ssh_input.username, ssh_input.ssh_key) - with WriteUserData() as data: - - if ssh_input.username == data["username"]: - if "sshKeys" not in data: - data["sshKeys"] = [] - # Return 409 if key already in array - for key in data["sshKeys"]: - if key == ssh_input.ssh_key: - - return UserMutationReturn( - success=False, - message="Key already exists", - code=409, - user=get_user_by_username(ssh_input.username), - ) - data["sshKeys"].append(ssh_input.ssh_key) - - return UserMutationReturn( - success=True, - message="New SSH key successfully written", - code=201, - user=get_user_by_username(ssh_input.username), - ) - - if "users" not in data: - data["users"] = [] - for user in data["users"]: - if user["username"] == ssh_input.username: - if "sshKeys" not in user: - user["sshKeys"] = [] - # Return 409 if key already in array - for key in user["sshKeys"]: - if key == ssh_input.ssh_key: - - return UserMutationReturn( - success=False, - message="Key already exists", - code=409, - user=get_user_by_username(ssh_input.username), - ) - - user["sshKeys"].append(ssh_input.ssh_key) - - return UserMutationReturn( - success=True, - message="New SSH key successfully written", - code=201, - user=get_user_by_username(ssh_input.username), - ) - - return UserMutationReturn( - success=False, - message="User not found", - code=404, - user=None, - ) + return UserMutationReturn( + success=success, + message=message, + code=code, + user=get_user_by_username(ssh_input.username), + ) @strawberry.mutation(permission_classes=[IsAuthenticated]) def delete_ssh(self, ssh_input: SshMutationInput) -> UserMutationReturn: """Delete ssh key from user""" - with WriteUserData() as data: - if ssh_input.username == "root": - if "ssh" not in data: - data["ssh"] = {} - if "rootKeys" not in data["ssh"]: - data["ssh"]["rootKeys"] = [] - # Return 404 if key not in array - for key in data["ssh"]["rootKeys"]: - if key == ssh_input.ssh_key: - data["ssh"]["rootKeys"].remove(key) - - return UserMutationReturn( - success=True, - message="SSH key deleted", - code=200, - user=get_user_by_username(ssh_input.username), - ) - return UserMutationReturn( - success=False, - message="Key not found", - code=404, - user=get_user_by_username(ssh_input.username), - ) - if ssh_input.username == data["username"]: - if "sshKeys" not in data: - data["sshKeys"] = [] - # Return 404 if key not in array - for key in data["sshKeys"]: - if key == ssh_input.ssh_key: - data["sshKeys"].remove(key) - return UserMutationReturn( - success=True, - message="SSH key deleted", - code=200, - user=get_user_by_username(ssh_input.username), - ) - - return UserMutationReturn( - success=False, - message="Key not found", - code=404, - user=get_user_by_username(ssh_input.username), - ) - if "users" not in data: - data["users"] = [] - for user in data["users"]: - if user["username"] == ssh_input.username: - if "sshKeys" not in user: - user["sshKeys"] = [] - # Return 404 if key not in array - for key in user["sshKeys"]: - if key == ssh_input.ssh_key: - user["sshKeys"].remove(key) - return UserMutationReturn( - success=True, - message="SSH key deleted", - code=200, - user=get_user_by_username(ssh_input.username), - ) - - return UserMutationReturn( - success=False, - message="Key not found", - code=404, - user=get_user_by_username(ssh_input.username), - ) + success, message, code = delete_ssh_key(ssh_input.username, ssh_input.ssh_key) return UserMutationReturn( - success=False, - message="User not found", - code=404, - user=None, + success=success, + message=message, + code=code, + user=get_user_by_username(ssh_input.username), ) diff --git a/selfprivacy_api/graphql/mutations/ssh_utils.py b/selfprivacy_api/graphql/mutations/ssh_utils.py new file mode 100644 index 0000000..fb346ed --- /dev/null +++ b/selfprivacy_api/graphql/mutations/ssh_utils.py @@ -0,0 +1,74 @@ +from selfprivacy_api.utils import ( + WriteUserData, + ensure_ssh_and_users_fields_exist, + validate_ssh_public_key, +) + + +def create_ssh_key(username, ssh_key): + """Create a new ssh key""" + + if not validate_ssh_public_key(ssh_key): + return ( + False, + "Invalid key type. Only ssh-ed25519 and ssh-rsa are supported", + 400, + ) + + with WriteUserData() as data: + ensure_ssh_and_users_fields_exist(data) + + if username == data["username"]: + if ssh_key in data["sshKeys"]: + return False, "Key already exists", 409 + + data["sshKeys"].append(ssh_key) + return True, "New SSH key successfully written", 201 + + if username == "root": + if ssh_key in data["ssh"]["rootKeys"]: + return False, "Key already exists", 409 + + data["ssh"]["rootKeys"].append(ssh_key) + return True, "New SSH key successfully written", 201 + + for user in data["users"]: + if user["username"] == username: + if ssh_key in user["sshKeys"]: + return False, "Key already exists", 409 + + user["sshKeys"].append(ssh_key) + return True, "New SSH key successfully written", 201 + + return False, "User not found", 404 + + +def delete_ssh_key(username, ssh_key): + """Delete a ssh key""" + + with WriteUserData() as data: + ensure_ssh_and_users_fields_exist(data) + + if username == "root": + if ssh_key in data["ssh"]["rootKeys"]: + data["ssh"]["rootKeys"].remove(ssh_key) + return True, "SSH key deleted", 200 + + return False, "Key not found", 404 + + if username == data["username"]: + if ssh_key in data["sshKeys"]: + data["sshKeys"].remove(ssh_key) + return True, "SSH key deleted", 200 + + return False, "Key not found", 404 + + for user in data["users"]: + if user["username"] == username: + if ssh_key in user["sshKeys"]: + user["sshKeys"].remove(ssh_key) + return True, "SSH key deleted", 200 + + return False, "Key not found", 404 + + return False, "User not found", 404 diff --git a/selfprivacy_api/graphql/mutations/users_mutations.py b/selfprivacy_api/graphql/mutations/users_mutations.py index 75ed0c3..6f7e55f 100644 --- a/selfprivacy_api/graphql/mutations/users_mutations.py +++ b/selfprivacy_api/graphql/mutations/users_mutations.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 """Users management module""" # pylint: disable=too-few-public-methods -import re import strawberry from selfprivacy_api.graphql import IsAuthenticated from selfprivacy_api.graphql.common_types.user import ( @@ -11,12 +10,11 @@ from selfprivacy_api.graphql.common_types.user import ( from selfprivacy_api.graphql.mutations.mutation_interface import ( GenericMutationReturn, ) -from selfprivacy_api.utils import ( - WriteUserData, - ReadUserData, - is_username_forbidden, +from selfprivacy_api.graphql.mutations.users_utils import ( + create_user_util, + delete_user_util, + update_user_util, ) -from selfprivacy_api.utils import hash_password @strawberry.input @@ -33,140 +31,35 @@ class UserMutations: @strawberry.mutation(permission_classes=[IsAuthenticated]) def create_user(self, user: UserMutationInput) -> UserMutationReturn: - """Create a new user""" - # Check if password is null or none - if user.password is None or user.password == "": - return UserMutationReturn( - success=False, - message="Password is none or null", - code=400, - user=None, - ) + success, message, code = create_user_util(user.username, user.password) - hashed_password = hash_password(user.password) - - # Check if username is forbidden - if is_username_forbidden(user.username): - return UserMutationReturn( - success=False, - message="Username is forbidden", - code=409, - user=None, - ) - - # Check is username passes regex - if not re.match(r"^[a-z_][a-z0-9_]+$", user.username): - return UserMutationReturn( - success=False, - message="Username must be alphanumeric", - code=400, - user=None, - ) - - # Check if username less than 32 characters - if len(user.username) >= 32: - return UserMutationReturn( - success=False, - message="Username must be less than 32 characters", - code=400, - user=None, - ) - - with ReadUserData() as data: - if "users" not in data: - data["users"] = [] - - # Return 409 if user already exists - if data["username"] == user.username: - return UserMutationReturn( - success=False, - message="User already exists", - code=409, - user=None, - ) - - for data_user in data["users"]: - if data_user["username"] == user.username: - return UserMutationReturn( - success=False, - message="User already exists", - code=409, - user=None, - ) - - with WriteUserData() as data: - if "users" not in data: - data["users"] = [] - - data["users"].append( - { - "username": user.username, - "hashedPassword": hashed_password, - } - ) return UserMutationReturn( - success=True, - message="User was successfully created!", - code=201, + success=success, + message=message, + code=code, user=get_user_by_username(user.username), ) @strawberry.mutation(permission_classes=[IsAuthenticated]) def delete_user(self, username: str) -> GenericMutationReturn: - with WriteUserData() as data: - - if username == data["username"] or username == "root": - return GenericMutationReturn( - success=False, - message="Cannot delete main or root user", - code=400, - ) - - # Return 404 if user does not exist - for data_user in data["users"]: - if data_user["username"] == username: - data["users"].remove(data_user) - break - else: - return GenericMutationReturn( - success=False, - message="User does not exist", - code=404, - ) + success, message, code = delete_user_util(username) return GenericMutationReturn( - success=True, - message="User was deleted", - code=200, + success=success, + message=message, + code=code, ) @strawberry.mutation(permission_classes=[IsAuthenticated]) def update_user(self, user: UserMutationInput) -> UserMutationReturn: """Update user mutation""" - hashed_password = hash_password(user.password) - with WriteUserData() as data: - if user.username == data["username"]: - data["hashedMasterPassword"] = hashed_password - - # Return 404 if user does not exist - else: - for data_user in data["users"]: - if data_user["username"] == user.username: - data_user["hashedPassword"] = hashed_password - break - else: - return UserMutationReturn( - success=False, - message="User does not exist", - code=404, - user=None, - ) + success, message, code = update_user_util(user.username, user.password) return UserMutationReturn( - success=True, - message="User was successfully updated", - code=200, + success=success, + message=message, + code=code, user=get_user_by_username(user.username), ) diff --git a/selfprivacy_api/graphql/mutations/users_utils.py b/selfprivacy_api/graphql/mutations/users_utils.py new file mode 100644 index 0000000..4709990 --- /dev/null +++ b/selfprivacy_api/graphql/mutations/users_utils.py @@ -0,0 +1,93 @@ +import re +from selfprivacy_api.utils import ( + WriteUserData, + ReadUserData, + ensure_ssh_and_users_fields_exist, + is_username_forbidden, +) +from selfprivacy_api.utils import hash_password + + +def create_user_util(username, password): + """Create a new user""" + + # Check if password is null or none + if password == "": + return False, "Password is null", 400 + + # Check if username is forbidden + if is_username_forbidden(username): + return False, "Username is forbidden", 409 + + # Check is username passes regex + if not re.match(r"^[a-z_][a-z0-9_]+$", username): + return False, "Username must be alphanumeric", 400 + + # Check if username less than 32 characters + if len(username) >= 32: + return False, "Username must be less than 32 characters", 400 + + with ReadUserData() as data: + ensure_ssh_and_users_fields_exist(data) + + # Return 409 if user already exists + if data["username"] == username: + return False, "User already exists", 409 + + for data_user in data["users"]: + if data_user["username"] == username: + return False, "User already exists", 409 + + hashed_password = hash_password(password) + + with WriteUserData() as data: + ensure_ssh_and_users_fields_exist(data) + + data["users"].append( + { + "username": username, + "hashedPassword": hashed_password, + } + ) + + return True, "User was successfully created!", 201 + + +def delete_user_util(username): + with WriteUserData() as data: + + if username == data["username"] or username == "root": + return False, "Cannot delete main or root user", 400 + + # Return 404 if user does not exist + for data_user in data["users"]: + if data_user["username"] == username: + data["users"].remove(data_user) + break + else: + return False, "User does not exist", 404 + + return True, "User was deleted", 200 + + +def update_user_util(username, password): + # Check if password is null or none + if password == "": + return False, "Password is null", 400 + + hashed_password = hash_password(password) + + with WriteUserData() as data: + if username == data["username"]: + data["hashedMasterPassword"] = hashed_password + + # Return 404 if user does not exist + else: + for data_user in data["users"]: + if data_user["username"] == username: + data_user["hashedPassword"] = hashed_password + break + else: + return False, "User does not exist", 404 + + return True, "User was successfully updated", 200 diff --git a/selfprivacy_api/utils/__init__.py b/selfprivacy_api/utils/__init__.py index cc93622..266400c 100644 --- a/selfprivacy_api/utils/__init__.py +++ b/selfprivacy_api/utils/__init__.py @@ -173,3 +173,17 @@ def hash_password(password): hashed_password = hashed_password.decode("ascii") hashed_password = hashed_password.rstrip() return hashed_password + + +def ensure_ssh_and_users_fields_exist(data): + if "ssh" not in data: + data["ssh"] = [] + + elif data["ssh"].get("rootKeys") is None: + data["ssh"]["rootKeys"] = [] + + if "sshKeys" not in data: + data["sshKeys"] = [] + + if "users" not in data: + data["users"] = [] diff --git a/tests/test_graphql/test_ssh.py b/tests/test_graphql/test_ssh.py index 238b58d..0498c62 100644 --- a/tests/test_graphql/test_ssh.py +++ b/tests/test_graphql/test_ssh.py @@ -92,13 +92,13 @@ def test_graphql_add_ssh(authorized_client, some_users, mock_subprocess_popen): assert response.status_code == 200 assert response.json.get("data") is not None - assert response.json["data"]["createSsh"]["code"] == 200 + assert response.json["data"]["createSsh"]["code"] == 201 assert response.json["data"]["createSsh"]["message"] is not None assert response.json["data"]["createSsh"]["success"] is True assert response.json["data"]["createSsh"]["user"]["username"] == "user1" assert response.json["data"]["createSsh"]["user"]["sshKeys"] == [ - "ssh-rsa KEY tester@pc", + "ssh-rsa KEY user1@pc", "ssh-rsa KEY test_key@pc", ] @@ -119,7 +119,7 @@ def test_graphql_add_root_ssh(authorized_client, some_users, mock_subprocess_pop assert response.status_code == 200 assert response.json.get("data") is not None - assert response.json["data"]["createSsh"]["code"] == 200 + assert response.json["data"]["createSsh"]["code"] == 201 assert response.json["data"]["createSsh"]["message"] is not None assert response.json["data"]["createSsh"]["success"] is True @@ -146,13 +146,13 @@ def test_graphql_add_main_ssh(authorized_client, some_users, mock_subprocess_pop assert response.status_code == 200 assert response.json.get("data") is not None - assert response.json["data"]["createSsh"]["code"] == 200 + assert response.json["data"]["createSsh"]["code"] == 201 assert response.json["data"]["createSsh"]["message"] is not None assert response.json["data"]["createSsh"]["success"] is True assert response.json["data"]["createSsh"]["user"]["username"] == "tester" assert response.json["data"]["createSsh"]["user"]["sshKeys"] == [ - "ssh-rsa KEY tester@pc", + "ssh-rsa KEY test@pc", "ssh-rsa KEY test_key@pc", ] @@ -252,9 +252,7 @@ def test_graphql_dell_ssh(authorized_client, some_users, mock_subprocess_popen): assert response.json["data"]["deleteSsh"]["success"] is True assert response.json["data"]["deleteSsh"]["user"]["username"] == "user1" - assert response.json["data"]["deleteSsh"]["user"]["sshKeys"] == [ - "ssh-rsa KEY user1@pc" - ] + assert response.json["data"]["deleteSsh"]["user"]["sshKeys"] == [] def test_graphql_dell_root_ssh(authorized_client, some_users, mock_subprocess_popen): @@ -289,7 +287,7 @@ def test_graphql_dell_main_ssh(authorized_client, some_users, mock_subprocess_po "variables": { "sshInput": { "username": "tester", - "sshKey": "ssh-rsa KEY tester@pc", + "sshKey": "ssh-rsa KEY test@pc", }, }, }, diff --git a/tests/test_graphql/test_users.py b/tests/test_graphql/test_users.py index 0c2ad94..18998db 100644 --- a/tests/test_graphql/test_users.py +++ b/tests/test_graphql/test_users.py @@ -8,7 +8,6 @@ from tests.common import ( ) invalid_usernames = [ - "root", "messagebus", "postfix", "polkituser", @@ -468,7 +467,8 @@ def test_graphql_add_existing_user(authorized_client, one_user, mock_subprocess_ assert response.json["data"]["createUser"]["code"] == 409 assert response.json["data"]["createUser"]["success"] is False - assert response.json["data"]["createUser"]["user"] is None + assert response.json["data"]["createUser"]["user"]["username"] == "user1" + assert response.json["data"]["createUser"]["user"]["sshKeys"][0] == "ssh-rsa KEY user1@pc" def test_graphql_add_main_user(authorized_client, one_user, mock_subprocess_popen): @@ -491,8 +491,8 @@ def test_graphql_add_main_user(authorized_client, one_user, mock_subprocess_pope assert response.json["data"]["createUser"]["code"] == 409 assert response.json["data"]["createUser"]["success"] is False - assert response.json["data"]["createUser"]["user"] is None - + assert response.json["data"]["createUser"]["user"]["username"] == "tester" + assert response.json["data"]["createUser"]["user"]["sshKeys"][0] == "ssh-rsa KEY test@pc" def test_graphql_add_long_username(authorized_client, one_user, mock_subprocess_popen): response = authorized_client.post(