Skip to content

Commit b8b1558

Browse files
authored
Merge pull request #1119 from moreati/ci-resourcewarnings
CI: Reliability, eliminate a race condition and some resource leaks
2 parents 16c602a + d032c59 commit b8b1558

11 files changed

+80
-36
lines changed

docs/changelog.rst

+2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ Unreleased
2929
* :gh:issue:`905` Initial support for templated ``ansible_ssh_args``,
3030
``ansible_ssh_common_args``, and ``ansible_ssh_extra_args`` variables.
3131
NB: play or task scoped variables will probably still fail.
32+
* :gh:issue:`694` CI: Fixed a race condition and some resource leaks causing
33+
some of intermittent failures when running the test suite.
3234

3335

3436
v0.3.9 (2024-08-13)

mitogen/parent.py

+15-1
Original file line numberDiff line numberDiff line change
@@ -2542,7 +2542,7 @@ def _signal_child(self, signum):
25422542
# because it is setuid, so this is best-effort only.
25432543
LOG.debug('%r: sending %s', self.proc, SIGNAL_BY_NUM[signum])
25442544
try:
2545-
os.kill(self.proc.pid, signum)
2545+
self.proc.send_signal(signum)
25462546
except OSError:
25472547
e = sys.exc_info()[1]
25482548
if e.args[0] != errno.EPERM:
@@ -2662,6 +2662,17 @@ def poll(self):
26622662
"""
26632663
raise NotImplementedError()
26642664

2665+
def send_signal(self, sig):
2666+
os.kill(self.pid, sig)
2667+
2668+
def terminate(self):
2669+
"Ask the process to gracefully shutdown."
2670+
self.send_signal(signal.SIGTERM)
2671+
2672+
def kill(self):
2673+
"Ask the operating system to forcefully destroy the process."
2674+
self.send_signal(signal.SIGKILL)
2675+
26652676

26662677
class PopenProcess(Process):
26672678
"""
@@ -2678,6 +2689,9 @@ def __init__(self, proc, stdin, stdout, stderr=None):
26782689
def poll(self):
26792690
return self.proc.poll()
26802691

2692+
def send_signal(self, sig):
2693+
self.proc.send_signal(sig)
2694+
26812695

26822696
class ModuleForwarder(object):
26832697
"""

mitogen/unix.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -143,19 +143,23 @@ def on_shutdown(self, broker):
143143
def on_accept_client(self, sock):
144144
sock.setblocking(True)
145145
try:
146-
pid, = struct.unpack('>L', sock.recv(4))
146+
data = sock.recv(4)
147+
pid, = struct.unpack('>L', data)
147148
except (struct.error, socket.error):
148-
LOG.error('listener: failed to read remote identity: %s',
149-
sys.exc_info()[1])
149+
LOG.error('listener: failed to read remote identity, got %d bytes: %s',
150+
len(data), sys.exc_info()[1])
151+
sock.close()
150152
return
151153

152154
context_id = self._router.id_allocator.allocate()
153155
try:
156+
# FIXME #1109 send() returns number of bytes sent, check it
154157
sock.send(struct.pack('>LLL', context_id, mitogen.context_id,
155158
os.getpid()))
156159
except socket.error:
157160
LOG.error('listener: failed to assign identity to PID %d: %s',
158161
pid, sys.exc_info()[1])
162+
sock.close()
159163
return
160164

161165
context = mitogen.parent.Context(self._router, context_id)

tests/connection_test.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
import os
23
import signal
34
import sys
@@ -54,7 +55,9 @@ def do_detach(econtext):
5455
class DetachReapTest(testlib.RouterMixin, testlib.TestCase):
5556
def test_subprocess_preserved_on_shutdown(self):
5657
c1 = self.router.local()
58+
c1_stream = self.router.stream_by_id(c1.context_id)
5759
pid = c1.call(os.getpid)
60+
self.assertEqual(pid, c1_stream.conn.proc.pid)
5861

5962
l = mitogen.core.Latch()
6063
mitogen.core.listen(c1, 'disconnect', l.put)
@@ -64,8 +67,8 @@ def test_subprocess_preserved_on_shutdown(self):
6467
self.broker.shutdown()
6568
self.broker.join()
6669

67-
os.kill(pid, 0) # succeeds if process still alive
70+
self.assertIsNone(os.kill(pid, 0)) # succeeds if process still alive
6871

6972
# now clean up
70-
os.kill(pid, signal.SIGTERM)
71-
os.waitpid(pid, 0)
73+
c1_stream.conn.proc.terminate()
74+
c1_stream.conn.proc.proc.wait()

tests/create_child_test.py

+1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ def close_proc(proc):
7676
proc.stdout.close()
7777
if proc.stderr:
7878
proc.stderr.close()
79+
proc.proc.wait()
7980

8081

8182
def wait_read(fp, n):

tests/data/importer/six_brokenpkg/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,4 @@
5353
else:
5454
from . import _six as six
5555
six_py_file = '{0}.py'.format(os.path.splitext(six.__file__)[0])
56-
exec(open(six_py_file, 'rb').read())
56+
with open(six_py_file, 'rb') as f: exec(f.read())

tests/id_allocation_test.py

+3
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,6 @@ def test_slave_allocates_id(self):
2727
# Subsequent master allocation does not collide
2828
c2 = self.router.local()
2929
self.assertEqual(1002, c2.context_id)
30+
31+
context.shutdown()
32+
c2.shutdown()

tests/reaper_test.py

+9-11
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@
1010

1111

1212
class ReaperTest(testlib.TestCase):
13-
@mock.patch('os.kill')
14-
def test_calc_delay(self, kill):
13+
def test_calc_delay(self):
1514
broker = mock.Mock()
1615
proc = mock.Mock()
1716
proc.poll.return_value = None
@@ -24,29 +23,28 @@ def test_calc_delay(self, kill):
2423
self.assertEqual(752, int(1000 * reaper._calc_delay(5)))
2524
self.assertEqual(1294, int(1000 * reaper._calc_delay(6)))
2625

27-
@mock.patch('os.kill')
28-
def test_reap_calls(self, kill):
26+
def test_reap_calls(self):
2927
broker = mock.Mock()
3028
proc = mock.Mock()
3129
proc.poll.return_value = None
3230

3331
reaper = mitogen.parent.Reaper(broker, proc, True, True)
3432

3533
reaper.reap()
36-
self.assertEqual(0, kill.call_count)
34+
self.assertEqual(0, proc.send_signal.call_count)
3735

3836
reaper.reap()
39-
self.assertEqual(1, kill.call_count)
37+
self.assertEqual(1, proc.send_signal.call_count)
4038

4139
reaper.reap()
4240
reaper.reap()
4341
reaper.reap()
44-
self.assertEqual(1, kill.call_count)
42+
self.assertEqual(1, proc.send_signal.call_count)
4543

4644
reaper.reap()
47-
self.assertEqual(2, kill.call_count)
45+
self.assertEqual(2, proc.send_signal.call_count)
4846

49-
self.assertEqual(kill.mock_calls, [
50-
mock.call(proc.pid, signal.SIGTERM),
51-
mock.call(proc.pid, signal.SIGKILL),
47+
self.assertEqual(proc.send_signal.mock_calls, [
48+
mock.call(signal.SIGTERM),
49+
mock.call(signal.SIGKILL),
5250
])

tests/ssh_test.py

+1
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ def test_verbose_enabled(self):
190190
self.dockerized_ssh.port,
191191
)
192192
self.assertEqual(name, context.name)
193+
context.shutdown(wait=True)
193194

194195

195196
class StubPermissionDeniedTest(StubSshMixin, testlib.TestCase):

tests/testlib.py

+28-6
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,17 @@ def data_path(suffix):
146146
return path
147147

148148

149+
def retry(fn, on, max_attempts, delay):
150+
for i in range(max_attempts):
151+
try:
152+
return fn()
153+
except on:
154+
if i >= max_attempts - 1:
155+
raise
156+
else:
157+
time.sleep(delay)
158+
159+
149160
def threading__thread_is_alive(thread):
150161
"""Return whether the thread is alive (Python version compatibility shim).
151162
@@ -562,18 +573,24 @@ def wait_for_sshd(self):
562573
wait_for_port(self.get_host(), self.port, pattern='OpenSSH')
563574

564575
def check_processes(self):
565-
args = ['docker', 'exec', self.container_name, 'ps', '-o', 'comm=']
576+
# Get Accounting name (ucomm) & command line (args) of each process
577+
# in the container. No truncation (-ww). No column headers (foo=).
578+
ps_output = subprocess.check_output([
579+
'docker', 'exec', self.container_name,
580+
'ps', '-w', '-w', '-o', 'ucomm=', '-o', 'args=',
581+
])
582+
ps_lines = ps_output.decode().splitlines()
583+
processes = [tuple(line.split(None, 1)) for line in ps_lines]
566584
counts = {}
567-
for comm in subprocess.check_output(args).decode().splitlines():
568-
comm = comm.strip()
569-
counts[comm] = counts.get(comm, 0) + 1
585+
for ucomm, _ in processes:
586+
counts[ucomm] = counts.get(ucomm, 0) + 1
570587

571588
if counts != {'ps': 1, 'sshd': 1}:
572589
assert 0, (
573590
'Docker container %r contained extra running processes '
574591
'after test completed: %r' % (
575592
self.container_name,
576-
counts
593+
processes,
577594
)
578595
)
579596

@@ -630,7 +647,12 @@ def setUpClass(cls):
630647

631648
@classmethod
632649
def tearDownClass(cls):
633-
cls.dockerized_ssh.check_processes()
650+
retry(
651+
cls.dockerized_ssh.check_processes,
652+
on=AssertionError,
653+
max_attempts=5,
654+
delay=0.1,
655+
)
634656
cls.dockerized_ssh.close()
635657
super(DockerMixin, cls).tearDownClass()
636658

tests/unix_test.py

+7-11
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,13 @@ class ListenerTest(testlib.RouterMixin, testlib.TestCase):
6565

6666
def test_constructor_basic(self):
6767
listener = self.klass.build_stream(router=self.router)
68-
capture = testlib.LogCapturer()
69-
capture.start()
70-
try:
71-
self.assertFalse(mitogen.unix.is_path_dead(listener.protocol.path))
72-
os.unlink(listener.protocol.path)
73-
# ensure we catch 0 byte read error log message
74-
self.broker.shutdown()
75-
self.broker.join()
76-
self.broker_shutdown = True
77-
finally:
78-
capture.stop()
68+
self.assertFalse(mitogen.unix.is_path_dead(listener.protocol.path))
69+
os.unlink(listener.protocol.path)
70+
71+
# ensure we catch 0 byte read error log message
72+
self.broker.shutdown()
73+
self.broker.join()
74+
self.broker_shutdown = True
7975

8076

8177
class ClientTest(testlib.TestCase):

0 commit comments

Comments
 (0)