@@ -41,6 +41,39 @@ def _force_empty_queue(q):
4141 continue
4242
4343
44+ # OutputQueue wraps the process of sending data to a multiprocessing queue
45+ # so that we can simultaneously check for the "stop" signal when it's time
46+ # to shut down.
47+ class OutputQueue (object ):
48+ def __init__ (self , output_queue , stop ):
49+ self .output_queue = output_queue
50+ self .stop = stop
51+
52+ def __call__ (self , coord , data ):
53+ """
54+ Send data, associated with coordinate coord, to the queue. While also
55+ watching for a signal to stop. If the data is too large to send, then
56+ trap the MemoryError and exit the program.
57+ """
58+
59+ try :
60+ while not _non_blocking_put (self .output_queue , data ):
61+ if self .stop .is_set ():
62+ return True
63+
64+ except MemoryError :
65+ stacktrace = format_stacktrace_one_line ()
66+ self .logger .error (
67+ "MemoryError while sending %s to the queue. Stacktrace: %s" %
68+ (serialize_coord (coord ), stacktrace ))
69+ # memory error might not leave the malloc subsystem in a usable
70+ # state, so better to exit the whole worker here than crash this
71+ # thread, which would lock up the whole worker.
72+ sys .exit (1 )
73+
74+ return False
75+
76+
4477# The strategy with each worker is to loop on a thread event. When the
4578# main thread/process receives a kill signal, it will issue stops to
4679# each worker to signal that work should end.
@@ -63,7 +96,7 @@ class SqsQueueReader(object):
6396 def __init__ (self , sqs_queue , output_queue , logger , stop ,
6497 metatile_size , sqs_msgs_to_read_size = 10 ):
6598 self .sqs_queue = sqs_queue
66- self .output_queue = output_queue
99+ self .output = OutputQueue ( output_queue , stop )
67100 self .sqs_msgs_to_read_size = sqs_msgs_to_read_size
68101 self .logger = logger
69102 self .stop = stop
@@ -119,9 +152,8 @@ def __call__(self):
119152 metadata = metadata ,
120153 coord = msg .coord ,
121154 )
122- while not _non_blocking_put (self .output_queue , data ):
123- if self .stop .is_set ():
124- break
155+ if self .output (msg .coord , data ):
156+ break
125157
126158 self .sqs_queue .close ()
127159 self .logger .debug ('sqs queue reader stopped' )
@@ -143,6 +175,8 @@ def __init__(self, fetcher, input_queue, output_queue, io_pool,
143175
144176 def __call__ (self , stop ):
145177 saw_sentinel = False
178+ output = OutputQueue (self .output_queue , stop )
179+
146180 while not stop .is_set ():
147181 try :
148182 data = self .input_queue .get (timeout = timeout_seconds )
@@ -226,9 +260,8 @@ def __call__(self, stop):
226260 nominal_zoom = nominal_zoom ,
227261 )
228262
229- while not _non_blocking_put (self .output_queue , data ):
230- if stop .is_set ():
231- break
263+ if output (coord , data ):
264+ break
232265
233266 if not saw_sentinel :
234267 _force_empty_queue (self .input_queue )
@@ -253,6 +286,8 @@ def __call__(self, stop):
253286 # ignore ctrl-c interrupts when run from terminal
254287 signal .signal (signal .SIGINT , signal .SIG_IGN )
255288
289+ output = OutputQueue (self .output_queue , stop )
290+
256291 saw_sentinel = False
257292 while not stop .is_set ():
258293 try :
@@ -292,9 +327,8 @@ def __call__(self, stop):
292327 formatted_tiles = formatted_tiles ,
293328 )
294329
295- while not _non_blocking_put (self .output_queue , data ):
296- if stop .is_set ():
297- break
330+ if output (coord , data ):
331+ break
298332
299333 if not saw_sentinel :
300334 _force_empty_queue (self .input_queue )
@@ -316,6 +350,8 @@ def __init__(self, input_queue, output_queue, io_pool, store, logger,
316350 def __call__ (self , stop ):
317351 saw_sentinel = False
318352
353+ queue_output = OutputQueue (self .output_queue , stop )
354+
319355 while not stop .is_set ():
320356 try :
321357 data = self .input_queue .get (timeout = timeout_seconds )
@@ -374,9 +410,8 @@ def __call__(self, stop):
374410 metadata = metadata ,
375411 )
376412
377- while not _non_blocking_put (self .output_queue , data ):
378- if stop .is_set ():
379- break
413+ if queue_output (coord , data ):
414+ break
380415
381416 if not saw_sentinel :
382417 _force_empty_queue (self .input_queue )
0 commit comments