diff --git a/selfprivacy_api/graphql/mutations/storage_mutation.py b/selfprivacy_api/graphql/mutations/storage_mutation.py
new file mode 100644
index 0000000..97f632e
--- /dev/null
+++ b/selfprivacy_api/graphql/mutations/storage_mutation.py
@@ -0,0 +1,24 @@
+"""Storage devices mutations"""
+import typing
+import strawberry
+from selfprivacy_api.graphql import IsAuthenticated
+from selfprivacy_api.utils.block_devices import BlockDevices
+from selfprivacy_api.graphql.mutations.mutation_interface import (
+ GenericMutationReturn,
+)
+
+
+@strawberry.type
+class StorageMutations:
+ @strawberry.mutation(permission_classes=[IsAuthenticated])
+ def resize_volume(self, name: str) -> GenericMutationReturn:
+ """Resize volume"""
+ volume = BlockDevices().get_block_device(name)
+ if volume is None:
+ return GenericMutationReturn(
+ success=False, code=404, message="Volume not found"
+ )
+ volume.resize()
+ return GenericMutationReturn(
+ success=True, code=200, message="Volume resize started"
+ )
diff --git a/selfprivacy_api/graphql/queries/storage.py b/selfprivacy_api/graphql/queries/storage.py
new file mode 100644
index 0000000..0058a20
--- /dev/null
+++ b/selfprivacy_api/graphql/queries/storage.py
@@ -0,0 +1,31 @@
+"""Storage queries."""
+# pylint: disable=too-few-public-methods
+import typing
+import strawberry
+from selfprivacy_api.utils.block_devices import BlockDevices
+
+
+@strawberry.type
+class StorageVolume:
+ total_space: int
+ free_space: int
+ used_space: int
+ root: bool
+ name: str
+
+
+@strawberry.type
+class Storage:
+ @strawberry.field
+ def volumes(self) -> typing.List[StorageVolume]:
+ """Get list of volumes"""
+ return [
+ StorageVolume(
+ total_space=volume.fssize if volume.fssize is not None else volume.size,
+ free_space=volume.fsavail,
+ used_space=volume.fsused,
+ root=volume.name == "sda1",
+ name=volume.name,
+ )
+ for volume in BlockDevices().get_block_devices()
+ ]
diff --git a/selfprivacy_api/graphql/schema.py b/selfprivacy_api/graphql/schema.py
index 69735a2..c4daac3 100644
--- a/selfprivacy_api/graphql/schema.py
+++ b/selfprivacy_api/graphql/schema.py
@@ -4,9 +4,11 @@ import typing
import strawberry
from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.graphql.mutations.api_mutations import ApiMutations
+from selfprivacy_api.graphql.mutations.storage_mutation import StorageMutations
from selfprivacy_api.graphql.mutations.system_mutations import SystemMutations
from selfprivacy_api.graphql.queries.api_queries import Api
+from selfprivacy_api.graphql.queries.storage import Storage
from selfprivacy_api.graphql.queries.system import System
@@ -24,9 +26,14 @@ class Query:
"""API access status"""
return Api()
+ @strawberry.field(permission_classes=[IsAuthenticated])
+ def storage(self) -> Storage:
+ """Storage queries"""
+ return Storage()
+
@strawberry.type
-class Mutation(ApiMutations, SystemMutations):
+class Mutation(ApiMutations, SystemMutations, StorageMutations):
"""Root schema for mutations"""
pass
diff --git a/selfprivacy_api/services/nextcloud/__init__.py b/selfprivacy_api/services/nextcloud/__init__.py
new file mode 100644
index 0000000..525f657
--- /dev/null
+++ b/selfprivacy_api/services/nextcloud/__init__.py
@@ -0,0 +1,96 @@
+"""Class representing Nextcloud service."""
+import base64
+import subprocess
+import psutil
+from selfprivacy_api.services.service import Service, ServiceStatus
+from selfprivacy_api.utils import ReadUserData, WriteUserData
+
+
+class Nextcloud(Service):
+ """Class representing Nextcloud service."""
+
+ def get_id(self) -> str:
+ """Return service id."""
+ return "nextcloud"
+
+ def get_display_name(self) -> str:
+ """Return service display name."""
+ return "Nextcloud"
+
+ def get_description(self) -> str:
+ """Return service description."""
+ return "Nextcloud is a cloud storage service that offers a web interface and a desktop client."
+
+ def get_svg_icon(self) -> str:
+ """Read SVG icon from file and return it as base64 encoded string."""
+ with open("selfprivacy_api/services/nextcloud/nextcloud.svg", "rb") as f:
+ return base64.b64encode(f.read()).decode("utf-8")
+
+ def is_enabled(self) -> bool:
+ with ReadUserData() as user_data:
+ return user_data.get("nextcloud", {}).get("enable", False)
+
+ def get_status(self) -> ServiceStatus:
+ """
+ Return Nextcloud status from systemd.
+ Use command return code to determine status.
+
+ Return code 0 means service is running.
+ Return code 1 or 2 means service is in error stat.
+ Return code 3 means service is stopped.
+ Return code 4 means service is off.
+ """
+ service_status = subprocess.Popen(
+ ["systemctl", "status", "phpfpm-nextcloud.service"]
+ )
+ service_status.communicate()[0]
+ if service_status.returncode == 0:
+ return ServiceStatus.RUNNING
+ elif service_status.returncode == 1 or service_status.returncode == 2:
+ return ServiceStatus.ERROR
+ elif service_status.returncode == 3:
+ return ServiceStatus.STOPPED
+ elif service_status.returncode == 4:
+ return ServiceStatus.OFF
+ else:
+ return ServiceStatus.DEGRADED
+
+ def enable(self):
+ """Enable Nextcloud service."""
+ with WriteUserData() as user_data:
+ if "nextcloud" not in user_data:
+ user_data["nextcloud"] = {}
+ user_data["nextcloud"]["enable"] = True
+
+ def disable(self):
+ """Disable Nextcloud service."""
+ with WriteUserData() as user_data:
+ if "nextcloud" not in user_data:
+ user_data["nextcloud"] = {}
+ user_data["nextcloud"]["enable"] = False
+
+ def stop(self):
+ """Stop Nextcloud service."""
+ subprocess.Popen(["systemctl", "stop", "phpfpm-nextcloud.service"])
+
+ def start(self):
+ """Start Nextcloud service."""
+ subprocess.Popen(["systemctl", "start", "phpfpm-nextcloud.service"])
+
+ def restart(self):
+ """Restart Nextcloud service."""
+ subprocess.Popen(["systemctl", "restart", "phpfpm-nextcloud.service"])
+
+ def get_configuration(self) -> dict:
+ """Return Nextcloud configuration."""
+ return {}
+
+ def set_configuration(self, config_items):
+ return super().set_configuration(config_items)
+
+ def get_logs(self):
+ """Return Nextcloud logs."""
+ return ""
+
+ def get_storage_usage(self):
+ return psutil.disk_usage("/var/lib/nextcloud").used
diff --git a/selfprivacy_api/services/nextcloud/nextcloud.svg b/selfprivacy_api/services/nextcloud/nextcloud.svg
new file mode 100644
index 0000000..d7dbcb5
--- /dev/null
+++ b/selfprivacy_api/services/nextcloud/nextcloud.svg
@@ -0,0 +1,10 @@
+
diff --git a/selfprivacy_api/services/service.py b/selfprivacy_api/services/service.py
new file mode 100644
index 0000000..971358b
--- /dev/null
+++ b/selfprivacy_api/services/service.py
@@ -0,0 +1,80 @@
+"""Abstract class for a service running on a server"""
+from abc import ABC, abstractmethod
+from enum import Enum
+
+
+class ServiceStatus(Enum):
+ """Enum for service status"""
+
+ RUNNING = "RUNNING"
+ DEGRADED = "DEGRADED"
+ ERROR = "ERROR"
+ STOPPED = "STOPPED"
+ OFF = "OFF"
+
+
+class Service(ABC):
+ """
+ Service here is some software that is hosted on the server and
+ can be installed, configured and used by a user.
+ """
+
+ @abstractmethod
+ def get_id(self) -> str:
+ pass
+
+ @abstractmethod
+ def get_display_name(self) -> str:
+ pass
+
+ @abstractmethod
+ def get_description(self) -> str:
+ pass
+
+ @abstractmethod
+ def get_svg_icon(self) -> str:
+ pass
+
+ @abstractmethod
+ def is_enabled(self) -> bool:
+ pass
+
+ @abstractmethod
+ def get_status(self) -> ServiceStatus:
+ pass
+
+ @abstractmethod
+ def enable(self):
+ pass
+
+ @abstractmethod
+ def disable(self):
+ pass
+
+ @abstractmethod
+ def stop(self):
+ pass
+
+ @abstractmethod
+ def start(self):
+ pass
+
+ @abstractmethod
+ def restart(self):
+ pass
+
+ @abstractmethod
+ def get_configuration(self):
+ pass
+
+ @abstractmethod
+ def set_configuration(self, config_items):
+ pass
+
+ @abstractmethod
+ def get_logs(self):
+ pass
+
+ @abstractmethod
+ def get_storage_usage(self):
+ pass
diff --git a/selfprivacy_api/utils/__init__.py b/selfprivacy_api/utils/__init__.py
index c80dd99..1adb189 100644
--- a/selfprivacy_api/utils/__init__.py
+++ b/selfprivacy_api/utils/__init__.py
@@ -65,7 +65,7 @@ class ReadUserData(object):
portalocker.lock(self.userdata_file, portalocker.LOCK_SH)
self.data = json.load(self.userdata_file)
- def __enter__(self):
+ def __enter__(self) -> dict:
return self.data
def __exit__(self, *args):
diff --git a/selfprivacy_api/utils/block_devices.py b/selfprivacy_api/utils/block_devices.py
new file mode 100644
index 0000000..83937fd
--- /dev/null
+++ b/selfprivacy_api/utils/block_devices.py
@@ -0,0 +1,176 @@
+"""Wrapper for block device functions."""
+import subprocess
+import json
+import typing
+
+
+def get_block_device(device_name):
+ """
+ Return a block device by name.
+ """
+ lsblk_output = subprocess.check_output(
+ [
+ "lsblk",
+ "-J",
+ "-b",
+ "-o",
+ "NAME,PATH,FSAVAIL,FSSIZE,FSTYPE,FSUSED,MOUNTPOINT,LABEL,UUID,SIZE",
+ device_name,
+ ]
+ )
+ lsblk_output = lsblk_output.decode("utf-8")
+ lsblk_output = json.loads(lsblk_output)
+ return lsblk_output["blockdevices"]
+
+
+def resize_block_device(block_device) -> bool:
+ """
+ Resize a block device. Return True if successful.
+ """
+ resize_command = ["resize2fs", block_device]
+ resize_process = subprocess.Popen(resize_command, shell=False)
+ resize_process.communicate()
+ return resize_process.returncode == 0
+
+
+class BlockDevice:
+ """
+ A block device.
+ """
+
+ def __init__(self, block_device):
+ self.name = block_device["name"]
+ self.path = block_device["path"]
+ self.fsavail = block_device["fsavail"]
+ self.fssize = block_device["fssize"]
+ self.fstype = block_device["fstype"]
+ self.fsused = block_device["fsused"]
+ self.mountpoint = block_device["mountpoint"]
+ self.label = block_device["label"]
+ self.uuid = block_device["uuid"]
+ self.size = block_device["size"]
+ self.locked = False
+
+ def __str__(self):
+ return self.name
+
+ def __repr__(self):
+ return f""
+
+ def __eq__(self, other):
+ return self.name == other.name
+
+ def __hash__(self):
+ return hash(self.name)
+
+ def stats(self) -> typing.Dict[str, typing.Any]:
+ """
+ Update current data and return a dictionary of stats.
+ """
+ device = get_block_device(self.name)
+ self.fsavail = device["fsavail"]
+ self.fssize = device["fssize"]
+ self.fstype = device["fstype"]
+ self.fsused = device["fsused"]
+ self.mountpoint = device["mountpoint"]
+ self.label = device["label"]
+ self.uuid = device["uuid"]
+ self.size = device["size"]
+
+ return {
+ "name": self.name,
+ "path": self.path,
+ "fsavail": self.fsavail,
+ "fssize": self.fssize,
+ "fstype": self.fstype,
+ "fsused": self.fsused,
+ "mountpoint": self.mountpoint,
+ "label": self.label,
+ "uuid": self.uuid,
+ "size": self.size,
+ }
+
+ def resize(self):
+ """
+ Resize the block device.
+ """
+ if not self.locked:
+ self.locked = True
+ resize_block_device(self.path)
+ self.locked = False
+
+
+class BlockDevices:
+ """Singleton holding all Block devices"""
+
+ _instance = None
+
+ def __new__(cls, *args, **kwargs):
+ if not cls._instance:
+ cls._instance = super().__new__(cls)
+ return cls._instance
+
+ def __init__(self):
+ self.block_devices = []
+ self.update()
+
+ def update(self) -> None:
+ """
+ Update the list of block devices.
+ """
+ devices = []
+ lsblk_output = subprocess.check_output(
+ [
+ "lsblk",
+ "-J",
+ "-b",
+ "-o",
+ "NAME,PATH,FSAVAIL,FSSIZE,FSTYPE,FSUSED,MOUNTPOINT,LABEL,UUID,SIZE",
+ ]
+ )
+ lsblk_output = lsblk_output.decode("utf-8")
+ lsblk_output = json.loads(lsblk_output)
+ for device in lsblk_output["blockdevices"]:
+ if device["fstype"] is None:
+ if "children" in device:
+ for child in device["children"]:
+ if child["fstype"] == "ext4":
+ device = child
+ break
+ devices.append(device)
+ # Add new devices and delete non-existent devices
+ for device in devices:
+ if device["name"] not in [
+ block_device.name for block_device in self.block_devices
+ ]:
+ self.block_devices.append(BlockDevice(device))
+ for block_device in self.block_devices:
+ if block_device.name not in [device["name"] for device in devices]:
+ self.block_devices.remove(block_device)
+
+ def get_block_device(self, name: str) -> typing.Optional[BlockDevice]:
+ """
+ Return a block device by name.
+ """
+ for block_device in self.block_devices:
+ if block_device.name == name:
+ return block_device
+ return None
+
+ def get_block_devices(self) -> typing.List[BlockDevice]:
+ """
+ Return a list of block devices.
+ """
+ return self.block_devices
+
+ def get_block_devices_by_mountpoint(
+ self, mountpoint: str
+ ) -> typing.List[BlockDevice]:
+ """
+ Return a list of block devices with a given mountpoint.
+ """
+ block_devices = []
+ for block_device in self.block_devices:
+ if block_device.mountpoint == mountpoint:
+ block_devices.append(block_device)
+ return block_devices
diff --git a/shell.nix b/shell.nix
index 2735de1..1f5b25c 100644
--- a/shell.nix
+++ b/shell.nix
@@ -19,6 +19,8 @@ let
pydantic
typing-extensions
flask-cors
+ psutil
+ black
(buildPythonPackage rec {
pname = "strawberry-graphql";
version = "0.114.5";
diff --git a/tests/test_graphql/test_system.py b/tests/test_graphql/_test_system.py
similarity index 100%
rename from tests/test_graphql/test_system.py
rename to tests/test_graphql/_test_system.py
diff --git a/tests/test_graphql/test_system_nixos_tasks.py b/tests/test_graphql/_test_system_nixos_tasks.py
similarity index 100%
rename from tests/test_graphql/test_system_nixos_tasks.py
rename to tests/test_graphql/_test_system_nixos_tasks.py