selfprivacy-rest-api/selfprivacy_api/actions/ssh.py

150 lines
3.9 KiB
Python
Raw Permalink Normal View History

"""Actions to manage the SSH."""
from typing import Optional
from pydantic import BaseModel
from selfprivacy_api.actions.users import (
UserNotFound,
ensure_ssh_and_users_fields_exist,
)
from selfprivacy_api.utils import WriteUserData, ReadUserData, validate_ssh_public_key
def enable_ssh():
with WriteUserData() as data:
if "ssh" not in data:
data["ssh"] = {}
data["ssh"]["enable"] = True
class UserdataSshSettings(BaseModel):
"""Settings for the SSH."""
enable: bool = True
passwordAuthentication: bool = True
rootKeys: list[str] = []
def get_ssh_settings() -> UserdataSshSettings:
with ReadUserData() as data:
if "ssh" not in data:
return UserdataSshSettings()
if "enable" not in data["ssh"]:
data["ssh"]["enable"] = True
if "passwordAuthentication" not in data["ssh"]:
data["ssh"]["passwordAuthentication"] = True
if "rootKeys" not in data["ssh"]:
data["ssh"]["rootKeys"] = []
return UserdataSshSettings(**data["ssh"])
def set_ssh_settings(
enable: Optional[bool] = None, password_authentication: Optional[bool] = None
) -> None:
with WriteUserData() as data:
if "ssh" not in data:
data["ssh"] = {}
if enable is not None:
data["ssh"]["enable"] = enable
if password_authentication is not None:
data["ssh"]["passwordAuthentication"] = password_authentication
def add_root_ssh_key(public_key: str):
with WriteUserData() as data:
if "ssh" not in data:
data["ssh"] = {}
if "rootKeys" not in data["ssh"]:
data["ssh"]["rootKeys"] = []
# Return 409 if key already in array
for key in data["ssh"]["rootKeys"]:
if key == public_key:
raise KeyAlreadyExists()
data["ssh"]["rootKeys"].append(public_key)
class KeyAlreadyExists(Exception):
"""Key already exists"""
pass
class InvalidPublicKey(Exception):
"""Invalid public key"""
pass
def create_ssh_key(username: str, ssh_key: str):
"""Create a new ssh key"""
if not validate_ssh_public_key(ssh_key):
raise InvalidPublicKey()
with WriteUserData() as data:
ensure_ssh_and_users_fields_exist(data)
if username == data["username"]:
if ssh_key in data["sshKeys"]:
raise KeyAlreadyExists()
data["sshKeys"].append(ssh_key)
return
if username == "root":
if ssh_key in data["ssh"]["rootKeys"]:
raise KeyAlreadyExists()
data["ssh"]["rootKeys"].append(ssh_key)
return
for user in data["users"]:
if user["username"] == username:
if "sshKeys" not in user:
user["sshKeys"] = []
if ssh_key in user["sshKeys"]:
raise KeyAlreadyExists()
user["sshKeys"].append(ssh_key)
return
raise UserNotFound()
class KeyNotFound(Exception):
"""Key not found"""
pass
def remove_ssh_key(username: str, ssh_key: str):
"""Delete a ssh key"""
with WriteUserData() as data:
ensure_ssh_and_users_fields_exist(data)
if username == "root":
if ssh_key in data["ssh"]["rootKeys"]:
data["ssh"]["rootKeys"].remove(ssh_key)
return
raise KeyNotFound()
if username == data["username"]:
if ssh_key in data["sshKeys"]:
data["sshKeys"].remove(ssh_key)
return
raise KeyNotFound()
for user in data["users"]:
if user["username"] == username:
if "sshKeys" not in user:
user["sshKeys"] = []
if ssh_key in user["sshKeys"]:
user["sshKeys"].remove(ssh_key)
return
raise KeyNotFound()
raise UserNotFound()