selfprivacy-rest-api/selfprivacy_api/services/moving.py

117 lines
3.7 KiB
Python

"""Generic handler for moving services"""
from __future__ import annotations
import subprocess
import pathlib
import shutil
from typing import List
from selfprivacy_api.jobs import Job, report_progress
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.owned_path import OwnedPath
class MoveError(Exception):
"""Move failed"""
def get_foldername(p: OwnedPath) -> str:
return p.path.split("/")[-1]
def location_at_volume(binding_path: OwnedPath, volume_name: str):
return f"/volumes/{volume_name}/{get_foldername(binding_path)}"
def check_volume(volume: BlockDevice, space_needed: int) -> None:
# Check if there is enough space on the new volume
if int(volume.fsavail) < space_needed:
raise MoveError("Not enough space on the new volume.")
# Make sure the volume is mounted
if not volume.is_root() and f"/volumes/{volume.name}" not in volume.mountpoints:
raise MoveError("Volume is not mounted.")
def check_folders(volume_name: str, folders: List[OwnedPath]) -> None:
# Make sure current actual directory exists and if its user and group are correct
for folder in folders:
path = pathlib.Path(location_at_volume(folder, volume_name))
if not path.exists():
raise MoveError(f"directory {path} is not found.")
if not path.is_dir():
raise MoveError(f"{path} is not a directory.")
if path.owner() != folder.owner:
raise MoveError(f"{path} is not owned by {folder.owner}.")
def unbind_folders(owned_folders: List[OwnedPath]) -> None:
for folder in owned_folders:
try:
subprocess.run(
["umount", folder.path],
check=True,
)
except subprocess.CalledProcessError:
raise MoveError(f"Unable to unmount folder {folder.path}.")
def move_folders_to_volume(
folders: List[OwnedPath],
old_volume_name: str, # TODO: pass an actual validated block device
new_volume: BlockDevice,
job: Job,
) -> None:
current_progress = job.progress
if current_progress is None:
current_progress = 0
progress_per_folder = 50 // len(folders)
for folder in folders:
shutil.move(
location_at_volume(folder, old_volume_name),
location_at_volume(folder, new_volume.name),
)
progress = current_progress + progress_per_folder
report_progress(progress, job, "Moving data to new volume...")
def ensure_folder_ownership(folders: List[OwnedPath], volume: BlockDevice) -> None:
for folder in folders:
true_location = location_at_volume(folder, volume.name)
try:
subprocess.run(
[
"chown",
"-R",
f"{folder.owner}:{folder.group}",
# Could we just chown the binded location instead?
true_location,
],
check=True,
)
except subprocess.CalledProcessError as error:
print(error.output)
error_message = (
f"Unable to set ownership of {true_location} :{error.output}"
)
raise MoveError(error_message)
def bind_folders(folders: List[OwnedPath], volume: BlockDevice) -> None:
for folder in folders:
try:
subprocess.run(
[
"mount",
"--bind",
location_at_volume(folder, volume.name),
folder.path,
],
check=True,
)
except subprocess.CalledProcessError as error:
print(error.output)
raise MoveError(f"Unable to mount new volume:{error.output}")