forked from cea-hpc/clustershell
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTaskThreadSuspendTest.py
73 lines (57 loc) · 1.94 KB
/
TaskThreadSuspendTest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# ClusterShell test suite
# Written by S. Thiell
"""Unit test for ClusterShell in multithreaded environments"""
import random
import time
import threading
import unittest
from ClusterShell.Task import *
from ClusterShell.Event import EventHandler
class TaskThreadSuspendTest(unittest.TestCase):
def tearDown(self):
task_cleanup()
def testSuspendMiscTwoTasks(self):
"""test task suspend/resume (2 tasks)"""
task = task_self()
task2 = Task()
task2.shell("sleep 4 && echo thr1")
task2.resume()
w = task.shell("sleep 1 && echo thr0", key=0)
task.resume()
self.assertEqual(task.key_buffer(0), b"thr0")
self.assertEqual(w.read(), b"thr0")
assert task2 != task
task2.suspend()
time.sleep(10)
task2.resume()
task_wait()
task2.shell("echo suspend_test", key=1)
task2.resume()
task_wait()
self.assertEqual(task2.key_buffer(1), b"suspend_test")
def _thread_delayed_unsuspend_func(self, task):
"""thread used to unsuspend task during task_wait()"""
time_th = int(random.random()*6+5)
#print "TIME unsuspend thread=%d" % time_th
time.sleep(time_th)
self.resumed = True
task.resume()
def testThreadTaskWaitWithSuspend(self):
"""test task_wait() with suspended tasks"""
task = Task()
self.resumed = False
threading.Thread(None, self._thread_delayed_unsuspend_func,
args=(task,)).start()
time_sh = int(random.random()*4)
#print "TIME shell=%d" % time_sh
task.shell("sleep %d" % time_sh)
task.resume()
time.sleep(1)
suspended = task.suspend()
for i in range(1, 4):
task = Task()
task.shell("sleep %d" % i)
task.resume()
time.sleep(1)
task_wait()
self.assertTrue(self.resumed or suspended == False)