-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathsync.py
397 lines (314 loc) · 13.2 KB
/
sync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
#
# Copyright 2013-2020 Ghent University
#
# This file is part of vsc-administration,
# originally created by the HPC team of Ghent University (http://ugent.be/hpc/en),
# with support of Ghent University (http://ugent.be/hpc),
# the Flemish Supercomputer Centre (VSC) (https://www.vscentrum.be),
# the Flemish Research Foundation (FWO) (http://www.fwo.be/en)
# and the Department of Economy, Science and Innovation (EWI) (http://www.ewi-vlaanderen.be/en).
#
# https://github.com/hpcugent/vsc-administration
#
# All rights reserved.
#
"""
Functions to deploy users to slurm.
"""
import logging
from enum import Enum
from vsc.accountpage.wrappers import mkNamedTupleInstance
from vsc.config.base import ANTWERPEN, BRUSSEL, GENT, LEUVEN, INSTITUTE_VOS_GENT
from vsc.utils.missing import namedtuple_with_defaults
from vsc.utils.run import asyncloop
SLURM_SACCT_MGR = "/usr/bin/sacctmgr"
SLURM_ORGANISATIONS = {
ANTWERPEN: 'uantwerpen',
BRUSSEL: 'vub',
GENT: 'ugent',
LEUVEN: 'kuleuven',
}
class SacctMgrException(Exception):
pass
class SyncTypes(Enum):
accounts = "accounts"
users = "users"
IGNORE_USERS = ["root"]
IGNORE_ACCOUNTS = ["root"]
SacctUserFields = ["User", "Def_Acct", "Admin", "Cluster", "Account", "Partition", "Share",
"MaxJobs", "MaxNodes", "MaxCPUs", "MaxSubmit", "MaxWall", "MaxCPUMins",
"QOS", "Def_QOS"]
SacctAccountFields = ["Account", "Descr", "Org", "Cluster", "Par_Name", "User", "Share",
"GrpJobs", "GrpNodes", "GrpCPUs", "GrpMem", "GrpSubmit", "GrpWall", "GrpCPUMins",
"MaxJobs", "MaxNodes", "MaxCPUs", "MaxSubmit", "MaxWall", "MaxCPUMins",
"QOS", "Def_QOS"]
SlurmAccount = namedtuple_with_defaults('SlurmAccount', SacctAccountFields)
SlurmUser = namedtuple_with_defaults('SlurmUser', SacctUserFields)
def mkSlurmAccount(fields):
"""Make a named tuple from the given fields."""
account = mkNamedTupleInstance(fields, SlurmAccount)
if account.Account in IGNORE_ACCOUNTS:
return None
return account
def mkSlurmUser(fields):
"""Make a named tuple from the given fields."""
user = mkNamedTupleInstance(fields, SlurmUser)
if user.User in IGNORE_USERS:
return None
return user
def parse_slurm_acct_line(header, line, info_type, user_field_number):
"""Parse the line into the correct data type."""
fields = line.split("|")
if info_type == SyncTypes.accounts:
if fields[user_field_number]:
# association information for a user. Users are processed later.
return None
creator = mkSlurmAccount
elif info_type == SyncTypes.users:
creator = mkSlurmUser
else:
return None
return creator(dict(zip(header, fields)))
def parse_slurm_acct_dump(lines, info_type):
"""Parse the accounts from the listing."""
acct_info = set()
header = [w.replace(' ', '_') for w in lines[0].rstrip().split("|")]
user_field_number = [h.lower() for h in header].index("user")
for line in lines[1:]:
line = line.rstrip()
try:
info = parse_slurm_acct_line(header, line, info_type, user_field_number)
# This fails when we get e.g., the users and look at the account lines.
# We should them just skip that line instead of raising an exception
if info:
acct_info.add(info)
except Exception as err:
logging.exception("Slurm acct sync: could not process line %s [%s]", line, err)
raise
return acct_info
def get_slurm_acct_info(info_type):
"""Get slurm account info for the given clusterself.
@param info_type: SyncTypes
"""
(exitcode, contents) = asyncloop([
SLURM_SACCT_MGR,
"-s",
"-P",
"list",
info_type.value,
])
if exitcode != 0:
raise SacctMgrException("Cannot run sacctmgr")
info = parse_slurm_acct_dump(contents.splitlines(), info_type)
return info
def create_add_account_command(account, parent, organisation, cluster):
"""
Creates the command to add the given account.
@param account: name of the account to add
@param parent: name of the parent account. If None then parent will be "root".
@param organisation: name of the organisation to which the account belongs.
@param cluster: cluster to which the account must be added
@returns: list comprising the command
"""
CREATE_ACCOUNT_COMMAND = [
SLURM_SACCT_MGR,
"-i", # commit immediately
"add",
"account",
account,
"Parent={0}".format(parent or "root"),
"Organization={0}".format(SLURM_ORGANISATIONS[organisation]),
"Cluster={0}".format(cluster),
]
logging.debug(
"Adding command to add account %s with Parent=%s Cluster=%s Organization=%s",
account,
parent,
cluster,
organisation,
)
return CREATE_ACCOUNT_COMMAND
def create_add_user_command(user, vo_id, cluster):
"""
Creates the command to add the given account.
@param account: name of the account to add
@param parent: name of the parent account. If None then parent will be "root".
@param organisation: name of the organisation to which the account belongs.
@param cluster: cluster to which the account must be added
@returns: list comprising the command
"""
CREATE_USER_COMMAND = [
SLURM_SACCT_MGR,
"-i", # commit immediately
"add",
"user",
user,
"Account={0}".format(vo_id),
"DefaultAccount={0}".format(vo_id),
"Cluster={0}".format(cluster)
]
logging.debug(
"Adding command to add user %s with Account=%s Cluster=%s",
user,
vo_id,
cluster,
)
return CREATE_USER_COMMAND
def create_change_user_command(user, current_vo_id, new_vo_id, cluster):
"""Creates the commands to change a user's account.
@returns: two lists comprising the commands
"""
add_user_command = create_add_user_command(user, new_vo_id, cluster)
remove_association_command = create_remove_association_command(user, current_vo_id, cluster)
logging.debug(
"Adding commands to change user %s on Cluster=%s from Account=%s to DefaultAccount=%s",
user,
cluster,
current_vo_id,
new_vo_id
)
return [add_user_command, remove_association_command]
def create_remove_user_command(user, cluster):
"""Create the command to remove a user.
@returns: list comprising the command
"""
REMOVE_USER_COMMAND = [
SLURM_SACCT_MGR,
"-i", # commit immediately
"delete",
"user",
"name={user}".format(user=user),
"Cluster={cluster}".format(cluster=cluster)
]
logging.debug(
"Adding command to remove user %s from Cluster=%s",
user,
cluster,
)
return REMOVE_USER_COMMAND
def create_remove_association_command(user, obsolete_vo_id, cluster):
"""Create the command to remove an association
@returns: a list comprising the command.
"""
REMOVE_ASSOCIATION_USER_COMMAND = [
SLURM_SACCT_MGR,
"-i", # commit immediately
"delete",
"user",
"name={0}".format(user),
"Account={0}".format(obsolete_vo_id),
"where",
"Cluster={0}".format(cluster),
]
return REMOVE_ASSOCIATION_USER_COMMAND
def slurm_institute_accounts(slurm_account_info, clusters):
"""Check for the presence of the institutes and their default VOs in the slurm account list.
@returns: list of sacctmgr commands to add the accounts to the clusters if needed
"""
commands = []
for cluster in clusters:
cluster_accounts = [acct.Account for acct in slurm_account_info if acct and acct.Cluster == cluster]
for (inst, vo) in INSTITUTE_VOS_GENT.items():
if inst not in cluster_accounts:
commands.append(
create_add_account_command(account=inst, parent=None, cluster=cluster, organisation=inst)
)
if vo not in cluster_accounts:
commands.append(
create_add_account_command(account=vo, parent=inst, cluster=cluster, organisation=inst)
)
return commands
def slurm_vo_accounts(account_page_vos, slurm_account_info, clusters):
"""Check for the presence of the new/changed VOs in the slurm account list.
@returns: list of sacctmgr commands to add the accounts for VOs if needed
"""
commands = []
for cluster in clusters:
cluster_accounts = [acct.Account for acct in slurm_account_info if acct and acct.Cluster == cluster]
for vo in account_page_vos:
if vo.vsc_id in INSTITUTE_VOS_GENT.values():
continue
if vo.vsc_id not in cluster_accounts:
commands.append(create_add_account_command(
account=vo.vsc_id,
parent=vo.institute['name'],
cluster=cluster,
organisation=vo.institute['name']
))
return commands
def slurm_user_accounts(vo_members, active_accounts, slurm_user_info, clusters, dry_run=False):
"""Check for the presence of the user in his/her account.
@returns: list of sacctmgr commands to add the users if needed.
"""
commands = []
active_vo_members = set()
reverse_vo_mapping = dict()
for (members, vo) in vo_members.values():
# basic set arithmetic: take the intersection of the RHS sets and make the union with the LHS set
active_vo_members |= members & active_accounts
for m in members:
reverse_vo_mapping[m] = (vo.vsc_id, vo.institute['name'])
for cluster in clusters:
cluster_users_acct = [
(user.User, user.Def_Acct, user.Account) for user in slurm_user_info if user and user.Cluster == cluster
]
cluster_users = set([u[0] for u in cluster_users_acct])
# these are the users that need to be removed as they are no longer an active user in any
# (including the institute default) VO
remove_users = cluster_users - active_vo_members
new_users = set()
changed_users = set()
moved_users = set()
obsolete_users = set()
for (vo_id, (members, vo)) in vo_members.items():
# these are users not yet in the Slurm DB for this cluster
new_users |= set([
(user, vo.vsc_id, vo.institute['name'])
for user in (members & active_accounts) - cluster_users
])
# these are the current Slurm users per Account, i.e., the VO currently being processed
slurm_acct_users = set([user for (user, def_acct, _) in cluster_users_acct if def_acct == vo_id])
# these are the users for which an earlier delete failed because they still had live job in their (at that
# time) current account (VO)
obsolete_users_vo = (set([user for (user, def_acct, acct) in cluster_users_acct
if acct == vo_id and def_acct != acct]) - members) & active_accounts
obsolete_users |= set([(u, vo_id) for u in obsolete_users_vo])
# these are the users that should no longer be in this account, but should not be removed
# we need to look up their new VO
# Again, basic set arithmetic. LHS is the intersection of the people we have left and the active users
changed_users_vo = (slurm_acct_users - members) & active_accounts
changed_users |= changed_users_vo
try:
moved_users |= set([(user, vo_id, reverse_vo_mapping[user]) for user in changed_users_vo])
except KeyError as err:
logging.warning("Found user not belonging to any VO in the reverse VO map: %s", err)
if dry_run:
for user in changed_users:
try:
moved_users.add((user, reverse_vo_mapping[user]))
except KeyError as err:
logging.warning("Dry run, cannot find up user %s in reverse VO map",
user)
logging.debug("%d new users", len(new_users))
logging.debug("%d removed users", len(remove_users))
logging.debug("%d changed users", len(moved_users))
commands.extend([create_add_user_command(
user=user,
vo_id=vo_id,
cluster=cluster) for (user, vo_id, _) in new_users
])
commands.extend([create_remove_user_command(user=user, cluster=cluster) for user in remove_users])
def flatten(ls):
"""Turns a list of lists (ls) into a list, a.k.a. flatten a list."""
return [item for l in ls for item in l]
commands.extend(flatten([create_change_user_command(
user=user,
current_vo_id=current_vo_id,
new_vo_id=new_vo_id,
cluster=cluster) for (user, current_vo_id, (new_vo_id, _)) in moved_users])
)
commands.extend([create_remove_association_command(
user=user,
obsolete_vo_id=vo_id,
cluster=cluster) for (user, vo_id) in obsolete_users])
return commands