1 |
#!/usr/bin/env python |
2 |
from torque_utils import pbsnodes |
3 |
|
4 |
# this config allows mcore to run up to full ATLAS allocation (07.04.2015) |
5 |
# full allocation right now is 47% of "non-retired" capacity = knal + mars + car |
6 |
# just configure 47% of each class. |
7 |
|
8 |
MAXDRAIN = 32 # max num of nodes allowed to drain |
9 |
MAXFREE = 49 # max num of free slots to tolerate |
10 |
MAXFREEPERNODE = 3 # standard for four-core jobs |
11 |
NODESPERGRAB = 3 # number of nodes to grab each time around |
12 |
|
13 |
# override when lsgrid is running |
14 |
|
15 |
# MAXFREEPERNODE = 29 # allow lsgrid 30-core jobs |
16 |
# MAXFREE = 149 # max num of free slots to tolerate |
17 |
|
18 |
# override when bbmri is running |
19 |
|
20 |
# MAXFREE = 149 # max num of free slots to tolerate |
21 |
# MAXFREEPERNODE = 9 # allow bbmri 10-core jobs |
22 |
|
23 |
# parameterize percent capacity -- 47% is too conservative, make this variable. |
24 |
|
25 |
|
26 |
# original car was [ 'wn-car-0%02d.farm.nikhef.nl' % (n) for n in range( 96-int(CAPACFRAC*96), 97) ] |
27 |
# temporarily changed due to low-numbered cars offline and reserved and such. |
28 |
|
29 |
CAPACFRAC = 0.85 |
30 |
|
31 |
NUM_CAR = int(CAPACFRAC * 96) |
32 |
NUM_KNAL = int(CAPACFRAC * 18) |
33 |
NUM_MARS = int(CAPACFRAC * 58) |
34 |
|
35 |
CANDIDATE_NODES = [ 'wn-car-0%02d.farm.nikhef.nl' % (n) for n in range( 1, NUM_CAR+1) ] + \ |
36 |
[ 'wn-knal-0%02d.farm.nikhef.nl' % (n) for n in range( 1, NUM_KNAL+1) ] + \ |
37 |
[ 'wn-mars-0%02d.farm.nikhef.nl' % (n) for n in range( 1, NUM_MARS+1) ] |
38 |
|
39 |
MCQUEUE = 'atlasmc' |
40 |
|
41 |
# settings below appropriate for knal nodes (32 cores) |
42 |
# ... want to keep num of draining nodes |
43 |
# to MAXFREE / 3 since giving back a node could be expensive. |
44 |
# it might already be running several mc jobs |
45 |
|
46 |
# MAXDRAIN = 16 # max num of nodes allowed to drain |
47 |
# MAXFREE = 49 # max num of free slots to tolerate |
48 |
# MAXFREEPERNODE = 3 |
49 |
# NODESPERGRAB = 1 |
50 |
# CANDIDATE_NODES = [ 'wn-knal-0%02d.farm.nikhef.nl' % (n) for n in range(1,19) ] |
51 |
# MCQUEUE = 'atlasmc' |
52 |
|
53 |
# settings below more appropriate for smrt nodes (8 core) |
54 |
# here giving a node back doesn't really matter, it wasn't |
55 |
# running any mc jobs yet anyway. better to grab lots |
56 |
# of nodes and give back the ones that drain the slowest if |
57 |
# too many slots are free. |
58 |
|
59 |
# MAXDRAIN = 16 # max num of nodes allowed to drain |
60 |
# MAXFREE = 49 # max num of free slots to tolerate |
61 |
# CANDIDATE_NODES = [ 'wn-smrt-0%02d.farm.nikhef.nl' % (n) for n in range(1,73) ] |
62 |
|
63 |
UNDERPOP_NODES_FILE = ".nodes_with_too_few_jobs" |
64 |
TORQUE = "stro.nikhef.nl" |
65 |
|
66 |
import os |
67 |
import pickle |
68 |
import sys |
69 |
import subprocess |
70 |
|
71 |
def getmcjobinfo(nodes): |
72 |
|
73 |
# input is the list of nodes for which to get info |
74 |
# function returns tuple (jobs_waiting, node_info) |
75 |
# jobs_waiting boolean value : are there MC jobs waiting to be run (state Q)? (independent of nodes!) |
76 |
# node_info: list of tuples (wn_name, wn, num_running_mc, num_free_slots) |
77 |
# where wn is the node object for the machine with name wn_name, num_running_mc is the number of |
78 |
# mc jobs running on that node, and num_free_slots is the number of unused job slots on that node |
79 |
|
80 |
usedkeys = ['egroup', 'jobid', 'queue', 'job_state', 'euser', 'exec_host' ] |
81 |
|
82 |
import torqueJobs |
83 |
import torqueAttMappers as tam |
84 |
|
85 |
import tempfile |
86 |
qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp") |
87 |
|
88 |
import time |
89 |
os.system('/usr/bin/qstat -f @' + TORQUE + ' > ' + qsfname) |
90 |
now = time.mktime(time.localtime()) |
91 |
|
92 |
jlist = torqueJobs.qs_parsefile(qsfname) |
93 |
|
94 |
os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt') |
95 |
|
96 |
def mapatts(indict,inkeys): |
97 |
sdict = tam.sub_dict(indict,inkeys) |
98 |
odict = dict() |
99 |
for k in sdict.keys(): |
100 |
if k in tam.tfl and sdict[k]: |
101 |
secs = tam.hms(sdict[k]) |
102 |
sdict[k] = secs/3600. |
103 |
if k in tam.mfl and sdict[k]: |
104 |
mebi = tam.memconvert(sdict[k]) |
105 |
sdict[k] = mebi |
106 |
if k in tam.tfl2 and sdict[k]: |
107 |
secs = tam.tconv(sdict[k]) |
108 |
sdict[k] = secs |
109 |
elif k == 'job_state': |
110 |
statelett = sdict[k] |
111 |
if statelett in ['Q','W']: |
112 |
sdict[k] = 'queued' |
113 |
elif statelett in ['R','E']: |
114 |
sdict[k] = 'running' |
115 |
elif k == 'exec_host' and sdict[k]: |
116 |
termpos = sdict[k].find('/') |
117 |
wnstring = sdict[k][:termpos] |
118 |
sdict[k] = wnstring |
119 |
if sdict[k]: |
120 |
odict[k] = sdict[k] |
121 |
return odict |
122 |
|
123 |
mcjoblist = list() |
124 |
waitingJobs = False |
125 |
for j in jlist: |
126 |
if j['queue'] == MCQUEUE: |
127 |
newj = mapatts(j,usedkeys) |
128 |
mcjoblist.append(newj) |
129 |
if newj['job_state'] == 'queued': |
130 |
waitingJobs = True |
131 |
|
132 |
# find out how many running mc jobs each node has |
133 |
|
134 |
running_mc = dict() |
135 |
for n in nodes: |
136 |
running_mc[n.name] = 0 |
137 |
for j in mcjoblist: |
138 |
if j['job_state'] =='running': |
139 |
wn = j['exec_host'] |
140 |
if wn in running_mc.keys(): |
141 |
running_mc[wn] += 1 |
142 |
rawlist = list() |
143 |
for n in nodes: |
144 |
rawlist.append( (n.name, n, running_mc[n.name], n.freeCpu) ) |
145 |
from operator import itemgetter |
146 |
def srtrunningfree(item): |
147 |
# sort optimal for dropping nodes; for nodes with zero running mc jobs, want to |
148 |
# drop least-drained nodes (fewer free slots first in list); for nodes with |
149 |
# running mc jobs, want to shed as few nodes as possible so pick the more-drained |
150 |
# nodes first. this probably also works for shedding nodes when queue dries up. |
151 |
mcrun = item[2] |
152 |
emptyslots = item[3] |
153 |
if mcrun == 0: |
154 |
rank = emptyslots |
155 |
else: |
156 |
# the original "optimal" order: rank = 32*mcrun - emptyslots # 32 just a number bigger than expected emptyslots value |
157 |
rank = 7168 - 224*float(emptyslots)/mcrun + mcrun # constants 7168 and 224 only serve to enable a multi-level sort. |
158 |
return rank |
159 |
slist = sorted(rawlist, key=srtrunningfree) |
160 |
|
161 |
return waitingJobs, slist |
162 |
|
163 |
def remove_from_mc_pool(node): |
164 |
# print "adding el6 tag back to node", node.name |
165 |
if "el6" not in node.properties : |
166 |
proc = subprocess.Popen(['/usr/bin/qmgr'], |
167 |
stdin=subprocess.PIPE, |
168 |
stdout=subprocess.PIPE,) |
169 |
proc.stdin.write( 'set node ' + node.name + \ |
170 |
' properties += el6\n' ) |
171 |
proc.stdin.write( 'print node ' + node.name + \ |
172 |
' properties\n' ) |
173 |
out = proc.communicate()[0] |
174 |
# print out |
175 |
if "mc" in node.properties : |
176 |
proc = subprocess.Popen(['/usr/bin/qmgr'], |
177 |
stdin=subprocess.PIPE, |
178 |
stdout=subprocess.PIPE,) |
179 |
proc.stdin.write( 'set node ' + node.name + \ |
180 |
' properties -= mc\n' ) |
181 |
proc.stdin.write( 'print node ' + node.name + \ |
182 |
' properties\n' ) |
183 |
out = proc.communicate()[0] |
184 |
# print out |
185 |
|
186 |
def add_to_mc_pool(node): |
187 |
# print "node props b4:", grabbed_node.properties |
188 |
if "el6" in node.properties : |
189 |
proc = subprocess.Popen(['/usr/bin/qmgr'], |
190 |
stdin=subprocess.PIPE, |
191 |
stdout=subprocess.PIPE,) |
192 |
proc.stdin.write( 'set node ' + node.name + \ |
193 |
' properties -= el6\n' ) |
194 |
proc.stdin.write( 'print node ' + node.name + \ |
195 |
' properties\n' ) |
196 |
out = proc.communicate()[0] |
197 |
|
198 |
if "mc" not in node.properties : |
199 |
proc = subprocess.Popen(['/usr/bin/qmgr'], |
200 |
stdin=subprocess.PIPE, |
201 |
stdout=subprocess.PIPE,) |
202 |
proc.stdin.write( 'set node ' + node.name + \ |
203 |
' properties += mc\n' ) |
204 |
proc.stdin.write( 'print node ' + node.name + \ |
205 |
' properties\n' ) |
206 |
out = proc.communicate()[0] |
207 |
|
208 |
import optparse |
209 |
|
210 |
usage = "usage: %prog [-d debug_level] [-n]" |
211 |
|
212 |
p = optparse.OptionParser(description="Monitor state of multicore pool and adjust size as needed", |
213 |
usage=usage) |
214 |
|
215 |
p.add_option("-A",action="store_true",dest="addall",default=False, |
216 |
help="add all eligible nodes to multicore pool") |
217 |
p.add_option("-l",action="store",dest="logfile",default=None, |
218 |
help="log actions and information to LOGFILE (default stdout)") |
219 |
p.add_option("-L",action="store",dest="loglevel",default="INFO", |
220 |
help="print messages of LOGLEVEL (DEBUG, INFO, WARNING, ..., default INFO") |
221 |
p.add_option("-n",action="store_true",dest="noopt",default=False, |
222 |
help="don't do anything, just print what would have been done") |
223 |
p.add_option("-i",action="store_true",dest="info",default=False, |
224 |
help="print info on all nodes in multicore pool") |
225 |
|
226 |
import logging |
227 |
|
228 |
opts, args = p.parse_args() |
229 |
|
230 |
# assuming loglevel is bound to the string value obtained from the |
231 |
# command line argument. Convert to upper case to allow the user to |
232 |
# specify --log=DEBUG or --log=debug |
233 |
|
234 |
numeric_level = getattr(logging, opts.loglevel.upper(), None) |
235 |
if not isinstance(numeric_level, int): |
236 |
raise ValueError('Invalid log level: %s' % loglevel) |
237 |
if opts.logfile: |
238 |
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', |
239 |
level=numeric_level,filename=opts.logfile) |
240 |
else: |
241 |
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', |
242 |
level=numeric_level) |
243 |
|
244 |
if os.path.isfile(UNDERPOP_NODES_FILE): |
245 |
pkl_file = open(UNDERPOP_NODES_FILE,'rb') |
246 |
nodes_too_few_jobs_last_run = pickle.load(pkl_file) |
247 |
pkl_file.close() |
248 |
else: |
249 |
nodes_too_few_jobs_last_run = list() |
250 |
|
251 |
logging.debug("nodes from pickle file, marked from last run: " + \ |
252 |
repr(nodes_too_few_jobs_last_run) ) |
253 |
|
254 |
wnodes = pbsnodes() |
255 |
mcnodes = list() |
256 |
waiting = None |
257 |
node_tuples = None |
258 |
|
259 |
for node in wnodes: |
260 |
if node.name in CANDIDATE_NODES and node.state.count('offline') == 0: |
261 |
mcnodes.append(node) |
262 |
|
263 |
draining_slots = 0 |
264 |
mcdedicated = list() |
265 |
draining_nodes = 0 |
266 |
|
267 |
for node in mcnodes: |
268 |
if 'el6' not in node.properties and 'mc' in node.properties : |
269 |
mcdedicated.append(node) |
270 |
draining_slots += node.freeCpu |
271 |
if node.freeCpu > 0 : draining_nodes += 1 |
272 |
|
273 |
if opts.info: |
274 |
waiting, node_tuples = getmcjobinfo(mcdedicated) |
275 |
for t in node_tuples: |
276 |
if t[2] == 0: |
277 |
print "%28s has %2d running mc jobs, %2d empty slots" % (t[0], t[2], t[3]) |
278 |
else: |
279 |
print "%28s has %2d running mc jobs, %2d empty slots, ratio %4.1f" % (t[0], t[2], t[3], float(t[3])/t[2]) |
280 |
sys.exit(0) |
281 |
candidate_nodes = list() |
282 |
for n in mcnodes: |
283 |
if n not in mcdedicated: |
284 |
candidate_nodes.append(n) |
285 |
|
286 |
if opts.addall: |
287 |
for grabbed_node in candidate_nodes: |
288 |
logging.info("%s added to mc node pool" % (grabbed_node.name)) |
289 |
# candidate_nodes.remove(grabbed_node) |
290 |
if not opts.noopt: add_to_mc_pool(grabbed_node) |
291 |
sys.exit(0) |
292 |
|
293 |
# check for dedicated nodes with too few jobs |
294 |
|
295 |
nodes_with_too_few_jobs = list() |
296 |
|
297 |
for node in mcdedicated: |
298 |
logging.debug(node.name + " has " + repr(node.freeCpu) + " free slots") |
299 |
if node.freeCpu > MAXFREEPERNODE: |
300 |
nodes_with_too_few_jobs.append(node) |
301 |
|
302 |
logging.debug("there are " + repr(len(nodes_with_too_few_jobs)) + \ |
303 |
" nodes with too few jobs") |
304 |
|
305 |
nodes_consistently_underpopulated = list() |
306 |
for n in nodes_with_too_few_jobs: |
307 |
if n.name in nodes_too_few_jobs_last_run: |
308 |
nodes_consistently_underpopulated.append(n) |
309 |
|
310 |
undcount = len(nodes_consistently_underpopulated) |
311 |
if undcount > 0: |
312 |
logging.debug("there are " + repr(undcount) + \ |
313 |
" nodes with too few jobs that were also marked last run" ) |
314 |
|
315 |
import random |
316 |
|
317 |
removed_a_node = False |
318 |
|
319 |
if undcount > 0: |
320 |
logging.info("nodes consistently underpopulated:") |
321 |
for n in nodes_consistently_underpopulated: |
322 |
logging.info(" " + n.name) |
323 |
if undcount > 1: |
324 |
remcount = undcount / 2 # number to remove |
325 |
logging.info("going to remove " + repr(remcount) + " from mc pool") |
326 |
else: |
327 |
remcount = 1 |
328 |
|
329 |
# find out how many running mc jobs each node has |
330 |
|
331 |
waiting, node_tuples = getmcjobinfo(nodes_consistently_underpopulated) |
332 |
|
333 |
for node_t in node_tuples[:remcount]: |
334 |
nname, nnode, running_mc, unused_slots = node_t |
335 |
logging.info("dumped %d empty slots, %d mc jobs on %s" % \ |
336 |
(unused_slots, running_mc, nname) ) |
337 |
if not opts.noopt : remove_from_mc_pool(nnode) |
338 |
nodes_with_too_few_jobs.remove(nnode) |
339 |
nodes_consistently_underpopulated.remove(nnode) |
340 |
removed_a_node = True |
341 |
|
342 |
namelist = list() |
343 |
if len(nodes_with_too_few_jobs) > 0: |
344 |
logging.debug("placing %d nodes " % (len(nodes_with_too_few_jobs)) + \ |
345 |
"with too few jobs on list for next run:") |
346 |
|
347 |
for n in nodes_with_too_few_jobs: |
348 |
logging.debug(n.name) |
349 |
namelist.append(n.name) |
350 |
if not opts.noopt: |
351 |
pkl_file = open(UNDERPOP_NODES_FILE,'wb') |
352 |
pickle.dump(namelist,pkl_file) |
353 |
pkl_file.close() |
354 |
if len(nodes_with_too_few_jobs) > 0 or removed_a_node : |
355 |
sys.exit(0) |
356 |
|
357 |
# if we survive to this point, all nodes 'dedicated' are being efficiently |
358 |
# used. are we draining less than the configured max? |
359 |
|
360 |
logging.debug("There are " + repr(len(mcdedicated)) + " dedicated nodes and " + \ |
361 |
repr(draining_slots) + " unused slots") |
362 |
if (MAXFREE - draining_slots) >= MAXFREEPERNODE: |
363 |
logging.debug("%d unused slots are permitted, so %d more" % (MAXFREE, MAXFREE - draining_slots)) |
364 |
logging.debug("headroom of more than %d+ slots means we can try to grab another node" % (MAXFREEPERNODE) ) |
365 |
if draining_nodes < MAXDRAIN : |
366 |
# first check if there are actually any waiting jobs to run; if not makes no sense to grab a node. |
367 |
waiting, node_tuples = getmcjobinfo(mcdedicated) |
368 |
if not waiting: |
369 |
logging.debug("No waiting jobs found, nothing to do") |
370 |
sys.exit(0) |
371 |
logging.debug("max %d node(s) with unused slots permitted, %d seen" % \ |
372 |
(MAXDRAIN, draining_nodes)) |
373 |
logging.debug("there are also waiting jobs: try to grab another node") |
374 |
# build a list of candidates to grab |
375 |
logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes)) |
376 |
if len(candidate_nodes) < 1: |
377 |
logging.debug("no more nodes, bailing out, nothing more I can do") |
378 |
sys.exit(0) |
379 |
for nn in range(min(NODESPERGRAB,len(candidate_nodes))): |
380 |
# logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes)) |
381 |
grabbed_node=random.choice(candidate_nodes) |
382 |
logging.info("%s added to mc node pool" % (grabbed_node.name)) |
383 |
candidate_nodes.remove(grabbed_node) |
384 |
if not opts.noopt: add_to_mc_pool(grabbed_node) |
385 |
else: |
386 |
logging.debug("There are %d nodes with unused slots (draining)" % (draining_nodes)) |
387 |
logging.debug("This equals or exceeds the configured max of %d" % (MAXDRAIN)) |
388 |
logging.debug("Doing nothing now") |
389 |
|
390 |
elif draining_slots > MAXFREE: |
391 |
|
392 |
# find out how many running mc jobs each node has |
393 |
|
394 |
waiting, node_tuples = getmcjobinfo(mcdedicated) |
395 |
|
396 |
slots_to_recover = draining_slots - MAXFREE |
397 |
logging.info("unused slot limit (%d) exceeded by %d " \ |
398 |
% (MAXFREE, slots_to_recover) + ": remove node(s) from mc pool") |
399 |
slots_recovered = 0 |
400 |
while slots_recovered < slots_to_recover: |
401 |
node_t = node_tuples.pop(0) |
402 |
nname, nnode, running_mc, unused_slots = node_t |
403 |
if unused_slots > 0: |
404 |
logging.info("dumped %d empty slots, %d mc jobs on %s" % \ |
405 |
(unused_slots, running_mc, nname) ) |
406 |
if not opts.noopt : remove_from_mc_pool(nnode) |
407 |
slots_recovered += unused_slots |
408 |
else: |
409 |
logging.debug("%d unused slots of allowed %d" % (draining_slots, MAXFREE)) |
410 |
logging.debug("difference is %d which is less than %d" % \ |
411 |
(MAXFREE - draining_slots, MAXFREEPERNODE) ) |
412 |
logging.debug("so: doing nothing now.") |