test(huey): break out preparing the environment vars

I did it for testing redis socketing too, but I guess this will wait for
another time. Somehow it worked even without an actual redis socket and it was
creepy. Idk yet how one can best make redis to make sockets at arbitrary
temporary dirs without starting another redis.
redis-huey
Houkime 2024-01-19 15:03:00 +00:00
parent 83592b7bf4
commit 2b19633cbd
1 changed files with 22 additions and 15 deletions

View File

@ -1,11 +1,9 @@
import pytest import pytest
from subprocess import Popen from subprocess import Popen
from os import environ from os import environ, path
from selfprivacy_api.utils.huey import huey, immediate
# from selfprivacy_api.backup.util import output_yielder
from selfprivacy_api.utils.huey import huey
@huey.task() @huey.task()
@ -14,28 +12,37 @@ def sum(a: int, b: int) -> int:
@pytest.fixture() @pytest.fixture()
def huey_queues(): def not_immediate():
"""
Full, not-immediate, queued huey, with consumer starting and stopping.
IMPORTANT: Assumes tests are run from the project directory.
The above is needed by consumer to find our huey setup.
"""
old_immediate = huey.immediate old_immediate = huey.immediate
environ["HUEY_QUEUES_FOR_TESTS"] = "Yes" environ["HUEY_QUEUES_FOR_TESTS"] = "Yes"
command = ["huey_consumer.py", "selfprivacy_api.task_registry.huey"]
huey.immediate = False huey.immediate = False
assert huey.immediate is False assert huey.immediate is False
consumer_handle = Popen(command)
yield huey yield
consumer_handle.terminate()
del environ["HUEY_QUEUES_FOR_TESTS"] del environ["HUEY_QUEUES_FOR_TESTS"]
huey.immediate = old_immediate huey.immediate = old_immediate
assert huey.immediate == old_immediate assert huey.immediate == old_immediate
@pytest.fixture()
def huey_queues(not_immediate):
"""
Full, not-immediate, queued huey, with consumer starting and stopping.
IMPORTANT: Assumes tests are run from the project directory.
The above is needed by consumer to find our huey setup.
"""
command = ["huey_consumer.py", "selfprivacy_api.task_registry.huey"]
consumer_handle = Popen(command)
yield huey
consumer_handle.terminate()
def test_huey(huey_queues): def test_huey(huey_queues):
assert huey.immediate is False
assert immediate() is False
result = sum(2, 5) result = sum(2, 5)
assert result(blocking=True) == 7 assert result(blocking=True) == 7