/[pdpsoft]/nl.nikhef.ndpf.mcfloat/tags/REL-1.1.1/mcfloat
ViewVC logotype

Contents of /nl.nikhef.ndpf.mcfloat/tags/REL-1.1.1/mcfloat

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2864 - (show annotations) (download)
Sun Feb 14 11:04:53 2016 UTC (6 years, 5 months ago) by templon
File size: 18761 byte(s)
Tag release 1.1.1
1 #!/usr/bin/env python
2 # $Id$
3 # Source: $URL$
4 # J. A. Templon, NIKHEF/PDP 2014
5
6 # Class to hold parameters:
7
8 class mcfloat_cfg:
9 def __init__(self): # default values; adjust for your site
10 self.maxdrain = 32
11 self.maxfree = 49
12 self.maxfreepernode = 3
13 self.nodespergrab = 3
14 self.capacityfrac = 0.47
15 self.mcqueue = 'atlasmc'
16 self.underpop_nodes_file = "/var/db/nodes_with_too_few_jobs"
17 self.torque = "stro.nikhef.nl"
18 def __repr__(self):
19 return "Capacity Fraction %4.2f\nMax Drain %d\nMax Free %d\nMax Free per Node %d" % (self.capacityfrac,
20 self.maxdrain,
21 self.maxfree,
22 self.maxfreepernode )
23
24 MCFLOAT_CONFIG_FILE = "/var/db/mcfloat_config"
25
26 # the above config allows mcore to run up to full ATLAS allocation (07.04.2015)
27 # full allocation right now is 47% of "non-retired" capacity = knal + mars + car
28 # just configure 47% of each class.
29
30 # override when lsgrid is running (legacy comments, might be useful)
31
32 # MAXFREEPERNODE = 29 # allow lsgrid 30-core jobs
33 # MAXFREE = 149 # max num of free slots to tolerate
34
35 # override when bbmri is running
36
37 # MAXFREE = 149 # max num of free slots to tolerate
38 # MAXFREEPERNODE = 9 # allow bbmri 10-core jobs
39
40 import os
41 import pickle
42 import sys
43 import subprocess
44
45 def getmcjobinfo(nodes, cfg):
46
47 # input is the list of nodes for which to get info
48 # function returns tuple (jobs_waiting, node_info)
49 # jobs_waiting boolean value : are there MC jobs waiting to be run (state Q)? (independent of nodes!)
50 # node_info: list of tuples (wn_name, wn, num_running_mc, num_free_slots)
51 # where wn is the node object for the machine with name wn_name, num_running_mc is the number of
52 # mc jobs running on that node, and num_free_slots is the number of unused job slots on that node
53
54 usedkeys = ['egroup', 'jobid', 'queue', 'job_state', 'euser', 'exec_host' ]
55
56 import torqueJobs
57 import torqueAttMappers as tam
58
59 import tempfile
60 qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp")
61
62 import time
63 os.system('/usr/bin/qstat -f @' + cfg.torque + ' > ' + qsfname)
64 now = time.mktime(time.localtime())
65
66 jlist = torqueJobs.qs_parsefile(qsfname)
67
68 os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt')
69
70 def mapatts(indict,inkeys):
71 sdict = tam.sub_dict(indict,inkeys)
72 odict = dict()
73 for k in sdict.keys():
74 if k in tam.tfl and sdict[k]:
75 secs = tam.hms(sdict[k])
76 sdict[k] = secs/3600.
77 if k in tam.mfl and sdict[k]:
78 mebi = tam.memconvert(sdict[k])
79 sdict[k] = mebi
80 if k in tam.tfl2 and sdict[k]:
81 secs = tam.tconv(sdict[k])
82 sdict[k] = secs
83 elif k == 'job_state':
84 statelett = sdict[k]
85 if statelett in ['Q','W']:
86 sdict[k] = 'queued'
87 elif statelett in ['R','E']:
88 sdict[k] = 'running'
89 elif k == 'exec_host' and sdict[k]:
90 termpos = sdict[k].find('/')
91 wnstring = sdict[k][:termpos]
92 sdict[k] = wnstring
93 if sdict[k]:
94 odict[k] = sdict[k]
95 return odict
96
97 mcjoblist = list()
98 waitingJobs = False
99 for j in jlist:
100 if j['queue'] == cfg.mcqueue:
101 newj = mapatts(j,usedkeys)
102 mcjoblist.append(newj)
103 if newj['job_state'] == 'queued':
104 waitingJobs = True
105
106 # find out how many running mc jobs each node has
107
108 running_mc = dict()
109 for n in nodes:
110 running_mc[n.name] = 0
111 for j in mcjoblist:
112 if j['job_state'] =='running':
113 wn = j['exec_host']
114 if wn in running_mc.keys():
115 running_mc[wn] += 1
116 rawlist = list()
117 for n in nodes:
118 rawlist.append( (n.name, n, running_mc[n.name], n.freeCpu) )
119 from operator import itemgetter
120 def srtrunningfree(item):
121 # sort optimal for dropping nodes; for nodes with zero running mc jobs, want to
122 # drop least-drained nodes (fewer free slots first in list); for nodes with
123 # running mc jobs, want to shed as few nodes as possible so pick the more-drained
124 # nodes first. this probably also works for shedding nodes when queue dries up.
125 mcrun = item[2]
126 emptyslots = item[3]
127 if mcrun == 0:
128 rank = emptyslots
129 else:
130 # the original "optimal" order: rank = 32*mcrun - emptyslots # 32 just a number bigger than expected emptyslots value
131 rank = 7168 - 224*float(emptyslots)/mcrun + mcrun # constants 7168 and 224 only serve to enable a multi-level sort.
132 return rank
133 slist = sorted(rawlist, key=srtrunningfree)
134
135 return waitingJobs, slist
136
137 def remove_from_mc_pool(node):
138 # print "adding el6 tag back to node", node.name
139 if "el6" not in node.properties :
140 proc = subprocess.Popen(['/usr/bin/qmgr'],
141 stdin=subprocess.PIPE,
142 stdout=subprocess.PIPE,)
143 proc.stdin.write( 'set node ' + node.name + \
144 ' properties += el6\n' )
145 proc.stdin.write( 'print node ' + node.name + \
146 ' properties\n' )
147 out = proc.communicate()[0]
148 # print out
149 if "mc" in node.properties :
150 proc = subprocess.Popen(['/usr/bin/qmgr'],
151 stdin=subprocess.PIPE,
152 stdout=subprocess.PIPE,)
153 proc.stdin.write( 'set node ' + node.name + \
154 ' properties -= mc\n' )
155 proc.stdin.write( 'print node ' + node.name + \
156 ' properties\n' )
157 out = proc.communicate()[0]
158 # print out
159
160 def add_to_mc_pool(node):
161 # print "node props b4:", grabbed_node.properties
162 if "el6" in node.properties :
163 proc = subprocess.Popen(['/usr/bin/qmgr'],
164 stdin=subprocess.PIPE,
165 stdout=subprocess.PIPE,)
166 proc.stdin.write( 'set node ' + node.name + \
167 ' properties -= el6\n' )
168 proc.stdin.write( 'print node ' + node.name + \
169 ' properties\n' )
170 out = proc.communicate()[0]
171
172 if "mc" not in node.properties :
173 proc = subprocess.Popen(['/usr/bin/qmgr'],
174 stdin=subprocess.PIPE,
175 stdout=subprocess.PIPE,)
176 proc.stdin.write( 'set node ' + node.name + \
177 ' properties += mc\n' )
178 proc.stdin.write( 'print node ' + node.name + \
179 ' properties\n' )
180 out = proc.communicate()[0]
181
182 import optparse
183
184 usage = "usage: %prog [-d debug_level] [-n]"
185
186 p = optparse.OptionParser(description="Monitor state of multicore pool and adjust size as needed",
187 usage=usage)
188
189 p.add_option("-A",action="store_true",dest="addall",default=False,
190 help="add all eligible nodes to multicore pool")
191
192 p.add_option("-D",action="store_true",dest="delall",default=False,
193 help="delete all nodes from multicore pool")
194
195 p.add_option("-m",action="store",dest="new_max_free",default=None,
196 help="maximum number of unused slots to tolerate in the multicore pool")
197
198 p.add_option("-l",action="store",dest="logfile",default=None,
199 help="log actions and information to LOGFILE (default stdout)")
200
201 p.add_option("-L",action="store",dest="loglevel",default="INFO",
202 help="print messages of LOGLEVEL (DEBUG, INFO, WARNING, ..., default INFO")
203
204 p.add_option("-n",action="store_true",dest="noopt",default=False,
205 help="don't do anything, just print what would have been done")
206
207 p.add_option("-i",action="store_true",dest="info",default=False,
208 help="print info on all nodes in multicore pool")
209
210 p.add_option("-f",action="store",dest="newfrac",default=None,
211 help="fraction (out of 1.0) of in-service nodes to commit to multicore pool")
212
213 p.add_option("-r",action="store_true",dest="reset_config",default=False,
214 help="reset mcfloat config to the default")
215
216 p.add_option("-R",action="store_true",dest="remove_strays_from_pool",default=False,
217 help="remove mc pool nodes that are not one of the configured candidate nodes")
218
219 p.add_option("-q",action="store_true",dest="querycfg",default=False,
220 help="print current config and exit")
221
222 import logging
223
224 opts, args = p.parse_args()
225
226 # assuming loglevel is bound to the string value obtained from the
227 # command line argument. Convert to upper case to allow the user to
228 # specify --log=DEBUG or --log=debug
229
230 numeric_level = getattr(logging, opts.loglevel.upper(), None)
231 if not isinstance(numeric_level, int):
232 raise ValueError('Invalid log level: %s' % loglevel)
233 if opts.logfile:
234 logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
235 level=numeric_level,filename=opts.logfile)
236 else:
237 logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
238 level=numeric_level)
239
240 if os.path.isfile(MCFLOAT_CONFIG_FILE):
241 pkl_file = open(MCFLOAT_CONFIG_FILE,'rb')
242 cfg = pickle.load(pkl_file)
243 pkl_file.close()
244 else:
245 cfg = mcfloat_cfg() # default values
246
247 if opts.querycfg:
248 print "Current mcfloat configuration:"
249 print cfg
250 sys.exit(0)
251
252 config_changed = False
253
254 # bundle all config changers here
255
256 if opts.newfrac:
257 cfg.capacityfrac = float(opts.newfrac)
258 if cfg.capacityfrac < 0 or cfg.capacityfrac > 1:
259 raise ValueError('Invalid capacity fraction: %s' % opts.newfrac)
260 logging.info("Raising capacity fraction to %s" % opts.newfrac)
261 config_changed = True
262
263 if opts.new_max_free:
264 cfg.maxfree = int(opts.new_max_free)
265 if cfg.maxfree < 0:
266 raise ValueError('Invalid max free slots: %s' % opts.new_max_free)
267 logging.info("Raising max free slots to %s" % opts.new_max_free)
268 config_changed = True
269
270 if opts.reset_config:
271 cfg = mcfloat_cfg() # default values
272 logging.info("Reset mcfloat config to default values")
273 config_changed = True
274
275 ## below after all config changers
276
277 if config_changed:
278 logging.info("writing out changed mcfloat config to config file")
279 if not opts.noopt:
280 pkl_file = open(MCFLOAT_CONFIG_FILE,'wb')
281 pickle.dump(cfg,pkl_file)
282 pkl_file.close()
283 sys.exit(0)
284
285 if opts.delall:
286 cfrac = 1
287 else:
288 cfrac = cfg.capacityfrac
289
290 NUM_CAR = int(cfrac * 96)
291 NUM_KNAL = int(cfrac * 18)
292 NUM_MARS = int(cfrac * 58)
293
294 # there is no check whether START + NUM < number of nodes
295 # so careful when changing START from 1
296
297 START_CAR = 25
298 START_KNAL = 1
299 START_MARS = 1
300
301 RANGE_CAR = range(START_CAR, START_CAR + NUM_CAR + 1)
302 RANGE_KNAL = range(START_KNAL, START_KNAL + NUM_KNAL + 1)
303 RANGE_MARS = range(START_MARS, START_MARS + NUM_MARS + 1)
304
305 CANDIDATE_NODES = [ 'wn-car-0%02d.farm.nikhef.nl' % (n) \
306 for n in RANGE_CAR ] + \
307 [ 'wn-knal-0%02d.farm.nikhef.nl' % (n) \
308 for n in RANGE_KNAL ] + \
309 [ 'wn-mars-0%02d.farm.nikhef.nl' % (n) \
310 for n in RANGE_MARS ]
311
312 from torque_utils import pbsnodes
313
314 wnodes = pbsnodes()
315 mcnodes = list()
316 waiting = None
317 node_tuples = None
318
319 if opts.remove_strays_from_pool:
320 for node in wnodes:
321 if node.name not in CANDIDATE_NODES and 'mc' in node.properties :
322 remove_from_mc_pool(node)
323 logging.info("%s deleted from mc node pool" % (node.name))
324 sys.exit(0)
325
326 for node in wnodes:
327 if node.name in CANDIDATE_NODES and node.state.count('offline') == 0:
328 mcnodes.append(node)
329
330 if os.path.isfile(cfg.underpop_nodes_file):
331 pkl_file = open(cfg.underpop_nodes_file,'rb')
332 nodes_too_few_jobs_last_run = pickle.load(pkl_file)
333 pkl_file.close()
334 else:
335 nodes_too_few_jobs_last_run = list()
336
337 logging.debug("nodes from pickle file, marked from last run: " + \
338 repr(nodes_too_few_jobs_last_run) )
339
340 draining_slots = 0
341 mcdedicated = list()
342 draining_nodes = 0
343
344 for node in mcnodes:
345 if 'el6' not in node.properties and 'mc' in node.properties :
346 mcdedicated.append(node)
347 draining_slots += node.freeCpu
348 if node.freeCpu > 0 : draining_nodes += 1
349
350 if opts.info:
351 waiting, node_tuples = getmcjobinfo(mcdedicated,cfg)
352 for t in node_tuples:
353 if t[2] == 0:
354 print "%28s has %2d running mc jobs, %2d empty slots" % (t[0], t[2], t[3])
355 else:
356 print "%28s has %2d running mc jobs, %2d empty slots, ratio %4.1f" % (t[0], t[2], t[3], float(t[3])/t[2])
357 sys.exit(0)
358 candidate_nodes = list()
359 for n in mcnodes:
360 if n not in mcdedicated:
361 candidate_nodes.append(n)
362
363 if opts.delall:
364 for grabbed_node in mcdedicated:
365 logging.info("%s deleted from mc node pool" % (grabbed_node.name))
366 if not opts.noopt: remove_from_mc_pool(grabbed_node)
367 sys.exit(0)
368
369 if opts.addall:
370 for grabbed_node in candidate_nodes:
371 logging.info("%s added to mc node pool" % (grabbed_node.name))
372 # candidate_nodes.remove(grabbed_node)
373 if not opts.noopt: add_to_mc_pool(grabbed_node)
374 sys.exit(0)
375
376 # check for dedicated nodes with too few jobs
377
378 nodes_with_too_few_jobs = list()
379
380 for node in mcdedicated:
381 logging.debug(node.name + " has " + repr(node.freeCpu) + " free slots")
382 if node.freeCpu > cfg.maxfreepernode:
383 nodes_with_too_few_jobs.append(node)
384
385 logging.debug("there are " + repr(len(nodes_with_too_few_jobs)) + \
386 " nodes with too few jobs")
387
388 nodes_consistently_underpopulated = list()
389 for n in nodes_with_too_few_jobs:
390 if n.name in nodes_too_few_jobs_last_run:
391 nodes_consistently_underpopulated.append(n)
392
393 undcount = len(nodes_consistently_underpopulated)
394 if undcount > 0:
395 logging.debug("there are " + repr(undcount) + \
396 " nodes with too few jobs that were also marked last run" )
397
398 import random
399
400 removed_a_node = False
401
402 if undcount > 0:
403 logging.info("nodes consistently underpopulated:")
404 for n in nodes_consistently_underpopulated:
405 logging.info(" " + n.name)
406 if undcount > 1:
407 remcount = undcount / 2 # number to remove
408 logging.info("going to remove " + repr(remcount) + " from mc pool")
409 else:
410 remcount = 1
411
412 # find out how many running mc jobs each node has
413
414 waiting, node_tuples = getmcjobinfo(nodes_consistently_underpopulated,cfg)
415
416 for node_t in node_tuples[:remcount]:
417 nname, nnode, running_mc, unused_slots = node_t
418 logging.info("dumped %d empty slots, %d mc jobs on %s" % \
419 (unused_slots, running_mc, nname) )
420 if not opts.noopt : remove_from_mc_pool(nnode)
421 nodes_with_too_few_jobs.remove(nnode)
422 nodes_consistently_underpopulated.remove(nnode)
423 removed_a_node = True
424
425 namelist = list()
426 if len(nodes_with_too_few_jobs) > 0:
427 logging.debug("placing %d nodes " % (len(nodes_with_too_few_jobs)) + \
428 "with too few jobs on list for next run:")
429
430 for n in nodes_with_too_few_jobs:
431 logging.debug(n.name)
432 namelist.append(n.name)
433 if not opts.noopt:
434 pkl_file = open(cfg.underpop_nodes_file,'wb')
435 pickle.dump(namelist,pkl_file)
436 pkl_file.close()
437 if len(nodes_with_too_few_jobs) > 0 or removed_a_node :
438 sys.exit(0)
439
440 # if we survive to this point, all nodes 'dedicated' are being efficiently
441 # used. are we draining less than the configured max?
442
443 logging.debug("There are " + repr(len(mcdedicated)) + " dedicated nodes and " + \
444 repr(draining_slots) + " unused slots")
445 if (cfg.maxfree - draining_slots) >= cfg.maxfreepernode:
446 logging.debug("%d unused slots are permitted, so %d more" % (cfg.maxfree, cfg.maxfree - draining_slots))
447 logging.debug("headroom of more than %d+ slots means we can try to grab another node" % (cfg.maxfreepernode) )
448 if draining_nodes < cfg.maxdrain :
449 # first check if there are actually any waiting jobs to run; if not makes no sense to grab a node.
450 waiting, node_tuples = getmcjobinfo(mcdedicated,cfg)
451 if not waiting:
452 logging.debug("No waiting jobs found, nothing to do")
453 sys.exit(0)
454 logging.debug("max %d node(s) with unused slots permitted, %d seen" % \
455 (cfg.maxdrain, draining_nodes))
456 logging.debug("there are also waiting jobs: try to grab another node")
457 # build a list of candidates to grab
458 logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))
459 if len(candidate_nodes) < 1:
460 logging.debug("no more nodes, bailing out, nothing more I can do")
461 sys.exit(0)
462 for nn in range(min(cfg.nodespergrab,len(candidate_nodes))):
463 # logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))
464 grabbed_node=random.choice(candidate_nodes)
465 logging.info("%s added to mc node pool" % (grabbed_node.name))
466 candidate_nodes.remove(grabbed_node)
467 if not opts.noopt: add_to_mc_pool(grabbed_node)
468 else:
469 logging.debug("There are %d nodes with unused slots (draining)" % (draining_nodes))
470 logging.debug("This equals or exceeds the configured max of %d" % (cfg.maxdrain))
471 logging.debug("Doing nothing now")
472
473 elif draining_slots > cfg.maxfree:
474
475 # find out how many running mc jobs each node has
476
477 waiting, node_tuples = getmcjobinfo(mcdedicated,cfg)
478
479 slots_to_recover = draining_slots - cfg.maxfree
480 logging.info("unused slot limit (%d) exceeded by %d " \
481 % (cfg.maxfree, slots_to_recover) + ": remove node(s) from mc pool")
482 slots_recovered = 0
483 while slots_recovered < slots_to_recover:
484 node_t = node_tuples.pop(0)
485 nname, nnode, running_mc, unused_slots = node_t
486 if unused_slots > 0:
487 logging.info("dumped %d empty slots, %d mc jobs on %s" % \
488 (unused_slots, running_mc, nname) )
489 if not opts.noopt : remove_from_mc_pool(nnode)
490 slots_recovered += unused_slots
491 else:
492 logging.debug("%d unused slots of allowed %d" % (draining_slots, cfg.maxfree))
493 logging.debug("difference is %d which is less than %d" % \
494 (cfg.maxfree - draining_slots, cfg.maxfreepernode) )
495 logging.debug("so: doing nothing now.")

Properties

Name Value
svn:executable *
svn:keywords Id URL

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28