pull/13/head
Inex Code 2022-08-11 23:11:00 +04:00
parent dfd28ad0cd
commit a96f6bd067
8 changed files with 55 additions and 43 deletions

View File

@ -1,2 +1,3 @@
[MASTER] [MASTER]
init-hook="from pylint.config import find_pylintrc; import os, sys; sys.path.append(os.path.dirname(find_pylintrc()))" init-hook="from pylint.config import find_pylintrc; import os, sys; sys.path.append(os.path.dirname(find_pylintrc()))"
extension-pkg-whitelist=pydantic

View File

@ -3,6 +3,7 @@ from datetime import datetime
from typing import Optional from typing import Optional
from pydantic import BaseModel from pydantic import BaseModel
from selfprivacy_api.utils.auth import ( from selfprivacy_api.utils.auth import (
delete_token, delete_token,
generate_recovery_token, generate_recovery_token,
@ -42,14 +43,10 @@ def get_api_tokens_with_caller_flag(caller_token: str) -> list[TokenInfoWithIsCa
class NotFoundException(Exception): class NotFoundException(Exception):
"""Not found exception""" """Not found exception"""
pass
class CannotDeleteCallerException(Exception): class CannotDeleteCallerException(Exception):
"""Cannot delete caller exception""" """Cannot delete caller exception"""
pass
def delete_api_token(caller_token: str, token_name: str) -> None: def delete_api_token(caller_token: str, token_name: str) -> None:
"""Delete the token""" """Delete the token"""
@ -98,14 +95,10 @@ def get_api_recovery_token_status() -> RecoveryTokenStatus:
class InvalidExpirationDate(Exception): class InvalidExpirationDate(Exception):
"""Invalid expiration date exception""" """Invalid expiration date exception"""
pass
class InvalidUsesLeft(Exception): class InvalidUsesLeft(Exception):
"""Invalid uses left exception""" """Invalid uses left exception"""
pass
def get_new_api_recovery_key( def get_new_api_recovery_key(
expiration_date: Optional[datetime] = None, uses_left: Optional[int] = None expiration_date: Optional[datetime] = None, uses_left: Optional[int] = None

View File

@ -75,14 +75,13 @@ def get_ssh_settings() -> SshSettings:
return SshSettings( return SshSettings(
enable=settings.enable, enable=settings.enable,
password_authentication=settings.passwordAuthentication, password_authentication=settings.passwordAuthentication,
root_ssh_keys=settings.rootSshKeys, root_ssh_keys=settings.rootKeys,
) )
def get_system_timezone() -> str: def get_system_timezone() -> str:
"""Get system timezone""" """Get system timezone"""
with ReadUserData() as user_data: return system_actions.get_timezone()
return system_actions.get_timezone()
@strawberry.type @strawberry.type

View File

@ -35,7 +35,7 @@ class Bitwarden(Service):
return "Bitwarden is a password manager." return "Bitwarden is a password manager."
@staticmethod @staticmethod
def get_svg_icon(self) -> str: def get_svg_icon() -> str:
"""Read SVG icon from file and return it as base64 encoded string.""" """Read SVG icon from file and return it as base64 encoded string."""
with open("selfprivacy_api/services/bitwarden/bitwarden.svg", "rb") as f: with open("selfprivacy_api/services/bitwarden/bitwarden.svg", "rb") as f:
return base64.b64encode(f.read()).decode("utf-8") return base64.b64encode(f.read()).decode("utf-8")

View File

@ -1,9 +1,7 @@
"""Generic handler for moving services""" """Generic handler for moving services"""
import base64
import subprocess import subprocess
import time import time
import typing
import pathlib import pathlib
import shutil import shutil
@ -149,7 +147,7 @@ def move_service(
progress=20, progress=20,
) )
current_progress = 20 current_progress = 20
folder_percentage = 50 / len(folder_names) folder_percentage = 50 // len(folder_names)
for folder in folder_names: for folder in folder_names:
shutil.move( shutil.move(
f"/volumes/{old_volume}/{folder.name}", f"/volumes/{old_volume}/{folder.name}",

View File

@ -120,8 +120,8 @@ class MailServer(Service):
@staticmethod @staticmethod
def get_dns_records() -> typing.List[ServiceDnsRecord]: def get_dns_records() -> typing.List[ServiceDnsRecord]:
dkim_record = get_dkim_key()
domain = get_domain() domain = get_domain()
dkim_record = get_dkim_key(domain)
ip4 = get_ip4() ip4 = get_ip4()
if dkim_record is None: if dkim_record is None:
@ -129,16 +129,16 @@ class MailServer(Service):
return [ return [
ServiceDnsRecord( ServiceDnsRecord(
type="MX", name=domain, data=domain, ttl=3600, priority=10 type="MX", name=domain, content=domain, ttl=3600, priority=10
), ),
ServiceDnsRecord( ServiceDnsRecord(
type="TXT", name="_dmarc", data=f"v=DMARC1; p=none", ttl=3600 type="TXT", name="_dmarc", content=f"v=DMARC1; p=none", ttl=3600
), ),
ServiceDnsRecord( ServiceDnsRecord(
type="TXT", name=domain, data=f"v=spf1 a mx ip4:{ip4} -all", ttl=3600 type="TXT", name=domain, content=f"v=spf1 a mx ip4:{ip4} -all", ttl=3600
), ),
ServiceDnsRecord( ServiceDnsRecord(
type="TXT", name="selector._domainkey", data=dkim_record, ttl=3600 type="TXT", name="selector._domainkey", content=dkim_record, ttl=3600
), ),
] ]

View File

@ -3,6 +3,8 @@ from abc import ABC, abstractmethod
from enum import Enum from enum import Enum
import typing import typing
from pydantic import BaseModel
from selfprivacy_api.utils.block_devices import BlockDevice from selfprivacy_api.utils.block_devices import BlockDevice
@ -16,12 +18,12 @@ class ServiceStatus(Enum):
OFF = "OFF" OFF = "OFF"
class ServiceDnsRecord: class ServiceDnsRecord(BaseModel):
type: str type: str
name: str name: str
content: str content: str
ttl: int ttl: int
priority: typing.Optional[int] priority: typing.Optional[int] = None
class Service(ABC): class Service(ABC):
@ -30,80 +32,99 @@ class Service(ABC):
can be installed, configured and used by a user. can be installed, configured and used by a user.
""" """
@staticmethod
@abstractmethod @abstractmethod
def get_id(self) -> str: def get_id() -> str:
pass pass
@staticmethod
@abstractmethod @abstractmethod
def get_display_name(self) -> str: def get_display_name() -> str:
pass pass
@staticmethod
@abstractmethod @abstractmethod
def get_description(self) -> str: def get_description() -> str:
pass pass
@staticmethod
@abstractmethod @abstractmethod
def get_svg_icon(self) -> str: def get_svg_icon() -> str:
pass pass
@staticmethod
@abstractmethod @abstractmethod
def is_movable() -> bool: def is_movable() -> bool:
pass pass
@staticmethod
@abstractmethod @abstractmethod
def is_required() -> bool: def is_required() -> bool:
pass pass
@staticmethod
@abstractmethod @abstractmethod
def is_enabled(self) -> bool: def is_enabled() -> bool:
pass pass
@staticmethod
@abstractmethod @abstractmethod
def get_status(self) -> ServiceStatus: def get_status() -> ServiceStatus:
pass pass
@staticmethod
@abstractmethod @abstractmethod
def enable(self): def enable():
pass pass
@staticmethod
@abstractmethod @abstractmethod
def disable(self): def disable():
pass pass
@staticmethod
@abstractmethod @abstractmethod
def stop(self): def stop():
pass pass
@staticmethod
@abstractmethod @abstractmethod
def start(self): def start():
pass pass
@staticmethod
@abstractmethod @abstractmethod
def restart(self): def restart():
pass pass
@staticmethod
@abstractmethod @abstractmethod
def get_configuration(self): def get_configuration():
pass pass
@staticmethod
@abstractmethod @abstractmethod
def set_configuration(self, config_items): def set_configuration(config_items):
pass pass
@staticmethod
@abstractmethod @abstractmethod
def get_logs(self): def get_logs():
pass pass
@staticmethod
@abstractmethod @abstractmethod
def get_storage_usage(self) -> int: def get_storage_usage() -> int:
pass pass
@staticmethod
@abstractmethod @abstractmethod
def get_dns_records(self) -> typing.List[ServiceDnsRecord]: def get_dns_records() -> typing.List[ServiceDnsRecord]:
pass pass
@staticmethod
@abstractmethod @abstractmethod
def get_location(self) -> str: def get_location() -> str:
pass pass
@abstractmethod @abstractmethod

View File

@ -5,7 +5,7 @@ import re
from typing import Optional from typing import Optional
def get_ip4() -> Optional[str]: def get_ip4() -> str:
"""Get IPv4 address""" """Get IPv4 address"""
try: try:
ip4 = subprocess.check_output(["ip", "addr", "show", "dev", "eth0"]).decode( ip4 = subprocess.check_output(["ip", "addr", "show", "dev", "eth0"]).decode(
@ -14,10 +14,10 @@ def get_ip4() -> Optional[str]:
ip4 = re.search(r"inet (\d+\.\d+\.\d+\.\d+)\/\d+", ip4) ip4 = re.search(r"inet (\d+\.\d+\.\d+\.\d+)\/\d+", ip4)
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
ip4 = None ip4 = None
return ip4.group(1) if ip4 else None return ip4.group(1) if ip4 else ""
def get_ip6() -> Optional[str]: def get_ip6() -> str:
"""Get IPv6 address""" """Get IPv6 address"""
try: try:
ip6 = subprocess.check_output(["ip", "addr", "show", "dev", "eth0"]).decode( ip6 = subprocess.check_output(["ip", "addr", "show", "dev", "eth0"]).decode(
@ -26,4 +26,4 @@ def get_ip6() -> Optional[str]:
ip6 = re.search(r"inet6 (\S+)\/\d+", ip6) ip6 = re.search(r"inet6 (\S+)\/\d+", ip6)
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
ip6 = None ip6 = None
return ip6.group(1) if ip6 else None return ip6.group(1) if ip6 else ""