forked from cea-hpc/clustershell
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathStreamWorkerTest.py
333 lines (261 loc) · 11.5 KB
/
StreamWorkerTest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
"""
Unit test for StreamWorker
"""
import os
import unittest
from ClusterShell.Worker.Worker import StreamWorker, WorkerError
from ClusterShell.Task import task_self
from ClusterShell.Event import EventHandler
class StreamTest(unittest.TestCase):
def run_worker(self, worker):
"""helper method to schedule and run a worker"""
task_self().schedule(worker)
task_self().run()
def test_001_empty(self):
"""test empty StreamWorker"""
# that makes no sense but well...
# handler=None is supported by base Worker class
worker = StreamWorker(handler=None)
self.run_worker(worker)
# GH Issue #488:
# An unconfigured engine client does not abort by itself...
worker.abort()
# Check that we are in a clean state now
self.assertEqual(len(task_self()._engine._clients), 0)
def test_002_pipe_readers(self):
"""test StreamWorker bound to several pipe readers"""
streams = { "pipe1_reader": b"Some data to read from a pipe",
"stderr": b"Error data to read using special keyword stderr",
"pipe2_reader": b"Other data to read from another pipe",
"pipe3_reader": b"Cool data to read from a third pipe" }
class TestH(EventHandler):
def __init__(self, testcase):
self.snames = set()
self.testcase = testcase
def ev_read(self, worker, node, sname, msg):
self.recv_msg(sname, msg)
def recv_msg(self, sname, msg):
self.testcase.assertTrue(len(self.snames) < len(streams))
self.testcase.assertEqual(streams[sname], msg)
self.snames.add(sname)
if len(self.snames) == len(streams):
# before finishing, try to add another pipe at
# runtime: this is NOT allowed
rfd, wfd = os.pipe()
self.testcase.assertRaises(WorkerError,
worker.set_reader, "pipe4_reader", rfd)
self.testcase.assertRaises(WorkerError,
worker.set_writer, "pipe4_writer", wfd)
os.close(rfd)
os.close(wfd)
# create a StreamWorker instance bound to several pipes
hdlr = TestH(self)
worker = StreamWorker(handler=hdlr)
for sname in streams.keys():
rfd, wfd = os.pipe()
worker.set_reader(sname, rfd)
os.write(wfd, streams[sname])
os.close(wfd)
self.run_worker(worker)
# check that all ev_read have been received
self.assertEqual(set(("pipe1_reader", "pipe2_reader", "pipe3_reader",
"stderr")), hdlr.snames)
def test_003_io_pipes(self):
"""test StreamWorker bound to pipe readers and writers"""
# os.write -> pipe1 -> worker -> pipe2 -> os.read
class TestH(EventHandler):
def __init__(self, testcase):
self.testcase = testcase
self.worker = None
self.pickup_count = 0
self.hup_count = 0
def ev_pickup(self, worker, node):
self.pickup_count += 1
def ev_read(self, worker, node, sname, msg):
self.testcase.assertEqual(sname, "pipe1")
worker.write(msg, "pipe2")
def ev_timer(self, timer):
# call set_write_eof on specific stream after some delay
worker = self.worker
self.worker = 'DONE'
worker.set_write_eof("pipe2")
def ev_hup(self, worker, node, rc):
# ev_hup called at the end (after set_write_eof is called)
self.hup_count += 1
self.testcase.assertEqual(self.worker, 'DONE')
# no rc code should be set
self.testcase.assertEqual(rc, None)
# create a StreamWorker instance bound to several pipes
hdlr = TestH(self)
worker = StreamWorker(handler=hdlr)
hdlr.worker = worker
rfd1, wfd1 = os.pipe()
worker.set_reader("pipe1", rfd1)
os.write(wfd1, b"Some data\n")
os.close(wfd1)
rfd2, wfd2 = os.pipe()
worker.set_writer("pipe2", wfd2)
timer1 = task_self().timer(1.0, handler=hdlr)
self.run_worker(worker)
self.assertEqual(os.read(rfd2, 1024), b"Some data")
os.close(rfd2)
# wfd2 should be closed by CS
self.assertRaises(OSError, os.close, wfd2)
# rfd1 should be closed by CS
self.assertRaises(OSError, os.close, rfd1)
# check pickup/hup
self.assertEqual(hdlr.hup_count, 1)
self.assertEqual(hdlr.pickup_count, 1)
self.assertTrue(task_self().max_retcode() is None)
def test_004_timeout_on_open_stream(self):
"""test StreamWorker with timeout set on open stream"""
# Create worker set with timeout
worker = StreamWorker(handler=None, timeout=0.5)
# Create pipe stream
rfd1, wfd1 = os.pipe()
worker.set_reader("pipe1", rfd1, closefd=False)
# Write some chars without line break (worst case)
os.write(wfd1, b"Some data")
# TEST: Do not close wfd1 to simulate open stream
# Need to enable pipe1_msgtree
task_self().set_default("pipe1_msgtree", True)
self.run_worker(worker)
# Timeout occurred - read buffer should have been flushed
self.assertEqual(worker.read(sname="pipe1"), b"Some data")
# closefd was set, we should be able to close pipe fds
os.close(rfd1)
os.close(wfd1)
def test_005_timeout_events(self):
"""test StreamWorker with timeout set (event based)"""
class TestH(EventHandler):
def __init__(self, testcase):
self.testcase = testcase
self.ev_pickup_called = False
self.ev_read_called = False
self.ev_hup_called = False
self.ev_timeout_called = False
def ev_pickup(self, worker, node):
self.ev_pickup_called = True
def ev_read(self, worker, node, sname, msg):
self.ev_read_called = True
self.testcase.assertEqual(sname, "pipe1")
self.testcase.assertEqual(msg, b"Some data")
def ev_hup(self, worker, node, rc):
# ev_hup is called but no rc code should be set
self.ev_hup_called = True
self.testcase.assertEqual(rc, None)
def ev_close(self, worker, timedout):
if timedout:
self.ev_timeout_called = True
hdlr = TestH(self)
worker = StreamWorker(handler=hdlr, timeout=0.5)
# Create pipe stream with closefd set (default)
rfd1, wfd1 = os.pipe()
worker.set_reader("pipe1", rfd1)
# Write some chars without line break (worst case)
os.write(wfd1, b"Some data")
# TEST: Do not close wfd1 to simulate open stream
self.run_worker(worker)
self.assertTrue(hdlr.ev_timeout_called)
self.assertTrue(hdlr.ev_read_called)
self.assertTrue(hdlr.ev_pickup_called)
self.assertTrue(hdlr.ev_hup_called)
# rfd1 should be already closed by CS
self.assertRaises(OSError, os.close, rfd1)
os.close(wfd1)
def test_006_worker_abort_on_written(self):
"""test StreamWorker abort on ev_written"""
# This test creates a writable StreamWorker that will abort after the
# first write, to check whether ev_written is generated in the right
# place.
class TestH(EventHandler):
def __init__(self, testcase, rfd):
self.testcase = testcase
self.rfd = rfd
self.check_written = 0
def ev_written(self, worker, node, sname, size):
self.check_written += 1
self.testcase.assertEqual(os.read(self.rfd, 1024), b"initial")
worker.abort()
worker.abort() # safe but no effect
rfd, wfd = os.pipe()
hdlr = TestH(self, rfd)
worker = StreamWorker(handler=hdlr)
worker.set_writer("test", wfd) # closefd=True
worker.write(b"initial", "test")
self.run_worker(worker)
self.assertEqual(hdlr.check_written, 1)
os.close(rfd)
def test_007_worker_abort_on_written_eof(self):
"""test StreamWorker abort on ev_written (with EOF)"""
# This test is similar to previous test test_006 but does
# write() + set_write_eof().
class TestH(EventHandler):
def __init__(self, testcase, rfd):
self.testcase = testcase
self.rfd = rfd
self.check_written = 0
def ev_written(self, worker, node, sname, size):
self.check_written += 1
self.testcase.assertEqual(os.read(self.rfd, 1024), b"initial")
worker.abort()
worker.abort() # safe but no effect
rfd, wfd = os.pipe()
hdlr = TestH(self, rfd)
worker = StreamWorker(handler=hdlr)
worker.set_writer("test", wfd) # closefd=True
worker.write(b"initial", "test")
worker.set_write_eof()
self.run_worker(worker)
self.assertEqual(hdlr.check_written, 1)
os.close(rfd)
def test_008_broken_pipe_on_write(self):
"""test StreamWorker with broken pipe on write()"""
# This test creates a writable StreamWorker that will close the read
# side of the pipe just after the first write to generate a broken
# pipe error.
class TestH(EventHandler):
def __init__(self, testcase, rfd):
self.testcase = testcase
self.rfd = rfd
self.check_hup = 0
self.check_written = 0
def ev_hup(self, worker, node, rc):
self.check_hup += 1
def ev_written(self, worker, node, sname, size):
self.check_written += 1
self.testcase.assertEqual(os.read(self.rfd, 1024), b"initial")
# close reader, that will stop the StreamWorker
os.close(self.rfd)
# The following write call used to raise broken pipe before
# version 1.7.2.
worker.write(b"final")
rfd, wfd = os.pipe()
hdlr = TestH(self, rfd)
worker = StreamWorker(handler=hdlr)
worker.set_writer("test", wfd) # closefd=True
worker.write(b"initial", "test")
self.run_worker(worker)
self.assertEqual(hdlr.check_hup, 1)
self.assertEqual(hdlr.check_written, 1)
def test_009_worker_abort_on_close(self):
"""test StreamWorker abort() on closing worker"""
class TestH(EventHandler):
def __init__(self, testcase, rfd):
self.testcase = testcase
self.rfd = rfd
self.check_close = 0
def ev_close(self, worker, timedout):
self.check_close += 1
self.testcase.assertFalse(timedout)
os.close(self.rfd)
worker.abort()
worker.abort() # safe but no effect
rfd, wfd = os.pipe()
hdlr = TestH(self, rfd)
worker = StreamWorker(handler=hdlr)
worker.set_writer("test", wfd) # closefd=True
worker.write(b"initial", "test")
worker.set_write_eof()
self.run_worker(worker)
self.assertEqual(hdlr.check_close, 1)