Merge pull request 'Forgetting' (#46) from backups-forget into master
continuous-integration/drone/push Build is failing Details

Reviewed-on: #46
Reviewed-by: Inex Code <inex.code@selfprivacy.org>
pull/52/head
Inex Code 2023-08-14 05:29:12 +03:00
commit 36e915907f
11 changed files with 461 additions and 45 deletions

View File

@ -2,6 +2,7 @@
This module contains the controller class for backups.
"""
from datetime import datetime, timedelta
import os
from os import statvfs
from typing import List, Optional
@ -43,6 +44,13 @@ DEFAULT_JSON_PROVIDER = {
"bucket": "",
}
BACKUP_PROVIDER_ENVS = {
"kind": "BACKUP_KIND",
"login": "BACKUP_LOGIN",
"key": "BACKUP_KEY",
"location": "BACKUP_LOCATION",
}
class NotDeadError(AssertionError):
"""
@ -132,6 +140,24 @@ class Backups:
Storage.store_provider(none_provider)
return none_provider
@staticmethod
def set_provider_from_envs():
for env in BACKUP_PROVIDER_ENVS.values():
if env not in os.environ.keys():
raise ValueError(
f"Cannot set backup provider from envs, there is no {env} set"
)
kind_str = os.environ[BACKUP_PROVIDER_ENVS["kind"]]
kind_enum = BackupProviderEnum[kind_str]
provider = Backups._construct_provider(
kind=kind_enum,
login=os.environ[BACKUP_PROVIDER_ENVS["login"]],
key=os.environ[BACKUP_PROVIDER_ENVS["key"]],
location=os.environ[BACKUP_PROVIDER_ENVS["location"]],
)
Storage.store_provider(provider)
@staticmethod
def _construct_provider(
kind: BackupProviderEnum,
@ -210,6 +236,14 @@ class Backups:
Backups.provider().backupper.init()
Storage.mark_as_init()
@staticmethod
def erase_repo() -> None:
"""
Completely empties the remote
"""
Backups.provider().backupper.erase_repo()
Storage.mark_as_uninitted()
@staticmethod
def is_initted() -> bool:
"""
@ -405,10 +439,18 @@ class Backups:
@staticmethod
def forget_snapshot(snapshot: Snapshot) -> None:
"""Deletes a snapshot from the storage"""
"""Deletes a snapshot from the repo and from cache"""
Backups.provider().backupper.forget_snapshot(snapshot.id)
Storage.delete_cached_snapshot(snapshot)
@staticmethod
def forget_all_snapshots():
"""deliberately erase all snapshots we made"""
# there is no dedicated optimized command for this,
# but maybe we can have a multi-erase
for snapshot in Backups.get_all_snapshots():
Backups.forget_snapshot(snapshot)
@staticmethod
def force_snapshot_cache_reload() -> None:
"""

View File

@ -36,6 +36,11 @@ class AbstractBackupper(ABC):
"""Initialize the repository"""
raise NotImplementedError
@abstractmethod
def erase_repo(self) -> None:
"""Completely empties the remote"""
raise NotImplementedError
@abstractmethod
def restore_from_backup(
self,

View File

@ -23,6 +23,11 @@ class NoneBackupper(AbstractBackupper):
def init(self):
raise NotImplementedError
def erase_repo(self) -> None:
"""Completely empties the remote"""
# this one is already empty
pass
def restore_from_backup(self, snapshot_id: str, folders: List[str], verify=True):
"""Restore a target folder using a snapshot"""
raise NotImplementedError

View File

@ -1,9 +1,11 @@
from __future__ import annotations
import subprocess
import json
import datetime
import tempfile
from typing import List
from typing import List, TypeVar, Callable
from collections.abc import Iterable
from json.decoder import JSONDecodeError
from os.path import exists, join
@ -21,6 +23,25 @@ from selfprivacy_api.backup.local_secret import LocalBackupSecret
SHORT_ID_LEN = 8
T = TypeVar("T", bound=Callable)
def unlocked_repo(func: T) -> T:
"""unlock repo and retry if it appears to be locked"""
def inner(self: ResticBackupper, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except Exception as e:
if "unable to create lock" in str(e):
self.unlock()
return func(self, *args, **kwargs)
else:
raise e
# Above, we manually guarantee that the type returned is compatible.
return inner # type: ignore
class ResticBackupper(AbstractBackupper):
def __init__(self, login_flag: str, key_flag: str, storage_type: str) -> None:
@ -40,20 +61,25 @@ class ResticBackupper(AbstractBackupper):
def restic_repo(self) -> str:
# https://restic.readthedocs.io/en/latest/030_preparing_a_new_repo.html#other-services-via-rclone
# https://forum.rclone.org/t/can-rclone-be-run-solely-with-command-line-options-no-config-no-env-vars/6314/5
return f"rclone:{self.storage_type}{self.repo}"
return f"rclone:{self.rclone_repo()}"
def rclone_repo(self) -> str:
return f"{self.storage_type}{self.repo}"
def rclone_args(self):
return "rclone.args=serve restic --stdio " + self.backend_rclone_args()
return "rclone.args=serve restic --stdio " + " ".join(
self.backend_rclone_args()
)
def backend_rclone_args(self) -> str:
acc_arg = ""
key_arg = ""
def backend_rclone_args(self) -> list[str]:
args = []
if self.account != "":
acc_arg = f"{self.login_flag} {self.account}"
acc_args = [self.login_flag, self.account]
args.extend(acc_args)
if self.key != "":
key_arg = f"{self.key_flag} {self.key}"
return f"{acc_arg} {key_arg}"
key_args = [self.key_flag, self.key]
args.extend(key_args)
return args
def _password_command(self):
return f"echo {LocalBackupSecret.get()}"
@ -79,6 +105,27 @@ class ResticBackupper(AbstractBackupper):
command.extend(ResticBackupper.__flatten_list(args))
return command
def erase_repo(self) -> None:
"""Fully erases repo on remote, can be reinitted again"""
command = [
"rclone",
"purge",
self.rclone_repo(),
]
backend_args = self.backend_rclone_args()
if backend_args:
command.extend(backend_args)
with subprocess.Popen(command, stdout=subprocess.PIPE, shell=False) as handle:
output = handle.communicate()[0].decode("utf-8")
if handle.returncode != 0:
raise ValueError(
"purge exited with errorcode",
handle.returncode,
":",
output,
)
def mount_repo(self, mount_directory):
mount_command = self.restic_command("mount", mount_directory)
mount_command.insert(0, "nohup")
@ -116,6 +163,7 @@ class ResticBackupper(AbstractBackupper):
result.append(item)
return result
@unlocked_repo
def start_backup(self, folders: List[str], tag: str) -> Snapshot:
"""
Start backup with restic
@ -139,8 +187,10 @@ class ResticBackupper(AbstractBackupper):
raise ValueError("No service with id ", tag)
job = get_backup_job(service)
output = []
try:
for raw_message in output_yielder(backup_command):
output.append(raw_message)
message = self.parse_message(
raw_message,
job,
@ -151,7 +201,13 @@ class ResticBackupper(AbstractBackupper):
tag,
)
except ValueError as error:
raise ValueError("Could not create a snapshot: ", messages) from error
raise ValueError(
"Could not create a snapshot: ",
str(error),
output,
"parsed messages:",
messages,
) from error
@staticmethod
def _snapshot_from_backup_messages(messages, repo_name) -> Snapshot:
@ -200,23 +256,72 @@ class ResticBackupper(AbstractBackupper):
if "created restic repository" not in output:
raise ValueError("cannot init a repo: " + output)
@unlocked_repo
def is_initted(self) -> bool:
command = self.restic_command(
"check",
"--json",
)
with subprocess.Popen(
command,
stdout=subprocess.PIPE,
shell=False,
stderr=subprocess.STDOUT,
) as handle:
output = handle.communicate()[0].decode("utf-8")
if not ResticBackupper.has_json(output):
if handle.returncode != 0:
if "unable to create lock" in output:
raise ValueError("Stale lock detected: ", output)
return False
# raise NotImplementedError("error(big): " + output)
return True
def unlock(self) -> None:
"""Remove stale locks."""
command = self.restic_command(
"unlock",
)
with subprocess.Popen(
command,
stdout=subprocess.PIPE,
shell=False,
stderr=subprocess.STDOUT,
) as handle:
# communication forces to complete and for returncode to get defined
output = handle.communicate()[0].decode("utf-8")
if handle.returncode != 0:
raise ValueError("cannot unlock the backup repository: ", output)
def lock(self) -> None:
"""
Introduce a stale lock.
Mainly for testing purposes.
Double lock is supposed to fail
"""
command = self.restic_command(
"check",
)
# using temporary cache in /run/user/1000/restic-check-cache-817079729
# repository 9639c714 opened (repository version 2) successfully, password is correct
# created new cache in /run/user/1000/restic-check-cache-817079729
# create exclusive lock for repository
# load indexes
# check all packs
# check snapshots, trees and blobs
# [0:00] 100.00% 1 / 1 snapshots
# no errors were found
try:
for line in output_yielder(command):
if "indexes" in line:
break
if "unable" in line:
raise ValueError(line)
except Exception as e:
raise ValueError("could not lock repository") from e
@unlocked_repo
def restored_size(self, snapshot_id: str) -> int:
"""
Size of a snapshot
@ -230,6 +335,7 @@ class ResticBackupper(AbstractBackupper):
with subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=False,
) as handle:
output = handle.communicate()[0].decode("utf-8")
@ -239,6 +345,7 @@ class ResticBackupper(AbstractBackupper):
except ValueError as error:
raise ValueError("cannot restore a snapshot: " + output) from error
@unlocked_repo
def restore_from_backup(
self,
snapshot_id,
@ -277,7 +384,10 @@ class ResticBackupper(AbstractBackupper):
)
with subprocess.Popen(
restore_command, stdout=subprocess.PIPE, shell=False
restore_command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=False,
) as handle:
# for some reason restore does not support
@ -297,6 +407,7 @@ class ResticBackupper(AbstractBackupper):
output,
)
@unlocked_repo
def forget_snapshot(self, snapshot_id) -> None:
"""
Either removes snapshot or marks it for deletion later,
@ -332,10 +443,7 @@ class ResticBackupper(AbstractBackupper):
) # none should be impossible after communicate
if handle.returncode != 0:
raise ValueError(
"forget exited with errorcode",
handle.returncode,
":",
output,
"forget exited with errorcode", handle.returncode, ":", output, err
)
def _load_snapshots(self) -> object:
@ -361,8 +469,9 @@ class ResticBackupper(AbstractBackupper):
try:
return ResticBackupper.parse_json_output(output)
except ValueError as error:
raise ValueError("Cannot load snapshots: ") from error
raise ValueError("Cannot load snapshots: ", output) from error
@unlocked_repo
def get_snapshots(self) -> List[Snapshot]:
"""Get all snapshots from the repo"""
snapshots = []

View File

@ -21,7 +21,7 @@ REDIS_SNAPSHOT_CACHE_EXPIRE_SECONDS = 24 * 60 * 60 # one day
REDIS_SNAPSHOTS_PREFIX = "backups:snapshots:"
REDIS_LAST_BACKUP_PREFIX = "backups:last-backed-up:"
REDIS_INITTED_CACHE_PREFIX = "backups:initted_services:"
REDIS_INITTED_CACHE = "backups:repo_initted"
REDIS_PROVIDER_KEY = "backups:provider"
REDIS_AUTOBACKUP_PERIOD_KEY = "backups:autobackup_period"
@ -38,9 +38,9 @@ class Storage:
"""Deletes all backup related data from redis"""
redis.delete(REDIS_PROVIDER_KEY)
redis.delete(REDIS_AUTOBACKUP_PERIOD_KEY)
redis.delete(REDIS_INITTED_CACHE)
prefixes_to_clean = [
REDIS_INITTED_CACHE_PREFIX,
REDIS_SNAPSHOTS_PREFIX,
REDIS_LAST_BACKUP_PREFIX,
]
@ -162,11 +162,16 @@ class Storage:
@staticmethod
def has_init_mark() -> bool:
"""Returns True if the repository was initialized"""
if redis.exists(REDIS_INITTED_CACHE_PREFIX):
if redis.exists(REDIS_INITTED_CACHE):
return True
return False
@staticmethod
def mark_as_init():
"""Marks the repository as initialized"""
redis.set(REDIS_INITTED_CACHE_PREFIX, 1)
redis.set(REDIS_INITTED_CACHE, 1)
@staticmethod
def mark_as_uninitted():
"""Marks the repository as initialized"""
redis.delete(REDIS_INITTED_CACHE)

View File

@ -1,8 +1,10 @@
import subprocess
from os.path import exists
from typing import Generator
def output_yielder(command):
def output_yielder(command) -> Generator[str, None, None]:
"""Note: If you break during iteration, it kills the process"""
with subprocess.Popen(
command,
shell=False,
@ -10,9 +12,15 @@ def output_yielder(command):
stderr=subprocess.STDOUT,
universal_newlines=True,
) as handle:
for line in iter(handle.stdout.readline, ""):
if "NOTICE:" not in line:
yield line
if handle is None or handle.stdout is None:
raise ValueError("could not run command: ", command)
try:
for line in iter(handle.stdout.readline, ""):
if "NOTICE:" not in line:
yield line
except GeneratorExit:
handle.kill()
def sync(src_path: str, dest_path: str):

View File

@ -27,4 +27,4 @@ async def get_token_header(
def get_api_version() -> str:
"""Get API version"""
return "2.2.1"
return "2.3.0"

View File

@ -157,6 +157,35 @@ class BackupMutations:
job=job_to_api_job(job),
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def forget_snapshot(self, snapshot_id: str) -> GenericMutationReturn:
"""Forget a snapshot.
Makes it inaccessible from the server.
After some time, the data (encrypted) will not be recoverable
from the backup server too, but not immediately"""
snap = Backups.get_snapshot_by_id(snapshot_id)
if snap is None:
return GenericMutationReturn(
success=False,
code=404,
message=f"snapshot {snapshot_id} not found",
)
try:
Backups.forget_snapshot(snap)
return GenericMutationReturn(
success=True,
code=200,
message="",
)
except Exception as error:
return GenericMutationReturn(
success=False,
code=400,
message=str(error),
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def force_snapshots_reload(self) -> GenericMutationReturn:
"""Force snapshots reload"""

View File

@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name="selfprivacy_api",
version="2.2.1",
version="2.3.0",
packages=find_packages(),
scripts=[
"selfprivacy_api/app.py",

View File

@ -94,6 +94,18 @@ mutation TestRestoreService($snapshot_id: String!) {
}
"""
API_FORGET_MUTATION = """
mutation TestForgetSnapshot($snapshot_id: String!) {
backup {
forgetSnapshot(snapshotId: $snapshot_id) {
success
message
code
}
}
}
"""
API_SNAPSHOTS_QUERY = """
allSnapshots {
id
@ -143,6 +155,17 @@ def api_backup(authorized_client, service):
return response
def api_forget(authorized_client, snapshot_id):
response = authorized_client.post(
"/graphql",
json={
"query": API_FORGET_MUTATION,
"variables": {"snapshot_id": snapshot_id},
},
)
return response
def api_set_period(authorized_client, period):
response = authorized_client.post(
"/graphql",
@ -370,3 +393,30 @@ def test_reload_snapshots(authorized_client, dummy_service):
snaps = api_snapshots(authorized_client)
assert len(snaps) == 1
def test_forget_snapshot(authorized_client, dummy_service):
response = api_backup(authorized_client, dummy_service)
data = get_data(response)["backup"]["startBackup"]
snaps = api_snapshots(authorized_client)
assert len(snaps) == 1
response = api_forget(authorized_client, snaps[0]["id"])
data = get_data(response)["backup"]["forgetSnapshot"]
assert_ok(data)
snaps = api_snapshots(authorized_client)
assert len(snaps) == 0
def test_forget_nonexistent_snapshot(authorized_client, dummy_service):
snaps = api_snapshots(authorized_client)
assert len(snaps) == 0
response = api_forget(authorized_client, "898798uekiodpjoiweoiwuoeirueor")
data = get_data(response)["backup"]["forgetSnapshot"]
assert data["code"] == 404
assert data["success"] is False
snaps = api_snapshots(authorized_client)
assert len(snaps) == 0

View File

@ -1,4 +1,5 @@
import pytest
import os
import os.path as path
from os import makedirs
from os import remove
@ -18,10 +19,11 @@ from selfprivacy_api.jobs import Jobs, JobStatus
from selfprivacy_api.models.backup.snapshot import Snapshot
from selfprivacy_api.backup import Backups
from selfprivacy_api.backup import Backups, BACKUP_PROVIDER_ENVS
import selfprivacy_api.backup.providers as providers
from selfprivacy_api.backup.providers import AbstractBackupProvider
from selfprivacy_api.backup.providers.backblaze import Backblaze
from selfprivacy_api.backup.providers.none import NoBackups
from selfprivacy_api.backup.util import sync
from selfprivacy_api.backup.backuppers.restic_backupper import ResticBackupper
from selfprivacy_api.backup.jobs import add_backup_job, add_restore_job
@ -37,14 +39,34 @@ TESTFILE_2_BODY = "testissimo!"
REPO_NAME = "test_backup"
@pytest.fixture(scope="function")
def backups(tmpdir):
Backups.reset()
test_repo_path = path.join(tmpdir, "totallyunrelated")
def prepare_localfile_backups(temp_dir):
test_repo_path = path.join(temp_dir, "totallyunrelated")
assert not path.exists(test_repo_path)
Backups.set_localfile_repo(test_repo_path)
@pytest.fixture(scope="function")
def backups_local(tmpdir):
Backups.reset()
prepare_localfile_backups(tmpdir)
Jobs.reset()
Backups.init_repo()
@pytest.fixture(scope="function")
def backups(tmpdir):
# for those tests that are supposed to pass with any repo
Backups.reset()
if BACKUP_PROVIDER_ENVS["kind"] in os.environ.keys():
Backups.set_provider_from_envs()
else:
prepare_localfile_backups(tmpdir)
Jobs.reset()
# assert not repo_path
Backups.init_repo()
yield
Backups.erase_repo()
@pytest.fixture()
@ -80,11 +102,6 @@ def raw_dummy_service(tmpdir):
@pytest.fixture()
def dummy_service(tmpdir, backups, raw_dummy_service) -> Service:
service = raw_dummy_service
repo_path = path.join(tmpdir, "test_repo")
assert not path.exists(repo_path)
# assert not repo_path
Backups.init_repo()
# register our service
services.services.append(service)
@ -129,6 +146,53 @@ def test_config_load(generic_userdata):
assert provider.backupper.key == "KEY"
def test_reset_sets_to_none1():
Backups.reset()
provider = Backups.provider()
assert provider is not None
assert isinstance(provider, NoBackups)
def test_reset_sets_to_none2(backups):
# now with something set up first^^^
Backups.reset()
provider = Backups.provider()
assert provider is not None
assert isinstance(provider, NoBackups)
def test_setting_from_envs(tmpdir):
Backups.reset()
environment_stash = {}
if BACKUP_PROVIDER_ENVS["kind"] in os.environ.keys():
# we are running under special envs, stash them before rewriting them
for key in BACKUP_PROVIDER_ENVS.values():
environment_stash[key] = os.environ[key]
os.environ[BACKUP_PROVIDER_ENVS["kind"]] = "BACKBLAZE"
os.environ[BACKUP_PROVIDER_ENVS["login"]] = "ID"
os.environ[BACKUP_PROVIDER_ENVS["key"]] = "KEY"
os.environ[BACKUP_PROVIDER_ENVS["location"]] = "selfprivacy"
Backups.set_provider_from_envs()
provider = Backups.provider()
assert provider is not None
assert isinstance(provider, Backblaze)
assert provider.login == "ID"
assert provider.key == "KEY"
assert provider.location == "selfprivacy"
assert provider.backupper.account == "ID"
assert provider.backupper.key == "KEY"
if environment_stash != {}:
for key in BACKUP_PROVIDER_ENVS.values():
os.environ[key] = environment_stash[key]
else:
for key in BACKUP_PROVIDER_ENVS.values():
del os.environ[key]
def test_json_reset(generic_userdata):
Backups.reset(reset_json=False)
provider = Backups.provider()
@ -158,6 +222,19 @@ def test_file_backend_init(file_backup):
file_backup.backupper.init()
def test_reinit_after_purge(backups):
assert Backups.is_initted() is True
Backups.erase_repo()
assert Backups.is_initted() is False
with pytest.raises(ValueError):
Backups.get_all_snapshots()
Backups.init_repo()
assert Backups.is_initted() is True
assert len(Backups.get_all_snapshots()) == 0
def test_backup_simple_file(raw_dummy_service, file_backup):
# temporarily incomplete
service = raw_dummy_service
@ -258,9 +335,12 @@ def test_sizing(backups, dummy_service):
assert size > 0
def test_init_tracking(backups, raw_dummy_service):
def test_init_tracking(backups, tmpdir):
assert Backups.is_initted() is True
Backups.reset()
assert Backups.is_initted() is False
separate_dir = tmpdir / "out_of_the_way"
prepare_localfile_backups(separate_dir)
Backups.init_repo()
assert Backups.is_initted() is True
@ -552,8 +632,38 @@ def test_snapshots_caching(backups, dummy_service):
assert len(cached_snapshots) == 1
def lowlevel_forget(snapshot_id):
Backups.provider().backupper.forget_snapshot(snapshot_id)
# Storage
def test_snapshots_cache_invalidation(backups, dummy_service):
Backups.back_up(dummy_service)
cached_snapshots = Storage.get_cached_snapshots()
assert len(cached_snapshots) == 1
Storage.invalidate_snapshot_storage()
cached_snapshots = Storage.get_cached_snapshots()
assert len(cached_snapshots) == 0
Backups.force_snapshot_cache_reload()
cached_snapshots = Storage.get_cached_snapshots()
assert len(cached_snapshots) == 1
snap = cached_snapshots[0]
lowlevel_forget(snap.id)
cached_snapshots = Storage.get_cached_snapshots()
assert len(cached_snapshots) == 1
Backups.force_snapshot_cache_reload()
cached_snapshots = Storage.get_cached_snapshots()
assert len(cached_snapshots) == 0
# Storage
def test_init_tracking_caching(backups, raw_dummy_service):
assert Storage.has_init_mark() is True
Backups.reset()
assert Storage.has_init_mark() is False
Storage.mark_as_init()
@ -563,7 +673,12 @@ def test_init_tracking_caching(backups, raw_dummy_service):
# Storage
def test_init_tracking_caching2(backups, raw_dummy_service):
def test_init_tracking_caching2(backups, tmpdir):
assert Storage.has_init_mark() is True
Backups.reset()
assert Storage.has_init_mark() is False
separate_dir = tmpdir / "out_of_the_way"
prepare_localfile_backups(separate_dir)
assert Storage.has_init_mark() is False
Backups.init_repo()
@ -643,3 +758,51 @@ def test_move_blocks_backups(backups, dummy_service, restore_strategy):
with pytest.raises(ValueError):
Backups.restore_snapshot(snap, restore_strategy)
def test_double_lock_unlock(backups, dummy_service):
# notice that introducing stale locks is only safe for other tests if we erase repo in between
# which we do at the time of writing this test
Backups.provider().backupper.lock()
with pytest.raises(ValueError):
Backups.provider().backupper.lock()
Backups.provider().backupper.unlock()
Backups.provider().backupper.lock()
Backups.provider().backupper.unlock()
Backups.provider().backupper.unlock()
def test_operations_while_locked(backups, dummy_service):
# Stale lock prevention test
# consider making it fully at the level of backupper?
# because this is where prevention lives?
# Backups singleton is here only so that we can run this against B2, S3 and whatever
# But maybe it is not necessary (if restic treats them uniformly enough)
Backups.provider().backupper.lock()
snap = Backups.back_up(dummy_service)
assert snap is not None
Backups.provider().backupper.lock()
# using lowlevel to make sure no caching interferes
assert Backups.provider().backupper.is_initted() is True
Backups.provider().backupper.lock()
assert Backups.snapshot_restored_size(snap.id) > 0
Backups.provider().backupper.lock()
Backups.restore_snapshot(snap)
Backups.provider().backupper.lock()
Backups.forget_snapshot(snap)
Backups.provider().backupper.lock()
assert Backups.provider().backupper.get_snapshots() == []
# check that no locks were left
Backups.provider().backupper.lock()
Backups.provider().backupper.unlock()