From f7b7e5a0be6995bc671a750395142cff5fa5a745 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Wed, 16 Nov 2022 13:54:54 +0000 Subject: [PATCH] migrate Jobs to redis --- selfprivacy_api/jobs/__init__.py | 114 +++++++++++++++++++------------ tests/test_jobs.py | 3 +- 2 files changed, 71 insertions(+), 46 deletions(-) diff --git a/selfprivacy_api/jobs/__init__.py b/selfprivacy_api/jobs/__init__.py index 0dcfd66..7ecc8c9 100644 --- a/selfprivacy_api/jobs/__init__.py +++ b/selfprivacy_api/jobs/__init__.py @@ -17,13 +17,16 @@ A job is a dictionary with the following keys: import typing import datetime from uuid import UUID -import json import uuid from enum import Enum from pydantic import BaseModel -from selfprivacy_api.utils import ReadUserData, UserDataFiles, WriteUserData +from selfprivacy_api.utils.redis_pool import RedisPool +import asyncio + + +loop = asyncio.get_event_loop() class JobStatus(Enum): @@ -66,8 +69,11 @@ class Jobs: """ Reset the jobs list. """ - with WriteUserData(UserDataFiles.JOBS) as user_data: - user_data["jobs"] = [] + r = RedisPool().get_connection() + jobs = Jobs.get_jobs() + for job in jobs: + loop.run_until_complete(r.delete(redis_key_from_uuid(job.uid))) + loop.run_until_complete(r.delete("jobs")) @staticmethod def add( @@ -95,13 +101,10 @@ class Jobs: error=None, result=None, ) - with WriteUserData(UserDataFiles.JOBS) as user_data: - try: - if "jobs" not in user_data: - user_data["jobs"] = [] - user_data["jobs"].append(json.loads(job.json())) - except json.decoder.JSONDecodeError: - user_data["jobs"] = [json.loads(job.json())] + r = RedisPool().get_connection() + store_job_as_hash(r, redis_key_from_uuid(job.uid), job) + coroutine = r.lpush("jobs", redis_key_from_uuid(job.uid)) + loop.run_until_complete(coroutine) return job @staticmethod @@ -116,13 +119,10 @@ class Jobs: """ Remove a job from the jobs list. """ - with WriteUserData(UserDataFiles.JOBS) as user_data: - if "jobs" not in user_data: - user_data["jobs"] = [] - for i, j in enumerate(user_data["jobs"]): - if j["uid"] == job_uuid: - del user_data["jobs"][i] - return True + r = RedisPool().get_connection() + key = redis_key_from_uuid(job_uuid) + loop.run_until_complete(r.delete(key)) + loop.run_until_complete(r.lrem("jobs", 0, key)) return False @staticmethod @@ -154,13 +154,10 @@ class Jobs: if status in (JobStatus.FINISHED, JobStatus.ERROR): job.finished_at = datetime.datetime.now() - with WriteUserData(UserDataFiles.JOBS) as user_data: - if "jobs" not in user_data: - user_data["jobs"] = [] - for i, j in enumerate(user_data["jobs"]): - if j["uid"] == str(job.uid): - user_data["jobs"][i] = json.loads(job.json()) - break + r = RedisPool().get_connection() + key = redis_key_from_uuid(job.uid) + if exists_sync(r, key): + store_job_as_hash(r, key, job) return job @@ -169,12 +166,10 @@ class Jobs: """ Get a job from the jobs list. """ - with ReadUserData(UserDataFiles.JOBS) as user_data: - if "jobs" not in user_data: - user_data["jobs"] = [] - for job in user_data["jobs"]: - if job["uid"] == uid: - return Job(**job) + r = RedisPool().get_connection() + key = redis_key_from_uuid(uid) + if exists_sync(r, key): + return job_from_hash(r, key) return None @staticmethod @@ -182,23 +177,54 @@ class Jobs: """ Get the jobs list. """ - with ReadUserData(UserDataFiles.JOBS) as user_data: - try: - if "jobs" not in user_data: - user_data["jobs"] = [] - return [Job(**job) for job in user_data["jobs"]] - except json.decoder.JSONDecodeError: - return [] + r = RedisPool().get_connection() + jobs = loop.run_until_complete(r.lrange("jobs", 0, -1)) + return [job_from_hash(r, job_key) for job_key in jobs] @staticmethod def is_busy() -> bool: """ Check if there is a job running. """ - with ReadUserData(UserDataFiles.JOBS) as user_data: - if "jobs" not in user_data: - user_data["jobs"] = [] - for job in user_data["jobs"]: - if job["status"] == JobStatus.RUNNING.value: - return True + for job in Jobs.get_jobs(): + if job["status"] == JobStatus.RUNNING.value: + return True return False + + +def redis_key_from_uuid(uuid): + return "jobs:" + str(uuid) + + +def store_job_as_hash(r, redis_key, model): + for key, value in model.dict().items(): + if isinstance(value, uuid.UUID): + value = str(value) + if isinstance(value, datetime.datetime): + value = value.isoformat() + if isinstance(value, JobStatus): + value = value.value + coroutine = r.hset(redis_key, key, str(value)) + loop.run_until_complete(coroutine) + + +def job_from_hash(r, redis_key): + if exists_sync(r, redis_key): + job_dict = loop.run_until_complete(r.hgetall(redis_key)) + for date in [ + "created_at", + "updated_at", + "finished_at", + ]: + if job_dict[date] != "None": + job_dict[date] = datetime.datetime.fromisoformat(job_dict[date]) + for key in job_dict.keys(): + if job_dict[key] == "None": + job_dict[key] = None + + return Job(**job_dict) + return None + + +def exists_sync(r, key): + return loop.run_until_complete(r.exists(key)) diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 87f1386..65d58e4 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -1,14 +1,13 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument -import json import pytest -from selfprivacy_api.utils import WriteUserData, ReadUserData from selfprivacy_api.jobs import Jobs, JobStatus def test_jobs(authorized_client, jobs_file, shared_datadir): jobs = Jobs() + jobs.reset() assert jobs.get_jobs() == [] test_job = jobs.add(