selfprivacy-rest-api/tests/test_graphql/test_backup.py

282 lines
7.9 KiB
Python

import pytest
import os.path as path
from os import makedirs
from os import remove
from os import listdir
from datetime import datetime, timedelta, timezone
from selfprivacy_api.services.test_service import DummyService
from selfprivacy_api.graphql.queries.providers import BackupProvider
from selfprivacy_api.backup import Backups
import selfprivacy_api.backup.providers as providers
from selfprivacy_api.backup.providers import AbstractBackupProvider
from selfprivacy_api.backup.providers.backblaze import Backblaze
from selfprivacy_api.backup.tasks import start_backup
from selfprivacy_api.backup.storage import Storage
TESTFILE_BODY = "testytest!"
REPO_NAME = "test_backup"
@pytest.fixture(scope="function")
def backups(tmpdir):
Backups.reset()
test_repo_path = path.join(tmpdir, "totallyunrelated")
Backups.set_localfile_repo(test_repo_path)
@pytest.fixture()
def backups_backblaze(generic_userdata):
Backups.reset()
@pytest.fixture()
def raw_dummy_service(tmpdir, backups):
service_dir = path.join(tmpdir, "test_service")
makedirs(service_dir)
testfile_path = path.join(service_dir, "testfile.txt")
with open(testfile_path, "w") as file:
file.write(TESTFILE_BODY)
# we need this to not change get_location() much
class TestDummyService(DummyService, location=service_dir):
pass
service = TestDummyService()
return service
@pytest.fixture()
def dummy_service(tmpdir, backups, raw_dummy_service):
service = raw_dummy_service
repo_path = path.join(tmpdir, "test_repo")
assert not path.exists(repo_path)
# assert not repo_path
Backups.init_repo(service)
return service
@pytest.fixture()
def memory_backup() -> AbstractBackupProvider:
ProviderClass = providers.get_provider(BackupProvider.MEMORY)
assert ProviderClass is not None
memory_provider = ProviderClass(login="", key="")
assert memory_provider is not None
return memory_provider
@pytest.fixture()
def file_backup(tmpdir) -> AbstractBackupProvider:
test_repo_path = path.join(tmpdir, "test_repo")
ProviderClass = providers.get_provider(BackupProvider.FILE)
assert ProviderClass is not None
provider = ProviderClass(test_repo_path)
assert provider is not None
return provider
def test_config_load(generic_userdata):
Backups.reset()
provider = Backups.provider()
assert provider is not None
assert isinstance(provider, Backblaze)
assert provider.login == "ID"
assert provider.key == "KEY"
def test_select_backend():
provider = providers.get_provider(BackupProvider.BACKBLAZE)
assert provider is not None
assert provider == Backblaze
def test_file_backend_init(file_backup):
file_backup.backuper.init("somerepo")
def test_backup_simple_file(raw_dummy_service, file_backup):
# temporarily incomplete
service = raw_dummy_service
assert service is not None
assert file_backup is not None
name = service.get_id()
file_backup.backuper.init(name)
def test_backup_service(dummy_service, backups):
assert Backups.get_last_backed_up(dummy_service) is None
Backups.back_up(dummy_service)
now = datetime.now(timezone.utc)
date = Backups.get_last_backed_up(dummy_service)
assert date is not None
assert now > date
assert now - date < timedelta(minutes=1)
def test_no_repo(memory_backup):
with pytest.raises(ValueError):
assert memory_backup.backuper.get_snapshots("") == []
def test_one_snapshot(backups, dummy_service):
Backups.back_up(dummy_service)
snaps = Backups.get_snapshots(dummy_service)
assert len(snaps) == 1
snap = snaps[0]
assert snap.service_name == dummy_service.get_id()
def test_backup_returns_snapshot(backups, dummy_service):
service_folder = dummy_service.get_location()
provider = Backups.provider()
name = dummy_service.get_id()
snapshot = provider.backuper.start_backup(service_folder, name)
assert snapshot.id is not None
assert snapshot.service_name == name
assert snapshot.created_at is not None
def test_restore(backups, dummy_service):
service_folder = dummy_service.get_location()
file_to_nuke = listdir(service_folder)[0]
assert file_to_nuke is not None
path_to_nuke = path.join(service_folder, file_to_nuke)
Backups.back_up(dummy_service)
snap = Backups.get_snapshots(dummy_service)[0]
assert snap is not None
assert path.exists(path_to_nuke)
remove(path_to_nuke)
assert not path.exists(path_to_nuke)
Backups.restore_service_from_snapshot(dummy_service, snap.id)
assert path.exists(path_to_nuke)
def test_sizing(backups, dummy_service):
Backups.back_up(dummy_service)
snap = Backups.get_snapshots(dummy_service)[0]
size = Backups.service_snapshot_size(dummy_service, snap.id)
assert size is not None
assert size > 0
def test_init_tracking(backups, raw_dummy_service):
assert Backups.is_initted(raw_dummy_service) is False
Backups.init_repo(raw_dummy_service)
assert Backups.is_initted(raw_dummy_service) is True
def test_backup_service_task(backups, dummy_service):
handle = start_backup(dummy_service)
handle(blocking=True)
snaps = Backups.get_snapshots(dummy_service)
assert len(snaps) == 1
def test_autobackup_enable_service(backups, dummy_service):
assert not Backups.is_autobackup_enabled(dummy_service)
Backups.enable_autobackup(dummy_service)
assert Backups.is_autobackup_enabled(dummy_service)
Backups.disable_autobackup(dummy_service)
assert not Backups.is_autobackup_enabled(dummy_service)
def test_set_autobackup_period(backups):
assert Backups.autobackup_period_minutes() is None
Backups.set_autobackup_period_minutes(2)
assert Backups.autobackup_period_minutes() == 2
Backups.disable_all_autobackup()
assert Backups.autobackup_period_minutes() is None
Backups.set_autobackup_period_minutes(3)
assert Backups.autobackup_period_minutes() == 3
Backups.set_autobackup_period_minutes(0)
assert Backups.autobackup_period_minutes() is None
Backups.set_autobackup_period_minutes(3)
assert Backups.autobackup_period_minutes() == 3
Backups.set_autobackup_period_minutes(-1)
assert Backups.autobackup_period_minutes() is None
# Storage
def test_snapshots_caching(backups, dummy_service):
Backups.back_up(dummy_service)
# we test indirectly that we do redis calls instead of shell calls
start = datetime.now()
for i in range(10):
snapshots = Backups.get_snapshots(dummy_service)
assert len(snapshots) == 1
assert datetime.now() - start < timedelta(seconds=0.5)
cached_snapshots = Storage.get_cached_snapshots()
assert len(cached_snapshots) == 1
Storage.delete_cached_snapshot(cached_snapshots[0])
cached_snapshots = Storage.get_cached_snapshots()
assert len(cached_snapshots) == 0
snapshots = Backups.get_snapshots(dummy_service)
assert len(snapshots) == 1
cached_snapshots = Storage.get_cached_snapshots()
assert len(cached_snapshots) == 1
# Storage
def test_init_tracking_caching(backups, raw_dummy_service):
assert Storage.has_init_mark(raw_dummy_service) is False
Storage.mark_as_init(raw_dummy_service)
assert Storage.has_init_mark(raw_dummy_service) is True
assert Backups.is_initted(raw_dummy_service) is True
# Storage
def test_init_tracking_caching2(backups, raw_dummy_service):
assert Storage.has_init_mark(raw_dummy_service) is False
Backups.init_repo(raw_dummy_service)
assert Storage.has_init_mark(raw_dummy_service) is True
# Storage
def test_provider_storage(backups_backblaze):
Backups.reset()
provider = Backups.provider()
assert provider is not None
assert isinstance(provider, Backblaze)
assert provider.login == "ID"
assert provider.key == "KEY"
Storage.store_provider(provider)
restored_provider = Backups.load_provider_redis()
assert isinstance(restored_provider, Backblaze)
assert restored_provider.login == "ID"
assert restored_provider.key == "KEY"