From f7b7e5a0be6995bc671a750395142cff5fa5a745 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Wed, 16 Nov 2022 13:54:54 +0000 Subject: [PATCH 1/4] migrate Jobs to redis --- selfprivacy_api/jobs/__init__.py | 114 +++++++++++++++++++------------ tests/test_jobs.py | 3 +- 2 files changed, 71 insertions(+), 46 deletions(-) diff --git a/selfprivacy_api/jobs/__init__.py b/selfprivacy_api/jobs/__init__.py index 0dcfd66..7ecc8c9 100644 --- a/selfprivacy_api/jobs/__init__.py +++ b/selfprivacy_api/jobs/__init__.py @@ -17,13 +17,16 @@ A job is a dictionary with the following keys: import typing import datetime from uuid import UUID -import json import uuid from enum import Enum from pydantic import BaseModel -from selfprivacy_api.utils import ReadUserData, UserDataFiles, WriteUserData +from selfprivacy_api.utils.redis_pool import RedisPool +import asyncio + + +loop = asyncio.get_event_loop() class JobStatus(Enum): @@ -66,8 +69,11 @@ class Jobs: """ Reset the jobs list. """ - with WriteUserData(UserDataFiles.JOBS) as user_data: - user_data["jobs"] = [] + r = RedisPool().get_connection() + jobs = Jobs.get_jobs() + for job in jobs: + loop.run_until_complete(r.delete(redis_key_from_uuid(job.uid))) + loop.run_until_complete(r.delete("jobs")) @staticmethod def add( @@ -95,13 +101,10 @@ class Jobs: error=None, result=None, ) - with WriteUserData(UserDataFiles.JOBS) as user_data: - try: - if "jobs" not in user_data: - user_data["jobs"] = [] - user_data["jobs"].append(json.loads(job.json())) - except json.decoder.JSONDecodeError: - user_data["jobs"] = [json.loads(job.json())] + r = RedisPool().get_connection() + store_job_as_hash(r, redis_key_from_uuid(job.uid), job) + coroutine = r.lpush("jobs", redis_key_from_uuid(job.uid)) + loop.run_until_complete(coroutine) return job @staticmethod @@ -116,13 +119,10 @@ class Jobs: """ Remove a job from the jobs list. """ - with WriteUserData(UserDataFiles.JOBS) as user_data: - if "jobs" not in user_data: - user_data["jobs"] = [] - for i, j in enumerate(user_data["jobs"]): - if j["uid"] == job_uuid: - del user_data["jobs"][i] - return True + r = RedisPool().get_connection() + key = redis_key_from_uuid(job_uuid) + loop.run_until_complete(r.delete(key)) + loop.run_until_complete(r.lrem("jobs", 0, key)) return False @staticmethod @@ -154,13 +154,10 @@ class Jobs: if status in (JobStatus.FINISHED, JobStatus.ERROR): job.finished_at = datetime.datetime.now() - with WriteUserData(UserDataFiles.JOBS) as user_data: - if "jobs" not in user_data: - user_data["jobs"] = [] - for i, j in enumerate(user_data["jobs"]): - if j["uid"] == str(job.uid): - user_data["jobs"][i] = json.loads(job.json()) - break + r = RedisPool().get_connection() + key = redis_key_from_uuid(job.uid) + if exists_sync(r, key): + store_job_as_hash(r, key, job) return job @@ -169,12 +166,10 @@ class Jobs: """ Get a job from the jobs list. """ - with ReadUserData(UserDataFiles.JOBS) as user_data: - if "jobs" not in user_data: - user_data["jobs"] = [] - for job in user_data["jobs"]: - if job["uid"] == uid: - return Job(**job) + r = RedisPool().get_connection() + key = redis_key_from_uuid(uid) + if exists_sync(r, key): + return job_from_hash(r, key) return None @staticmethod @@ -182,23 +177,54 @@ class Jobs: """ Get the jobs list. """ - with ReadUserData(UserDataFiles.JOBS) as user_data: - try: - if "jobs" not in user_data: - user_data["jobs"] = [] - return [Job(**job) for job in user_data["jobs"]] - except json.decoder.JSONDecodeError: - return [] + r = RedisPool().get_connection() + jobs = loop.run_until_complete(r.lrange("jobs", 0, -1)) + return [job_from_hash(r, job_key) for job_key in jobs] @staticmethod def is_busy() -> bool: """ Check if there is a job running. """ - with ReadUserData(UserDataFiles.JOBS) as user_data: - if "jobs" not in user_data: - user_data["jobs"] = [] - for job in user_data["jobs"]: - if job["status"] == JobStatus.RUNNING.value: - return True + for job in Jobs.get_jobs(): + if job["status"] == JobStatus.RUNNING.value: + return True return False + + +def redis_key_from_uuid(uuid): + return "jobs:" + str(uuid) + + +def store_job_as_hash(r, redis_key, model): + for key, value in model.dict().items(): + if isinstance(value, uuid.UUID): + value = str(value) + if isinstance(value, datetime.datetime): + value = value.isoformat() + if isinstance(value, JobStatus): + value = value.value + coroutine = r.hset(redis_key, key, str(value)) + loop.run_until_complete(coroutine) + + +def job_from_hash(r, redis_key): + if exists_sync(r, redis_key): + job_dict = loop.run_until_complete(r.hgetall(redis_key)) + for date in [ + "created_at", + "updated_at", + "finished_at", + ]: + if job_dict[date] != "None": + job_dict[date] = datetime.datetime.fromisoformat(job_dict[date]) + for key in job_dict.keys(): + if job_dict[key] == "None": + job_dict[key] = None + + return Job(**job_dict) + return None + + +def exists_sync(r, key): + return loop.run_until_complete(r.exists(key)) diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 87f1386..65d58e4 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -1,14 +1,13 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument -import json import pytest -from selfprivacy_api.utils import WriteUserData, ReadUserData from selfprivacy_api.jobs import Jobs, JobStatus def test_jobs(authorized_client, jobs_file, shared_datadir): jobs = Jobs() + jobs.reset() assert jobs.get_jobs() == [] test_job = jobs.add( From 5afa2338ca3513d4d5e0c15783d503762951ca81 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Wed, 23 Nov 2022 12:32:46 +0000 Subject: [PATCH 2/4] Migrate Jobs to redis Jobs API shall now use redis to store and retrieve jobs. This will make it possible to add pubsub for jobs updates. For now it uses blocking api of redis. --- selfprivacy_api/jobs/__init__.py | 24 +++++++++--------------- selfprivacy_api/utils/redis_pool.py | 2 +- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/selfprivacy_api/jobs/__init__.py b/selfprivacy_api/jobs/__init__.py index 7ecc8c9..7c16afd 100644 --- a/selfprivacy_api/jobs/__init__.py +++ b/selfprivacy_api/jobs/__init__.py @@ -23,10 +23,6 @@ from enum import Enum from pydantic import BaseModel from selfprivacy_api.utils.redis_pool import RedisPool -import asyncio - - -loop = asyncio.get_event_loop() class JobStatus(Enum): @@ -72,8 +68,8 @@ class Jobs: r = RedisPool().get_connection() jobs = Jobs.get_jobs() for job in jobs: - loop.run_until_complete(r.delete(redis_key_from_uuid(job.uid))) - loop.run_until_complete(r.delete("jobs")) + r.delete(redis_key_from_uuid(job.uid)) + r.delete("jobs") @staticmethod def add( @@ -103,8 +99,7 @@ class Jobs: ) r = RedisPool().get_connection() store_job_as_hash(r, redis_key_from_uuid(job.uid), job) - coroutine = r.lpush("jobs", redis_key_from_uuid(job.uid)) - loop.run_until_complete(coroutine) + r.lpush("jobs", redis_key_from_uuid(job.uid)) return job @staticmethod @@ -121,8 +116,8 @@ class Jobs: """ r = RedisPool().get_connection() key = redis_key_from_uuid(job_uuid) - loop.run_until_complete(r.delete(key)) - loop.run_until_complete(r.lrem("jobs", 0, key)) + r.delete(key) + r.lrem("jobs", 0, key) return False @staticmethod @@ -178,7 +173,7 @@ class Jobs: Get the jobs list. """ r = RedisPool().get_connection() - jobs = loop.run_until_complete(r.lrange("jobs", 0, -1)) + jobs = r.lrange("jobs", 0, -1) return [job_from_hash(r, job_key) for job_key in jobs] @staticmethod @@ -204,13 +199,12 @@ def store_job_as_hash(r, redis_key, model): value = value.isoformat() if isinstance(value, JobStatus): value = value.value - coroutine = r.hset(redis_key, key, str(value)) - loop.run_until_complete(coroutine) + r.hset(redis_key, key, str(value)) def job_from_hash(r, redis_key): if exists_sync(r, redis_key): - job_dict = loop.run_until_complete(r.hgetall(redis_key)) + job_dict = r.hgetall(redis_key) for date in [ "created_at", "updated_at", @@ -227,4 +221,4 @@ def job_from_hash(r, redis_key): def exists_sync(r, key): - return loop.run_until_complete(r.exists(key)) + return r.exists(key) diff --git a/selfprivacy_api/utils/redis_pool.py b/selfprivacy_api/utils/redis_pool.py index e4e98ac..a87049c 100644 --- a/selfprivacy_api/utils/redis_pool.py +++ b/selfprivacy_api/utils/redis_pool.py @@ -1,7 +1,7 @@ """ Redis pool module for selfprivacy_api """ -import redis.asyncio as redis +import redis from selfprivacy_api.utils.singleton_metaclass import SingletonMetaclass REDIS_SOCKET = "/run/redis-sp-api/redis.sock" From 244851c7cc818f6471bc5c66ff595e73e5d14287 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Wed, 23 Nov 2022 15:04:39 +0000 Subject: [PATCH 3/4] jobs: remove 'jobs' list, and use 'jobs:' prefix Less complexity, easier to add redis-native TTL --- selfprivacy_api/jobs/__init__.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/selfprivacy_api/jobs/__init__.py b/selfprivacy_api/jobs/__init__.py index 7c16afd..4fa820c 100644 --- a/selfprivacy_api/jobs/__init__.py +++ b/selfprivacy_api/jobs/__init__.py @@ -69,7 +69,6 @@ class Jobs: jobs = Jobs.get_jobs() for job in jobs: r.delete(redis_key_from_uuid(job.uid)) - r.delete("jobs") @staticmethod def add( @@ -99,7 +98,6 @@ class Jobs: ) r = RedisPool().get_connection() store_job_as_hash(r, redis_key_from_uuid(job.uid), job) - r.lpush("jobs", redis_key_from_uuid(job.uid)) return job @staticmethod @@ -117,7 +115,6 @@ class Jobs: r = RedisPool().get_connection() key = redis_key_from_uuid(job_uuid) r.delete(key) - r.lrem("jobs", 0, key) return False @staticmethod @@ -151,7 +148,7 @@ class Jobs: r = RedisPool().get_connection() key = redis_key_from_uuid(job.uid) - if exists_sync(r, key): + if r.exists(key): store_job_as_hash(r, key, job) return job @@ -163,7 +160,7 @@ class Jobs: """ r = RedisPool().get_connection() key = redis_key_from_uuid(uid) - if exists_sync(r, key): + if r.exists(key): return job_from_hash(r, key) return None @@ -173,7 +170,7 @@ class Jobs: Get the jobs list. """ r = RedisPool().get_connection() - jobs = r.lrange("jobs", 0, -1) + jobs = r.keys("jobs:*") return [job_from_hash(r, job_key) for job_key in jobs] @staticmethod @@ -203,7 +200,7 @@ def store_job_as_hash(r, redis_key, model): def job_from_hash(r, redis_key): - if exists_sync(r, redis_key): + if r.exists(redis_key): job_dict = r.hgetall(redis_key) for date in [ "created_at", @@ -218,7 +215,3 @@ def job_from_hash(r, redis_key): return Job(**job_dict) return None - - -def exists_sync(r, key): - return r.exists(key) From d6ef01c0c7f91665925a20fde4ec3c37e9e5a6dc Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Wed, 23 Nov 2022 16:29:50 +0000 Subject: [PATCH 4/4] Add TTL to storage of finished or failed jobs Defaulting to 10 days. --- selfprivacy_api/jobs/__init__.py | 4 ++++ tests/test_jobs.py | 14 ++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/selfprivacy_api/jobs/__init__.py b/selfprivacy_api/jobs/__init__.py index 4fa820c..9e4d961 100644 --- a/selfprivacy_api/jobs/__init__.py +++ b/selfprivacy_api/jobs/__init__.py @@ -24,6 +24,8 @@ from pydantic import BaseModel from selfprivacy_api.utils.redis_pool import RedisPool +JOB_EXPIRATION_SECONDS = 10 * 24 * 60 * 60 # ten days + class JobStatus(Enum): """ @@ -150,6 +152,8 @@ class Jobs: key = redis_key_from_uuid(job.uid) if r.exists(key): store_job_as_hash(r, key, job) + if status in (JobStatus.FINISHED, JobStatus.ERROR): + r.expire(key, JOB_EXPIRATION_SECONDS) return job diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 65d58e4..371dca4 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -3,6 +3,7 @@ import pytest from selfprivacy_api.jobs import Jobs, JobStatus +import selfprivacy_api.jobs as jobsmodule def test_jobs(authorized_client, jobs_file, shared_datadir): @@ -30,6 +31,19 @@ def test_jobs(authorized_client, jobs_file, shared_datadir): assert jobs.get_jobs() == [test_job] + backup = jobsmodule.JOB_EXPIRATION_SECONDS + jobsmodule.JOB_EXPIRATION_SECONDS = 0 + + jobs.update( + job=test_job, + status=JobStatus.FINISHED, + status_text="Yaaay!", + progress=100, + ) + + assert jobs.get_jobs() == [] + jobsmodule.JOB_EXPIRATION_SECONDS = backup + @pytest.fixture def mock_subprocess_run(mocker):