diff --git a/selfprivacy_api/jobs/__init__.py b/selfprivacy_api/jobs/__init__.py index 0dcfd66..9e4d961 100644 --- a/selfprivacy_api/jobs/__init__.py +++ b/selfprivacy_api/jobs/__init__.py @@ -17,13 +17,14 @@ A job is a dictionary with the following keys: import typing import datetime from uuid import UUID -import json import uuid from enum import Enum from pydantic import BaseModel -from selfprivacy_api.utils import ReadUserData, UserDataFiles, WriteUserData +from selfprivacy_api.utils.redis_pool import RedisPool + +JOB_EXPIRATION_SECONDS = 10 * 24 * 60 * 60 # ten days class JobStatus(Enum): @@ -66,8 +67,10 @@ class Jobs: """ Reset the jobs list. """ - with WriteUserData(UserDataFiles.JOBS) as user_data: - user_data["jobs"] = [] + r = RedisPool().get_connection() + jobs = Jobs.get_jobs() + for job in jobs: + r.delete(redis_key_from_uuid(job.uid)) @staticmethod def add( @@ -95,13 +98,8 @@ class Jobs: error=None, result=None, ) - with WriteUserData(UserDataFiles.JOBS) as user_data: - try: - if "jobs" not in user_data: - user_data["jobs"] = [] - user_data["jobs"].append(json.loads(job.json())) - except json.decoder.JSONDecodeError: - user_data["jobs"] = [json.loads(job.json())] + r = RedisPool().get_connection() + store_job_as_hash(r, redis_key_from_uuid(job.uid), job) return job @staticmethod @@ -116,13 +114,9 @@ class Jobs: """ Remove a job from the jobs list. """ - with WriteUserData(UserDataFiles.JOBS) as user_data: - if "jobs" not in user_data: - user_data["jobs"] = [] - for i, j in enumerate(user_data["jobs"]): - if j["uid"] == job_uuid: - del user_data["jobs"][i] - return True + r = RedisPool().get_connection() + key = redis_key_from_uuid(job_uuid) + r.delete(key) return False @staticmethod @@ -154,13 +148,12 @@ class Jobs: if status in (JobStatus.FINISHED, JobStatus.ERROR): job.finished_at = datetime.datetime.now() - with WriteUserData(UserDataFiles.JOBS) as user_data: - if "jobs" not in user_data: - user_data["jobs"] = [] - for i, j in enumerate(user_data["jobs"]): - if j["uid"] == str(job.uid): - user_data["jobs"][i] = json.loads(job.json()) - break + r = RedisPool().get_connection() + key = redis_key_from_uuid(job.uid) + if r.exists(key): + store_job_as_hash(r, key, job) + if status in (JobStatus.FINISHED, JobStatus.ERROR): + r.expire(key, JOB_EXPIRATION_SECONDS) return job @@ -169,12 +162,10 @@ class Jobs: """ Get a job from the jobs list. """ - with ReadUserData(UserDataFiles.JOBS) as user_data: - if "jobs" not in user_data: - user_data["jobs"] = [] - for job in user_data["jobs"]: - if job["uid"] == uid: - return Job(**job) + r = RedisPool().get_connection() + key = redis_key_from_uuid(uid) + if r.exists(key): + return job_from_hash(r, key) return None @staticmethod @@ -182,23 +173,49 @@ class Jobs: """ Get the jobs list. """ - with ReadUserData(UserDataFiles.JOBS) as user_data: - try: - if "jobs" not in user_data: - user_data["jobs"] = [] - return [Job(**job) for job in user_data["jobs"]] - except json.decoder.JSONDecodeError: - return [] + r = RedisPool().get_connection() + jobs = r.keys("jobs:*") + return [job_from_hash(r, job_key) for job_key in jobs] @staticmethod def is_busy() -> bool: """ Check if there is a job running. """ - with ReadUserData(UserDataFiles.JOBS) as user_data: - if "jobs" not in user_data: - user_data["jobs"] = [] - for job in user_data["jobs"]: - if job["status"] == JobStatus.RUNNING.value: - return True + for job in Jobs.get_jobs(): + if job["status"] == JobStatus.RUNNING.value: + return True return False + + +def redis_key_from_uuid(uuid): + return "jobs:" + str(uuid) + + +def store_job_as_hash(r, redis_key, model): + for key, value in model.dict().items(): + if isinstance(value, uuid.UUID): + value = str(value) + if isinstance(value, datetime.datetime): + value = value.isoformat() + if isinstance(value, JobStatus): + value = value.value + r.hset(redis_key, key, str(value)) + + +def job_from_hash(r, redis_key): + if r.exists(redis_key): + job_dict = r.hgetall(redis_key) + for date in [ + "created_at", + "updated_at", + "finished_at", + ]: + if job_dict[date] != "None": + job_dict[date] = datetime.datetime.fromisoformat(job_dict[date]) + for key in job_dict.keys(): + if job_dict[key] == "None": + job_dict[key] = None + + return Job(**job_dict) + return None diff --git a/selfprivacy_api/utils/redis_pool.py b/selfprivacy_api/utils/redis_pool.py index e4e98ac..a87049c 100644 --- a/selfprivacy_api/utils/redis_pool.py +++ b/selfprivacy_api/utils/redis_pool.py @@ -1,7 +1,7 @@ """ Redis pool module for selfprivacy_api """ -import redis.asyncio as redis +import redis from selfprivacy_api.utils.singleton_metaclass import SingletonMetaclass REDIS_SOCKET = "/run/redis-sp-api/redis.sock" diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 87f1386..371dca4 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -1,14 +1,14 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument -import json import pytest -from selfprivacy_api.utils import WriteUserData, ReadUserData from selfprivacy_api.jobs import Jobs, JobStatus +import selfprivacy_api.jobs as jobsmodule def test_jobs(authorized_client, jobs_file, shared_datadir): jobs = Jobs() + jobs.reset() assert jobs.get_jobs() == [] test_job = jobs.add( @@ -31,6 +31,19 @@ def test_jobs(authorized_client, jobs_file, shared_datadir): assert jobs.get_jobs() == [test_job] + backup = jobsmodule.JOB_EXPIRATION_SECONDS + jobsmodule.JOB_EXPIRATION_SECONDS = 0 + + jobs.update( + job=test_job, + status=JobStatus.FINISHED, + status_text="Yaaay!", + progress=100, + ) + + assert jobs.get_jobs() == [] + jobsmodule.JOB_EXPIRATION_SECONDS = backup + @pytest.fixture def mock_subprocess_run(mocker):