/[pdpsoft]/nl.nikhef.ndpf.mcfloat/trunk/mcfloat
ViewVC logotype

Contents of /nl.nikhef.ndpf.mcfloat/trunk/mcfloat

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3150 - (show annotations) (download)
Mon Jan 30 10:54:04 2017 UTC (5 years, 6 months ago) by templon
File size: 19359 byte(s)
move START_CHOC from 1 to 11; Dennis had reserved chocs 1..10 for Jos Vermaseren


1 #!/usr/bin/env python
2 # $Id$
3 # Source: $URL$
4 # J. A. Templon, NIKHEF/PDP 2014
5
6 # Class to hold parameters:
7
8 class mcfloat_cfg:
9 def __init__(self): # default values; adjust for your site
10 self.maxdrain = 32
11 self.maxfree = 49
12 self.maxfreepernode = 3
13 self.nodespergrab = 3
14 self.capacityfrac = 0.39
15 self.mcqueue = 'atlasmc'
16 self.underpop_nodes_file = "/var/db/nodes_with_too_few_jobs"
17 self.torque = "korf.nikhef.nl"
18 def __repr__(self):
19 return "Capacity Fraction %4.2f\nMax Drain %d\nMax Free %d\nMax Free per Node %d" % (self.capacityfrac,
20 self.maxdrain,
21 self.maxfree,
22 self.maxfreepernode )
23
24 MCFLOAT_CONFIG_FILE = "/var/db/mcfloat_config"
25
26 # the above config allows mcore to run up to full ATLAS allocation (07.04.2015)
27 # full allocation right now is 47% of "non-retired" capacity = knal + mars + car
28 # just configure 47% of each class.
29
30 # override when lsgrid is running (legacy comments, might be useful)
31
32 # MAXFREEPERNODE = 29 # allow lsgrid 30-core jobs
33 # MAXFREE = 149 # max num of free slots to tolerate
34
35 # override when bbmri is running
36
37 # MAXFREE = 149 # max num of free slots to tolerate
38 # MAXFREEPERNODE = 9 # allow bbmri 10-core jobs
39
40 import os
41 import pickle
42 import sys
43 import subprocess
44
45 def getmcjobinfo(nodes, cfg):
46
47 # input is the list of nodes for which to get info
48 # function returns tuple (jobs_waiting, node_info)
49 # jobs_waiting boolean value : are there MC jobs waiting to be run (state Q)? (independent of nodes!)
50 # node_info: list of tuples (wn_name, wn, num_running_mc, num_free_slots)
51 # where wn is the node object for the machine with name wn_name, num_running_mc is the number of
52 # mc jobs running on that node, and num_free_slots is the number of unused job slots on that node
53
54 usedkeys = ['egroup', 'jobid', 'queue', 'job_state', 'euser', 'exec_host' ]
55
56 import torqueJobs
57 import torqueAttMappers as tam
58
59 import tempfile
60 qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp")
61
62 import time
63 os.system('/usr/bin/qstat -f @' + cfg.torque + ' > ' + qsfname)
64 now = time.mktime(time.localtime())
65
66 jlist = torqueJobs.qs_parsefile(qsfname)
67
68 os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt')
69
70 def mapatts(indict,inkeys):
71 sdict = tam.sub_dict(indict,inkeys)
72 odict = dict()
73 for k in sdict.keys():
74 if k in tam.tfl and sdict[k]:
75 secs = tam.hms(sdict[k])
76 sdict[k] = secs/3600.
77 if k in tam.mfl and sdict[k]:
78 mebi = tam.memconvert(sdict[k])
79 sdict[k] = mebi
80 if k in tam.tfl2 and sdict[k]:
81 secs = tam.tconv(sdict[k])
82 sdict[k] = secs
83 elif k == 'job_state':
84 statelett = sdict[k]
85 if statelett in ['Q','W']:
86 sdict[k] = 'queued'
87 elif statelett in ['R','E']:
88 sdict[k] = 'running'
89 elif k == 'exec_host' and sdict[k]:
90 termpos = sdict[k].find('/')
91 wnstring = sdict[k][:termpos]
92 sdict[k] = wnstring
93 if sdict[k]:
94 odict[k] = sdict[k]
95 return odict
96
97 mcjoblist = list()
98 waitingJobs = False
99 for j in jlist:
100 if j['queue'] == cfg.mcqueue:
101 newj = mapatts(j,usedkeys)
102 mcjoblist.append(newj)
103 if newj['job_state'] == 'queued':
104 waitingJobs = True
105
106 # find out how many running mc jobs each node has
107
108 running_mc = dict()
109 for n in nodes:
110 running_mc[n.name] = 0
111 for j in mcjoblist:
112 if j['job_state'] =='running':
113 wn = j['exec_host']
114 if wn in running_mc.keys():
115 running_mc[wn] += 1
116 rawlist = list()
117 for n in nodes:
118 rawlist.append( (n.name, n, running_mc[n.name], n.freeCpu) )
119 from operator import itemgetter
120 def srtrunningfree(item):
121 # sort optimal for dropping nodes; for nodes with zero running mc jobs, want to
122 # drop least-drained nodes (fewer free slots first in list); for nodes with
123 # running mc jobs, want to shed as few nodes as possible so pick the more-drained
124 # nodes first. this probably also works for shedding nodes when queue dries up.
125 mcrun = item[2]
126 emptyslots = item[3]
127 if mcrun == 0:
128 rank = emptyslots
129 else:
130 # the original "optimal" order: rank = 32*mcrun - emptyslots # 32 just a number bigger than expected emptyslots value
131 rank = 7168 - 224*float(emptyslots)/mcrun + mcrun # constants 7168 and 224 only serve to enable a multi-level sort.
132 return rank
133 slist = sorted(rawlist, key=srtrunningfree)
134
135 return waitingJobs, slist
136
137 def remove_from_mc_pool(node):
138 # print "adding el6 tag back to node", node.name
139 if "el6" not in node.properties :
140 proc = subprocess.Popen(['/usr/bin/qmgr'],
141 stdin=subprocess.PIPE,
142 stdout=subprocess.PIPE,)
143 proc.stdin.write( 'set node ' + node.name + \
144 ' properties += el6\n' )
145 proc.stdin.write( 'print node ' + node.name + \
146 ' properties\n' )
147 out = proc.communicate()[0]
148 # print out
149 if "mc" in node.properties :
150 proc = subprocess.Popen(['/usr/bin/qmgr'],
151 stdin=subprocess.PIPE,
152 stdout=subprocess.PIPE,)
153 proc.stdin.write( 'set node ' + node.name + \
154 ' properties -= mc\n' )
155 proc.stdin.write( 'print node ' + node.name + \
156 ' properties\n' )
157 out = proc.communicate()[0]
158 # print out
159
160 def add_to_mc_pool(node):
161 # print "node props b4:", grabbed_node.properties
162 if "el6" in node.properties :
163 proc = subprocess.Popen(['/usr/bin/qmgr'],
164 stdin=subprocess.PIPE,
165 stdout=subprocess.PIPE,)
166 proc.stdin.write( 'set node ' + node.name + \
167 ' properties -= el6\n' )
168 proc.stdin.write( 'print node ' + node.name + \
169 ' properties\n' )
170 out = proc.communicate()[0]
171
172 if "mc" not in node.properties :
173 proc = subprocess.Popen(['/usr/bin/qmgr'],
174 stdin=subprocess.PIPE,
175 stdout=subprocess.PIPE,)
176 proc.stdin.write( 'set node ' + node.name + \
177 ' properties += mc\n' )
178 proc.stdin.write( 'print node ' + node.name + \
179 ' properties\n' )
180 out = proc.communicate()[0]
181
182 import optparse
183
184 usage = "usage: %prog [options]"
185
186 p = optparse.OptionParser(description="Monitor state of multicore pool and adjust size as needed",
187 usage=usage)
188
189 p.add_option("-A",action="store_true",dest="addall",default=False,
190 help="add all eligible nodes to multicore pool")
191
192 p.add_option("-D",action="store_true",dest="delall",default=False,
193 help="delete all nodes from multicore pool")
194
195 p.add_option("-R",action="store_true",dest="remove_strays_from_pool",default=False,
196 help="remove mc pool nodes that are not one of the configured candidate nodes")
197
198 p.add_option("-l",action="store",dest="logfile",default=None,
199 help="log actions and information to LOGFILE (default stdout)")
200
201 p.add_option("-L",action="store",dest="loglevel",default="INFO",
202 help="print messages of LOGLEVEL (DEBUG, INFO, WARNING, ..., default INFO")
203
204 p.add_option("-n",action="store_true",dest="noopt",default=False,
205 help="don't do anything, just print what would have been done")
206
207 p.add_option("-i",action="store_true",dest="info",default=False,
208 help="print info on all nodes in multicore pool")
209
210 p.add_option("-d",action="store",dest="new_max_drain",default=None,
211 help="maximum number of nodes allowed in draining state")
212
213 p.add_option("-m",action="store",dest="new_max_free",default=None,
214 help="maximum number of unused slots to tolerate in the multicore pool")
215
216 p.add_option("-f",action="store",dest="newfrac",default=None,
217 help="fraction (out of 1.0) of in-service nodes to commit to multicore pool")
218
219 p.add_option("-r",action="store_true",dest="reset_config",default=False,
220 help="reset mcfloat config to the default")
221
222 p.add_option("-q",action="store_true",dest="querycfg",default=False,
223 help="print current config and exit")
224
225 import logging
226
227 opts, args = p.parse_args()
228
229 # assuming loglevel is bound to the string value obtained from the
230 # command line argument. Convert to upper case to allow the user to
231 # specify --log=DEBUG or --log=debug
232
233 numeric_level = getattr(logging, opts.loglevel.upper(), None)
234 if not isinstance(numeric_level, int):
235 raise ValueError('Invalid log level: %s' % loglevel)
236 if opts.logfile:
237 logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
238 level=numeric_level,filename=opts.logfile)
239 else:
240 logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
241 level=numeric_level)
242
243 if os.path.isfile(MCFLOAT_CONFIG_FILE):
244 pkl_file = open(MCFLOAT_CONFIG_FILE,'rb')
245 cfg = pickle.load(pkl_file)
246 pkl_file.close()
247 else:
248 cfg = mcfloat_cfg() # default values
249
250 if opts.querycfg:
251 print "Current mcfloat configuration:"
252 print cfg
253 sys.exit(0)
254
255 config_changed = False
256
257 # bundle all config changers here
258
259 if opts.newfrac:
260 cfg.capacityfrac = float(opts.newfrac)
261 if cfg.capacityfrac < 0 or cfg.capacityfrac > 1:
262 raise ValueError('Invalid capacity fraction: %s' % opts.newfrac)
263 logging.info("Changing capacity fraction to %s" % opts.newfrac)
264 config_changed = True
265
266 if opts.new_max_free:
267 cfg.maxfree = int(opts.new_max_free)
268 if cfg.maxfree < 0:
269 raise ValueError('Invalid max free slots: %s' % opts.new_max_free)
270 logging.info("Changing max free slots to %s" % opts.new_max_free)
271 config_changed = True
272
273 if opts.new_max_drain:
274 cfg.maxdrain = int(opts.new_max_drain)
275 if cfg.maxdrain < 0:
276 raise ValueError('Invalid max num draining nodes: %s' % opts.new_max_drain)
277 logging.info("Changing max number draining nodes to %s" % opts.new_max_drain)
278 config_changed = True
279
280 if opts.reset_config:
281 cfg = mcfloat_cfg() # default values
282 logging.info("Reset mcfloat config to default values")
283 config_changed = True
284
285 ## below after all config changers
286
287 if config_changed:
288 logging.info("writing out changed mcfloat config to config file")
289 if not opts.noopt:
290 pkl_file = open(MCFLOAT_CONFIG_FILE,'wb')
291 pickle.dump(cfg,pkl_file)
292 pkl_file.close()
293 sys.exit(0)
294
295 if opts.delall:
296 cfrac = 1
297 else:
298 cfrac = cfg.capacityfrac
299
300 # NUM_CAR = int(cfrac * 96)
301 NUM_KNAL = int(cfrac * 18)
302 NUM_MARS = int(cfrac * 58)
303 NUM_CHOC = int(cfrac * 58)
304
305 # there is no check whether START + NUM < number of nodes
306 # so careful when changing START from 1
307
308 # START_CAR = 1
309 START_KNAL = 1
310 START_MARS = 1
311 START_CHOC = 11
312
313 # RANGE_CAR = range(START_CAR, START_CAR + NUM_CAR + 1)
314 RANGE_KNAL = range(START_KNAL, START_KNAL + NUM_KNAL + 1)
315 RANGE_MARS = range(START_MARS, START_MARS + NUM_MARS + 1)
316 RANGE_CHOC = range(START_CHOC, START_CHOC + NUM_CHOC + 1)
317
318 CANDIDATE_NODES = [ 'wn-knal-0%02d.farm.nikhef.nl' % (n) \
319 for n in RANGE_KNAL ] + \
320 [ 'wn-mars-0%02d.farm.nikhef.nl' % (n) \
321 for n in RANGE_MARS ] + \
322 [ 'wn-choc-0%02d.farm.nikhef.nl' % (n) \
323 for n in RANGE_CHOC ]
324
325
326 from torque_utils import pbsnodes
327
328 wnodes = pbsnodes()
329 mcnodes = list()
330 waiting = None
331 node_tuples = None
332
333 if opts.remove_strays_from_pool:
334 for node in wnodes:
335 if node.name not in CANDIDATE_NODES and 'mc' in node.properties :
336 remove_from_mc_pool(node)
337 logging.info("%s deleted from mc node pool" % (node.name))
338 sys.exit(0)
339
340 for node in wnodes:
341 if node.name in CANDIDATE_NODES and node.state.count('offline') == 0:
342 mcnodes.append(node)
343
344 if os.path.isfile(cfg.underpop_nodes_file):
345 pkl_file = open(cfg.underpop_nodes_file,'rb')
346 nodes_too_few_jobs_last_run = pickle.load(pkl_file)
347 pkl_file.close()
348 else:
349 nodes_too_few_jobs_last_run = list()
350
351 logging.debug("nodes from pickle file, marked from last run: " + \
352 repr(nodes_too_few_jobs_last_run) )
353
354 draining_slots = 0
355 mcdedicated = list()
356 draining_nodes = 0
357
358 for node in mcnodes:
359 if 'el6' not in node.properties and 'mc' in node.properties :
360 mcdedicated.append(node)
361 draining_slots += node.freeCpu
362 if node.freeCpu > 0 : draining_nodes += 1
363
364 if opts.info:
365 waiting, node_tuples = getmcjobinfo(mcdedicated,cfg)
366 for t in node_tuples:
367 if t[2] == 0:
368 print "%28s has %2d running mc jobs, %2d empty slots" % (t[0], t[2], t[3])
369 else:
370 print "%28s has %2d running mc jobs, %2d empty slots, ratio %4.1f" % (t[0], t[2], t[3], float(t[3])/t[2])
371 sys.exit(0)
372 candidate_nodes = list()
373 for n in mcnodes:
374 if n not in mcdedicated:
375 candidate_nodes.append(n)
376
377 if opts.delall:
378 for grabbed_node in wnodes:
379 if 'mc' in grabbed_node.properties :
380 logging.info("%s deleted from mc node pool" % (grabbed_node.name))
381 if not opts.noopt: remove_from_mc_pool(grabbed_node)
382 sys.exit(0)
383
384 if opts.addall:
385 for grabbed_node in candidate_nodes:
386 logging.info("%s added to mc node pool" % (grabbed_node.name))
387 # candidate_nodes.remove(grabbed_node)
388 if not opts.noopt: add_to_mc_pool(grabbed_node)
389 sys.exit(0)
390
391 # check for dedicated nodes with too few jobs
392
393 nodes_with_too_few_jobs = list()
394
395 for node in mcdedicated:
396 logging.debug(node.name + " has " + repr(node.freeCpu) + " free slots")
397 if node.freeCpu > cfg.maxfreepernode:
398 nodes_with_too_few_jobs.append(node)
399
400 logging.debug("there are " + repr(len(nodes_with_too_few_jobs)) + \
401 " nodes with too few jobs")
402
403 nodes_consistently_underpopulated = list()
404 for n in nodes_with_too_few_jobs:
405 if n.name in nodes_too_few_jobs_last_run:
406 nodes_consistently_underpopulated.append(n)
407
408 undcount = len(nodes_consistently_underpopulated)
409 if undcount > 0:
410 logging.debug("there are " + repr(undcount) + \
411 " nodes with too few jobs that were also marked last run" )
412
413 import random
414
415 removed_a_node = False
416
417 if undcount > 0:
418 logging.info("nodes consistently underpopulated:")
419 for n in nodes_consistently_underpopulated:
420 logging.info(" " + n.name)
421 if undcount > 1:
422 remcount = undcount / 2 # number to remove
423 logging.info("going to remove " + repr(remcount) + " from mc pool")
424 else:
425 remcount = 1
426
427 # find out how many running mc jobs each node has
428
429 waiting, node_tuples = getmcjobinfo(nodes_consistently_underpopulated,cfg)
430
431 for node_t in node_tuples[:remcount]:
432 nname, nnode, running_mc, unused_slots = node_t
433 logging.info("dumped %d empty slots, %d mc jobs on %s" % \
434 (unused_slots, running_mc, nname) )
435 if not opts.noopt : remove_from_mc_pool(nnode)
436 nodes_with_too_few_jobs.remove(nnode)
437 nodes_consistently_underpopulated.remove(nnode)
438 removed_a_node = True
439
440 namelist = list()
441 if len(nodes_with_too_few_jobs) > 0:
442 logging.debug("placing %d nodes " % (len(nodes_with_too_few_jobs)) + \
443 "with too few jobs on list for next run:")
444
445 for n in nodes_with_too_few_jobs:
446 logging.debug(n.name)
447 namelist.append(n.name)
448 if not opts.noopt:
449 pkl_file = open(cfg.underpop_nodes_file,'wb')
450 pickle.dump(namelist,pkl_file)
451 pkl_file.close()
452 if len(nodes_with_too_few_jobs) > 0 or removed_a_node :
453 sys.exit(0)
454
455 # if we survive to this point, all nodes 'dedicated' are being efficiently
456 # used. are we draining less than the configured max?
457
458 logging.debug("There are " + repr(len(mcdedicated)) + " dedicated nodes and " + \
459 repr(draining_slots) + " unused slots")
460 if (cfg.maxfree - draining_slots) >= cfg.maxfreepernode:
461 logging.debug("%d unused slots are permitted, so %d more" % (cfg.maxfree, cfg.maxfree - draining_slots))
462 logging.debug("headroom of more than %d+ slots means we can try to grab another node" % (cfg.maxfreepernode) )
463 if draining_nodes < cfg.maxdrain :
464 # first check if there are actually any waiting jobs to run; if not makes no sense to grab a node.
465 waiting, node_tuples = getmcjobinfo(mcdedicated,cfg)
466 if not waiting:
467 logging.debug("No waiting jobs found, nothing to do")
468 sys.exit(0)
469 logging.debug("max %d node(s) with unused slots permitted, %d seen" % \
470 (cfg.maxdrain, draining_nodes))
471 logging.debug("there are also waiting jobs: try to grab another node")
472 # build a list of candidates to grab
473 logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))
474 if len(candidate_nodes) < 1:
475 logging.debug("no more nodes, bailing out, nothing more I can do")
476 sys.exit(0)
477 for nn in range(min(cfg.nodespergrab,len(candidate_nodes))):
478 # logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))
479 grabbed_node=random.choice(candidate_nodes)
480 logging.info("%s added to mc node pool" % (grabbed_node.name))
481 candidate_nodes.remove(grabbed_node)
482 if not opts.noopt: add_to_mc_pool(grabbed_node)
483 else:
484 logging.debug("There are %d nodes with unused slots (draining)" % (draining_nodes))
485 logging.debug("This equals or exceeds the configured max of %d" % (cfg.maxdrain))
486 logging.debug("Doing nothing now")
487
488 elif draining_slots > cfg.maxfree:
489
490 # find out how many running mc jobs each node has
491
492 waiting, node_tuples = getmcjobinfo(mcdedicated,cfg)
493
494 slots_to_recover = draining_slots - cfg.maxfree
495 logging.info("unused slot limit (%d) exceeded by %d " \
496 % (cfg.maxfree, slots_to_recover) + ": remove node(s) from mc pool")
497 slots_recovered = 0
498 while slots_recovered < slots_to_recover:
499 node_t = node_tuples.pop(0)
500 nname, nnode, running_mc, unused_slots = node_t
501 if unused_slots > 0:
502 logging.info("dumped %d empty slots, %d mc jobs on %s" % \
503 (unused_slots, running_mc, nname) )
504 if not opts.noopt : remove_from_mc_pool(nnode)
505 slots_recovered += unused_slots
506 else:
507 logging.debug("%d unused slots of allowed %d" % (draining_slots, cfg.maxfree))
508 logging.debug("difference is %d which is less than %d" % \
509 (cfg.maxfree - draining_slots, cfg.maxfreepernode) )
510 logging.debug("so: doing nothing now.")

Properties

Name Value
svn:executable *
svn:keywords Id URL

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28