/[pdpsoft]/nl.nikhef.ndpf.mcfloat/trunk/mcfloat
ViewVC logotype

Contents of /nl.nikhef.ndpf.mcfloat/trunk/mcfloat

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2842 - (show annotations) (download)
Mon Jan 25 19:59:44 2016 UTC (6 years, 8 months ago) by templon
File size: 16466 byte(s)
new version that allows parameters to be set via command line; avoid edits

1 #!/usr/bin/env python
2
3 # Class to hold parameters:
4
5 class mcfloat_cfg:
6 def __init__(self): # default values; adjust for your site
7 self.maxdrain = 32
8 self.maxfree = 49
9 self.maxfreepernode = 3
10 self.nodespergrab = 3
11 self.capacityfrac = 0.47
12 self.mcqueue = 'atlasmc'
13 self.underpop_nodes_file = "/var/db/nodes_with_too_few_jobs"
14 self.torque = "stro.nikhef.nl"
15
16 MCFLOAT_CONFIG_FILE = "/var/db/mcfloat_config"
17
18 # the above config allows mcore to run up to full ATLAS allocation (07.04.2015)
19 # full allocation right now is 47% of "non-retired" capacity = knal + mars + car
20 # just configure 47% of each class.
21
22 # override when lsgrid is running (legacy comments, might be useful)
23
24 # MAXFREEPERNODE = 29 # allow lsgrid 30-core jobs
25 # MAXFREE = 149 # max num of free slots to tolerate
26
27 # override when bbmri is running
28
29 # MAXFREE = 149 # max num of free slots to tolerate
30 # MAXFREEPERNODE = 9 # allow bbmri 10-core jobs
31
32 import os
33 import pickle
34 import sys
35 import subprocess
36
37 def getmcjobinfo(nodes, cfg):
38
39 # input is the list of nodes for which to get info
40 # function returns tuple (jobs_waiting, node_info)
41 # jobs_waiting boolean value : are there MC jobs waiting to be run (state Q)? (independent of nodes!)
42 # node_info: list of tuples (wn_name, wn, num_running_mc, num_free_slots)
43 # where wn is the node object for the machine with name wn_name, num_running_mc is the number of
44 # mc jobs running on that node, and num_free_slots is the number of unused job slots on that node
45
46 usedkeys = ['egroup', 'jobid', 'queue', 'job_state', 'euser', 'exec_host' ]
47
48 import torqueJobs
49 import torqueAttMappers as tam
50
51 import tempfile
52 qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp")
53
54 import time
55 os.system('/usr/bin/qstat -f @' + cfg.torque + ' > ' + qsfname)
56 now = time.mktime(time.localtime())
57
58 jlist = torqueJobs.qs_parsefile(qsfname)
59
60 os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt')
61
62 def mapatts(indict,inkeys):
63 sdict = tam.sub_dict(indict,inkeys)
64 odict = dict()
65 for k in sdict.keys():
66 if k in tam.tfl and sdict[k]:
67 secs = tam.hms(sdict[k])
68 sdict[k] = secs/3600.
69 if k in tam.mfl and sdict[k]:
70 mebi = tam.memconvert(sdict[k])
71 sdict[k] = mebi
72 if k in tam.tfl2 and sdict[k]:
73 secs = tam.tconv(sdict[k])
74 sdict[k] = secs
75 elif k == 'job_state':
76 statelett = sdict[k]
77 if statelett in ['Q','W']:
78 sdict[k] = 'queued'
79 elif statelett in ['R','E']:
80 sdict[k] = 'running'
81 elif k == 'exec_host' and sdict[k]:
82 termpos = sdict[k].find('/')
83 wnstring = sdict[k][:termpos]
84 sdict[k] = wnstring
85 if sdict[k]:
86 odict[k] = sdict[k]
87 return odict
88
89 mcjoblist = list()
90 waitingJobs = False
91 for j in jlist:
92 if j['queue'] == cfg.mcqueue:
93 newj = mapatts(j,usedkeys)
94 mcjoblist.append(newj)
95 if newj['job_state'] == 'queued':
96 waitingJobs = True
97
98 # find out how many running mc jobs each node has
99
100 running_mc = dict()
101 for n in nodes:
102 running_mc[n.name] = 0
103 for j in mcjoblist:
104 if j['job_state'] =='running':
105 wn = j['exec_host']
106 if wn in running_mc.keys():
107 running_mc[wn] += 1
108 rawlist = list()
109 for n in nodes:
110 rawlist.append( (n.name, n, running_mc[n.name], n.freeCpu) )
111 from operator import itemgetter
112 def srtrunningfree(item):
113 # sort optimal for dropping nodes; for nodes with zero running mc jobs, want to
114 # drop least-drained nodes (fewer free slots first in list); for nodes with
115 # running mc jobs, want to shed as few nodes as possible so pick the more-drained
116 # nodes first. this probably also works for shedding nodes when queue dries up.
117 mcrun = item[2]
118 emptyslots = item[3]
119 if mcrun == 0:
120 rank = emptyslots
121 else:
122 # the original "optimal" order: rank = 32*mcrun - emptyslots # 32 just a number bigger than expected emptyslots value
123 rank = 7168 - 224*float(emptyslots)/mcrun + mcrun # constants 7168 and 224 only serve to enable a multi-level sort.
124 return rank
125 slist = sorted(rawlist, key=srtrunningfree)
126
127 return waitingJobs, slist
128
129 def remove_from_mc_pool(node):
130 # print "adding el6 tag back to node", node.name
131 if "el6" not in node.properties :
132 proc = subprocess.Popen(['/usr/bin/qmgr'],
133 stdin=subprocess.PIPE,
134 stdout=subprocess.PIPE,)
135 proc.stdin.write( 'set node ' + node.name + \
136 ' properties += el6\n' )
137 proc.stdin.write( 'print node ' + node.name + \
138 ' properties\n' )
139 out = proc.communicate()[0]
140 # print out
141 if "mc" in node.properties :
142 proc = subprocess.Popen(['/usr/bin/qmgr'],
143 stdin=subprocess.PIPE,
144 stdout=subprocess.PIPE,)
145 proc.stdin.write( 'set node ' + node.name + \
146 ' properties -= mc\n' )
147 proc.stdin.write( 'print node ' + node.name + \
148 ' properties\n' )
149 out = proc.communicate()[0]
150 # print out
151
152 def add_to_mc_pool(node):
153 # print "node props b4:", grabbed_node.properties
154 if "el6" in node.properties :
155 proc = subprocess.Popen(['/usr/bin/qmgr'],
156 stdin=subprocess.PIPE,
157 stdout=subprocess.PIPE,)
158 proc.stdin.write( 'set node ' + node.name + \
159 ' properties -= el6\n' )
160 proc.stdin.write( 'print node ' + node.name + \
161 ' properties\n' )
162 out = proc.communicate()[0]
163
164 if "mc" not in node.properties :
165 proc = subprocess.Popen(['/usr/bin/qmgr'],
166 stdin=subprocess.PIPE,
167 stdout=subprocess.PIPE,)
168 proc.stdin.write( 'set node ' + node.name + \
169 ' properties += mc\n' )
170 proc.stdin.write( 'print node ' + node.name + \
171 ' properties\n' )
172 out = proc.communicate()[0]
173
174 import optparse
175
176 usage = "usage: %prog [-d debug_level] [-n]"
177
178 p = optparse.OptionParser(description="Monitor state of multicore pool and adjust size as needed",
179 usage=usage)
180
181 p.add_option("-A",action="store_true",dest="addall",default=False,
182 help="add all eligible nodes to multicore pool")
183 p.add_option("-D",action="store_true",dest="delall",default=False,
184 help="delete all nodes from multicore pool")
185 p.add_option("-l",action="store",dest="logfile",default=None,
186 help="log actions and information to LOGFILE (default stdout)")
187 p.add_option("-L",action="store",dest="loglevel",default="INFO",
188 help="print messages of LOGLEVEL (DEBUG, INFO, WARNING, ..., default INFO")
189 p.add_option("-n",action="store_true",dest="noopt",default=False,
190 help="don't do anything, just print what would have been done")
191 p.add_option("-i",action="store_true",dest="info",default=False,
192 help="print info on all nodes in multicore pool")
193 p.add_option("-f",action="store",dest="newfrac",default=None,
194 help="fraction (out of 1.0) of in-service nodes to commit to multicore pool")
195
196 import logging
197
198 opts, args = p.parse_args()
199
200 # assuming loglevel is bound to the string value obtained from the
201 # command line argument. Convert to upper case to allow the user to
202 # specify --log=DEBUG or --log=debug
203
204 numeric_level = getattr(logging, opts.loglevel.upper(), None)
205 if not isinstance(numeric_level, int):
206 raise ValueError('Invalid log level: %s' % loglevel)
207 if opts.logfile:
208 logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
209 level=numeric_level,filename=opts.logfile)
210 else:
211 logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
212 level=numeric_level)
213
214 if os.path.isfile(MCFLOAT_CONFIG_FILE):
215 pkl_file = open(MCFLOAT_CONFIG_FILE,'rb')
216 cfg = pickle.load(pkl_file)
217 pkl_file.close()
218 else:
219 cfg = mcfloat_cfg() # default values
220
221 config_changed = False
222
223 # bundle all config changers here
224
225 if opts.newfrac:
226 cfg.capacityfrac = float(opts.newfrac)
227 if cfg.capacityfrac < 0 or cfg.capacityfrac > 1:
228 raise ValueError('Invalid capacity fraction: %s' % opts.newfrac)
229 config_changed = True
230
231 ## below after all config changers
232
233 if config_changed:
234 logging.info("writing out changed mcfloat config to config file")
235 if not opts.noopt:
236 pkl_file = open(MCFLOAT_CONFIG_FILE,'wb')
237 pickle.dump(cfg,pkl_file)
238 pkl_file.close()
239 sys.exit(0)
240
241 if opts.delall:
242 cfrac = 1
243 else:
244 cfrac = cfg.capacityfrac
245
246 NUM_CAR = int(cfrac * 96)
247 NUM_KNAL = int(cfrac * 18)
248 NUM_MARS = int(cfrac * 58)
249
250 CANDIDATE_NODES = [ 'wn-car-0%02d.farm.nikhef.nl' % (n) for n in range( 1, NUM_CAR+1) ] + \
251 [ 'wn-knal-0%02d.farm.nikhef.nl' % (n) for n in range( 1, NUM_KNAL+1) ] + \
252 [ 'wn-mars-0%02d.farm.nikhef.nl' % (n) for n in range( 1, NUM_MARS+1) ]
253
254 if os.path.isfile(cfg.underpop_nodes_file):
255 pkl_file = open(cfg.underpop_nodes_file,'rb')
256 nodes_too_few_jobs_last_run = pickle.load(pkl_file)
257 pkl_file.close()
258 else:
259 nodes_too_few_jobs_last_run = list()
260
261 logging.debug("nodes from pickle file, marked from last run: " + \
262 repr(nodes_too_few_jobs_last_run) )
263
264 from torque_utils import pbsnodes
265
266 wnodes = pbsnodes()
267 mcnodes = list()
268 waiting = None
269 node_tuples = None
270
271 for node in wnodes:
272 if node.name in CANDIDATE_NODES and node.state.count('offline') == 0:
273 mcnodes.append(node)
274
275 draining_slots = 0
276 mcdedicated = list()
277 draining_nodes = 0
278
279 for node in mcnodes:
280 if 'el6' not in node.properties and 'mc' in node.properties :
281 mcdedicated.append(node)
282 draining_slots += node.freeCpu
283 if node.freeCpu > 0 : draining_nodes += 1
284
285 if opts.info:
286 waiting, node_tuples = getmcjobinfo(mcdedicated,cfg)
287 for t in node_tuples:
288 if t[2] == 0:
289 print "%28s has %2d running mc jobs, %2d empty slots" % (t[0], t[2], t[3])
290 else:
291 print "%28s has %2d running mc jobs, %2d empty slots, ratio %4.1f" % (t[0], t[2], t[3], float(t[3])/t[2])
292 sys.exit(0)
293 candidate_nodes = list()
294 for n in mcnodes:
295 if n not in mcdedicated:
296 candidate_nodes.append(n)
297
298 if opts.delall:
299 for grabbed_node in mcdedicated:
300 logging.info("%s deleted from mc node pool" % (grabbed_node.name))
301 if not opts.noopt: remove_from_mc_pool(grabbed_node)
302 sys.exit(0)
303
304 if opts.addall:
305 for grabbed_node in candidate_nodes:
306 logging.info("%s added to mc node pool" % (grabbed_node.name))
307 # candidate_nodes.remove(grabbed_node)
308 if not opts.noopt: add_to_mc_pool(grabbed_node)
309 sys.exit(0)
310
311 # check for dedicated nodes with too few jobs
312
313 nodes_with_too_few_jobs = list()
314
315 for node in mcdedicated:
316 logging.debug(node.name + " has " + repr(node.freeCpu) + " free slots")
317 if node.freeCpu > cfg.maxfreepernode:
318 nodes_with_too_few_jobs.append(node)
319
320 logging.debug("there are " + repr(len(nodes_with_too_few_jobs)) + \
321 " nodes with too few jobs")
322
323 nodes_consistently_underpopulated = list()
324 for n in nodes_with_too_few_jobs:
325 if n.name in nodes_too_few_jobs_last_run:
326 nodes_consistently_underpopulated.append(n)
327
328 undcount = len(nodes_consistently_underpopulated)
329 if undcount > 0:
330 logging.debug("there are " + repr(undcount) + \
331 " nodes with too few jobs that were also marked last run" )
332
333 import random
334
335 removed_a_node = False
336
337 if undcount > 0:
338 logging.info("nodes consistently underpopulated:")
339 for n in nodes_consistently_underpopulated:
340 logging.info(" " + n.name)
341 if undcount > 1:
342 remcount = undcount / 2 # number to remove
343 logging.info("going to remove " + repr(remcount) + " from mc pool")
344 else:
345 remcount = 1
346
347 # find out how many running mc jobs each node has
348
349 waiting, node_tuples = getmcjobinfo(nodes_consistently_underpopulated,cfg)
350
351 for node_t in node_tuples[:remcount]:
352 nname, nnode, running_mc, unused_slots = node_t
353 logging.info("dumped %d empty slots, %d mc jobs on %s" % \
354 (unused_slots, running_mc, nname) )
355 if not opts.noopt : remove_from_mc_pool(nnode)
356 nodes_with_too_few_jobs.remove(nnode)
357 nodes_consistently_underpopulated.remove(nnode)
358 removed_a_node = True
359
360 namelist = list()
361 if len(nodes_with_too_few_jobs) > 0:
362 logging.debug("placing %d nodes " % (len(nodes_with_too_few_jobs)) + \
363 "with too few jobs on list for next run:")
364
365 for n in nodes_with_too_few_jobs:
366 logging.debug(n.name)
367 namelist.append(n.name)
368 if not opts.noopt:
369 pkl_file = open(cfg.underpop_nodes_file,'wb')
370 pickle.dump(namelist,pkl_file)
371 pkl_file.close()
372 if len(nodes_with_too_few_jobs) > 0 or removed_a_node :
373 sys.exit(0)
374
375 # if we survive to this point, all nodes 'dedicated' are being efficiently
376 # used. are we draining less than the configured max?
377
378 logging.debug("There are " + repr(len(mcdedicated)) + " dedicated nodes and " + \
379 repr(draining_slots) + " unused slots")
380 if (cfg.maxfree - draining_slots) >= cfg.maxfreepernode:
381 logging.debug("%d unused slots are permitted, so %d more" % (cfg.maxfree, cfg.maxfree - draining_slots))
382 logging.debug("headroom of more than %d+ slots means we can try to grab another node" % (cfg.maxfreepernode) )
383 if draining_nodes < cfg.maxdrain :
384 # first check if there are actually any waiting jobs to run; if not makes no sense to grab a node.
385 waiting, node_tuples = getmcjobinfo(mcdedicated,cfg)
386 if not waiting:
387 logging.debug("No waiting jobs found, nothing to do")
388 sys.exit(0)
389 logging.debug("max %d node(s) with unused slots permitted, %d seen" % \
390 (cfg.maxdrain, draining_nodes))
391 logging.debug("there are also waiting jobs: try to grab another node")
392 # build a list of candidates to grab
393 logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))
394 if len(candidate_nodes) < 1:
395 logging.debug("no more nodes, bailing out, nothing more I can do")
396 sys.exit(0)
397 for nn in range(min(cfg.nodespergrab,len(candidate_nodes))):
398 # logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))
399 grabbed_node=random.choice(candidate_nodes)
400 logging.info("%s added to mc node pool" % (grabbed_node.name))
401 candidate_nodes.remove(grabbed_node)
402 if not opts.noopt: add_to_mc_pool(grabbed_node)
403 else:
404 logging.debug("There are %d nodes with unused slots (draining)" % (draining_nodes))
405 logging.debug("This equals or exceeds the configured max of %d" % (cfg.maxdrain))
406 logging.debug("Doing nothing now")
407
408 elif draining_slots > cfg.maxfree:
409
410 # find out how many running mc jobs each node has
411
412 waiting, node_tuples = getmcjobinfo(mcdedicated,cfg)
413
414 slots_to_recover = draining_slots - cfg.MAXFREE
415 logging.info("unused slot limit (%d) exceeded by %d " \
416 % (cfg.maxfree, slots_to_recover) + ": remove node(s) from mc pool")
417 slots_recovered = 0
418 while slots_recovered < slots_to_recover:
419 node_t = node_tuples.pop(0)
420 nname, nnode, running_mc, unused_slots = node_t
421 if unused_slots > 0:
422 logging.info("dumped %d empty slots, %d mc jobs on %s" % \
423 (unused_slots, running_mc, nname) )
424 if not opts.noopt : remove_from_mc_pool(nnode)
425 slots_recovered += unused_slots
426 else:
427 logging.debug("%d unused slots of allowed %d" % (draining_slots, cfg.maxfree))
428 logging.debug("difference is %d which is less than %d" % \
429 (cfg.maxfree - draining_slots, cfg.maxfreepernode) )
430 logging.debug("so: doing nothing now.")

Properties

Name Value
svn:executable *

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28