Skip to content

Commit f29f043

Browse files
committed
Allow setting number of processes/threads through env.
1 parent d2499f4 commit f29f043

File tree

5 files changed

+102
-22
lines changed

5 files changed

+102
-22
lines changed

README.md

+25-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ Seemlessly integrate [Dramatiq][dramatiq] with your Django project!
2020

2121
## Installation
2222

23-
To install, ensure both Django Dramtiq and Dramatiq are installed, along with RabbitMQ:
23+
To install, ensure both Django Dramatiq and Dramatiq are installed, along with RabbitMQ:
2424

2525
pip install django-dramatiq 'dramatiq[rabbitmq]'
2626

@@ -104,7 +104,31 @@ DRAMATIQ_AUTODISCOVER_MODULES = ["tasks", "services"]
104104
Django Dramatiq comes with a management command you can use to
105105
auto-discover task modules and run workers:
106106

107+
```sh
107108
python manage.py rundramatiq
109+
```
110+
111+
By default, `rundramatiq` will adjust the number of processes/threads used
112+
by Dramatiq based on the number of detected CPUs: one process will be launched
113+
per CPU, and each process will have 8 worker threads.
114+
115+
The default number of processes, threads per process can be overridden through
116+
environment variables, which take precedence over the defaults:
117+
118+
```sh
119+
export DRAMATIQ_NPROCS=2 DRAMATIQ_NTHREADS=2
120+
python manage.py rundramatiq
121+
```
122+
123+
Or alternatively through command line arguments, which take precedence over the
124+
defaults and any environment variables:
125+
126+
```sh
127+
python manage.py rundramatiq -p 2 -t 2
128+
```
129+
130+
This is useful e.g. to facilitate faster Dramatiq restarts in your development
131+
environment.
108132

109133
If your project for some reason has apps with modules named `tasks` that
110134
are not intended for use with Dramatiq, you can ignore them:

django_dramatiq/management/commands/rundramatiq.py

+10-5
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,14 @@
1111
from django.core.management.base import BaseCommand
1212
from django.utils.module_loading import module_has_submodule
1313

14-
#: The number of available CPUs.
15-
CPU_COUNT = multiprocessing.cpu_count()
16-
THREAD_COUNT = 8
14+
from django_dramatiq.utils import getenv_int
15+
16+
17+
# Number of processes to use. Default: one per CPU.
18+
NPROCS = getenv_int("DRAMATIQ_NPROCS", default=multiprocessing.cpu_count)
19+
20+
# Number of threads per process to use. Default: 8.
21+
NTHREADS = getenv_int("DRAMATIQ_NTHREADS", 8)
1722

1823

1924
class Command(BaseCommand):
@@ -49,13 +54,13 @@ def add_arguments(self, parser):
4954
)
5055
parser.add_argument(
5156
"--processes", "-p",
52-
default=CPU_COUNT,
57+
default=NPROCS,
5358
type=int,
5459
help="The number of processes to run",
5560
)
5661
parser.add_argument(
5762
"--threads", "-t",
58-
default=THREAD_COUNT,
63+
default=NTHREADS,
5964
type=int,
6065
help="The number of threads per process to use",
6166
)

django_dramatiq/utils.py

+22
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,28 @@
1+
import logging
2+
import os
3+
14
from django.utils.module_loading import import_string
25

36

7+
def getenv_int(varname, default=None):
8+
"""Retrieves an environment variable as an int."""
9+
envstr = os.getenv(varname, None)
10+
11+
if envstr is not None:
12+
try:
13+
return int(envstr)
14+
except ValueError:
15+
if default is None:
16+
raise
17+
msgf = "Invalid value for %s: %r. Reverting to default."
18+
logging.warning(msgf, varname, envstr)
19+
20+
if callable(default):
21+
return default()
22+
else:
23+
return default
24+
25+
426
def load_middleware(path_or_obj, **kwargs):
527
if isinstance(path_or_obj, str):
628
return import_string(path_or_obj)(**kwargs)

tests/test_rundramatiq_command.py

+16-16
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ def test_rundramatiq_can_run_dramatiq(execvp_mock):
3636
assert "Discovered tasks module: 'tests.testapp3.tasks.other_tasks'" in buff.getvalue()
3737

3838
# And execvp should be called with the appropriate arguments
39-
cores = str(rundramatiq.CPU_COUNT)
40-
threads = str(rundramatiq.THREAD_COUNT)
39+
cores = str(rundramatiq.NPROCS)
40+
threads = str(rundramatiq.NTHREADS)
4141
expected_exec_name = "dramatiq"
4242
expected_exec_path = os.path.join(
4343
os.path.dirname(sys.executable),
@@ -67,8 +67,8 @@ def test_rundramatiq_can_run_dramatiq_reload(execvp_mock):
6767
call_command("rundramatiq", "--reload", stdout=buff)
6868

6969
# Then execvp should be called with the appropriate arguments
70-
cores = str(rundramatiq.CPU_COUNT)
71-
threads = str(rundramatiq.THREAD_COUNT)
70+
cores = str(rundramatiq.NPROCS)
71+
threads = str(rundramatiq.NTHREADS)
7272
expected_exec_name = "dramatiq"
7373
expected_exec_path = os.path.join(
7474
os.path.dirname(sys.executable),
@@ -99,8 +99,8 @@ def test_rundramatiq_can_run_dramatiq_with_polling(execvp_mock):
9999
call_command("rundramatiq", "--reload", "--reload-use-polling", stdout=buff)
100100

101101
# Then execvp should be called with the appropriate arguments
102-
cores = str(rundramatiq.CPU_COUNT)
103-
threads = str(rundramatiq.THREAD_COUNT)
102+
cores = str(rundramatiq.NPROCS)
103+
threads = str(rundramatiq.NTHREADS)
104104
expected_exec_name = "dramatiq"
105105
expected_exec_path = os.path.join(
106106
os.path.dirname(sys.executable),
@@ -132,8 +132,8 @@ def test_rundramatiq_can_run_dramatiq_with_only_some_queues(execvp_mock):
132132
call_command("rundramatiq", "--queues", "A B C", stdout=buff)
133133

134134
# Then execvp should be called with the appropriate arguments
135-
cores = str(rundramatiq.CPU_COUNT)
136-
threads = str(rundramatiq.THREAD_COUNT)
135+
cores = str(rundramatiq.NPROCS)
136+
threads = str(rundramatiq.NTHREADS)
137137
expected_exec_name = "dramatiq"
138138
expected_exec_path = os.path.join(
139139
os.path.dirname(sys.executable),
@@ -164,8 +164,8 @@ def test_rundramatiq_can_run_dramatiq_with_specified_pid_file(execvp_mock):
164164
call_command("rundramatiq", "--pid-file", "drama.pid", stdout=buff)
165165

166166
# Then execvp should be called with the appropriate arguments
167-
cores = str(rundramatiq.CPU_COUNT)
168-
threads = str(rundramatiq.THREAD_COUNT)
167+
cores = str(rundramatiq.NPROCS)
168+
threads = str(rundramatiq.NTHREADS)
169169
expected_exec_name = "dramatiq"
170170
expected_exec_path = os.path.join(
171171
os.path.dirname(sys.executable),
@@ -196,8 +196,8 @@ def test_rundramatiq_can_run_dramatiq_with_specified_log_file(execvp_mock):
196196
call_command("rundramatiq", "--log-file", "drama.log", stdout=buff)
197197

198198
# Then execvp should be called with the appropriate arguments
199-
cores = str(rundramatiq.CPU_COUNT)
200-
threads = str(rundramatiq.THREAD_COUNT)
199+
cores = str(rundramatiq.NPROCS)
200+
threads = str(rundramatiq.NTHREADS)
201201
expected_exec_name = "dramatiq"
202202
expected_exec_path = os.path.join(
203203
os.path.dirname(sys.executable),
@@ -244,8 +244,8 @@ def test_rundramatiq_can_ignore_modules(execvp_mock, settings):
244244
assert "Ignored tasks module: 'tests.testapp3.tasks.utils.not_a_task'" in buff.getvalue()
245245

246246
# And execvp should be called with the appropriate arguments
247-
cores = str(rundramatiq.CPU_COUNT)
248-
threads = str(rundramatiq.THREAD_COUNT)
247+
cores = str(rundramatiq.NPROCS)
248+
threads = str(rundramatiq.NTHREADS)
249249
expected_exec_name = "dramatiq"
250250
expected_exec_path = os.path.join(
251251
os.path.dirname(sys.executable),
@@ -272,8 +272,8 @@ def test_rundramatiq_can_fork(execvp_mock, settings):
272272
call_command("rundramatiq", "--fork-function", "a", "--fork-function", "b", stdout=buff)
273273

274274
# Then execvp should be called with the appropriate arguments
275-
cores = str(rundramatiq.CPU_COUNT)
276-
threads = str(rundramatiq.THREAD_COUNT)
275+
cores = str(rundramatiq.NPROCS)
276+
threads = str(rundramatiq.NTHREADS)
277277
expected_exec_name = "dramatiq"
278278
expected_exec_path = os.path.join(
279279
os.path.dirname(sys.executable),

tests/test_utils.py

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import os
2+
3+
import pytest
4+
5+
from django_dramatiq.utils import getenv_int
6+
7+
8+
@pytest.mark.parametrize("value, default, expected", (
9+
("42", None, 42),
10+
("invalid", 69, 69),
11+
("invalid", None, ValueError),
12+
("invalid", lambda: 96, 96),
13+
(None, 19, 19),
14+
(None, lambda: 78, 78),
15+
(None, 'hello', 'hello'), # returned default is not checked to be an int
16+
(None, lambda: 'world', 'world') # idem
17+
))
18+
def test_getenv_int(value, default, expected):
19+
varname = "TEST_ENV_20250204"
20+
if value is not None:
21+
os.environ[varname] = value
22+
else:
23+
os.environ.pop(varname, None)
24+
25+
if isinstance(expected, type) and issubclass(expected, Exception):
26+
with pytest.raises(expected):
27+
getenv_int(varname, default)
28+
else:
29+
assert getenv_int(varname, default) == expected

0 commit comments

Comments
 (0)