From 5afa2338ca3513d4d5e0c15783d503762951ca81 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Wed, 23 Nov 2022 12:32:46 +0000 Subject: [PATCH] Migrate Jobs to redis Jobs API shall now use redis to store and retrieve jobs. This will make it possible to add pubsub for jobs updates. For now it uses blocking api of redis. --- selfprivacy_api/jobs/__init__.py | 24 +++++++++--------------- selfprivacy_api/utils/redis_pool.py | 2 +- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/selfprivacy_api/jobs/__init__.py b/selfprivacy_api/jobs/__init__.py index 7ecc8c9..7c16afd 100644 --- a/selfprivacy_api/jobs/__init__.py +++ b/selfprivacy_api/jobs/__init__.py @@ -23,10 +23,6 @@ from enum import Enum from pydantic import BaseModel from selfprivacy_api.utils.redis_pool import RedisPool -import asyncio - - -loop = asyncio.get_event_loop() class JobStatus(Enum): @@ -72,8 +68,8 @@ class Jobs: r = RedisPool().get_connection() jobs = Jobs.get_jobs() for job in jobs: - loop.run_until_complete(r.delete(redis_key_from_uuid(job.uid))) - loop.run_until_complete(r.delete("jobs")) + r.delete(redis_key_from_uuid(job.uid)) + r.delete("jobs") @staticmethod def add( @@ -103,8 +99,7 @@ class Jobs: ) r = RedisPool().get_connection() store_job_as_hash(r, redis_key_from_uuid(job.uid), job) - coroutine = r.lpush("jobs", redis_key_from_uuid(job.uid)) - loop.run_until_complete(coroutine) + r.lpush("jobs", redis_key_from_uuid(job.uid)) return job @staticmethod @@ -121,8 +116,8 @@ class Jobs: """ r = RedisPool().get_connection() key = redis_key_from_uuid(job_uuid) - loop.run_until_complete(r.delete(key)) - loop.run_until_complete(r.lrem("jobs", 0, key)) + r.delete(key) + r.lrem("jobs", 0, key) return False @staticmethod @@ -178,7 +173,7 @@ class Jobs: Get the jobs list. """ r = RedisPool().get_connection() - jobs = loop.run_until_complete(r.lrange("jobs", 0, -1)) + jobs = r.lrange("jobs", 0, -1) return [job_from_hash(r, job_key) for job_key in jobs] @staticmethod @@ -204,13 +199,12 @@ def store_job_as_hash(r, redis_key, model): value = value.isoformat() if isinstance(value, JobStatus): value = value.value - coroutine = r.hset(redis_key, key, str(value)) - loop.run_until_complete(coroutine) + r.hset(redis_key, key, str(value)) def job_from_hash(r, redis_key): if exists_sync(r, redis_key): - job_dict = loop.run_until_complete(r.hgetall(redis_key)) + job_dict = r.hgetall(redis_key) for date in [ "created_at", "updated_at", @@ -227,4 +221,4 @@ def job_from_hash(r, redis_key): def exists_sync(r, key): - return loop.run_until_complete(r.exists(key)) + return r.exists(key) diff --git a/selfprivacy_api/utils/redis_pool.py b/selfprivacy_api/utils/redis_pool.py index e4e98ac..a87049c 100644 --- a/selfprivacy_api/utils/redis_pool.py +++ b/selfprivacy_api/utils/redis_pool.py @@ -1,7 +1,7 @@ """ Redis pool module for selfprivacy_api """ -import redis.asyncio as redis +import redis from selfprivacy_api.utils.singleton_metaclass import SingletonMetaclass REDIS_SOCKET = "/run/redis-sp-api/redis.sock"