Skip to content

Commit 039bd53

Browse files
committed
Merge branch 'release/0.5.2'
2 parents b3fe337 + 9ec4c43 commit 039bd53

File tree

7 files changed

+329
-16
lines changed

7 files changed

+329
-16
lines changed

.github/workflows/test.yml

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,24 +30,14 @@ jobs:
3030
- name: Run lint check
3131
run: poetry run pre-commit run -a ${{ matrix.cmd }}
3232
pytest:
33-
services:
34-
redis:
35-
image: bitnami/redis:6.2.5
36-
env:
37-
ALLOW_EMPTY_PASSWORD: "yes"
38-
options: >-
39-
--health-cmd="redis-cli ping"
40-
--health-interval=5s
41-
--health-timeout=5s
42-
--health-retries=30
43-
ports:
44-
- 6379:6379
4533
strategy:
4634
matrix:
4735
py_version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
4836
runs-on: "ubuntu-latest"
4937
steps:
5038
- uses: actions/checkout@v4
39+
- name: Set up Redis instance and Redis cluster
40+
run: docker-compose up -d
5141
- name: Set up Python
5242
uses: actions/setup-python@v2
5343
with:

docker-compose.yml

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
version: '3.2'
2+
3+
services:
4+
redis:
5+
image: bitnami/redis:6.2.5
6+
environment:
7+
ALLOW_EMPTY_PASSWORD: "yes"
8+
healthcheck:
9+
test: ["CMD", "redis-cli", "ping"]
10+
interval: 5s
11+
timeout: 5s
12+
retries: 3
13+
start_period: 10s
14+
ports:
15+
- 7000:6379
16+
redis-node-0:
17+
image: docker.io/bitnami/redis-cluster:7.2
18+
environment:
19+
ALLOW_EMPTY_PASSWORD: "yes"
20+
REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5"
21+
22+
redis-node-1:
23+
image: docker.io/bitnami/redis-cluster:7.2
24+
environment:
25+
ALLOW_EMPTY_PASSWORD: "yes"
26+
REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5"
27+
28+
redis-node-2:
29+
image: docker.io/bitnami/redis-cluster:7.2
30+
environment:
31+
ALLOW_EMPTY_PASSWORD: "yes"
32+
REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5"
33+
34+
redis-node-3:
35+
image: docker.io/bitnami/redis-cluster:7.2
36+
environment:
37+
ALLOW_EMPTY_PASSWORD: "yes"
38+
REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5"
39+
40+
redis-node-4:
41+
image: docker.io/bitnami/redis-cluster:7.2
42+
environment:
43+
ALLOW_EMPTY_PASSWORD: "yes"
44+
REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5"
45+
46+
redis-node-5:
47+
image: docker.io/bitnami/redis-cluster:7.2
48+
depends_on:
49+
- redis-node-0
50+
- redis-node-1
51+
- redis-node-2
52+
- redis-node-3
53+
- redis-node-4
54+
environment:
55+
ALLOW_EMPTY_PASSWORD: "yes"
56+
REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5"
57+
REDIS_CLUSTER_REPLICAS: 1
58+
REDIS_CLUSTER_CREATOR: "yes"
59+
ports:
60+
- 7001:6379

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "taskiq-redis"
3-
version = "0.5.1"
3+
version = "0.5.2"
44
description = "Redis integration for taskiq"
55
authors = ["taskiq-team <[email protected]>"]
66
readme = "README.md"

taskiq_redis/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
"""Package for redis integration."""
2-
from taskiq_redis.redis_backend import RedisAsyncResultBackend
2+
from taskiq_redis.redis_backend import (
3+
RedisAsyncClusterResultBackend,
4+
RedisAsyncResultBackend,
5+
)
36
from taskiq_redis.redis_broker import ListQueueBroker, PubSubBroker
47
from taskiq_redis.schedule_source import RedisScheduleSource
58

69
__all__ = [
10+
"RedisAsyncClusterResultBackend",
711
"RedisAsyncResultBackend",
812
"ListQueueBroker",
913
"PubSubBroker",

taskiq_redis/redis_backend.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from typing import Dict, Optional, TypeVar, Union
33

44
from redis.asyncio import ConnectionPool, Redis
5+
from redis.asyncio.cluster import RedisCluster
56
from taskiq import AsyncResultBackend
67
from taskiq.abc.result_backend import TaskiqResult
78

@@ -134,3 +135,122 @@ async def get_result(
134135
taskiq_result.log = None
135136

136137
return taskiq_result
138+
139+
140+
class RedisAsyncClusterResultBackend(AsyncResultBackend[_ReturnType]):
141+
"""Async result backend based on redis cluster."""
142+
143+
def __init__(
144+
self,
145+
redis_url: str,
146+
keep_results: bool = True,
147+
result_ex_time: Optional[int] = None,
148+
result_px_time: Optional[int] = None,
149+
) -> None:
150+
"""
151+
Constructs a new result backend.
152+
153+
:param redis_url: url to redis cluster.
154+
:param keep_results: flag to not remove results from Redis after reading.
155+
:param result_ex_time: expire time in seconds for result.
156+
:param result_px_time: expire time in milliseconds for result.
157+
158+
:raises DuplicateExpireTimeSelectedError: if result_ex_time
159+
and result_px_time are selected.
160+
:raises ExpireTimeMustBeMoreThanZeroError: if result_ex_time
161+
and result_px_time are equal zero.
162+
"""
163+
self.redis: RedisCluster[bytes] = RedisCluster.from_url(redis_url)
164+
self.keep_results = keep_results
165+
self.result_ex_time = result_ex_time
166+
self.result_px_time = result_px_time
167+
168+
unavailable_conditions = any(
169+
(
170+
self.result_ex_time is not None and self.result_ex_time <= 0,
171+
self.result_px_time is not None and self.result_px_time <= 0,
172+
),
173+
)
174+
if unavailable_conditions:
175+
raise ExpireTimeMustBeMoreThanZeroError(
176+
"You must select one expire time param and it must be more than zero.",
177+
)
178+
179+
if self.result_ex_time and self.result_px_time:
180+
raise DuplicateExpireTimeSelectedError(
181+
"Choose either result_ex_time or result_px_time.",
182+
)
183+
184+
async def shutdown(self) -> None:
185+
"""Closes redis connection."""
186+
await self.redis.aclose() # type: ignore[attr-defined]
187+
await super().shutdown()
188+
189+
async def set_result(
190+
self,
191+
task_id: str,
192+
result: TaskiqResult[_ReturnType],
193+
) -> None:
194+
"""
195+
Sets task result in redis.
196+
197+
Dumps TaskiqResult instance into the bytes and writes
198+
it to redis.
199+
200+
:param task_id: ID of the task.
201+
:param result: TaskiqResult instance.
202+
"""
203+
redis_set_params: Dict[str, Union[str, bytes, int]] = {
204+
"name": task_id,
205+
"value": pickle.dumps(result),
206+
}
207+
if self.result_ex_time:
208+
redis_set_params["ex"] = self.result_ex_time
209+
elif self.result_px_time:
210+
redis_set_params["px"] = self.result_px_time
211+
212+
await self.redis.set(**redis_set_params) # type: ignore
213+
214+
async def is_result_ready(self, task_id: str) -> bool:
215+
"""
216+
Returns whether the result is ready.
217+
218+
:param task_id: ID of the task.
219+
220+
:returns: True if the result is ready else False.
221+
"""
222+
return bool(await self.redis.exists(task_id)) # type: ignore[attr-defined]
223+
224+
async def get_result(
225+
self,
226+
task_id: str,
227+
with_logs: bool = False,
228+
) -> TaskiqResult[_ReturnType]:
229+
"""
230+
Gets result from the task.
231+
232+
:param task_id: task's id.
233+
:param with_logs: if True it will download task's logs.
234+
:raises ResultIsMissingError: if there is no result when trying to get it.
235+
:return: task's return value.
236+
"""
237+
if self.keep_results:
238+
result_value = await self.redis.get( # type: ignore[attr-defined]
239+
name=task_id,
240+
)
241+
else:
242+
result_value = await self.redis.getdel( # type: ignore[attr-defined]
243+
name=task_id,
244+
)
245+
246+
if result_value is None:
247+
raise ResultIsMissingError
248+
249+
taskiq_result: TaskiqResult[_ReturnType] = pickle.loads( # noqa: S301
250+
result_value,
251+
)
252+
253+
if not with_logs:
254+
taskiq_result.log = None
255+
256+
return taskiq_result

tests/conftest.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,18 @@ def redis_url() -> str:
2525
2626
:return: URL string.
2727
"""
28-
return os.environ.get("TEST_REDIS_URL", "redis://localhost")
28+
return os.environ.get("TEST_REDIS_URL", "redis://localhost:7000")
29+
30+
31+
@pytest.fixture
32+
def redis_cluster_url() -> str:
33+
"""
34+
URL to connect to redis cluster.
35+
36+
It tries to get it from environ,
37+
and return default one if the variable is
38+
not set.
39+
40+
:return: URL string.
41+
"""
42+
return os.environ.get("TEST_REDIS_CLUSTER_URL", "redis://localhost:7001")

tests/test_result_backend.py

Lines changed: 126 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import pytest
44
from taskiq import TaskiqResult
55

6-
from taskiq_redis import RedisAsyncResultBackend
6+
from taskiq_redis import RedisAsyncClusterResultBackend, RedisAsyncResultBackend
77
from taskiq_redis.exceptions import ResultIsMissingError
88

99

@@ -130,3 +130,128 @@ async def test_keep_results_after_reading(redis_url: str) -> None:
130130
res2 = await result_backend.get_result(task_id=task_id)
131131
assert res1 == res2
132132
await result_backend.shutdown()
133+
134+
135+
@pytest.mark.anyio
136+
async def test_set_result_success_cluster(redis_cluster_url: str) -> None:
137+
"""
138+
Tests that results can be set without errors in cluster mode.
139+
140+
:param redis_url: redis URL.
141+
"""
142+
result_backend = RedisAsyncClusterResultBackend( # type: ignore
143+
redis_url=redis_cluster_url,
144+
)
145+
task_id = uuid.uuid4().hex
146+
result: "TaskiqResult[int]" = TaskiqResult(
147+
is_err=True,
148+
log="My Log",
149+
return_value=11,
150+
execution_time=112.2,
151+
)
152+
await result_backend.set_result(
153+
task_id=task_id,
154+
result=result,
155+
)
156+
157+
fetched_result = await result_backend.get_result(
158+
task_id=task_id,
159+
with_logs=True,
160+
)
161+
assert fetched_result.log == "My Log"
162+
assert fetched_result.return_value == 11
163+
assert fetched_result.execution_time == 112.2
164+
assert fetched_result.is_err
165+
await result_backend.shutdown()
166+
167+
168+
@pytest.mark.anyio
169+
async def test_fetch_without_logs_cluster(redis_cluster_url: str) -> None:
170+
"""
171+
Check if fetching value without logs works fine.
172+
173+
:param redis_url: redis URL.
174+
"""
175+
result_backend = RedisAsyncClusterResultBackend( # type: ignore
176+
redis_url=redis_cluster_url,
177+
)
178+
task_id = uuid.uuid4().hex
179+
result: "TaskiqResult[int]" = TaskiqResult(
180+
is_err=True,
181+
log="My Log",
182+
return_value=11,
183+
execution_time=112.2,
184+
)
185+
await result_backend.set_result(
186+
task_id=task_id,
187+
result=result,
188+
)
189+
190+
fetched_result = await result_backend.get_result(
191+
task_id=task_id,
192+
with_logs=False,
193+
)
194+
assert fetched_result.log is None
195+
assert fetched_result.return_value == 11
196+
assert fetched_result.execution_time == 112.2
197+
assert fetched_result.is_err
198+
await result_backend.shutdown()
199+
200+
201+
@pytest.mark.anyio
202+
async def test_remove_results_after_reading_cluster(redis_cluster_url: str) -> None:
203+
"""
204+
Check if removing results after reading works fine.
205+
206+
:param redis_url: redis URL.
207+
"""
208+
result_backend = RedisAsyncClusterResultBackend( # type: ignore
209+
redis_url=redis_cluster_url,
210+
keep_results=False,
211+
)
212+
task_id = uuid.uuid4().hex
213+
result: "TaskiqResult[int]" = TaskiqResult(
214+
is_err=True,
215+
log="My Log",
216+
return_value=11,
217+
execution_time=112.2,
218+
)
219+
await result_backend.set_result(
220+
task_id=task_id,
221+
result=result,
222+
)
223+
224+
await result_backend.get_result(task_id=task_id)
225+
with pytest.raises(ResultIsMissingError):
226+
await result_backend.get_result(task_id=task_id)
227+
228+
await result_backend.shutdown()
229+
230+
231+
@pytest.mark.anyio
232+
async def test_keep_results_after_reading_cluster(redis_cluster_url: str) -> None:
233+
"""
234+
Check if keeping results after reading works fine.
235+
236+
:param redis_url: redis URL.
237+
"""
238+
result_backend = RedisAsyncClusterResultBackend( # type: ignore
239+
redis_url=redis_cluster_url,
240+
keep_results=True,
241+
)
242+
task_id = uuid.uuid4().hex
243+
result: "TaskiqResult[int]" = TaskiqResult(
244+
is_err=True,
245+
log="My Log",
246+
return_value=11,
247+
execution_time=112.2,
248+
)
249+
await result_backend.set_result(
250+
task_id=task_id,
251+
result=result,
252+
)
253+
254+
res1 = await result_backend.get_result(task_id=task_id)
255+
res2 = await result_backend.get_result(task_id=task_id)
256+
assert res1 == res2
257+
await result_backend.shutdown()

0 commit comments

Comments
 (0)