/[pdpsoft]/nl.nikhef.ndpf.mcfloat/trunk/mcfloat
ViewVC logotype

Contents of /nl.nikhef.ndpf.mcfloat/trunk/mcfloat

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3332 - (show annotations) (download)
Sat Oct 9 15:44:34 2021 UTC (11 months, 4 weeks ago) by templon
File size: 21044 byte(s)
fixed : -n option had no effect for -R flag


1 #!/usr/bin/env python
2 # $Id$
3 # Source: $URL$
4 # J. A. Templon, NIKHEF/PDP 2014
5
6 # Class to hold parameters:
7
8 class mcfloat_cfg:
9 def __init__(self): # default values; adjust for your site
10 self.maxdrain = 32
11 self.maxfree = 49
12 self.maxfreepernode = 3
13 self.nodespergrab = 3
14 self.capacityfrac = 0.48
15 self.mcqueue = 'atlasmc7'
16 self.underpop_nodes_file = "/var/db/nodes_with_too_few_jobs"
17 self.torque = "korf.nikhef.nl"
18 def __repr__(self):
19 return ("Capacity Fraction %4.2f\nMax Drain %d\nMax Free %d\n" + \
20 "Max Free per Node %d") % (self.capacityfrac,
21 self.maxdrain,
22 self.maxfree,
23 self.maxfreepernode )
24
25 MCFLOAT_CONFIG_FILE = "/var/db/mcfloat_config"
26
27 # the above config allows mcore to run up to full ATLAS allocation (07.04.2015)
28 # full allocation right now is 47% of "non-retired" capacity = knal + mars + car
29 # just configure 47% of each class.
30
31 # override when lsgrid is running (legacy comments, might be useful)
32
33 # MAXFREEPERNODE = 29 # allow lsgrid 30-core jobs
34 # MAXFREE = 149 # max num of free slots to tolerate
35
36 # override when bbmri is running
37
38 # MAXFREE = 149 # max num of free slots to tolerate
39 # MAXFREEPERNODE = 9 # allow bbmri 10-core jobs
40
41 import os
42 import pickle
43 import sys
44 import subprocess
45
46 def getmcjobinfo(nodes, cfg):
47
48 # input is the list of nodes for which to get info
49 # function returns tuple (jobs_waiting, node_info)
50 # jobs_waiting boolean value : are there MC jobs waiting to be run (state Q)? (independent of nodes!)
51 # node_info: list of tuples (wn_name, wn, num_running_mc, num_free_slots)
52 # where wn is the node object for the machine with name wn_name, num_running_mc is the number of
53 # mc jobs running on that node, and num_free_slots is the number of unused job slots on that node
54
55 usedkeys = ['egroup', 'jobid', 'queue', 'job_state', 'euser', 'exec_host' ]
56
57 import torqueJobs
58 import torqueAttMappers as tam
59
60 import tempfile
61 qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp")
62
63 import time
64 os.system('/usr/bin/qstat -f @' + cfg.torque + ' > ' + qsfname)
65 now = time.mktime(time.localtime())
66
67 jlist = torqueJobs.qs_parsefile(qsfname)
68
69 os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt')
70
71 def mapatts(indict,inkeys):
72 sdict = tam.sub_dict(indict,inkeys)
73 odict = dict()
74 for k in sdict.keys():
75 if k in tam.tfl and sdict[k]:
76 secs = tam.hms(sdict[k])
77 sdict[k] = secs/3600.
78 if k in tam.mfl and sdict[k]:
79 mebi = tam.memconvert(sdict[k])
80 sdict[k] = mebi
81 if k in tam.tfl2 and sdict[k]:
82 secs = tam.tconv(sdict[k])
83 sdict[k] = secs
84 elif k == 'job_state':
85 statelett = sdict[k]
86 if statelett in ['Q','W']:
87 sdict[k] = 'queued'
88 elif statelett in ['R','E']:
89 sdict[k] = 'running'
90 elif k == 'exec_host' and sdict[k]:
91 termpos = sdict[k].find('/')
92 wnstring = sdict[k][:termpos]
93 sdict[k] = wnstring
94 if sdict[k]:
95 odict[k] = sdict[k]
96 return odict
97
98 mcjoblist = list()
99 waitingJobs = False
100 for j in jlist:
101 if j['queue'] == cfg.mcqueue:
102 newj = mapatts(j,usedkeys)
103 mcjoblist.append(newj)
104 if newj['job_state'] == 'queued':
105 waitingJobs = True
106
107 # find out how many running mc jobs each node has
108
109 running_mc = dict()
110 for n in nodes:
111 running_mc[n.name] = 0
112 for j in mcjoblist:
113 if j['job_state'] =='running':
114 wn = j['exec_host']
115 if wn in running_mc.keys():
116 running_mc[wn] += 1
117 rawlist = list()
118 for n in nodes:
119 rawlist.append( (n.name, n, running_mc[n.name], n.freeCpu) )
120 from operator import itemgetter
121 def srtrunningfree(item):
122 # sort optimal for dropping nodes; for nodes with zero running mc jobs, want to
123 # drop least-drained nodes (fewer free slots first in list); for nodes with
124 # running mc jobs, want to shed as few nodes as possible so pick the more-drained
125 # nodes first. this probably also works for shedding nodes when queue dries up.
126 mcrun = item[2]
127 emptyslots = item[3]
128 if mcrun == 0:
129 rank = emptyslots
130 else:
131 # the original "optimal" order: rank = 32*mcrun - emptyslots # 32 just a number bigger than expected emptyslots value
132 rank = 7168 - 224*float(emptyslots)/mcrun + mcrun # constants 7168 and 224 only serve to enable a multi-level sort.
133 return rank
134 slist = sorted(rawlist, key=srtrunningfree)
135
136 return waitingJobs, slist
137
138 def remove_from_mc_pool(node):
139 # print "adding el7 tag back to node", node.name
140 if "el7" not in node.properties :
141 proc = subprocess.Popen(['/usr/bin/qmgr'],
142 stdin=subprocess.PIPE,
143 stdout=subprocess.PIPE,)
144 proc.stdin.write( 'set node ' + node.name + \
145 ' properties += el7\n' )
146 proc.stdin.write( 'print node ' + node.name + \
147 ' properties\n' )
148 out = proc.communicate()[0]
149 # print out
150 if "mc" in node.properties :
151 proc = subprocess.Popen(['/usr/bin/qmgr'],
152 stdin=subprocess.PIPE,
153 stdout=subprocess.PIPE,)
154 proc.stdin.write( 'set node ' + node.name + \
155 ' properties -= mc\n' )
156 proc.stdin.write( 'print node ' + node.name + \
157 ' properties\n' )
158 out = proc.communicate()[0]
159 # print out
160
161 def add_to_mc_pool(node):
162 # print "node props b4:", grabbed_node.properties
163 if "el7" in node.properties :
164 proc = subprocess.Popen(['/usr/bin/qmgr'],
165 stdin=subprocess.PIPE,
166 stdout=subprocess.PIPE,)
167 proc.stdin.write( 'set node ' + node.name + \
168 ' properties -= el7\n' )
169 proc.stdin.write( 'print node ' + node.name + \
170 ' properties\n' )
171 out = proc.communicate()[0]
172
173 if "mc" not in node.properties :
174 proc = subprocess.Popen(['/usr/bin/qmgr'],
175 stdin=subprocess.PIPE,
176 stdout=subprocess.PIPE,)
177 proc.stdin.write( 'set node ' + node.name + \
178 ' properties += mc\n' )
179 proc.stdin.write( 'print node ' + node.name + \
180 ' properties\n' )
181 out = proc.communicate()[0]
182
183 import optparse
184
185 usage = "usage: %prog [options]"
186
187 p = optparse.OptionParser(description="Monitor state of multicore pool and adjust size as needed",
188 usage=usage)
189
190 p.add_option("-A",action="store_true",dest="addall",default=False,
191 help="add all eligible nodes to multicore pool")
192
193 p.add_option("-D",action="store_true",dest="delall",default=False,
194 help="delete all nodes from multicore pool")
195
196 p.add_option("-E",action="store_true",dest="add_everything",default=False,
197 help="Adds all nodes (even non-candidates) to the multicore pool")
198
199 p.add_option("-R",action="store_true",dest="remove_strays_from_pool",default=False,
200 help="remove mc pool nodes that are not one of the configured candidate nodes")
201
202 p.add_option("-l",action="store",dest="logfile",default=None,
203 help="log actions and information to LOGFILE (default stdout)")
204
205 p.add_option("-L",action="store",dest="loglevel",default="INFO",
206 help="print messages of LOGLEVEL (DEBUG, INFO, WARNING, ..., default INFO")
207
208 p.add_option("-n",action="store_true",dest="noopt",default=False,
209 help="don't do anything, just print what would have been done")
210
211 p.add_option("-i",action="store_true",dest="info",default=False,
212 help="print info on all nodes in multicore pool")
213
214 p.add_option("-d",action="store",dest="new_max_drain",default=None,
215 help="maximum number of nodes allowed in draining state")
216
217 p.add_option("-m",action="store",dest="new_max_free",default=None,
218 help="maximum number of unused slots to tolerate in the multicore pool")
219
220 p.add_option("-p",action="store",dest="new_max_free_per_node",default=None,
221 help="maximum number of unused slots per node to tolerate in the multicore pool")
222
223 p.add_option("-f",action="store",dest="newfrac",default=None,
224 help="fraction (out of 1.0) of in-service nodes to commit to multicore pool")
225
226 p.add_option("-r",action="store_true",dest="reset_config",default=False,
227 help="reset mcfloat config to the default")
228
229 p.add_option("-q",action="store_true",dest="querycfg",default=False,
230 help="print current config and exit")
231
232 import logging
233
234 opts, args = p.parse_args()
235
236 # assuming loglevel is bound to the string value obtained from the
237 # command line argument. Convert to upper case to allow the user to
238 # specify --log=DEBUG or --log=debug
239
240 numeric_level = getattr(logging, opts.loglevel.upper(), None)
241 if not isinstance(numeric_level, int):
242 raise ValueError('Invalid log level: %s' % loglevel)
243 if opts.logfile:
244 logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
245 level=numeric_level,filename=opts.logfile)
246 else:
247 logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
248 level=numeric_level)
249
250 if os.path.isfile(MCFLOAT_CONFIG_FILE):
251 pkl_file = open(MCFLOAT_CONFIG_FILE,'rb')
252 cfg = pickle.load(pkl_file)
253 pkl_file.close()
254 else:
255 cfg = mcfloat_cfg() # default values
256
257 if opts.querycfg:
258 print "Current mcfloat configuration:"
259 print cfg
260 sys.exit(0)
261
262 config_changed = False
263
264 # bundle all config changers here
265
266 if opts.newfrac:
267 cfg.capacityfrac = float(opts.newfrac)
268 if cfg.capacityfrac < 0 or cfg.capacityfrac > 1:
269 raise ValueError('Invalid capacity fraction: %s' % opts.newfrac)
270 logging.info("Changing capacity fraction to %s" % opts.newfrac)
271 config_changed = True
272
273 if opts.new_max_free:
274 cfg.maxfree = int(opts.new_max_free)
275 if cfg.maxfree < 0:
276 raise ValueError('Invalid max free slots: %s' % opts.new_max_free)
277 logging.info("Changing max free slots to %s" % opts.new_max_free)
278 config_changed = True
279
280 if opts.new_max_free_per_node:
281 cfg.maxfreepernode = int(opts.new_max_free_per_node)
282 if cfg.maxfreepernode < 0:
283 raise ValueError('Invalid max free slots per node: %s' % opts.new_max_free_per_node)
284 logging.info("Changing max free slots per node to %s" % opts.new_max_free_per_node)
285 config_changed = True
286
287 if opts.new_max_drain:
288 cfg.maxdrain = int(opts.new_max_drain)
289 if cfg.maxdrain < 0:
290 raise ValueError('Invalid max num draining nodes: %s' % opts.new_max_drain)
291 logging.info("Changing max number draining nodes to %s" % opts.new_max_drain)
292 config_changed = True
293
294 if opts.reset_config:
295 cfg = mcfloat_cfg() # default values
296 logging.info("Reset mcfloat config to default values")
297 config_changed = True
298
299 ## below after all config changers
300
301 if config_changed:
302 logging.info("writing out changed mcfloat config to config file")
303 if not opts.noopt:
304 pkl_file = open(MCFLOAT_CONFIG_FILE,'wb')
305 pickle.dump(cfg,pkl_file)
306 pkl_file.close()
307 sys.exit(0)
308
309 if opts.delall:
310 cfrac = 1
311 else:
312 cfrac = cfg.capacityfrac
313
314 # NUM_CAR = int(cfrac * 96)
315 # NUM_KNAL = int(cfrac * 18)
316 # NUM_MARS = int(cfrac * 58)
317
318 NUM_CHOC = int(cfrac * 58)
319 NUM_PEP = int(cfrac * 13)
320 NUM_SATE = int(cfrac * 54)
321 NUM_TAAI = int(cfrac * 12)
322 NUM_LOT = int(cfrac * 48) # lots start at 010 and go to 057
323 NUM_SNEL = int(cfrac * 32)
324
325 # there is no check whether START + NUM < number of nodes
326 # so careful when changing START from 1
327
328 # START_CAR = 1
329 # START_KNAL = 1
330 # START_MARS = 1
331
332 START_CHOC = 1
333 START_PEP = 1
334 START_SATE = 1
335 START_TAAI = 1
336 START_LOT = 10
337 START_SNEL = 1
338
339 # RANGE_CAR = range(START_CAR, START_CAR + NUM_CAR + 1)
340 # RANGE_KNAL = range(START_KNAL, START_KNAL + NUM_KNAL)
341 # RANGE_MARS = range(START_MARS, START_MARS + NUM_MARS)
342
343 RANGE_CHOC = range(START_CHOC, START_CHOC + NUM_CHOC)
344 RANGE_PEP = range(START_PEP, START_PEP + NUM_PEP)
345 RANGE_SATE = range(START_SATE, START_SATE + NUM_SATE)
346 RANGE_TAAI = range(START_TAAI, START_TAAI + NUM_TAAI)
347 RANGE_LOT = range(START_LOT, START_LOT + NUM_LOT)
348 RANGE_SNEL = range(START_SNEL, START_SNEL + NUM_SNEL)
349
350 CANDIDATE_NODES = [ 'wn-choc-0%02d.farm.nikhef.nl' % (n) for n in RANGE_CHOC ] + \
351 [ 'wn-pep-0%02d.farm.nikhef.nl' % (n) for n in RANGE_PEP ] + \
352 [ 'wn-sate-0%02d.farm.nikhef.nl' % (n) for n in RANGE_SATE ] + \
353 [ 'wn-taai-0%02d.farm.nikhef.nl' % (n) for n in RANGE_TAAI ] + \
354 [ 'wn-lot-0%02d.farm.nikhef.nl' % (n) for n in RANGE_LOT ] + \
355 [ 'wn-snel-0%02d.farm.nikhef.nl' % (n) for n in RANGE_SNEL ]
356
357 from torque_utils import pbsnodes
358
359 wnodes = pbsnodes()
360 mcnodes = list()
361 waiting = None
362 node_tuples = None
363
364 if opts.add_everything:
365 for gn in [n for n in wnodes if 'offline' not in n.state if ('mc' not in n.properties
366 and 'el7' in n.properties)] :
367 logging.info("%s added to mc node pool" % (gn.name))
368 # candidate_nodes.remove(grabbed_node)
369 if not opts.noopt: add_to_mc_pool(gn)
370 sys.exit(0)
371
372 if opts.remove_strays_from_pool:
373 for node in wnodes:
374 if node.name not in CANDIDATE_NODES and 'mc' in node.properties :
375 if not opts.noopt: remove_from_mc_pool(node)
376 logging.info("%s deleted from mc node pool" % (node.name))
377 sys.exit(0)
378
379 for node in wnodes:
380 if node.name in CANDIDATE_NODES and (node.state.count('offline') == 0 and
381 ('mc' in node.properties or
382 'el7' in node.properties)):
383 mcnodes.append(node)
384
385 if os.path.isfile(cfg.underpop_nodes_file):
386 pkl_file = open(cfg.underpop_nodes_file,'rb')
387 nodes_too_few_jobs_last_run = pickle.load(pkl_file)
388 pkl_file.close()
389 else:
390 nodes_too_few_jobs_last_run = list()
391
392 logging.debug("nodes from pickle file, marked from last run: " + \
393 repr(nodes_too_few_jobs_last_run) )
394
395 draining_slots = 0
396 mcdedicated = list()
397 draining_nodes = 0
398
399 for node in wnodes:
400 if 'el7' not in node.properties and 'mc' in node.properties :
401 mcdedicated.append(node)
402 draining_slots += node.freeCpu
403 if node.freeCpu > 0 : draining_nodes += 1
404
405 if opts.info:
406 waiting, node_tuples = getmcjobinfo(mcdedicated,cfg)
407 for t in node_tuples:
408 if t[2] == 0:
409 print "%28s has %2d running mc jobs, %2d empty slots" % (t[0], t[2], t[3])
410 else:
411 print "%28s has %2d running mc jobs, %2d empty slots, ratio %4.1f" % (t[0], t[2], t[3], float(t[3])/t[2])
412 sys.exit(0)
413 candidate_nodes = list()
414 for n in mcnodes:
415 if n not in mcdedicated:
416 candidate_nodes.append(n)
417
418 if opts.delall:
419 for grabbed_node in wnodes:
420 if 'mc' in grabbed_node.properties :
421 logging.info("%s deleted from mc node pool" % (grabbed_node.name))
422 if not opts.noopt: remove_from_mc_pool(grabbed_node)
423 sys.exit(0)
424
425 if opts.addall:
426 for grabbed_node in candidate_nodes:
427 logging.info("%s added to mc node pool" % (grabbed_node.name))
428 # candidate_nodes.remove(grabbed_node)
429 if not opts.noopt: add_to_mc_pool(grabbed_node)
430 sys.exit(0)
431
432 # check for dedicated nodes with too few jobs
433
434 nodes_with_too_few_jobs = list()
435
436 for node in mcdedicated:
437 logging.debug(node.name + " has " + repr(node.freeCpu) + " free slots")
438 if node.freeCpu > cfg.maxfreepernode:
439 nodes_with_too_few_jobs.append(node)
440
441 logging.debug("there are " + repr(len(nodes_with_too_few_jobs)) + \
442 " nodes with too few jobs")
443
444 nodes_consistently_underpopulated = list()
445 for n in nodes_with_too_few_jobs:
446 if n.name in nodes_too_few_jobs_last_run:
447 nodes_consistently_underpopulated.append(n)
448
449 undcount = len(nodes_consistently_underpopulated)
450 if undcount > 0:
451 logging.debug("there are " + repr(undcount) + \
452 " nodes with too few jobs that were also marked last run" )
453
454 import random
455
456 removed_a_node = False
457
458 if undcount > 0:
459 logging.info("nodes consistently underpopulated:")
460 for n in nodes_consistently_underpopulated:
461 logging.info(" " + n.name)
462 if undcount > 1:
463 remcount = undcount / 2 # number to remove
464 logging.info("going to remove " + repr(remcount) + " from mc pool")
465 else:
466 remcount = 1
467
468 # find out how many running mc jobs each node has
469
470 waiting, node_tuples = getmcjobinfo(nodes_consistently_underpopulated,cfg)
471
472 for node_t in node_tuples[:remcount]:
473 nname, nnode, running_mc, unused_slots = node_t
474 logging.info("dumped %d empty slots, %d mc jobs on %s" % \
475 (unused_slots, running_mc, nname) )
476 if not opts.noopt : remove_from_mc_pool(nnode)
477 nodes_with_too_few_jobs.remove(nnode)
478 nodes_consistently_underpopulated.remove(nnode)
479 removed_a_node = True
480
481 namelist = list()
482 if len(nodes_with_too_few_jobs) > 0:
483 logging.debug("placing %d nodes " % (len(nodes_with_too_few_jobs)) + \
484 "with too few jobs on list for next run:")
485
486 for n in nodes_with_too_few_jobs:
487 logging.debug(n.name)
488 namelist.append(n.name)
489 if not opts.noopt:
490 pkl_file = open(cfg.underpop_nodes_file,'wb')
491 pickle.dump(namelist,pkl_file)
492 pkl_file.close()
493 if len(nodes_with_too_few_jobs) > 0 or removed_a_node :
494 sys.exit(0)
495
496 # if we survive to this point, all nodes 'dedicated' are being efficiently
497 # used. are we draining less than the configured max?
498
499 logging.debug("There are " + repr(len(mcdedicated)) + " dedicated nodes and " + \
500 repr(draining_slots) + " unused slots")
501 if (cfg.maxfree - draining_slots) >= cfg.maxfreepernode:
502 logging.debug("%d unused slots are permitted, so %d more" % (cfg.maxfree, cfg.maxfree - draining_slots))
503 logging.debug("headroom of more than %d+ slots means we can try to grab another node" % (cfg.maxfreepernode) )
504 if draining_nodes < cfg.maxdrain :
505 # first check if there are actually any waiting jobs to run; if not makes no sense to grab a node.
506 waiting, node_tuples = getmcjobinfo(mcdedicated,cfg)
507 if not waiting:
508 logging.debug("No waiting jobs found, nothing to do")
509 sys.exit(0)
510 logging.debug("max %d node(s) with unused slots permitted, %d seen" % \
511 (cfg.maxdrain, draining_nodes))
512 logging.debug("there are also waiting jobs: try to grab another node")
513 # build a list of candidates to grab
514 logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))
515 if len(candidate_nodes) < 1:
516 logging.debug("no more nodes, bailing out, nothing more I can do")
517 sys.exit(0)
518 for nn in range(min(cfg.nodespergrab,len(candidate_nodes))):
519 # logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))
520 grabbed_node=random.choice(candidate_nodes)
521 logging.info("%s added to mc node pool" % (grabbed_node.name))
522 candidate_nodes.remove(grabbed_node)
523 if not opts.noopt: add_to_mc_pool(grabbed_node)
524 else:
525 logging.debug("There are %d nodes with unused slots (draining)" % (draining_nodes))
526 logging.debug("This equals or exceeds the configured max of %d" % (cfg.maxdrain))
527 logging.debug("Doing nothing now")
528
529 elif draining_slots > cfg.maxfree:
530
531 # find out how many running mc jobs each node has
532
533 waiting, node_tuples = getmcjobinfo(mcdedicated,cfg)
534
535 slots_to_recover = draining_slots - cfg.maxfree
536 logging.info("unused slot limit (%d) exceeded by %d " \
537 % (cfg.maxfree, slots_to_recover) + ": remove node(s) from mc pool")
538 slots_recovered = 0
539 while slots_recovered < slots_to_recover:
540 node_t = node_tuples.pop(0)
541 nname, nnode, running_mc, unused_slots = node_t
542 if unused_slots > 0:
543 logging.info("dumped %d empty slots, %d mc jobs on %s" % \
544 (unused_slots, running_mc, nname) )
545 if not opts.noopt : remove_from_mc_pool(nnode)
546 slots_recovered += unused_slots
547 else:
548 logging.debug("%d unused slots of allowed %d" % (draining_slots, cfg.maxfree))
549 logging.debug("difference is %d which is less than %d" % \
550 (cfg.maxfree - draining_slots, cfg.maxfreepernode) )
551 logging.debug("so: doing nothing now.")

Properties

Name Value
svn:executable *
svn:keywords Id URL

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28