fix: final typos fix
continuous-integration/drone/push Build is failing Details

pull/21/head
dettlaff 2023-11-15 16:47:04 +04:00
parent 3be5816f51
commit 03e6d45279
1 changed files with 14 additions and 12 deletions

View File

@ -1,6 +1,6 @@
import re import re
import subprocess import subprocess
from typing import Tuple, Iterable, IO, Any, Optional from typing import Tuple, Iterable
from selfprivacy_api.utils.huey import huey from selfprivacy_api.utils.huey import huey
@ -14,7 +14,7 @@ RESULT_WAS_NOT_FOUND_ERROR = "We are sorry, garbage collection result was not fo
CLEAR_COMPLETED = "Cleaning completed." CLEAR_COMPLETED = "Cleaning completed."
def run_nix_store_print_dead() -> IO[Any]: def run_nix_store_print_dead() -> str:
subprocess.run( subprocess.run(
["nix-env", "-p", "/nix/var/nix/profiles/system", "--delete-generations old"], ["nix-env", "-p", "/nix/var/nix/profiles/system", "--delete-generations old"],
check=False, check=False,
@ -25,10 +25,11 @@ def run_nix_store_print_dead() -> IO[Any]:
) )
def run_nix_collect_garbage() -> Optional[IO[bytes]]: def run_nix_collect_garbage() -> Iterable[bytes]:
return subprocess.Popen( process = subprocess.Popen(
["nix-store", "--gc"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT ["nix-store", "--gc"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT
).stdout )
return process.stdout if process.stdout else iter([])
def parse_line(line: str): def parse_line(line: str):
@ -69,7 +70,7 @@ def process_stream(job: Job, stream: Iterable[bytes], total_dead_packages: int)
if percent - prev_progress >= 5: if percent - prev_progress >= 5:
Jobs.update( Jobs.update(
job=Job, job=job,
status=JobStatus.RUNNING, status=JobStatus.RUNNING,
progress=percent, progress=percent,
status_text="Cleaning...", status_text="Cleaning...",
@ -79,14 +80,14 @@ def process_stream(job: Job, stream: Iterable[bytes], total_dead_packages: int)
elif "store paths deleted," in line: elif "store paths deleted," in line:
status = parse_line(line) status = parse_line(line)
Jobs.update( Jobs.update(
job=Job, job=job,
status=status[0], status=status[0],
status_text=status[1], status_text=status[1],
result=status[2], result=status[2],
) )
def get_dead_packages(output) -> Tuple[int, int]: def get_dead_packages(output) -> Tuple[int, float]:
dead = len(re.findall("/nix/store/", output)) dead = len(re.findall("/nix/store/", output))
percent = 0 percent = 0
if dead != 0: if dead != 0:
@ -97,7 +98,7 @@ def get_dead_packages(output) -> Tuple[int, int]:
@huey.task() @huey.task()
def calculate_and_clear_dead_packages(job: Job): def calculate_and_clear_dead_packages(job: Job):
Jobs.update( Jobs.update(
job=Job, job=job,
status=JobStatus.RUNNING, status=JobStatus.RUNNING,
progress=0, progress=0,
status_text="Calculate the number of dead packages...", status_text="Calculate the number of dead packages...",
@ -109,7 +110,7 @@ def calculate_and_clear_dead_packages(job: Job):
if dead_packages == 0: if dead_packages == 0:
Jobs.update( Jobs.update(
job=Job, job=job,
status=JobStatus.FINISHED, status=JobStatus.FINISHED,
status_text="Nothing to clear", status_text="Nothing to clear",
result="System is clear", result="System is clear",
@ -117,13 +118,14 @@ def calculate_and_clear_dead_packages(job: Job):
return return
Jobs.update( Jobs.update(
job=Job, job=job,
status=JobStatus.RUNNING, status=JobStatus.RUNNING,
progress=0, progress=0,
status_text=f"Found {dead_packages} packages to remove!", status_text=f"Found {dead_packages} packages to remove!",
) )
process_stream(job, run_nix_collect_garbage(), package_equal_to_percent) stream = run_nix_collect_garbage()
process_stream(job, stream, dead_packages)
def start_nix_collect_garbage() -> Job: def start_nix_collect_garbage() -> Job: