fix(backups): consider failing services MORE and not try to stop them

pull/57/head
Houkime 2023-08-23 13:39:12 +00:00
parent 36e915907f
commit 0a852d8b50
3 changed files with 18 additions and 4 deletions

View File

@ -283,7 +283,7 @@ class StoppedService:
def __enter__(self) -> Service: def __enter__(self) -> Service:
self.original_status = self.service.get_status() self.original_status = self.service.get_status()
if self.original_status != ServiceStatus.INACTIVE: if self.original_status not in [ServiceStatus.INACTIVE, ServiceStatus.FAILED]:
self.service.stop() self.service.stop()
wait_until_true( wait_until_true(
lambda: self.service.get_status() == ServiceStatus.INACTIVE, lambda: self.service.get_status() == ServiceStatus.INACTIVE,

View File

@ -135,8 +135,12 @@ class DummyService(Service):
@classmethod @classmethod
def stop(cls): def stop(cls):
cls.set_status(ServiceStatus.DEACTIVATING) # simulate a failing service unable to stop
cls.change_status_with_async_delay(ServiceStatus.INACTIVE, cls.startstop_delay) if not cls.get_status() == ServiceStatus.FAILED:
cls.set_status(ServiceStatus.DEACTIVATING)
cls.change_status_with_async_delay(
ServiceStatus.INACTIVE, cls.startstop_delay
)
@classmethod @classmethod
def start(cls): def start(cls):

View File

@ -10,6 +10,7 @@ from subprocess import Popen
import selfprivacy_api.services as services import selfprivacy_api.services as services
from selfprivacy_api.services import Service, get_all_services from selfprivacy_api.services import Service, get_all_services
from selfprivacy_api.services.service import ServiceStatus
from selfprivacy_api.services import get_service_by_id from selfprivacy_api.services import get_service_by_id
from selfprivacy_api.services.test_service import DummyService from selfprivacy_api.services.test_service import DummyService
@ -462,10 +463,19 @@ def restore_strategy(request) -> RestoreStrategy:
return RestoreStrategy.INPLACE return RestoreStrategy.INPLACE
@pytest.fixture(params=["failed", "healthy"])
def failed(request) -> bool:
if request.param == "failed":
return True
return False
def test_restore_snapshot_task( def test_restore_snapshot_task(
backups, dummy_service, restore_strategy, simulated_service_stopping_delay backups, dummy_service, restore_strategy, simulated_service_stopping_delay, failed
): ):
dummy_service.set_delay(simulated_service_stopping_delay) dummy_service.set_delay(simulated_service_stopping_delay)
if failed:
dummy_service.set_status(ServiceStatus.FAILED)
Backups.back_up(dummy_service) Backups.back_up(dummy_service)
snaps = Backups.get_snapshots(dummy_service) snaps = Backups.get_snapshots(dummy_service)