diff --git a/flake.nix b/flake.nix index fee8e79..f8b81aa 100644 --- a/flake.nix +++ b/flake.nix @@ -27,6 +27,7 @@ python-lsp-server pyflakes typer # for strawberry + types-redis # for mypy ] ++ strawberry-graphql.optional-dependencies.cli)); vmtest-src-dir = "/root/source"; diff --git a/selfprivacy_api/task_registry.py b/selfprivacy_api/task_registry.py index a492e23..6e1518d 100644 --- a/selfprivacy_api/task_registry.py +++ b/selfprivacy_api/task_registry.py @@ -1,5 +1,12 @@ +from os import environ + from selfprivacy_api.utils.huey import huey -from selfprivacy_api.jobs.test import test_job + from selfprivacy_api.backup.tasks import * from selfprivacy_api.services.tasks import move_service from selfprivacy_api.jobs.upgrade_system import rebuild_system_task + +from selfprivacy_api.jobs.test import test_job + +if environ.get("TEST_MODE"): + from tests.test_huey import sum diff --git a/selfprivacy_api/utils/huey.py b/selfprivacy_api/utils/huey.py index 8e09446..1a7a29d 100644 --- a/selfprivacy_api/utils/huey.py +++ b/selfprivacy_api/utils/huey.py @@ -1,16 +1,24 @@ """MiniHuey singleton.""" -import os -from huey import SqliteHuey +from os import environ +from huey import RedisHuey + +from selfprivacy_api.utils.redis_pool import RedisPool + +HUEY_DATABASE_NUMBER = 10 + + +def immediate() -> bool: + if environ.get("HUEY_QUEUES_FOR_TESTS"): + return False + if environ.get("TEST_MODE"): + return True + return False -HUEY_DATABASE = "/etc/selfprivacy/tasks.db" # Singleton instance containing the huey database. - -test_mode = os.environ.get("TEST_MODE") - -huey = SqliteHuey( +huey = RedisHuey( "selfprivacy-api", - filename=HUEY_DATABASE if not test_mode else None, - immediate=test_mode == "true", + url=RedisPool.connection_url(dbnumber=HUEY_DATABASE_NUMBER), + immediate=immediate(), utc=True, ) diff --git a/selfprivacy_api/utils/redis_pool.py b/selfprivacy_api/utils/redis_pool.py index 4bd6eda..3d35f01 100644 --- a/selfprivacy_api/utils/redis_pool.py +++ b/selfprivacy_api/utils/redis_pool.py @@ -1,8 +1,8 @@ """ Redis pool module for selfprivacy_api """ -from os import environ import redis + from selfprivacy_api.utils.singleton_metaclass import SingletonMetaclass REDIS_SOCKET = "/run/redis-sp-api/redis.sock" @@ -14,20 +14,20 @@ class RedisPool(metaclass=SingletonMetaclass): """ def __init__(self): - if "USE_REDIS_PORT" in environ: - self._pool = redis.ConnectionPool( - host="127.0.0.1", - port=int(environ["USE_REDIS_PORT"]), - decode_responses=True, - ) - - else: - self._pool = redis.ConnectionPool.from_url( - f"unix://{REDIS_SOCKET}", - decode_responses=True, - ) + self._pool = redis.ConnectionPool.from_url( + RedisPool.connection_url(dbnumber=0), + decode_responses=True, + ) self._pubsub_connection = self.get_connection() + @staticmethod + def connection_url(dbnumber: int) -> str: + """ + redis://[[username]:[password]]@localhost:6379/0 + unix://[username@]/path/to/socket.sock?db=0[&password=password] + """ + return f"unix://{REDIS_SOCKET}?db={dbnumber}" + def get_connection(self): """ Get a connection from the pool. diff --git a/tests/conftest.py b/tests/conftest.py index dceac72..f1c6e89 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -99,23 +99,14 @@ def generic_userdata(mocker, tmpdir): @pytest.fixture -def huey_database(mocker, shared_datadir): - """Mock huey database.""" - mock = mocker.patch( - "selfprivacy_api.utils.huey.HUEY_DATABASE", shared_datadir / "huey.db" - ) - return mock - - -@pytest.fixture -def client(huey_database, redis_repo_with_tokens): +def client(redis_repo_with_tokens): from selfprivacy_api.app import app return TestClient(app) @pytest.fixture -def authorized_client(huey_database, redis_repo_with_tokens): +def authorized_client(redis_repo_with_tokens): """Authorized test client fixture.""" from selfprivacy_api.app import app @@ -127,7 +118,7 @@ def authorized_client(huey_database, redis_repo_with_tokens): @pytest.fixture -def wrong_auth_client(huey_database, redis_repo_with_tokens): +def wrong_auth_client(redis_repo_with_tokens): """Wrong token test client fixture.""" from selfprivacy_api.app import app diff --git a/tests/test_huey.py b/tests/test_huey.py new file mode 100644 index 0000000..c741ae6 --- /dev/null +++ b/tests/test_huey.py @@ -0,0 +1,132 @@ +import pytest +import redis +from typing import List + +import subprocess +from subprocess import Popen, check_output, TimeoutExpired +from os import environ, path, set_blocking +from io import BufferedReader +from huey.exceptions import HueyException + +from selfprivacy_api.utils.huey import huey, immediate, HUEY_DATABASE_NUMBER +from selfprivacy_api.utils.redis_pool import RedisPool, REDIS_SOCKET + + +@huey.task() +def sum(a: int, b: int) -> int: + return a + b + + +def reset_huey_storage(): + huey.storage = huey.create_storage() + + +def flush_huey_redis_forcefully(): + url = RedisPool.connection_url(HUEY_DATABASE_NUMBER) + + pool = redis.ConnectionPool.from_url(url, decode_responses=True) + connection = redis.Redis(connection_pool=pool) + connection.flushdb() + + +# TODO: may be useful in other places too, move to utils/ tests common if using it somewhere +def read_all_ready_output(stream: BufferedReader) -> str: + set_blocking(stream.fileno(), False) + output: List[bytes] = [] + while True: + line = stream.readline() + raise ValueError(line) + if line == b"": + break + else: + output.append(line) + + set_blocking(stream.fileno(), True) + + result = b"".join(output) + return result.decode("utf-8") + + +@pytest.fixture() +def not_immediate(): + assert environ["TEST_MODE"] == "true" + + old_immediate = huey.immediate + environ["HUEY_QUEUES_FOR_TESTS"] = "Yes" + huey.immediate = False + assert huey.immediate is False + + yield + + del environ["HUEY_QUEUES_FOR_TESTS"] + huey.immediate = old_immediate + assert huey.immediate == old_immediate + + +@pytest.fixture() +def huey_socket_consumer(not_immediate): + """ + Same as above, but with socketed redis + """ + + flush_huey_redis_forcefully() + command = ["huey_consumer.py", "selfprivacy_api.task_registry.huey"] + + # First assert that consumer does not fail by itself + # Idk yet how to do it more elegantly + try: + check_output(command, timeout=2) + except TimeoutExpired: + pass + + # Then open it for real + consumer_handle = Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + assert path.exists(REDIS_SOCKET) + + yield consumer_handle + + consumer_handle.kill() + + +def test_huey_over_redis_socket(huey_socket_consumer): + assert huey.immediate is False + assert immediate() is False + + assert "unix" in RedisPool.connection_url(HUEY_DATABASE_NUMBER) + try: + assert ( + RedisPool.connection_url(HUEY_DATABASE_NUMBER) + in huey.storage_kwargs.values() + ) + except AssertionError: + raise ValueError( + "our test-side huey does not connect over socket: ", huey.storage_kwargs + ) + + result = sum(2, 5) + try: + assert result(blocking=True, timeout=100) == 7 + + except HueyException as error: + if "timed out" in str(error): + output = read_all_ready_output(huey_socket_consumer.stdout) + errorstream = read_all_ready_output(huey_socket_consumer.stderr) + raise TimeoutError( + f"Huey timed out: {str(error)}", + f"Consumer output: {output}", + f"Consumer errorstream: {errorstream}", + ) + else: + raise error + + +@pytest.mark.xfail(reason="cannot yet schedule with sockets for some reason") +def test_huey_schedule(huey_queues_socket): + # We do not schedule tasks anywhere, but concerning that it fails. + sum.schedule((2, 5), delay=10) + + try: + assert len(huey.scheduled()) == 1 + except AssertionError: + raise ValueError("have wrong amount of scheduled tasks", huey.scheduled())