forked from cea-hpc/clustershell
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathWorkerExecTest.py
179 lines (154 loc) · 7.44 KB
/
WorkerExecTest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# ClusterShell.Worker.ExecWorker test suite
# First version by A. Degremont 2014-07-10
"""Unit test for ExecWorker"""
import os
import unittest
from TLib import HOSTNAME, make_temp_file, make_temp_filename, make_temp_dir
from ClusterShell.Event import EventHandler
from ClusterShell.Worker.Exec import ExecWorker, WorkerError
from ClusterShell.Task import task_self
class ExecTest(unittest.TestCase):
def execw(self, **kwargs):
"""helper method to spawn and run ExecWorker"""
worker = ExecWorker(**kwargs)
task_self().schedule(worker)
task_self().run()
return worker
def test_no_nodes(self):
"""test ExecWorker with a simple command without nodes"""
self.execw(nodes=None, handler=None, command="echo ok")
self.assertEqual(task_self().max_retcode(), None)
def test_shell_syntax(self):
"""test ExecWorker with a command using shell syntax"""
cmd = "echo -n 1; echo -n 2"
self.execw(nodes='localhost', handler=None, command=cmd)
self.assertEqual(task_self().max_retcode(), 0)
self.assertEqual(task_self().node_buffer('localhost'), b'12')
def test_one_node(self):
"""test ExecWorker with a simple command on localhost"""
self.execw(nodes='localhost', handler=None, command="echo ok")
self.assertEqual(task_self().max_retcode(), 0)
self.assertEqual(task_self().node_buffer('localhost'), b'ok')
def test_one_node_error(self):
"""test ExecWorker with an error command on localhost"""
self.execw(nodes='localhost', handler=None, command="false")
self.assertEqual(task_self().max_retcode(), 1)
self.assertEqual(task_self().node_buffer('localhost'), b'')
@unittest.skipIf(HOSTNAME == 'localhost', "does not work with hostname set to 'localhost'")
def test_timeout(self):
"""test ExecWorker with a timeout"""
nodes = "localhost,%s" % HOSTNAME
self.execw(nodes=nodes, handler=None, command="sleep 1", timeout=0.2)
self.assertEqual(task_self().max_retcode(), None)
self.assertEqual(task_self().num_timeout(), 2)
def test_node_placeholder(self):
"""test ExecWorker with several nodes and %h (host)"""
nodes = "localhost,%s" % HOSTNAME
self.execw(nodes=nodes, handler=None, command="echo %h")
self.assertEqual(task_self().max_retcode(), 0)
self.assertEqual(task_self().node_buffer('localhost'), b'localhost')
self.assertEqual(task_self().node_buffer(HOSTNAME), HOSTNAME.encode('utf-8'))
def test_bad_placeholder(self):
"""test ExecWorker with unknown placeholder pattern"""
self.assertRaises(WorkerError, self.execw,
nodes="localhost", handler=None, command="echo %x")
self.assertRaises(WorkerError, self.execw,
nodes="localhost", handler=None, command="echo %")
@unittest.skipIf(HOSTNAME == 'localhost', "does not work with hostname set to 'localhost'")
def test_rank_placeholder(self):
"""test ExecWorker with several nodes and %n (rank)"""
nodes = "localhost,%s" % HOSTNAME
self.execw(nodes=nodes, handler=None, command="echo %n")
self.assertEqual(task_self().max_retcode(), 0)
self.assertEqual(set(bytes(msg) for msg, _ in task_self().iter_buffers()),
set([b'0', b'1']))
def test_copy(self):
"""test copying with an ExecWorker and host placeholder"""
src = make_temp_file(b"data")
dstdir = make_temp_dir()
dstpath = os.path.join(dstdir.name, os.path.basename(src.name))
try:
pattern = dstpath + ".%h"
self.execw(nodes='localhost', handler=None, source=src.name,
dest=pattern)
self.assertEqual(task_self().max_retcode(), 0)
self.assertTrue(os.path.isfile(dstpath + '.localhost'))
finally:
os.unlink(dstpath + '.localhost')
dstdir.cleanup()
def test_copy_preserve(self):
"""test copying with an ExecWorker (preserve=True)"""
src = make_temp_file(b"data")
past_time = 443757600
os.utime(src.name, (past_time, past_time))
dstpath = make_temp_filename()
try:
self.execw(nodes='localhost', handler=None, source=src.name,
dest=dstpath, preserve=True)
self.assertEqual(task_self().max_retcode(), 0)
self.assertTrue(os.stat(dstpath).st_mtime, past_time)
finally:
os.unlink(dstpath)
def test_copy_directory(self):
"""test copying directory with an ExecWorker"""
srcdir = make_temp_dir()
dstdir = make_temp_dir()
ref1 = make_temp_file(b"data1", dir=srcdir.name)
pathdstsrcdir = os.path.join(dstdir.name, os.path.basename(srcdir.name))
pathdst1 = os.path.join(pathdstsrcdir, os.path.basename(ref1.name))
try:
self.execw(nodes='localhost', handler=None, source=srcdir.name,
dest=dstdir.name)
self.assertEqual(task_self().max_retcode(), 0)
self.assertTrue(os.path.isdir(pathdstsrcdir))
self.assertTrue(os.path.isfile(pathdst1))
with open(pathdst1) as dst1:
self.assertEqual(dst1.readlines()[0], "data1")
finally:
os.unlink(pathdst1)
os.rmdir(pathdstsrcdir)
ref1.close()
dstdir.cleanup()
srcdir.cleanup()
def test_copy_wrong_directory(self):
"""test copying wrong directory with an ExecWorker"""
srcdir = make_temp_dir()
dst = make_temp_file(b"data")
ref1 = make_temp_file(b"data1", dir=srcdir.name)
try:
self.execw(nodes='localhost', handler=None, source=srcdir.name,
dest=dst.name, stderr=True)
self.assertEqual(task_self().max_retcode(), 1)
self.assertTrue(len(task_self().node_error("localhost")) > 0)
self.assertTrue(os.path.isfile(ref1.name))
finally:
ref1.close()
srcdir.cleanup()
def test_rcopy_wrong_directory(self):
"""test ExecWorker reverse copying with wrong directory"""
with make_temp_dir() as dstbasedirname:
dstdir = os.path.join(dstbasedirname, "wrong")
src = make_temp_file(b"data")
self.assertRaises(ValueError, self.execw, nodes='localhost',
handler=None, source=src.name, dest=dstdir,
stderr=True, reverse=True)
def test_abort_on_read(self):
"""test ExecWorker.abort() on read"""
class TestH(EventHandler):
def ev_read(self, worker, node, sname, msg):
worker.abort()
worker.abort() # safe but no effect
self.execw(nodes='localhost', handler=TestH(),
command="echo ok; tail -f /dev/null")
self.assertEqual(task_self().max_retcode(), None)
self.assertEqual(task_self().node_buffer('localhost'), b'ok')
def test_abort_on_close(self):
"""test ExecWorker.abort() on close"""
class TestH(EventHandler):
def ev_close(self, worker, timedout):
worker.abort()
worker.abort() # safe but no effect
self.execw(nodes='localhost', handler=TestH(),
command="echo ok; sleep .1")
self.assertEqual(task_self().max_retcode(), 0)
self.assertEqual(task_self().node_buffer('localhost'), b'ok')