refactor(service_mover): decompose the giant move_service

pull/88/head
Houkime 2024-01-29 16:51:35 +00:00
parent 0309e6b76e
commit 2863dd9763
1 changed files with 115 additions and 148 deletions

View File

@ -2,18 +2,24 @@
from __future__ import annotations from __future__ import annotations
import subprocess import subprocess
import time
import pathlib import pathlib
import shutil import shutil
from typing import List
from pydantic import BaseModel from pydantic import BaseModel
from selfprivacy_api.jobs import Job, JobStatus, Jobs from selfprivacy_api.jobs import Job, JobStatus, Jobs
from selfprivacy_api.utils.huey import huey from selfprivacy_api.utils.huey import huey
from selfprivacy_api.utils.block_devices import BlockDevice from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.utils import ReadUserData, WriteUserData from selfprivacy_api.utils import ReadUserData, WriteUserData
from selfprivacy_api.services.service import Service, ServiceStatus from selfprivacy_api.services.service import Service
from selfprivacy_api.services.owned_path import OwnedPath from selfprivacy_api.services.owned_path import OwnedPath
from selfprivacy_api.services.service import StoppedService
class MoveError(Exception):
"""Move failed"""
class FolderMoveNames(BaseModel): class FolderMoveNames(BaseModel):
name: str name: str
@ -45,110 +51,94 @@ class FolderMoveNames(BaseModel):
@huey.task() @huey.task()
def move_service( def move_service(
service: Service, service: Service,
volume: BlockDevice, new_volume: BlockDevice,
job: Job, job: Job,
folder_names: list[FolderMoveNames], folder_names: List[FolderMoveNames],
userdata_location: str, userdata_location: str = None, # deprecated, not used
): ):
"""Move a service to another volume.""" """
job = Jobs.update( Move a service to another volume.
job=job, Is not allowed to raise errors because it is a task.
status_text="Performing pre-move checks...", """
status=JobStatus.RUNNING,
)
service_name = service.get_display_name() service_name = service.get_display_name()
with ReadUserData() as user_data: old_volume = service.get_drive()
if not user_data.get("useBinds", False): report_progress(0, job, "Performing pre-move checks...")
try:
with ReadUserData() as user_data:
if not user_data.get("useBinds", False):
raise MoveError("Server is not using binds.")
check_volume(new_volume, service)
check_folders(old_volume, folder_names)
report_progress(5, job, f"Stopping {service_name}...")
with StoppedService(service):
report_progress(10, job, "Unmounting folders from old volume...")
unmount_old_volume(folder_names)
report_progress(20, job, "Moving data to new volume...")
move_folders_to_volume(folder_names, old_volume, new_volume, job)
report_progress(70, job, f"Making sure {service_name} owns its files...")
chown_folders(folder_names, new_volume, job, service)
report_progress(90, job, f"Mounting {service_name} data...")
mount_folders(folder_names, new_volume)
report_progress(95, job, f"Finishing moving {service_name}...")
update_volume_in_userdata(service, new_volume)
Jobs.update( Jobs.update(
job=job, job=job,
status=JobStatus.ERROR, status=JobStatus.FINISHED,
error="Server is not using binds.", result=f"{service_name} moved successfully.",
status_text=f"Starting {service_name}...",
progress=100,
) )
return except Exception as e:
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=type(e).__name__ + " " + str(e),
)
def check_volume(new_volume: BlockDevice, service: Service) -> bool:
service_name = service.get_display_name()
old_volume_name: str = service.get_drive()
# Check if we are on the same volume # Check if we are on the same volume
old_volume = service.get_drive() if old_volume_name == new_volume.name:
if old_volume == volume.name: raise MoveError(f"{service_name} is already on volume {new_volume}")
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=f"{service_name} is already on this volume.",
)
return
# Check if there is enough space on the new volume # Check if there is enough space on the new volume
if int(volume.fsavail) < service.get_storage_usage(): if int(new_volume.fsavail) < service.get_storage_usage():
Jobs.update( raise MoveError("Not enough space on the new volume.")
job=job,
status=JobStatus.ERROR,
error="Not enough space on the new volume.",
)
return
# Make sure the volume is mounted # Make sure the volume is mounted
if not volume.is_root() and f"/volumes/{volume.name}" not in volume.mountpoints: if (
Jobs.update( not new_volume.is_root()
job=job, and f"/volumes/{new_volume.name}" not in new_volume.mountpoints
status=JobStatus.ERROR, ):
error="Volume is not mounted.", raise MoveError("Volume is not mounted.")
)
return
def check_folders(old_volume: BlockDevice, folder_names: List[FolderMoveNames]) -> None:
# Make sure current actual directory exists and if its user and group are correct # Make sure current actual directory exists and if its user and group are correct
for folder in folder_names: for folder in folder_names:
if not pathlib.Path(f"/volumes/{old_volume}/{folder.name}").exists(): path = pathlib.Path(f"/volumes/{old_volume}/{folder.name}")
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=f"{service_name} is not found.",
)
return
if not pathlib.Path(f"/volumes/{old_volume}/{folder.name}").is_dir():
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=f"{service_name} is not a directory.",
)
return
if (
not pathlib.Path(f"/volumes/{old_volume}/{folder.name}").owner()
== folder.owner
):
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=f"{service_name} owner is not {folder.owner}.",
)
return
# Stop service if not path.exists():
Jobs.update( raise MoveError(f"{path} is not found.")
job=job, if not path.is_dir():
status=JobStatus.RUNNING, raise MoveError(f"{path} is not a directory.")
status_text=f"Stopping {service_name}...", if path.owner() != folder.owner:
progress=5, raise MoveError(f"{path} owner is not {folder.owner}.")
)
service.stop()
# Wait for the service to stop, check every second
# If it does not stop in 30 seconds, abort
for _ in range(30):
if service.get_status() not in (
ServiceStatus.ACTIVATING,
ServiceStatus.DEACTIVATING,
):
break
time.sleep(1)
else:
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=f"{service_name} did not stop in 30 seconds.",
)
return
# Unmount old volume
Jobs.update( def unmount_old_volume(folder_names: List[FolderMoveNames]) -> None:
job=job,
status_text="Unmounting old folder...",
status=JobStatus.RUNNING,
progress=10,
)
for folder in folder_names: for folder in folder_names:
try: try:
subprocess.run( subprocess.run(
@ -156,39 +146,31 @@ def move_service(
check=True, check=True,
) )
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
Jobs.update( raise MoveError("Unable to unmount old volume.")
job=job,
status=JobStatus.ERROR,
error="Unable to unmount old volume.", def move_folders_to_volume(
) folder_names: List[FolderMoveNames],
return old_volume: BlockDevice,
new_volume: BlockDevice,
job: Job,
) -> None:
# Move data to new volume and set correct permissions # Move data to new volume and set correct permissions
Jobs.update( current_progress = job.progress
job=job,
status_text="Moving data to new volume...",
status=JobStatus.RUNNING,
progress=20,
)
current_progress = 20
folder_percentage = 50 // len(folder_names) folder_percentage = 50 // len(folder_names)
for folder in folder_names: for folder in folder_names:
shutil.move( shutil.move(
f"/volumes/{old_volume}/{folder.name}", f"/volumes/{old_volume}/{folder.name}",
f"/volumes/{volume.name}/{folder.name}", f"/volumes/{new_volume.name}/{folder.name}",
)
Jobs.update(
job=job,
status_text="Moving data to new volume...",
status=JobStatus.RUNNING,
progress=current_progress + folder_percentage,
) )
progress = current_progress + folder_percentage
report_progress(progress, job, "Moving data to new volume...")
Jobs.update(
job=job, def chown_folders(
status_text=f"Making sure {service_name} owns its files...", folder_names: List[FolderMoveNames], volume: BlockDevice, job: Job, service: Service
status=JobStatus.RUNNING, ) -> None:
progress=70, service_name = service.get_display_name()
)
for folder in folder_names: for folder in folder_names:
try: try:
subprocess.run( subprocess.run(
@ -208,14 +190,8 @@ def move_service(
error=f"Unable to set ownership of new volume. {service_name} may not be able to access its files. Continuing anyway.", error=f"Unable to set ownership of new volume. {service_name} may not be able to access its files. Continuing anyway.",
) )
# Mount new volume
Jobs.update(
job=job,
status_text=f"Mounting {service_name} data...",
status=JobStatus.RUNNING,
progress=90,
)
def mount_folders(folder_names: List[FolderMoveNames], volume: BlockDevice) -> None:
for folder in folder_names: for folder in folder_names:
try: try:
subprocess.run( subprocess.run(
@ -229,32 +205,23 @@ def move_service(
) )
except subprocess.CalledProcessError as error: except subprocess.CalledProcessError as error:
print(error.output) print(error.output)
Jobs.update( raise MoveError(f"Unable to mount new volume:{error.output}")
job=job,
status=JobStatus.ERROR,
error="Unable to mount new volume.",
)
return
# Update userdata
Jobs.update( def update_volume_in_userdata(service: Service, volume: BlockDevice):
job=job,
status_text="Finishing move...",
status=JobStatus.RUNNING,
progress=95,
)
with WriteUserData() as user_data: with WriteUserData() as user_data:
service_id = service.get_id()
if "modules" not in user_data: if "modules" not in user_data:
user_data["modules"] = {} user_data["modules"] = {}
if userdata_location not in user_data["modules"]: if service_id not in user_data["modules"]:
user_data["modules"][userdata_location] = {} user_data["modules"][service_id] = {}
user_data["modules"][userdata_location]["location"] = volume.name user_data["modules"][service_id]["location"] = volume.name
# Start service
service.start()
def report_progress(progress: int, job: Job, status_text: str) -> None:
Jobs.update( Jobs.update(
job=job, job=job,
status=JobStatus.FINISHED, status=JobStatus.RUNNING,
result=f"{service_name} moved successfully.", status_text=status_text,
status_text=f"Starting {service_name}...", progress=progress,
progress=100,
) )