fix user managment

pull/12/head
def 2022-07-22 12:33:32 +02:00
parent 501c4ce43c
commit d3c829fd7f
5 changed files with 96 additions and 101 deletions

View File

@ -3,15 +3,22 @@ import strawberry
from selfprivacy_api.graphql.mutations.mutation_interface import ( from selfprivacy_api.graphql.mutations.mutation_interface import (
MutationReturnInterface, MutationReturnInterface,
) )
from enum import Enum
@strawberry.enum
class UserType(Enum):
NORMAL = "NORMAL"
PRIMARY = "PRIMARY"
ROOT = "ROOT"
@strawberry.type @strawberry.type
class User: class User:
"""Users management""" """Users management"""
user_type: UserType
username: str username: str
# userHomeFolderspace: UserHomeFolderUsage # userHomeFolderspace: UserHomeFolderUsage
sshKeys: typing.Optional[typing.List[str]] = None ssh_keys: typing.List[str] = []
@strawberry.type @strawberry.type

View File

@ -2,17 +2,14 @@
"""Users management module""" """Users management module"""
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
import typing
import strawberry import strawberry
from selfprivacy_api.graphql import IsAuthenticated from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.graphql.common_types.user import User, UserMutationReturn from selfprivacy_api.graphql.common_types.user import User, UserMutationReturn
from selfprivacy_api.graphql.mutations.mutation_interface import (
MutationReturnInterface,
)
from selfprivacy_api.utils import ( from selfprivacy_api.utils import (
WriteUserData, WriteUserData,
ReadUserData, ReadUserData,
is_username_forbidden,
validate_ssh_public_key, validate_ssh_public_key,
) )
@ -30,40 +27,32 @@ class UserMutations:
"""Mutations ssh""" """Mutations ssh"""
@strawberry.mutation(permission_classes=[IsAuthenticated]) @strawberry.mutation(permission_classes=[IsAuthenticated])
def create_ssh(self, settings: SshMutationsInput) -> UserMutationReturn: def create_ssh(self, input: SshMutationsInput) -> UserMutationReturn:
"""Create a new ssh""" """Create a new ssh"""
with ReadUserData() as data:
if settings.username == "root":
return UserMutationReturn(
success=False,
message="Error: Use /ssh/key/send to add root keys",
code=400,
user=User("root", data["ssh"]["rootKeys"]),
)
if not validate_ssh_public_key(settings.sshKey):
return UserMutationReturn(
success=False,
message="Error: Invalid key type. Only ssh-ed25519 and ssh-rsa are supported",
code=400,
user=User("root", data["ssh"]["rootKeys"]),
)
with WriteUserData() as data: with WriteUserData() as data:
if settings.username == data["username"]: if not validate_ssh_public_key(input.sshKey):
return UserMutationReturn(
success=False,
message="Invalid key type. Only ssh-ed25519 and ssh-rsa are supported",
code=400,
user=User(input.username, data["users"][input.username]["sshKeys"]),
)
if input.username == data["username"]:
if "sshKeys" not in data: if "sshKeys" not in data:
data["sshKeys"] = [] data["sshKeys"] = []
# Return 409 if key already in array # Return 409 if key already in array
for key in data["sshKeys"]: for key in data["sshKeys"]:
if key == settings.sshKey: if key == input.sshKey:
return UserMutationReturn( return UserMutationReturn(
success=False, success=False,
message="Error: Key already exists", message="Key already exists",
code=409, code=409,
user=User(data["username"], data["sshKeys"]), user=User(data["username"], data["sshKeys"]),
) )
data["sshKeys"].append(settings.sshKey) data["sshKeys"].append(input.sshKey)
return UserMutationReturn( return UserMutationReturn(
success=True, success=True,
@ -75,21 +64,21 @@ class UserMutations:
if "users" not in data: if "users" not in data:
data["users"] = [] data["users"] = []
for user in data["users"]: for user in data["users"]:
if user["username"] == settings.username: if user["username"] == input.username:
if "sshKeys" not in user: if "sshKeys" not in user:
user["sshKeys"] = [] user["sshKeys"] = []
# Return 409 if key already in array # Return 409 if key already in array
for key in user["sshKeys"]: for key in user["sshKeys"]:
if key == settings.sshKey: if key == input.sshKey:
return UserMutationReturn( return UserMutationReturn(
success=False, success=False,
message="Error: Key already exists", message="Key already exists",
code=409, code=409,
user=User(user["username"], user["sshKeys"]), user=User(user["username"], user["sshKeys"]),
) )
user["sshKeys"].append(settings.sshKey) user["sshKeys"].append(input.sshKey)
return UserMutationReturn( return UserMutationReturn(
success=True, success=True,
@ -100,28 +89,26 @@ class UserMutations:
return UserMutationReturn( return UserMutationReturn(
success=False, success=False,
message="Error: User not found", message="User not found",
code=404, code=404,
user=None, user=None,
) )
@strawberry.mutation(permission_classes=[IsAuthenticated]) @strawberry.mutation(permission_classes=[IsAuthenticated])
def delete_ssh(self, settings: SshMutationsInput) -> UserMutationReturn: def delete_ssh(self, input: SshMutationsInput) -> UserMutationReturn:
"""Delete ssh""" """Delete ssh key from user"""
with WriteUserData() as data: with WriteUserData() as data:
if settings.username == "root": if input.username == "root":
if "ssh" not in data: if "ssh" not in data:
data["ssh"] = {} data["ssh"] = {}
if "rootKeys" not in data["ssh"]: if "rootKeys" not in data["ssh"]:
data["ssh"]["rootKeys"] = [] data["ssh"]["rootKeys"] = []
# Return 404 if key not in array # Return 404 if key not in array
for key in data["ssh"]["rootKeys"]: for key in data["ssh"]["rootKeys"]:
if key == settings.sshKey: if key == input.sshKey:
data["ssh"]["rootKeys"].remove(key) data["ssh"]["rootKeys"].remove(key)
# If rootKeys became zero length, delete it
if len(data["ssh"]["rootKeys"]) == 0:
del data["ssh"]["rootKeys"]
return UserMutationReturn( return UserMutationReturn(
success=True, success=True,
message="SSH key deleted", message="SSH key deleted",
@ -130,57 +117,57 @@ class UserMutations:
) )
return UserMutationReturn( return UserMutationReturn(
success=False, success=False,
message="Error: Key not found", message="Key not found",
code=404, code=404,
user=User("root", data["ssh"]["rootKeys"]), user=User("root", data["ssh"]["rootKeys"]),
) )
if settings.username == data["username"]: if input.username == data["username"]:
if "sshKeys" not in data: if "sshKeys" not in data:
data["sshKeys"] = [] data["sshKeys"] = []
# Return 404 if key not in array # Return 404 if key not in array
for key in data["sshKeys"]: for key in data["sshKeys"]:
if key == settings.sshKey: if key == input.sshKey:
data["sshKeys"].remove(key) data["sshKeys"].remove(key)
return UserMutationReturn( return UserMutationReturn(
success=True, success=True,
message="SSH key deleted", message="SSH key deleted",
code=200, code=200,
user=User("root", data["ssh"]["rootKeys"]), user=User(data["username"], data["sshKeys"]),
) )
return UserMutationReturn( return UserMutationReturn(
success=False, success=False,
message="Error: Key not found", message="Key not found",
code=404, code=404,
user=User(data["username"], data["sshKeys"]), user=User(data["username"], data["sshKeys"]),
) )
if "users" not in data: if "users" not in data:
data["users"] = [] data["users"] = []
for user in data["users"]: for user in data["users"]:
if user["username"] == settings.username: if user["username"] == input.username:
if "sshKeys" not in user: if "sshKeys" not in user:
user["sshKeys"] = [] user["sshKeys"] = []
# Return 404 if key not in array # Return 404 if key not in array
for key in user["sshKeys"]: for key in user["sshKeys"]:
if key == settings.sshKey: if key == input.sshKey:
user["sshKeys"].remove(key) user["sshKeys"].remove(key)
return UserMutationReturn( return UserMutationReturn(
success=True, success=True,
message="SSH key deleted", message="SSH key deleted",
code=200, code=200,
user=User(settings.username, user["sshKeys"]), user=User(input.username, user["sshKeys"]),
) )
return UserMutationReturn( return UserMutationReturn(
success=False, success=False,
message="Error: Key not found", message="Key not found",
code=404, code=404,
user=User(settings.username, user["sshKeys"]), user=User(input.username, user["sshKeys"]),
) )
return UserMutationReturn( return UserMutationReturn(
success=False, success=False,
message="Error: User not found", message="User not found",
code=404, code=404,
user=None, user=None,
) )

View File

@ -11,7 +11,7 @@ from selfprivacy_api.graphql.mutations.mutation_interface import (
MutationReturnInterface, MutationReturnInterface,
) )
from selfprivacy_api.utils import WriteUserData, ReadUserData, is_username_forbidden from selfprivacy_api.utils import WriteUserData, ReadUserData, is_username_forbidden
from selfprivacy_api.utils import hash_password
@strawberry.input @strawberry.input
class UserMutationsInput: class UserMutationsInput:
@ -26,42 +26,34 @@ class UserMutations:
"""Mutations change user settings""" """Mutations change user settings"""
@strawberry.mutation(permission_classes=[IsAuthenticated]) @strawberry.mutation(permission_classes=[IsAuthenticated])
def create_user(self, settings: UserMutationsInput) -> UserMutationReturn: def create_user(self, user: UserMutationsInput) -> UserMutationReturn:
"""Create a new user""" """Create a new user"""
hashing_command = ["mkpasswd", "-m", "sha-512", settings.password]
password_hash_process_descriptor = subprocess.Popen( hashed_password = hash_password(user.password)
hashing_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
hashed_password = password_hash_process_descriptor.communicate()[0]
hashed_password = hashed_password.decode("ascii")
hashed_password = hashed_password.rstrip()
# Check if username is forbidden # Check if username is forbidden
if is_username_forbidden(settings.username): if is_username_forbidden(user.username):
return UserMutationReturn( return UserMutationReturn(
success=False, success=False,
message="Error: Username is forbidden", message="Username is forbidden",
code=409, code=409,
user=None, user=None,
) )
# Check is username passes regex # Check is username passes regex
if not re.match(r"^[a-z_][a-z0-9_]+$", settings.username): if not re.match(r"^[a-z_][a-z0-9_]+$", user.username):
return UserMutationReturn( return UserMutationReturn(
success=False, success=False,
message="Error: username must be alphanumeric", message="Username must be alphanumeric",
code=400, code=400,
user=None, user=None,
) )
# Check if username less than 32 characters # Check if username less than 32 characters
if len(settings.username) >= 32: if len(user.username) >= 32:
return UserMutationReturn( return UserMutationReturn(
success=False, success=False,
message="Error: username must be less than 32 characters", message="Username must be less than 32 characters",
code=400, code=400,
user=None, user=None,
) )
@ -71,19 +63,19 @@ class UserMutations:
data["users"] = [] data["users"] = []
# Return 409 if user already exists # Return 409 if user already exists
if data["username"] == settings.username: if data["username"] == user.username:
return UserMutationReturn( return UserMutationReturn(
success=False, success=False,
message="Error: User already exists", message="User already exists",
code=409, code=409,
user=None, user=None,
) )
for user in data["users"]: for data_user in data["users"]:
if user["username"] == settings.username: if data_user["username"] == user.username:
return UserMutationReturn( return UserMutationReturn(
success=False, success=False,
message="Error: User already exists", message="User already exists",
code=409, code=409,
user=None, user=None,
) )
@ -91,15 +83,15 @@ class UserMutations:
with WriteUserData() as data: with WriteUserData() as data:
data["users"].append( data["users"].append(
{ {
"username": settings.username, "username": user.username,
"hashedPassword": hashed_password, "hashedPassword": hashed_password,
} }
) )
return UserMutationReturn( return UserMutationReturn(
success=False, success=True,
message="User was successfully created!", message="User was successfully created!",
code=201, code=201,
user=User(settings.username, None), user=User(user.username),
) )
@strawberry.mutation(permission_classes=[IsAuthenticated]) @strawberry.mutation(permission_classes=[IsAuthenticated])
@ -109,58 +101,50 @@ class UserMutations:
if username == data["username"] or username == "root": if username == data["username"] or username == "root":
return MutationReturnInterface( return MutationReturnInterface(
success=False, success=False,
message="Error: Cannot delete main or root user", message="Cannot delete main or root user",
code=400, code=400,
) )
# Return 404 if user does not exist # Return 404 if user does not exist
for user in data["users"]: for data_user in data["users"]:
if user["username"] == username: if data_user["username"] == username:
data["users"].remove(user) data["users"].remove(data_user)
break break
else: else:
return MutationReturnInterface( return MutationReturnInterface(
success=False, success=False,
message="Error: User does not exist", message="User does not exist",
code=404, code=404,
) )
return MutationReturnInterface( return MutationReturnInterface(
success=True, success=True,
message="Nice frag - user deleted", message="User was deleted",
code=200, code=200,
) )
@strawberry.mutation(permission_classes=[IsAuthenticated]) @strawberry.mutation(permission_classes=[IsAuthenticated])
def update_user(self, settings: UserMutationsInput) -> UserMutationReturn: def update_user(self, user: UserMutationsInput) -> UserMutationReturn:
"""Update user mutation""" """Update user mutation"""
hashing_command = ["mkpasswd", "-m", "sha-512", settings.password] hashed_password = hash_password(user.password)
password_hash_process_descriptor = subprocess.Popen(
hashing_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
hashed_password = password_hash_process_descriptor.communicate()[0]
hashed_password = hashed_password.decode("ascii")
hashed_password = hashed_password.rstrip()
with WriteUserData() as data: with WriteUserData() as data:
ssh_keys = None ssh_keys = []
if settings.username == data["username"]: if user.username == data["username"]:
data["hashedMasterPassword"] = hashed_password data["hashedMasterPassword"] = hashed_password
# Return 404 if user does not exist # Return 404 if user does not exist
else: else:
for user in data["users"]: for data_user in data["users"]:
if user["username"] == settings.username: if data_user["username"] == user.username:
user["hashedPassword"] = hashed_password data_user["hashedPassword"] = hashed_password
ssh_keys = user["sshKeys"] ssh_keys = data_user["sshKeys"]
break break
else: else:
return UserMutationReturn( return UserMutationReturn(
success=False, success=False,
message="Error: User does not exist", message="User does not exist",
code=404, code=404,
user=None, user=None,
) )
@ -169,5 +153,5 @@ class UserMutations:
success=True, success=True,
message="User was successfully updated", message="User was successfully updated",
code=200, code=200,
user=User(settings.username, ssh_keys), user=User(user.username, ssh_keys),
) )

View File

@ -14,6 +14,8 @@ def get_users() -> typing.List[User]:
for user in data["users"]: for user in data["users"]:
user_list.append(User(**user)) user_list.append(User(**user))
if data["sshKeys"] not in data:
data["sshKeys"] = []
user_list.append(User(data["username"], data["sshKeys"])) user_list.append(User(data["username"], data["sshKeys"]))
return user_list return user_list

View File

@ -159,3 +159,18 @@ def get_dkim_key(domain):
dkim = cat_process.communicate()[0] dkim = cat_process.communicate()[0]
return str(dkim, "utf-8") return str(dkim, "utf-8")
return None return None
def hash_password(password):
hashing_command = ["mkpasswd", "-m", "sha-512", password]
password_hash_process_descriptor = subprocess.Popen(
hashing_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
hashed_password = password_hash_process_descriptor.communicate()[0]
hashed_password = hashed_password.decode("ascii")
hashed_password = hashed_password.rstrip()
return hashed_password