11
11
logger = logging .getLogger (__name__ )
12
12
13
13
14
- DEFAULT_QUEUE_NAME = ' default'
14
+ DEFAULT_QUEUE_NAME = " default"
15
15
16
16
17
17
def process_job (queue_name ):
@@ -22,7 +22,14 @@ def process_job(queue_name):
22
22
if not job :
23
23
return
24
24
25
- logger .info ('Processing job: name="%s" queue="%s" id=%s state=%s next_task=%s' , job .name , queue_name , job .pk , job .state , job .next_task )
25
+ logger .info (
26
+ 'Processing job: name="%s" queue="%s" id=%s state=%s next_task=%s' ,
27
+ job .name ,
28
+ queue_name ,
29
+ job .pk ,
30
+ job .state ,
31
+ job .next_task ,
32
+ )
26
33
job .state = Job .STATES .PROCESSING
27
34
job .save ()
28
35
@@ -40,18 +47,30 @@ def process_job(queue_name):
40
47
41
48
failure_hook_name = job .get_failure_hook_name ()
42
49
if failure_hook_name :
43
- logger .info ("Running failure hook %s for job id=%s" , failure_hook_name , job .pk )
50
+ logger .info (
51
+ "Running failure hook %s for job id=%s" , failure_hook_name , job .pk
52
+ )
44
53
failure_hook_function = import_string (failure_hook_name )
45
54
failure_hook_function (job , exception )
46
55
else :
47
56
logger .info ("No failure hook for job id=%s" , job .pk )
48
57
49
- logger .info ('Updating job: name="%s" id=%s state=%s next_task=%s' , job .name , job .pk , job .state , job .next_task or 'none' )
58
+ logger .info (
59
+ 'Updating job: name="%s" id=%s state=%s next_task=%s' ,
60
+ job .name ,
61
+ job .pk ,
62
+ job .state ,
63
+ job .next_task or "none" ,
64
+ )
50
65
51
66
try :
52
67
job .save ()
53
68
except :
54
- logger .error ('Failed to save job: id=%s org=%s' , job .pk , job .workspace .get ('organisation_id' ))
69
+ logger .error (
70
+ "Failed to save job: id=%s org=%s" ,
71
+ job .pk ,
72
+ job .workspace .get ("organisation_id" ),
73
+ )
55
74
raise
56
75
57
76
@@ -67,7 +86,11 @@ def __init__(self, name, rate_limit_in_seconds):
67
86
68
87
def do_work (self ):
69
88
sleep (1 )
70
- if self .last_job_finished and (timezone .now () - self .last_job_finished ).total_seconds () < self .rate_limit_in_seconds :
89
+ if (
90
+ self .last_job_finished
91
+ and (timezone .now () - self .last_job_finished ).total_seconds ()
92
+ < self .rate_limit_in_seconds
93
+ ):
71
94
return
72
95
73
96
process_job (self .queue_name )
@@ -79,14 +102,20 @@ class Command(BaseCommand):
79
102
help = "Run a queue worker process"
80
103
81
104
def add_arguments (self , parser ):
82
- parser .add_argument ('queue_name' , nargs = '?' , default = 'default' , type = str )
83
- parser .add_argument ('rate_limit' , help = 'The rate limit in seconds. The default rate limit is 1 job per second.' , nargs = '?' , default = 1 , type = int )
105
+ parser .add_argument ("queue_name" , nargs = "?" , default = "default" , type = str )
84
106
parser .add_argument (
85
- '--dry-run' ,
86
- action = 'store_true' ,
87
- dest = 'dry_run' ,
107
+ "rate_limit" ,
108
+ help = "The rate limit in seconds. The default rate limit is 1 job per second." ,
109
+ nargs = "?" ,
110
+ default = 1 ,
111
+ type = int ,
112
+ )
113
+ parser .add_argument (
114
+ "--dry-run" ,
115
+ action = "store_true" ,
116
+ dest = "dry_run" ,
88
117
default = False ,
89
- help = "Don't actually start the worker. Used for testing."
118
+ help = "Don't actually start the worker. Used for testing." ,
90
119
)
91
120
92
121
def handle (self , * args , ** options ):
@@ -96,14 +125,17 @@ def handle(self, *args, **options):
96
125
if len (args ) != 1 :
97
126
raise CommandError ("Please supply a single queue job name" )
98
127
99
- queue_name = options [' queue_name' ]
100
- rate_limit_in_seconds = options [' rate_limit' ]
128
+ queue_name = options [" queue_name" ]
129
+ rate_limit_in_seconds = options [" rate_limit" ]
101
130
102
- self .stdout .write ("Starting job worker for queue \" %s\" with rate limit %s/s" % (queue_name , rate_limit_in_seconds ))
131
+ self .stdout .write (
132
+ 'Starting job worker for queue "%s" with rate limit %s/s'
133
+ % (queue_name , rate_limit_in_seconds )
134
+ )
103
135
104
136
worker = Worker (queue_name , rate_limit_in_seconds )
105
137
106
- if options [' dry_run' ]:
138
+ if options [" dry_run" ]:
107
139
return
108
140
109
141
worker .run ()
0 commit comments