refactor(backups): snapshotlist and local secret groundwork

pull/35/head
Houkime 2023-02-17 15:55:19 +00:00 committed by Inex Code
parent e156e9cd58
commit 5371c7feef
4 changed files with 62 additions and 10 deletions

View File

@ -1,3 +1,7 @@
from typing import List
from selfprivacy_api.models.backup.snapshot import Snapshot
from selfprivacy_api.utils.singleton_metaclass import SingletonMetaclass
from selfprivacy_api.services.service import Service
@ -41,3 +45,8 @@ class Backups(metaclass=SingletonMetaclass):
service.pre_backup()
self.provider.backuper.start_backup(folder, repo_name)
service.post_restore()
def get_snapshots(self, service: Service) -> List[Snapshot]:
repo_name = service.get_id()
return self.provider.backuper.get_snapshots(repo_name)

View File

@ -0,0 +1,30 @@
"""Handling of local secret used for encrypted backups.
Separated out for circular dependency reasons
"""
REDIS_KEY = "backup:local_secret"
class LocalBackupSecret:
@staticmethod
def get():
"""A secret string which backblaze/other clouds do not know.
Serves as encryption key.
TODO: generate and save in redis
"""
return "TEMPORARY_SECRET"
@staticmethod
def reset():
pass
def exists():
pass
@staticmethod
def _generate():
pass
@staticmethod
def _store(secret: str):
pass

View File

@ -6,6 +6,8 @@ from typing import List
from selfprivacy_api.backup.backuper import AbstractBackuper
from selfprivacy_api.models.backup.snapshot import Snapshot
from selfprivacy_api.backup.local_secret import LocalBackupSecret
class ResticBackuper(AbstractBackuper):
def __init__(self, login_flag: str, key_flag: str, type: str):
@ -37,6 +39,9 @@ class ResticBackuper(AbstractBackuper):
return f"{acc_arg} {key_arg}"
def _password_command(self):
return f"echo {LocalBackupSecret.get()}"
def restic_command(self, repo_name: str, *args):
command = [
"restic",
@ -44,6 +49,8 @@ class ResticBackuper(AbstractBackuper):
self.rclone_args(),
"-r",
self.restic_repo(repo_name),
"--password-command",
self._password_command(),
]
if args != []:
command.extend(args)
@ -87,6 +94,7 @@ class ResticBackuper(AbstractBackuper):
def _load_snapshots(self, repo_name) -> object:
"""
Load list of snapshots from repository
raises Value Error if repo does not exist
"""
listing_command = self.restic_command(
repo_name,
@ -102,13 +110,12 @@ class ResticBackuper(AbstractBackuper):
) as backup_listing_process_descriptor:
output = backup_listing_process_descriptor.communicate()[0].decode("utf-8")
if "Is there a repository at the following location?" in output:
raise ValueError("No repository! : " + output)
try:
return self.parse_snapshot_output(output)
except ValueError:
if "Is there a repository at the following location?" in output:
return []
self.error_message = output
return []
except ValueError as e:
raise ValueError("Cannot load snapshots: ") from e
def get_snapshots(self, repo_name) -> List[Snapshot]:
"""Get all snapshots from the repo"""
@ -119,7 +126,7 @@ class ResticBackuper(AbstractBackuper):
return snapshots
def parse_snapshot_output(self, output: str) -> object:
if "[" not in output:
raise ValueError("There is no json in the restic snapshot output")
starting_index = output.find("[")
json.loads(output[starting_index:])
self.snapshot_list = json.loads(output[starting_index:])
print(output)
return json.loads(output[starting_index:])

View File

@ -59,5 +59,11 @@ def test_backup_service(test_service, backups):
backups.back_up(test_service)
def test_no_snapshots(memory_backup):
assert memory_backup.backuper.get_snapshots("") == []
def test_no_repo(memory_backup):
with pytest.raises(ValueError):
assert memory_backup.backuper.get_snapshots("") == []
# def test_one_snapshot(backups, test_service):
# backups.back_up(test_service)
# assert len(backups.get_snapshots(test_service)) == 1