selfprivacy-rest-api/selfprivacy_api/backup/jobs.py

57 lines
1.6 KiB
Python

from typing import Optional, List
from selfprivacy_api.jobs import Jobs, Job, JobStatus
from selfprivacy_api.services.service import Service
def job_type_prefix(service: Service) -> str:
return f"services.{service.get_id()}"
def backup_job_type(service: Service) -> str:
return f"{job_type_prefix(service)}.backup"
def get_jobs_by_service(service: Service) -> List[Job]:
result = []
for job in Jobs.get_jobs():
if job.type_id.startswith(job_type_prefix(service)) and job.status in [
JobStatus.CREATED,
JobStatus.RUNNING,
]:
result.append(job)
return result
def is_something_queued_for(service: Service) -> bool:
return len(get_jobs_by_service(service)) != 0
def add_backup_job(service: Service) -> Job:
if is_something_queued_for(service):
message = (
f"Cannot start a backup of {service.get_id()}, another operation is queued: "
+ get_jobs_by_service(service)[0].type_id
)
raise ValueError(message)
display_name = service.get_display_name()
job = Jobs.add(
type_id=backup_job_type(service),
name=f"Backup {display_name}",
description=f"Backing up {display_name}",
)
return job
def get_job_by_type(type_id: str) -> Optional[Job]:
for job in Jobs.get_jobs():
if job.type_id == type_id and job.status in [
JobStatus.CREATED,
JobStatus.RUNNING,
]:
return job
def get_backup_job(service: Service) -> Optional[Job]:
return get_job_by_type(backup_job_type(service))