176 lines
6.2 KiB
Python
176 lines
6.2 KiB
Python
#!/usr/bin/env python3
|
|
"""Users management module"""
|
|
# pylint: disable=too-few-public-methods
|
|
|
|
|
|
import strawberry
|
|
from selfprivacy_api.graphql import IsAuthenticated
|
|
from selfprivacy_api.graphql.common_types.user import User, UserMutationReturn, UserType
|
|
|
|
from selfprivacy_api.utils import (
|
|
WriteUserData,
|
|
ReadUserData,
|
|
get_user_by_username,
|
|
validate_ssh_public_key,
|
|
)
|
|
|
|
|
|
@strawberry.input
|
|
class SshMutationsInput:
|
|
"""Input type for ssh mutation"""
|
|
|
|
username: str
|
|
ssh_key: str
|
|
|
|
|
|
@strawberry.type
|
|
class SshMutations:
|
|
"""Mutations ssh"""
|
|
|
|
@strawberry.mutation(permission_classes=[IsAuthenticated])
|
|
def create_ssh(self, input: SshMutationsInput) -> UserMutationReturn:
|
|
"""Create a new ssh"""
|
|
|
|
if not validate_ssh_public_key(input.ssh_key):
|
|
return UserMutationReturn(
|
|
success=False,
|
|
message="Invalid key type. Only ssh-ed25519 and ssh-rsa are supported",
|
|
code=400,
|
|
user=get_user_by_username(input.username),
|
|
)
|
|
|
|
with WriteUserData() as data:
|
|
|
|
if input.username == data["username"]:
|
|
if "sshKeys" not in data:
|
|
data["sshKeys"] = []
|
|
# Return 409 if key already in array
|
|
for key in data["sshKeys"]:
|
|
if key == input.ssh_key:
|
|
|
|
return UserMutationReturn(
|
|
success=False,
|
|
message="Key already exists",
|
|
code=409,
|
|
user=get_user_by_username(input.username),
|
|
)
|
|
data["sshKeys"].append(input.ssh_key)
|
|
|
|
return UserMutationReturn(
|
|
success=True,
|
|
message="New SSH key successfully written",
|
|
code=201,
|
|
user=get_user_by_username(input.username),
|
|
)
|
|
|
|
if "users" not in data:
|
|
data["users"] = []
|
|
for user in data["users"]:
|
|
if user["username"] == input.username:
|
|
if "sshKeys" not in user:
|
|
user["sshKeys"] = []
|
|
# Return 409 if key already in array
|
|
for key in user["sshKeys"]:
|
|
if key == input.ssh_key:
|
|
|
|
return UserMutationReturn(
|
|
success=False,
|
|
message="Key already exists",
|
|
code=409,
|
|
user=get_user_by_username(input.username),
|
|
)
|
|
|
|
user["sshKeys"].append(input.ssh_key)
|
|
|
|
return UserMutationReturn(
|
|
success=True,
|
|
message="New SSH key successfully written",
|
|
code=201,
|
|
user=get_user_by_username(input.username),
|
|
)
|
|
|
|
return UserMutationReturn(
|
|
success=False,
|
|
message="User not found",
|
|
code=404,
|
|
user=None,
|
|
)
|
|
|
|
@strawberry.mutation(permission_classes=[IsAuthenticated])
|
|
def delete_ssh(self, input: SshMutationsInput) -> UserMutationReturn:
|
|
"""Delete ssh key from user"""
|
|
|
|
with WriteUserData() as data:
|
|
if input.username == "root":
|
|
if "ssh" not in data:
|
|
data["ssh"] = {}
|
|
if "rootKeys" not in data["ssh"]:
|
|
data["ssh"]["rootKeys"] = []
|
|
# Return 404 if key not in array
|
|
for key in data["ssh"]["rootKeys"]:
|
|
if key == input.ssh_key:
|
|
data["ssh"]["rootKeys"].remove(key)
|
|
|
|
return UserMutationReturn(
|
|
success=True,
|
|
message="SSH key deleted",
|
|
code=200,
|
|
user=get_user_by_username(input.username),
|
|
)
|
|
return UserMutationReturn(
|
|
success=False,
|
|
message="Key not found",
|
|
code=404,
|
|
user=get_user_by_username(input.username),
|
|
)
|
|
if input.username == data["username"]:
|
|
if "sshKeys" not in data:
|
|
data["sshKeys"] = []
|
|
# Return 404 if key not in array
|
|
for key in data["sshKeys"]:
|
|
if key == input.ssh_key:
|
|
data["sshKeys"].remove(key)
|
|
return UserMutationReturn(
|
|
success=True,
|
|
message="SSH key deleted",
|
|
code=200,
|
|
user=get_user_by_username(input.username),
|
|
)
|
|
|
|
return UserMutationReturn(
|
|
success=False,
|
|
message="Key not found",
|
|
code=404,
|
|
user=get_user_by_username(input.username),
|
|
)
|
|
if "users" not in data:
|
|
data["users"] = []
|
|
for user in data["users"]:
|
|
if user["username"] == input.username:
|
|
if "sshKeys" not in user:
|
|
user["sshKeys"] = []
|
|
# Return 404 if key not in array
|
|
for key in user["sshKeys"]:
|
|
if key == input.ssh_key:
|
|
user["sshKeys"].remove(key)
|
|
return UserMutationReturn(
|
|
success=True,
|
|
message="SSH key deleted",
|
|
code=200,
|
|
user=get_user_by_username(input.username),
|
|
)
|
|
|
|
return UserMutationReturn(
|
|
success=False,
|
|
message="Key not found",
|
|
code=404,
|
|
user=get_user_by_username(input.username),
|
|
)
|
|
|
|
return UserMutationReturn(
|
|
success=False,
|
|
message="User not found",
|
|
code=404,
|
|
user=None,
|
|
)
|