From 2b19633cbde057dd37ef51d19b52f20751941531 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Fri, 19 Jan 2024 15:03:00 +0000 Subject: [PATCH] test(huey): break out preparing the environment vars I did it for testing redis socketing too, but I guess this will wait for another time. Somehow it worked even without an actual redis socket and it was creepy. Idk yet how one can best make redis to make sockets at arbitrary temporary dirs without starting another redis. --- tests/test_huey.py | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/tests/test_huey.py b/tests/test_huey.py index 49103f2..fc711ac 100644 --- a/tests/test_huey.py +++ b/tests/test_huey.py @@ -1,11 +1,9 @@ import pytest from subprocess import Popen -from os import environ +from os import environ, path - -# from selfprivacy_api.backup.util import output_yielder -from selfprivacy_api.utils.huey import huey +from selfprivacy_api.utils.huey import huey, immediate @huey.task() @@ -14,28 +12,37 @@ def sum(a: int, b: int) -> int: @pytest.fixture() -def huey_queues(): - """ - Full, not-immediate, queued huey, with consumer starting and stopping. - IMPORTANT: Assumes tests are run from the project directory. - The above is needed by consumer to find our huey setup. - """ +def not_immediate(): old_immediate = huey.immediate - environ["HUEY_QUEUES_FOR_TESTS"] = "Yes" - command = ["huey_consumer.py", "selfprivacy_api.task_registry.huey"] huey.immediate = False assert huey.immediate is False - consumer_handle = Popen(command) - yield huey + yield - consumer_handle.terminate() del environ["HUEY_QUEUES_FOR_TESTS"] huey.immediate = old_immediate assert huey.immediate == old_immediate +@pytest.fixture() +def huey_queues(not_immediate): + """ + Full, not-immediate, queued huey, with consumer starting and stopping. + IMPORTANT: Assumes tests are run from the project directory. + The above is needed by consumer to find our huey setup. + """ + command = ["huey_consumer.py", "selfprivacy_api.task_registry.huey"] + consumer_handle = Popen(command) + + yield huey + + consumer_handle.terminate() + + def test_huey(huey_queues): + assert huey.immediate is False + assert immediate() is False + result = sum(2, 5) assert result(blocking=True) == 7