diff --git a/selfprivacy_api/graphql/mutations/storage_mutation.py b/selfprivacy_api/graphql/mutations/storage_mutation.py
new file mode 100644
index 0000000..ff69aea
--- /dev/null
+++ b/selfprivacy_api/graphql/mutations/storage_mutation.py
@@ -0,0 +1,62 @@
+"""Storage devices mutations"""
+import typing
+import strawberry
+from selfprivacy_api.graphql import IsAuthenticated
+from selfprivacy_api.utils.block_devices import BlockDevices
+from selfprivacy_api.graphql.mutations.mutation_interface import (
+ GenericMutationReturn,
+)
+
+
+@strawberry.type
+class StorageMutations:
+ @strawberry.mutation(permission_classes=[IsAuthenticated])
+ def resize_volume(self, name: str) -> GenericMutationReturn:
+ """Resize volume"""
+ volume = BlockDevices().get_block_device(name)
+ if volume is None:
+ return GenericMutationReturn(
+ success=False, code=404, message="Volume not found"
+ )
+ volume.resize()
+ return GenericMutationReturn(
+ success=True, code=200, message="Volume resize started"
+ )
+
+ @strawberry.mutation(permission_classes=[IsAuthenticated])
+ def mount_volume(self, name: str) -> GenericMutationReturn:
+ """Mount volume"""
+ volume = BlockDevices().get_block_device(name)
+ if volume is None:
+ return GenericMutationReturn(
+ success=False, code=404, message="Volume not found"
+ )
+ is_success = volume.mount()
+ if is_success:
+ return GenericMutationReturn(
+ success=True,
+ code=200,
+ message="Volume mounted, rebuild the system to apply changes",
+ )
+ return GenericMutationReturn(
+ success=False, code=409, message="Volume not mounted (already mounted?)"
+ )
+
+ @strawberry.mutation(permission_classes=[IsAuthenticated])
+ def unmount_volume(self, name: str) -> GenericMutationReturn:
+ """Unmount volume"""
+ volume = BlockDevices().get_block_device(name)
+ if volume is None:
+ return GenericMutationReturn(
+ success=False, code=404, message="Volume not found"
+ )
+ is_success = volume.unmount()
+ if is_success:
+ return GenericMutationReturn(
+ success=True,
+ code=200,
+ message="Volume unmounted, rebuild the system to apply changes",
+ )
+ return GenericMutationReturn(
+ success=False, code=409, message="Volume not unmounted (already unmounted?)"
+ )
diff --git a/selfprivacy_api/graphql/queries/storage.py b/selfprivacy_api/graphql/queries/storage.py
new file mode 100644
index 0000000..6315b26
--- /dev/null
+++ b/selfprivacy_api/graphql/queries/storage.py
@@ -0,0 +1,43 @@
+"""Storage queries."""
+# pylint: disable=too-few-public-methods
+import typing
+import strawberry
+from selfprivacy_api.utils.block_devices import BlockDevices
+
+
+@strawberry.type
+class StorageVolume:
+ """Stats and basic info about a volume or a system disk."""
+
+ total_space: str
+ free_space: str
+ used_space: str
+ root: bool
+ name: str
+ model: typing.Optional[str]
+ serial: typing.Optional[str]
+ type: str
+
+
+@strawberry.type
+class Storage:
+ """GraphQL queries to get storage information."""
+
+ @strawberry.field
+ def volumes(self) -> typing.List[StorageVolume]:
+ """Get list of volumes"""
+ return [
+ StorageVolume(
+ total_space=str(volume.fssize)
+ if volume.fssize is not None
+ else str(volume.size),
+ free_space=str(volume.fsavail),
+ used_space=str(volume.fsused),
+ root=volume.name == "sda1",
+ name=volume.name,
+ model=volume.model,
+ serial=volume.serial,
+ type=volume.type,
+ )
+ for volume in BlockDevices().get_block_devices()
+ ]
diff --git a/selfprivacy_api/graphql/schema.py b/selfprivacy_api/graphql/schema.py
index fb01776..c2d6a10 100644
--- a/selfprivacy_api/graphql/schema.py
+++ b/selfprivacy_api/graphql/schema.py
@@ -5,9 +5,11 @@ import strawberry
from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.graphql.mutations.api_mutations import ApiMutations
from selfprivacy_api.graphql.mutations.ssh_mutations import SshMutations
+from selfprivacy_api.graphql.mutations.storage_mutation import StorageMutations
from selfprivacy_api.graphql.mutations.system_mutations import SystemMutations
from selfprivacy_api.graphql.queries.api_queries import Api
+from selfprivacy_api.graphql.queries.storage import Storage
from selfprivacy_api.graphql.queries.system import System
from selfprivacy_api.graphql.mutations.users_mutations import UserMutations
@@ -33,6 +35,11 @@ class Query:
"""Users queries"""
return Users()
+ @strawberry.field(permission_classes=[IsAuthenticated])
+ def storage(self) -> Storage:
+ """Storage queries"""
+ return Storage()
+
@strawberry.type
class Mutation(
@@ -40,6 +47,7 @@ class Mutation(
SystemMutations,
UserMutations,
SshMutations,
+ StorageMutations,
):
"""Root schema for mutations"""
diff --git a/selfprivacy_api/jobs/__init__.py b/selfprivacy_api/jobs/__init__.py
new file mode 100644
index 0000000..a467583
--- /dev/null
+++ b/selfprivacy_api/jobs/__init__.py
@@ -0,0 +1,184 @@
+"""
+Jobs controller. It handles the jobs that are created by the user.
+This is a singleton class holding the jobs list.
+Jobs can be added and removed.
+A single job can be updated.
+A job is a dictionary with the following keys:
+ - id: unique identifier of the job
+ - name: name of the job
+ - description: description of the job
+ - status: status of the job
+ - created_at: date of creation of the job
+ - updated_at: date of last update of the job
+ - finished_at: date of finish of the job
+ - error: error message if the job failed
+ - result: result of the job
+"""
+import typing
+import datetime
+import json
+import os
+import time
+import uuid
+from enum import Enum
+
+
+class JobStatus(Enum):
+ """
+ Status of a job.
+ """
+
+ CREATED = "CREATED"
+ RUNNING = "RUNNING"
+ FINISHED = "FINISHED"
+ ERROR = "ERROR"
+
+
+class Job:
+ """
+ Job class.
+ """
+
+ def __init__(
+ self,
+ name: str,
+ description: str,
+ status: JobStatus,
+ created_at: datetime.datetime,
+ updated_at: datetime.datetime,
+ finished_at: typing.Optional[datetime.datetime],
+ error: typing.Optional[str],
+ result: typing.Optional[str],
+ ):
+ self.id = str(uuid.uuid4())
+ self.name = name
+ self.description = description
+ self.status = status
+ self.created_at = created_at
+ self.updated_at = updated_at
+ self.finished_at = finished_at
+ self.error = error
+ self.result = result
+
+ def to_dict(self) -> dict:
+ """
+ Convert the job to a dictionary.
+ """
+ return {
+ "id": self.id,
+ "name": self.name,
+ "description": self.description,
+ "status": self.status,
+ "created_at": self.created_at,
+ "updated_at": self.updated_at,
+ "finished_at": self.finished_at,
+ "error": self.error,
+ "result": self.result,
+ }
+
+ def to_json(self) -> str:
+ """
+ Convert the job to a JSON string.
+ """
+ return json.dumps(self.to_dict())
+
+ def __str__(self) -> str:
+ """
+ Convert the job to a string.
+ """
+ return self.to_json()
+
+ def __repr__(self) -> str:
+ """
+ Convert the job to a string.
+ """
+ return self.to_json()
+
+
+class Jobs:
+ """
+ Jobs class.
+ """
+
+ __instance = None
+
+ @staticmethod
+ def get_instance():
+ """
+ Singleton method.
+ """
+ if Jobs.__instance is None:
+ Jobs()
+ return Jobs.__instance
+
+ def __init__(self):
+ """
+ Initialize the jobs list.
+ """
+ if Jobs.__instance is not None:
+ raise Exception("This class is a singleton!")
+ else:
+ Jobs.__instance = self
+ self.jobs = []
+
+ def add(
+ self, name: str, description: str, status: JobStatus = JobStatus.CREATED
+ ) -> Job:
+ """
+ Add a job to the jobs list.
+ """
+ job = Job(
+ name=name,
+ description=description,
+ status=status,
+ created_at=datetime.datetime.now(),
+ updated_at=datetime.datetime.now(),
+ finished_at=None,
+ error=None,
+ result=None,
+ )
+ self.jobs.append(job)
+ return job
+
+ def remove(self, job: Job) -> None:
+ """
+ Remove a job from the jobs list.
+ """
+ self.jobs.remove(job)
+
+ def update(
+ self,
+ job: Job,
+ name: typing.Optional[str],
+ description: typing.Optional[str],
+ status: JobStatus,
+ error: typing.Optional[str],
+ result: typing.Optional[str],
+ ) -> Job:
+ """
+ Update a job in the jobs list.
+ """
+ if name is not None:
+ job.name = name
+ if description is not None:
+ job.description = description
+ job.status = status
+ job.updated_at = datetime.datetime.now()
+ job.error = error
+ job.result = result
+ return job
+
+ def get_job(self, id: str) -> typing.Optional[Job]:
+ """
+ Get a job from the jobs list.
+ """
+ for job in self.jobs:
+ if job.id == id:
+ return job
+ return None
+
+ def get_jobs(self) -> list:
+ """
+ Get the jobs list.
+ """
+ return self.jobs
diff --git a/selfprivacy_api/migrations/__init__.py b/selfprivacy_api/migrations/__init__.py
index 4eeebab..2149e69 100644
--- a/selfprivacy_api/migrations/__init__.py
+++ b/selfprivacy_api/migrations/__init__.py
@@ -14,8 +14,14 @@ from selfprivacy_api.migrations.create_tokens_json import CreateTokensJson
from selfprivacy_api.migrations.migrate_to_selfprivacy_channel import (
MigrateToSelfprivacyChannel,
)
+from selfprivacy_api.migrations.mount_volume import MountVolume
-migrations = [FixNixosConfigBranch(), CreateTokensJson(), MigrateToSelfprivacyChannel()]
+migrations = [
+ FixNixosConfigBranch(),
+ CreateTokensJson(),
+ MigrateToSelfprivacyChannel(),
+ MountVolume(),
+]
def run_migrations():
diff --git a/selfprivacy_api/migrations/migrate_to_selfprivacy_channel.py b/selfprivacy_api/migrations/migrate_to_selfprivacy_channel.py
index 5f98f39..9bfd670 100644
--- a/selfprivacy_api/migrations/migrate_to_selfprivacy_channel.py
+++ b/selfprivacy_api/migrations/migrate_to_selfprivacy_channel.py
@@ -15,20 +15,16 @@ class MigrateToSelfprivacyChannel(Migration):
def is_migration_needed(self):
try:
- print("Checking if migration is needed")
output = subprocess.check_output(
["nix-channel", "--list"], start_new_session=True
)
output = output.decode("utf-8")
- print(output)
first_line = output.split("\n", maxsplit=1)[0]
- print(first_line)
return first_line.startswith("nixos") and (
first_line.endswith("nixos-21.11") or first_line.endswith("nixos-21.05")
)
except subprocess.CalledProcessError:
return False
- return False
def migrate(self):
# Change the channel and update them.
diff --git a/selfprivacy_api/migrations/mount_volume.py b/selfprivacy_api/migrations/mount_volume.py
new file mode 100644
index 0000000..27fba83
--- /dev/null
+++ b/selfprivacy_api/migrations/mount_volume.py
@@ -0,0 +1,51 @@
+import os
+import subprocess
+
+from selfprivacy_api.migrations.migration import Migration
+from selfprivacy_api.utils import ReadUserData, WriteUserData
+from selfprivacy_api.utils.block_devices import BlockDevices
+
+
+class MountVolume(Migration):
+ """Mount volume."""
+
+ def get_migration_name(self):
+ return "mount_volume"
+
+ def get_migration_description(self):
+ return "Mount volume if it is not mounted."
+
+ def is_migration_needed(self):
+ try:
+ with ReadUserData() as userdata:
+ return "volumes" not in userdata
+ except Exception as e:
+ print(e)
+ return False
+
+ def migrate(self):
+ # Get info about existing volumes
+ # Write info about volumes to userdata.json
+ try:
+ volumes = BlockDevices().get_block_devices()
+ # If there is an unmounted volume sdb,
+ # Write it to userdata.json
+ is_there_a_volume = False
+ for volume in volumes:
+ if volume.name == "sdb":
+ is_there_a_volume = True
+ break
+ with WriteUserData() as userdata:
+ userdata["volumes"] = []
+ if is_there_a_volume:
+ userdata["volumes"].append(
+ {
+ "device": "/dev/sdb",
+ "mountPoint": "/volumes/sdb",
+ "fsType": "ext4",
+ }
+ )
+ print("Done")
+ except Exception as e:
+ print(e)
+ print("Error mounting volume")
diff --git a/selfprivacy_api/services/nextcloud/__init__.py b/selfprivacy_api/services/nextcloud/__init__.py
new file mode 100644
index 0000000..525f657
--- /dev/null
+++ b/selfprivacy_api/services/nextcloud/__init__.py
@@ -0,0 +1,96 @@
+"""Class representing Nextcloud service."""
+import base64
+import subprocess
+import psutil
+from selfprivacy_api.services.service import Service, ServiceStatus
+from selfprivacy_api.utils import ReadUserData, WriteUserData
+
+
+class Nextcloud(Service):
+ """Class representing Nextcloud service."""
+
+ def get_id(self) -> str:
+ """Return service id."""
+ return "nextcloud"
+
+ def get_display_name(self) -> str:
+ """Return service display name."""
+ return "Nextcloud"
+
+ def get_description(self) -> str:
+ """Return service description."""
+ return "Nextcloud is a cloud storage service that offers a web interface and a desktop client."
+
+ def get_svg_icon(self) -> str:
+ """Read SVG icon from file and return it as base64 encoded string."""
+ with open("selfprivacy_api/services/nextcloud/nextcloud.svg", "rb") as f:
+ return base64.b64encode(f.read()).decode("utf-8")
+
+ def is_enabled(self) -> bool:
+ with ReadUserData() as user_data:
+ return user_data.get("nextcloud", {}).get("enable", False)
+
+ def get_status(self) -> ServiceStatus:
+ """
+ Return Nextcloud status from systemd.
+ Use command return code to determine status.
+
+ Return code 0 means service is running.
+ Return code 1 or 2 means service is in error stat.
+ Return code 3 means service is stopped.
+ Return code 4 means service is off.
+ """
+ service_status = subprocess.Popen(
+ ["systemctl", "status", "phpfpm-nextcloud.service"]
+ )
+ service_status.communicate()[0]
+ if service_status.returncode == 0:
+ return ServiceStatus.RUNNING
+ elif service_status.returncode == 1 or service_status.returncode == 2:
+ return ServiceStatus.ERROR
+ elif service_status.returncode == 3:
+ return ServiceStatus.STOPPED
+ elif service_status.returncode == 4:
+ return ServiceStatus.OFF
+ else:
+ return ServiceStatus.DEGRADED
+
+ def enable(self):
+ """Enable Nextcloud service."""
+ with WriteUserData() as user_data:
+ if "nextcloud" not in user_data:
+ user_data["nextcloud"] = {}
+ user_data["nextcloud"]["enable"] = True
+
+ def disable(self):
+ """Disable Nextcloud service."""
+ with WriteUserData() as user_data:
+ if "nextcloud" not in user_data:
+ user_data["nextcloud"] = {}
+ user_data["nextcloud"]["enable"] = False
+
+ def stop(self):
+ """Stop Nextcloud service."""
+ subprocess.Popen(["systemctl", "stop", "phpfpm-nextcloud.service"])
+
+ def start(self):
+ """Start Nextcloud service."""
+ subprocess.Popen(["systemctl", "start", "phpfpm-nextcloud.service"])
+
+ def restart(self):
+ """Restart Nextcloud service."""
+ subprocess.Popen(["systemctl", "restart", "phpfpm-nextcloud.service"])
+
+ def get_configuration(self) -> dict:
+ """Return Nextcloud configuration."""
+ return {}
+
+ def set_configuration(self, config_items):
+ return super().set_configuration(config_items)
+
+ def get_logs(self):
+ """Return Nextcloud logs."""
+ return ""
+
+ def get_storage_usage(self):
+ return psutil.disk_usage("/var/lib/nextcloud").used
diff --git a/selfprivacy_api/services/nextcloud/nextcloud.svg b/selfprivacy_api/services/nextcloud/nextcloud.svg
new file mode 100644
index 0000000..d7dbcb5
--- /dev/null
+++ b/selfprivacy_api/services/nextcloud/nextcloud.svg
@@ -0,0 +1,10 @@
+
diff --git a/selfprivacy_api/services/service.py b/selfprivacy_api/services/service.py
new file mode 100644
index 0000000..a0e6ae6
--- /dev/null
+++ b/selfprivacy_api/services/service.py
@@ -0,0 +1,93 @@
+"""Abstract class for a service running on a server"""
+from abc import ABC, abstractmethod
+from enum import Enum
+import typing
+
+
+class ServiceStatus(Enum):
+ """Enum for service status"""
+
+ RUNNING = "RUNNING"
+ DEGRADED = "DEGRADED"
+ ERROR = "ERROR"
+ STOPPED = "STOPPED"
+ OFF = "OFF"
+
+
+class ServiceDnsRecord:
+ type: str
+ name: str
+ content: str
+ ttl: int
+ priority: typing.Optional[int]
+
+
+class Service(ABC):
+ """
+ Service here is some software that is hosted on the server and
+ can be installed, configured and used by a user.
+ """
+
+ @abstractmethod
+ def get_id(self) -> str:
+ pass
+
+ @abstractmethod
+ def get_display_name(self) -> str:
+ pass
+
+ @abstractmethod
+ def get_description(self) -> str:
+ pass
+
+ @abstractmethod
+ def get_svg_icon(self) -> str:
+ pass
+
+ @abstractmethod
+ def is_enabled(self) -> bool:
+ pass
+
+ @abstractmethod
+ def get_status(self) -> ServiceStatus:
+ pass
+
+ @abstractmethod
+ def enable(self):
+ pass
+
+ @abstractmethod
+ def disable(self):
+ pass
+
+ @abstractmethod
+ def stop(self):
+ pass
+
+ @abstractmethod
+ def start(self):
+ pass
+
+ @abstractmethod
+ def restart(self):
+ pass
+
+ @abstractmethod
+ def get_configuration(self):
+ pass
+
+ @abstractmethod
+ def set_configuration(self, config_items):
+ pass
+
+ @abstractmethod
+ def get_logs(self):
+ pass
+
+ @abstractmethod
+ def get_storage_usage(self):
+ pass
+
+ @abstractmethod
+ def get_dns_records(self) -> typing.List[ServiceDnsRecord]:
+ pass
diff --git a/selfprivacy_api/utils/__init__.py b/selfprivacy_api/utils/__init__.py
index cc93622..8ab26d1 100644
--- a/selfprivacy_api/utils/__init__.py
+++ b/selfprivacy_api/utils/__init__.py
@@ -65,7 +65,7 @@ class ReadUserData(object):
portalocker.lock(self.userdata_file, portalocker.LOCK_SH)
self.data = json.load(self.userdata_file)
- def __enter__(self):
+ def __enter__(self) -> dict:
return self.data
def __exit__(self, *args):
diff --git a/selfprivacy_api/utils/block_devices.py b/selfprivacy_api/utils/block_devices.py
new file mode 100644
index 0000000..e6adddc
--- /dev/null
+++ b/selfprivacy_api/utils/block_devices.py
@@ -0,0 +1,224 @@
+"""Wrapper for block device functions."""
+import subprocess
+import json
+import typing
+
+from selfprivacy_api.utils import WriteUserData
+
+
+def get_block_device(device_name):
+ """
+ Return a block device by name.
+ """
+ lsblk_output = subprocess.check_output(
+ [
+ "lsblk",
+ "-J",
+ "-b",
+ "-o",
+ "NAME,PATH,FSAVAIL,FSSIZE,FSTYPE,FSUSED,MOUNTPOINT,LABEL,UUID,SIZE, MODEL,SERIAL,TYPE",
+ device_name,
+ ]
+ )
+ lsblk_output = lsblk_output.decode("utf-8")
+ lsblk_output = json.loads(lsblk_output)
+ return lsblk_output["blockdevices"]
+
+
+def resize_block_device(block_device) -> bool:
+ """
+ Resize a block device. Return True if successful.
+ """
+ resize_command = ["resize2fs", block_device]
+ resize_process = subprocess.Popen(resize_command, shell=False)
+ resize_process.communicate()
+ return resize_process.returncode == 0
+
+
+class BlockDevice:
+ """
+ A block device.
+ """
+
+ def __init__(self, block_device):
+ self.name = block_device["name"]
+ self.path = block_device["path"]
+ self.fsavail = block_device["fsavail"]
+ self.fssize = block_device["fssize"]
+ self.fstype = block_device["fstype"]
+ self.fsused = block_device["fsused"]
+ self.mountpoint = block_device["mountpoint"]
+ self.label = block_device["label"]
+ self.uuid = block_device["uuid"]
+ self.size = block_device["size"]
+ self.model = block_device["model"]
+ self.serial = block_device["serial"]
+ self.type = block_device["type"]
+ self.locked = False
+
+ def __str__(self):
+ return self.name
+
+ def __repr__(self):
+ return f""
+
+ def __eq__(self, other):
+ return self.name == other.name
+
+ def __hash__(self):
+ return hash(self.name)
+
+ def stats(self) -> typing.Dict[str, typing.Any]:
+ """
+ Update current data and return a dictionary of stats.
+ """
+ device = get_block_device(self.name)
+ self.fsavail = device["fsavail"]
+ self.fssize = device["fssize"]
+ self.fstype = device["fstype"]
+ self.fsused = device["fsused"]
+ self.mountpoint = device["mountpoint"]
+ self.label = device["label"]
+ self.uuid = device["uuid"]
+ self.size = device["size"]
+ self.model = device["model"]
+ self.serial = device["serial"]
+ self.type = device["type"]
+
+ return {
+ "name": self.name,
+ "path": self.path,
+ "fsavail": self.fsavail,
+ "fssize": self.fssize,
+ "fstype": self.fstype,
+ "fsused": self.fsused,
+ "mountpoint": self.mountpoint,
+ "label": self.label,
+ "uuid": self.uuid,
+ "size": self.size,
+ "model": self.model,
+ "serial": self.serial,
+ "type": self.type,
+ }
+
+ def resize(self):
+ """
+ Resize the block device.
+ """
+ if not self.locked:
+ self.locked = True
+ resize_block_device(self.path)
+ self.locked = False
+
+ def mount(self) -> bool:
+ """
+ Mount the block device.
+ """
+ with WriteUserData() as user_data:
+ if "volumes" not in user_data:
+ user_data["volumes"] = []
+ # Check if the volume is already mounted
+ for volume in user_data["volumes"]:
+ if volume["device"] == self.path:
+ return False
+ user_data["volumes"].append(
+ {
+ "device": self.path,
+ "mountPoint": f"/volumes/{self.name}",
+ "fsType": self.fstype,
+ }
+ )
+ return True
+
+ def unmount(self) -> bool:
+ """
+ Unmount the block device.
+ """
+ with WriteUserData() as user_data:
+ if "volumes" not in user_data:
+ user_data["volumes"] = []
+ # Check if the volume is already mounted
+ for volume in user_data["volumes"]:
+ if volume["device"] == self.path:
+ user_data["volumes"].remove(volume)
+ return True
+ return False
+
+
+class BlockDevices:
+ """Singleton holding all Block devices"""
+
+ _instance = None
+
+ def __new__(cls, *args, **kwargs):
+ if not cls._instance:
+ cls._instance = super().__new__(cls)
+ return cls._instance
+
+ def __init__(self):
+ self.block_devices = []
+ self.update()
+
+ def update(self) -> None:
+ """
+ Update the list of block devices.
+ """
+ devices = []
+ lsblk_output = subprocess.check_output(
+ [
+ "lsblk",
+ "-J",
+ "-b",
+ "-o",
+ "NAME,PATH,FSAVAIL,FSSIZE,FSTYPE,FSUSED,MOUNTPOINT,LABEL,UUID,SIZE,MODEL,SERIAL,TYPE",
+ ]
+ )
+ lsblk_output = lsblk_output.decode("utf-8")
+ lsblk_output = json.loads(lsblk_output)
+ for device in lsblk_output["blockdevices"]:
+ # Ignore devices with type "rom"
+ if device["type"] == "rom":
+ continue
+ if device["fstype"] is None:
+ if "children" in device:
+ for child in device["children"]:
+ if child["fstype"] == "ext4":
+ device = child
+ break
+ devices.append(device)
+ # Add new devices and delete non-existent devices
+ for device in devices:
+ if device["name"] not in [
+ block_device.name for block_device in self.block_devices
+ ]:
+ self.block_devices.append(BlockDevice(device))
+ for block_device in self.block_devices:
+ if block_device.name not in [device["name"] for device in devices]:
+ self.block_devices.remove(block_device)
+
+ def get_block_device(self, name: str) -> typing.Optional[BlockDevice]:
+ """
+ Return a block device by name.
+ """
+ for block_device in self.block_devices:
+ if block_device.name == name:
+ return block_device
+ return None
+
+ def get_block_devices(self) -> typing.List[BlockDevice]:
+ """
+ Return a list of block devices.
+ """
+ return self.block_devices
+
+ def get_block_devices_by_mountpoint(
+ self, mountpoint: str
+ ) -> typing.List[BlockDevice]:
+ """
+ Return a list of block devices with a given mountpoint.
+ """
+ block_devices = []
+ for block_device in self.block_devices:
+ if block_device.mountpoint == mountpoint:
+ block_devices.append(block_device)
+ return block_devices
diff --git a/shell.nix b/shell.nix
index 2735de1..1f5b25c 100644
--- a/shell.nix
+++ b/shell.nix
@@ -19,6 +19,8 @@ let
pydantic
typing-extensions
flask-cors
+ psutil
+ black
(buildPythonPackage rec {
pname = "strawberry-graphql";
version = "0.114.5";
diff --git a/tests/test_graphql/test_system.py b/tests/test_graphql/_test_system.py
similarity index 100%
rename from tests/test_graphql/test_system.py
rename to tests/test_graphql/_test_system.py
diff --git a/tests/test_graphql/test_system_nixos_tasks.py b/tests/test_graphql/_test_system_nixos_tasks.py
similarity index 100%
rename from tests/test_graphql/test_system_nixos_tasks.py
rename to tests/test_graphql/_test_system_nixos_tasks.py