forked from cea-hpc/clustershell
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTLib.py
151 lines (126 loc) · 4.69 KB
/
TLib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""
Unit test library
"""
import os
import socket
import sys
import time
from tempfile import mkstemp
from tempfile import TemporaryFile, NamedTemporaryFile, TemporaryDirectory
try:
import configparser
except ImportError:
import ConfigParser as configparser
from io import BytesIO, StringIO
__all__ = ['HOSTNAME', 'load_cfg', 'make_temp_filename', 'make_temp_file',
'make_temp_dir', 'CLI_main']
# Get machine short hostname
HOSTNAME = socket.gethostname().split('.', 1)[0]
class TBytesIO(BytesIO):
"""Standard stream of in memory bytes for testing purpose."""
def __init__(self, initial_bytes=None):
if initial_bytes and type(initial_bytes) is not bytes:
initial_bytes = initial_bytes.encode()
BytesIO.__init__(self, initial_bytes)
def write(self, s):
BytesIO.write(self, s.encode())
def isatty(self):
return False
def load_cfg(name):
"""Load test configuration file as a new ConfigParser"""
cfgparser = configparser.ConfigParser()
cfgparser.read([ \
os.path.expanduser('~/.clustershell/tests/%s' % name),
'/etc/clustershell/tests/%s' % name])
return cfgparser
#
# Temp files and directories
#
def make_temp_filename(suffix=''):
"""Return a temporary name for a file."""
if len(suffix) > 0 and suffix[0] != '-':
suffix = '-' + suffix
fd, name = mkstemp(suffix, prefix='cs-test-')
os.close(fd) # don't leak open fd
return name
def make_temp_file(text, suffix='', dir=None):
"""Create a temporary file with the provided text."""
assert type(text) is bytes
tmp = NamedTemporaryFile(prefix='cs-test-',
suffix=suffix, dir=dir)
tmp.write(text)
tmp.flush()
return tmp
def make_temp_dir(suffix=''):
"""Create a temporary directory."""
if len(suffix) > 0 and suffix[0] != '-':
suffix = '-' + suffix
return TemporaryDirectory(suffix, prefix='cs-test-')
#
# CLI tests
#
def CLI_main(test, main, args, stdin, expected_stdout, expected_rc=0,
expected_stderr=None):
"""Generic CLI main() direct calling function that allows code coverage
checks."""
rc = -1
saved_stdin = sys.stdin
saved_stdout = sys.stdout
saved_stderr = sys.stderr
# Capture standard streams
# Input: if defined, the stdin argument specifies input data
if stdin is not None:
if type(stdin) is bytes:
# Use temporary file in Python 2 or with buffer (bytes) in Python 3
sys.stdin = TemporaryFile()
sys.stdin.write(stdin)
sys.stdin.seek(0) # ready to be read
else:
# If stdin is a string in Python 3, use StringIO as sys.stdin
# should be read in text mode for some tests (eg. Nodeset).
sys.stdin = StringIO(stdin)
# Output: ClusterShell writes to stdout/stderr using strings, but the tests
# expect bytes. TBytesIO is a wrapper that does the conversion until we
# migrate all tests to string.
sys.stdout = out = TBytesIO()
sys.stderr = err = TBytesIO()
sys.argv = args
try:
main()
except SystemExit as exc:
rc = int(str(exc))
finally:
sys.stdout = saved_stdout
sys.stderr = saved_stderr
# close temporary file if we used one for stdin
if saved_stdin != sys.stdin:
sys.stdin.close()
sys.stdin = saved_stdin
try:
if expected_stdout is not None:
# expected_stdout might be a compiled regexp or a string
try:
if not expected_stdout.search(out.getvalue()):
# search failed; use assertEqual() to display
# expected/output
test.assertEqual(out.getvalue(), expected_stdout.pattern)
except AttributeError:
# not a regexp
test.assertEqual(out.getvalue(), expected_stdout)
if expected_stderr is not None:
# expected_stderr might be a compiled regexp or a string
try:
if not expected_stderr.match(err.getvalue()):
# match failed; use assertEqual() to display expected/output
test.assertEqual(err.getvalue(), expected_stderr.pattern)
except AttributeError:
# check the end as stderr messages are often prefixed with
# argv[0]
test.assertTrue(err.getvalue().endswith(expected_stderr),
err.getvalue() + b' != ' + expected_stderr)
if expected_rc is not None:
test.assertEqual(rc, expected_rc,
"rc=%d err=%s" % (rc, err.getvalue()))
finally:
out.close()
err.close()