Skip to content

Commit cb3557c

Browse files
authored
Merge pull request #71 from anomaly/drop-celery
Moves to using TaskIQ with RabbitMQ and Redis, fixes MinIO implementation, introduces container health checks
2 parents 9c2bb1c + 13a5456 commit cb3557c

25 files changed

+674
-598
lines changed

.env.development

+6-5
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ POSTGRES_HOST=db
66
POSTGRES_PORT=5432
77
POSTGRES_DB=anomaly_labs
88

9-
# Logger
10-
FLUENTD_HOST=fluent-bit
11-
FLUENTD_PORT=24224
9+
# RabbitMQ
10+
RABBITMQ_DEFAULT_USER=rabbitmq
11+
RABBITMQ_DEFAULT_PASS=rabbitmq
12+
RABBITMQ_NODE_PORT=5672
13+
RABBITMQ_HOST=rabbitmq
1214

1315
# Memory store for Cookies and queues
1416
REDIS_HOST=redis
@@ -21,8 +23,7 @@ S3_PORT=9000
2123
S3_ACCESS_KEY=minioadminaccess
2224
S3_SECRET_KEY=minioadminsecret
2325
S3_REGION=any
24-
S3_USE_SSL=False
25-
26+
S3_USE_SSL=True
2627

2728
# Email relay used by the application
2829
SMTP_HOST=smtp.clicksend.com

.gitignore

+3-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ __pycache__/
44
*.py[cod]
55
*$py.class
66

7+
# Certificates for development
8+
.cert
9+
710
# C extensions
811
*.so
912

@@ -113,10 +116,6 @@ ipython_config.py
113116
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
114117
__pypackages__/
115118

116-
# Celery stuff
117-
celerybeat-schedule
118-
celerybeat.pid
119-
120119
# SageMath parsed files
121120
*.sage.py
122121

README.md

+92-46
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ This lab aims to outline a recipe for building a standardised Python server that
66
- [X] A Redis server for development
77
- [ ] Healthcheck endpoint that will validate that the API can get to the database
88
- [X] Worker processes that will process tasks in the background (using Celery)
9-
- [ ] Provide `Dockerfile` for development and production
9+
- [X] Provide `Dockerfile` for development and production
1010
- [ ] Log aggregation and monitoring ([parseable](https://github.com/parseablehq/parseable))
1111
- [X] ~~CSRF protection~~ see [#52](https://github.com/anomaly/lab-python-server/issues/52), also see [official guide](https://fastapi.tiangolo.com/tutorial/cors/)
1212
- [X] Basic endpoints for authentication (JWT and OTP based) - along with recommendations for encryption algorithms
@@ -55,15 +55,26 @@ The above is wrapped up as a `Task` endpoints, you need to supply the length of
5555
task crypt:hash -- 32
5656
```
5757

58+
## Exposed ports for development
59+
60+
If you are using the development `docker-compose.yml` it exposes the following ports to the host machine:
61+
62+
- `5432` - standard port for `postgres` so you can use a developer tool to inspect the database
63+
- `15672` - RabbitMQ web dashboard (HTTP)
64+
- `9000` - MinIO web server was exchanging S3 compatible objects (HTTPS, see configuration details)
65+
- `9001` - MinIO web Dashboard (HTTPS)
66+
67+
> Some of these ports should not be exposed in production
68+
5869
## Python packages
5970

6071
The following Python packages make the standard set of tools for our projects:
6172

6273
- [**SQLAlchemy**](https://www.sqlalchemy.org) - A Python object relational mapper (ORM)
6374
- [**alembic**](https://alembic.sqlalchemy.org/en/latest/) - A database migration tool
6475
- [**FastAPI**](http://fastapi.tiangolo.com) - A fast, simple, and flexible framework for building HTTP APIs
65-
- [**Celery**](https://docs.celeryq.dev/en/stable/getting-started/introduction.html) - A task queue
66-
- **fluent-logger** - A Python logging library that supports fluentd
76+
- [**pydantic**](https://docs.pydantic.dev) - A data validation library that is central around the design of FastAPI
77+
- [**TaskIQ**](https://https://taskiq-python.github.io/) - An `asyncio` compatible task queue processor that uses RabbitMQ and Redis and has FastAPI like design e.g Dependencies
6778
- [**pendulum**](https://pendulum.eustace.io) - A timezone aware datetime library
6879
- [**pyotp**](https://pyauth.github.io/pyotp/) - A One-Time Password (OTP) generator
6980

@@ -98,13 +109,13 @@ Directory structure for our application:
98109
├─ tests/
99110
├─ labs
100111
| └─ routers/ -- FastAPI routers
101-
| └─ tasks/ -- Celery tasks
112+
| └─ tasks/ -- TaskIQ
102113
| └─ models/ -- SQLAlchemy models
103114
| └─ schema/ -- Pydantic schemas
104115
| └─ alembic/ -- Alembic migrations
105116
| └─ __init__.py
106117
| └─ api.py
107-
| └─ celery.py
118+
| └─ broker.py
108119
| └─ config.py
109120
| └─ db.py
110121
| └─ utils.py
@@ -248,64 +259,62 @@ which would result in the client generating a function like `someSpecificIdYouDe
248259

249260
For consistenty FastAPI docs shows a wrapper function that [globally re-writes](https://fastapi.tiangolo.com/advanced/path-operation-advanced-configuration/?h=operation_id#using-the-path-operation-function-name-as-the-operationid) the `operation_id` to the function name. This does put the onus on the developer to name the function correctly.
250261

251-
## Celery based workers
262+
## TaskIQ based tasks
252263

253-
> *WARNING:* Celery currently *DOES NOT* have support for `asyncio` which comes in the way of our stack, please follow [Issue 21](https://github.com/anomaly/lab-python-server/issues/21) for information on current work arounds and recommendations. We are also actively working with the Celery team to get this resolved.
264+
The project uses [`TaskIQ`](https://taskiq-python.github.io) to manage task queues. TaskIQ supports `asyncio` and has FastAPI like design ideas e.g [dependency injection](https://taskiq-python.github.io/guide/state-and-deps.html) and can be tightly [coupled with FastAPI](https://taskiq-python.github.io/guide/taskiq-with-fastapi.html).
254265

255-
The projects use `Celery` to manage a queue backed by `redis` to schedule and process background tasks. The celery app is run a separate container. In development we use [watchdog](https://github.com/gorakhargosh/watchdog) to watch for changes to the Python files, this is obviously uncessary in production.
256266

257-
The celery app is configured in `celery.py` which reads from the `redis` configuration in `config.py`.
267+
TaskIQ is configured as recommend for production use with [taskiq-aio-pika](https://pypi.org/project/taskiq-aio-pika/) as the broker and [taskiq-redis](https://pypi.org/project/taskiq-redis/) as the result backend.
258268

259-
Each task is defined in the `tasks` package with appropriate subpackages.
269+
`broker.py` in the root of the project configures the broker using:
260270

261-
To schedule tasks, the API endpoints need to import the task
262271
```python
263-
from ...tasks.email import verification_email
272+
broker = AioPikaBroker(
273+
config.amqp_dsn,
274+
result_backend=redis_result_backend
275+
)
264276
```
265-
and call the `apply_async` method on the task:
277+
278+
`api.py` uses `FastAPI` events to `start` and `shutdown` the broker. As their documentation notes:
279+
280+
> Calling the startup method is necessary. If you don't call it, you may get an undefined behaviour.
281+
266282
```python
267-
verification_email.apply_async()
283+
# TaskIQ configurartion so we can share FastAPI dependencies in tasks
284+
@app.on_event("startup")
285+
async def app_startup():
286+
if not broker.is_worker_process:
287+
await broker.startup()
288+
289+
# On shutdown, we need to shutdown the broker
290+
@app.on_event("shutdown")
291+
async def app_shutdown():
292+
if not broker.is_worker_process:
293+
await broker.shutdown()
268294
```
269295

270-
A pieced together example of scheduling a task:
296+
We recommend creating a `tasks.py` file under each router directory to keep the tasks associated to each router group next to them. Tasks can be defined by simply calling the `task` decorator on the `broker`:
271297

272298
```python
273-
from fastapi import APIRouter, Request, Depends
274-
from sqlalchemy.ext.asyncio import AsyncSession
275-
276-
from ...db import session_context, session_context
277-
from ...tasks.email import verification_email
278-
from ...config import config
299+
@broker.task
300+
async def send_account_verification_email() -> None:
301+
import logging
302+
logging.error("Kicking off send_account_verification_email")
303+
```
279304

280-
router = APIRouter()
305+
and kick it off simply use the `kiq` method from the FastAPI handlers:
281306

307+
```python
282308
@router.get("/verify")
283-
async def log(request: Request):
309+
async def verify_user(request: Request):
284310
"""Verify an account
285311
"""
286-
verification_email.apply_async()
312+
await send_account_verification_email.kiq()
287313
return {"message": "hello world"}
288314
```
289315

290-
You can send position arguments to the task, for example:
291-
292-
```python
293-
verification_email.apply_async(args=[user_id])
294-
```
295-
296-
which would be recieved by the task as `user_id` as a positional argument.
297-
298-
> We recommend reading design documentation for the `Celery` project [here](https://docs.celeryproject.org/en/latest/userguide/tasks.html), the general principle is send meta data that the task can use to complete the task not complete, heavy objects. i.e send an ID with some context as opposed to a fully formed object.
299-
300-
### Monitoring the queue
316+
There are various powerful options for queuing tasks both scheduled and periodic tasks are supported.
301317

302-
Celery can be monitored using the `flower` package, it provides a web based interfaces. There's also a text based interface available via the command line interface:
303-
304-
```sh
305-
celery -A labs.celery:app events
306-
```
307-
308-
you can alternatively use the wrapped Task command `task dev:qwatch`, this is portable across projects if you copy the template.
309318

310319
## SQLAlchemy wisdom
311320

@@ -487,15 +496,52 @@ INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
487496
INFO [alembic.runtime.migration] Will assume transactional DDL.
488497
INFO [alembic.runtime.migration] Running upgrade -> 4b2dfa16da8f, init db
489498
```
499+
### Joining back with `HEAD`
500+
501+
`task db:heads`
502+
503+
## MinIO wisdom
504+
505+
MinIO is able to run with `TLS` enabled, all you hve to do is provide it a certificate. By default MinIO looks for certificates in `${HOME}/.minio/certs`. You can generate certificates and mount them into the container:
506+
507+
```yaml
508+
volumes:
509+
- minio-data:/data
510+
- .cert:/root/.minio/certs
511+
```
512+
513+
This will result in the dashboard being available via `HTTPS` and the signed URLs will be TLS enabled.
514+
515+
Since we use `TLS` enabled endpoints for development, running MinIO in secure mode will satisfy any browser security policies.
516+
517+
### S3FileMetadata
518+
519+
The template provides a `SQLAlchemy` table called `S3FileMetadata` this is used to store metadata about file uploads.
520+
521+
The client sends a request with the file `name`, `size` and `mime type`, the endpoint create a `S3FileMetadata` and returns an pre-signed upload URL, that the client must post the contents of the file to.
522+
523+
The client can take as long as it takes it upload the contents, but must begin uploading within the signed life e.g five minutes from when the URL is generated.
524+
525+
The template is designed to schedule a task to check if the object made it to the store. It continues to check this for a period of time and marks the file to be available if the contents are found on the object store.
526+
527+
The client must keep polling back to the server to see if the file is eventually available.
490528

491529
## Taskfile
492530

493531
[Task](https://taskfile.dev) is a task runner / build tool that aims to be simpler and easier to use than, for example, GNU Make. Wile it's useful to know the actual commands it's easy to use a tool like task to make things easier on a daily basis:
494532

495-
- `task db:revision -- "commit message"` - creates a new revision in the database and uses the parameter as the commit message
496-
- `task db:migrate` - migrates the schema to the latest version
497-
- `task dev:psql` - postgres shell in the database container
498-
- `task dev:pyshell` - get a `python` session on the api container which should have access to the entire application
533+
- `eject` - eject the project from a template
534+
- `build:image` - builds a publishable docker image
535+
- `crypt:hash` - generate a random cryptographic hash
536+
- `db:alembic` - arbitrary alembic command in the container
537+
- `db:heads` - shows the HEAD SHA for alembic migrations
538+
- `db:init` - initialise the database schema
539+
- `db:migrate` - migrates models to HEAD
540+
- `db:rev` - create a database migration, pass a string as commit string
541+
- `dev:psql` - postgres shell on the db container
542+
- `dev:pyshell` - get a python session on the api container
543+
- `dev:sh` - get a bash session on the api container
544+
- `dev:test` - runs tests inside the server container
499545

500546
## Docker in Development
501547

Taskfile.yml

+2-6
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,6 @@ tasks:
2525
desc: get a bash session on the api container
2626
cmds:
2727
- docker compose exec api sh -c "bash"
28-
dev:qwatch:
29-
desc: get a list of celery events
30-
cmds:
31-
- docker compose exec api sh -c "celery -A {{.PROJ_NAME}}.celery:app events"
3228
crypt:hash:
3329
desc: generate a random cryptographic hash
3430
cmds:
@@ -46,11 +42,11 @@ tasks:
4642
cmds:
4743
- docker compose exec api sh -c "alembic -c /opt/$PROJ_NAME/alembic.ini upgrade head"
4844
db:heads:
49-
desc: shows the HEAD SHA
45+
desc: shows the HEAD SHA for alembic migrations
5046
cmds:
5147
- docker compose exec api sh -c "alembic -c /opt/$PROJ_NAME/alembic.ini heads"
5248
db:alembic:
53-
desc: arbitrary alembic command
49+
desc: arbitrary alembic command in the container
5450
cmds:
5551
- docker compose exec api sh -c "alembic -c /opt/$PROJ_NAME/alembic.ini {{.CLI_ARGS}}"
5652
eject:

docker-compose.prod.yml

+9-22
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,6 @@ services:
9595
volumes:
9696
- /opt/data/redis:/data
9797

98-
# fluent-bit:
99-
# image: fluent/fluent-bit:1.5
100-
# container_name: ${PROJ_NAME}-fluentbit
101-
# restart: unless-stopped
102-
10398
# Applicaiton API:
10499
# - In development we read secrets from .env
105100
# - Provides a FastAPI based API that runs using uvicorn in development
@@ -130,30 +125,22 @@ services:
130125
- redis
131126
- db
132127

133-
# Worker: is a celery based worker process that runs in the background
128+
# TaskIQ worker
134129
worker:
135130
container_name: ${PROJ_NAME}-worker
136-
image: anomalyhq/${PROJ_NAME}-server:${VERSION}
137-
command: ["celery", "--app=${PROJ_NAME}.celery.app", "worker", "--pool=gevent", "--loglevel=INFO", "--queues=celery"]
138-
cap_drop:
139-
- "all"
131+
build:
132+
context: .
133+
dockerfile: Dockerfile
134+
command: ["taskiq", "worker", "${PROJ_NAME}.broker:broker"]
140135
env_file:
141-
- .env
136+
- .env.development
142137
restart: unless-stopped
138+
volumes:
139+
- ./src/${PROJ_NAME}:/opt/${PROJ_NAME}
143140
depends_on:
141+
- db
144142
- redis
145143

146-
beat:
147-
container_name: ${PROJ_NAME}-beat
148-
image: anomalyhq/${PROJ_NAME}}-server:${VERSION}
149-
command: ["celery", "--app=${PROJ_NAME}.celery.app", "beat", "--loglevel=INFO"]
150-
cap_drop:
151-
- "all"
152-
env_file:
153-
- .env
154-
restart: unless-stopped
155-
depends_on:
156-
- redis
157144

158145
networks:
159146
default:

0 commit comments

Comments
 (0)