Skip to content

Commit 4e111ba

Browse files
sfayerandresailer
andcommitted
fix: Handle exceptions from ThreadPoolExecutors
Co-authored-by: Andre Sailer <[email protected]>
1 parent fd21b17 commit 4e111ba

File tree

5 files changed

+34
-6
lines changed

5 files changed

+34
-6
lines changed

integration_tests.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import sys
1010
import tempfile
1111
import time
12-
from concurrent.futures import ThreadPoolExecutor
12+
from concurrent.futures import ThreadPoolExecutor, as_completed
1313
from contextlib import contextmanager
1414
from typing import Optional
1515

@@ -486,10 +486,15 @@ def logs(pattern: str = "*", lines: int = 10, follow: bool = True):
486486
if follow:
487487
base_cmd += ["-f"]
488488
with ThreadPoolExecutor(len(services)) as pool:
489+
futures = []
489490
for service in fnmatch.filter(services, pattern):
490491
cmd = base_cmd + [f"{runit_dir}/{service}/log/current"]
491492
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=None, text=True)
492-
pool.submit(_log_popen_stdout, p)
493+
futures.append(pool.submit(_log_popen_stdout, p))
494+
for res in as_completed(futures):
495+
err = res.exception()
496+
if err:
497+
raise err
493498

494499

495500
class TestExit(typer.Exit):

src/DIRAC/Core/DISET/private/MessageBroker.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,14 @@ def __receiveMsgDataAndQueue(self, trid):
181181
"from %s : %s" % (self.__trPool.get(trid).getFormattedCredentials(), result["Message"]),
182182
)
183183
return self.removeTransport(trid)
184-
self.__threadPool.submit(self.__processIncomingData, trid, result)
184+
185+
def err_handler(res):
186+
err = res.exception()
187+
if err:
188+
self.__log.exception("Exception in receiveMsgDataAndQueue thread", lException=err)
189+
190+
future = self.__threadPool.submit(self.__processIncomingData, trid, result)
191+
future.add_done_callback(err_handler)
185192
return S_OK()
186193

187194
def __processIncomingData(self, trid, receivedResult):

src/DIRAC/Core/DISET/private/Service.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,14 @@ def handleConnection(self, clientTransport):
338338
if not self.activityMonitoring:
339339
self._stats["connections"] += 1
340340
self._monitor.setComponentExtraParam("queries", self._stats["connections"])
341-
self._threadPool.submit(self._processInThread, clientTransport)
341+
342+
def err_handler(result):
343+
err = result.exception()
344+
if err:
345+
gLogger.exception("Exception in handleConnection thread", lException=err)
346+
347+
future = self._threadPool.submit(self._processInThread, clientTransport)
348+
future.add_done_callback(err_handler)
342349

343350
@property
344351
def wantsThrottle(self):

src/DIRAC/FrameworkSystem/Agent/MyProxyRenewalAgent.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,8 @@ def execute(self):
7272
userDN = record[0]
7373
userGroup = record[1]
7474
futures.append(executor.submit(self.__renewProxyForCredentials, userDN, userGroup))
75+
for res in concurrent.futures.as_completed(futures):
76+
err = res.exception()
77+
if err:
78+
raise err
7579
return S_OK()

src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import random
2424
import socket
2525
from collections import defaultdict
26-
from concurrent.futures import ThreadPoolExecutor
26+
from concurrent.futures import ThreadPoolExecutor, as_completed
2727

2828
import DIRAC
2929
from DIRAC import S_OK, gConfig
@@ -1079,8 +1079,13 @@ def updatePilotStatus(self):
10791079
# Threads aim at overcoming such issues and thus 1 thread per queue is created to
10801080
# update the status of pilots in transient states
10811081
with ThreadPoolExecutor(max_workers=len(self.queueDict)) as executor:
1082+
futures = []
10821083
for queue in self.queueDict:
1083-
executor.submit(self._updatePilotStatusPerQueue, queue, proxy)
1084+
futures.append(executor.submit(self._updatePilotStatusPerQueue, queue, proxy))
1085+
for res in as_completed(futures):
1086+
err = res.exception()
1087+
if err:
1088+
self.log.exception("Update pilot status thread failed", lException=err)
10841089

10851090
# The pilot can be in Done state set by the job agent check if the output is retrieved
10861091
for queue in self.queueDict:

0 commit comments

Comments
 (0)