Compare commits

...

9 Commits

Author SHA1 Message Date
houkime 5e93e6499f Merge pull request 'redis-huey' (#84) from redis-huey into master
continuous-integration/drone/push Build is passing Details
Reviewed-on: #84
Reviewed-by: Inex Code <inex.code@selfprivacy.org>
2024-03-20 14:19:07 +02:00
Houkime 9ee72c1fcb test(huey): make timeout more so that vm gets it in time
continuous-integration/drone/push Build is passing Details
2024-03-20 09:02:10 +00:00
Houkime 6f38b2309f fix(huey): adapt to new VM test environment
continuous-integration/drone/push Build is failing Details
2024-03-18 12:18:55 +00:00
Houkime baf7843349 test(huey): only import test task if it is a test 2024-03-18 12:18:55 +00:00
Houkime 8e48a5ad5f test(huey): add a scheduling test (expected-fails for now) 2024-03-18 12:18:55 +00:00
Houkime fde461b4b9 test(huey): test that redis socket connection works 2024-03-18 12:18:55 +00:00
Houkime 9954737791 use kill() instead of terminate in huey tests 2024-03-18 12:18:55 +00:00
Houkime 2b19633cbd test(huey): break out preparing the environment vars
I did it for testing redis socketing too, but I guess this will wait for
another time. Somehow it worked even without an actual redis socket and it was
creepy. Idk yet how one can best make redis to make sockets at arbitrary
temporary dirs without starting another redis.
2024-03-18 12:18:55 +00:00
Houkime 83592b7bf4 feature(huey): use RedisHuey 2024-03-18 12:18:55 +00:00
6 changed files with 174 additions and 35 deletions

View File

@ -27,6 +27,7 @@
python-lsp-server
pyflakes
typer # for strawberry
types-redis # for mypy
] ++ strawberry-graphql.optional-dependencies.cli));
vmtest-src-dir = "/root/source";

View File

@ -1,5 +1,12 @@
from os import environ
from selfprivacy_api.utils.huey import huey
from selfprivacy_api.jobs.test import test_job
from selfprivacy_api.backup.tasks import *
from selfprivacy_api.services.tasks import move_service
from selfprivacy_api.jobs.upgrade_system import rebuild_system_task
from selfprivacy_api.jobs.test import test_job
if environ.get("TEST_MODE"):
from tests.test_huey import sum

View File

@ -1,16 +1,24 @@
"""MiniHuey singleton."""
import os
from huey import SqliteHuey
from os import environ
from huey import RedisHuey
from selfprivacy_api.utils.redis_pool import RedisPool
HUEY_DATABASE_NUMBER = 10
def immediate() -> bool:
if environ.get("HUEY_QUEUES_FOR_TESTS"):
return False
if environ.get("TEST_MODE"):
return True
return False
HUEY_DATABASE = "/etc/selfprivacy/tasks.db"
# Singleton instance containing the huey database.
test_mode = os.environ.get("TEST_MODE")
huey = SqliteHuey(
huey = RedisHuey(
"selfprivacy-api",
filename=HUEY_DATABASE if not test_mode else None,
immediate=test_mode == "true",
url=RedisPool.connection_url(dbnumber=HUEY_DATABASE_NUMBER),
immediate=immediate(),
utc=True,
)

View File

@ -1,8 +1,8 @@
"""
Redis pool module for selfprivacy_api
"""
from os import environ
import redis
from selfprivacy_api.utils.singleton_metaclass import SingletonMetaclass
REDIS_SOCKET = "/run/redis-sp-api/redis.sock"
@ -14,20 +14,20 @@ class RedisPool(metaclass=SingletonMetaclass):
"""
def __init__(self):
if "USE_REDIS_PORT" in environ:
self._pool = redis.ConnectionPool(
host="127.0.0.1",
port=int(environ["USE_REDIS_PORT"]),
decode_responses=True,
)
else:
self._pool = redis.ConnectionPool.from_url(
f"unix://{REDIS_SOCKET}",
decode_responses=True,
)
self._pool = redis.ConnectionPool.from_url(
RedisPool.connection_url(dbnumber=0),
decode_responses=True,
)
self._pubsub_connection = self.get_connection()
@staticmethod
def connection_url(dbnumber: int) -> str:
"""
redis://[[username]:[password]]@localhost:6379/0
unix://[username@]/path/to/socket.sock?db=0[&password=password]
"""
return f"unix://{REDIS_SOCKET}?db={dbnumber}"
def get_connection(self):
"""
Get a connection from the pool.

View File

@ -99,23 +99,14 @@ def generic_userdata(mocker, tmpdir):
@pytest.fixture
def huey_database(mocker, shared_datadir):
"""Mock huey database."""
mock = mocker.patch(
"selfprivacy_api.utils.huey.HUEY_DATABASE", shared_datadir / "huey.db"
)
return mock
@pytest.fixture
def client(huey_database, redis_repo_with_tokens):
def client(redis_repo_with_tokens):
from selfprivacy_api.app import app
return TestClient(app)
@pytest.fixture
def authorized_client(huey_database, redis_repo_with_tokens):
def authorized_client(redis_repo_with_tokens):
"""Authorized test client fixture."""
from selfprivacy_api.app import app
@ -127,7 +118,7 @@ def authorized_client(huey_database, redis_repo_with_tokens):
@pytest.fixture
def wrong_auth_client(huey_database, redis_repo_with_tokens):
def wrong_auth_client(redis_repo_with_tokens):
"""Wrong token test client fixture."""
from selfprivacy_api.app import app

132
tests/test_huey.py Normal file
View File

@ -0,0 +1,132 @@
import pytest
import redis
from typing import List
import subprocess
from subprocess import Popen, check_output, TimeoutExpired
from os import environ, path, set_blocking
from io import BufferedReader
from huey.exceptions import HueyException
from selfprivacy_api.utils.huey import huey, immediate, HUEY_DATABASE_NUMBER
from selfprivacy_api.utils.redis_pool import RedisPool, REDIS_SOCKET
@huey.task()
def sum(a: int, b: int) -> int:
return a + b
def reset_huey_storage():
huey.storage = huey.create_storage()
def flush_huey_redis_forcefully():
url = RedisPool.connection_url(HUEY_DATABASE_NUMBER)
pool = redis.ConnectionPool.from_url(url, decode_responses=True)
connection = redis.Redis(connection_pool=pool)
connection.flushdb()
# TODO: may be useful in other places too, move to utils/ tests common if using it somewhere
def read_all_ready_output(stream: BufferedReader) -> str:
set_blocking(stream.fileno(), False)
output: List[bytes] = []
while True:
line = stream.readline()
raise ValueError(line)
if line == b"":
break
else:
output.append(line)
set_blocking(stream.fileno(), True)
result = b"".join(output)
return result.decode("utf-8")
@pytest.fixture()
def not_immediate():
assert environ["TEST_MODE"] == "true"
old_immediate = huey.immediate
environ["HUEY_QUEUES_FOR_TESTS"] = "Yes"
huey.immediate = False
assert huey.immediate is False
yield
del environ["HUEY_QUEUES_FOR_TESTS"]
huey.immediate = old_immediate
assert huey.immediate == old_immediate
@pytest.fixture()
def huey_socket_consumer(not_immediate):
"""
Same as above, but with socketed redis
"""
flush_huey_redis_forcefully()
command = ["huey_consumer.py", "selfprivacy_api.task_registry.huey"]
# First assert that consumer does not fail by itself
# Idk yet how to do it more elegantly
try:
check_output(command, timeout=2)
except TimeoutExpired:
pass
# Then open it for real
consumer_handle = Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert path.exists(REDIS_SOCKET)
yield consumer_handle
consumer_handle.kill()
def test_huey_over_redis_socket(huey_socket_consumer):
assert huey.immediate is False
assert immediate() is False
assert "unix" in RedisPool.connection_url(HUEY_DATABASE_NUMBER)
try:
assert (
RedisPool.connection_url(HUEY_DATABASE_NUMBER)
in huey.storage_kwargs.values()
)
except AssertionError:
raise ValueError(
"our test-side huey does not connect over socket: ", huey.storage_kwargs
)
result = sum(2, 5)
try:
assert result(blocking=True, timeout=100) == 7
except HueyException as error:
if "timed out" in str(error):
output = read_all_ready_output(huey_socket_consumer.stdout)
errorstream = read_all_ready_output(huey_socket_consumer.stderr)
raise TimeoutError(
f"Huey timed out: {str(error)}",
f"Consumer output: {output}",
f"Consumer errorstream: {errorstream}",
)
else:
raise error
@pytest.mark.xfail(reason="cannot yet schedule with sockets for some reason")
def test_huey_schedule(huey_queues_socket):
# We do not schedule tasks anywhere, but concerning that it fails.
sum.schedule((2, 5), delay=10)
try:
assert len(huey.scheduled()) == 1
except AssertionError:
raise ValueError("have wrong amount of scheduled tasks", huey.scheduled())