/[pdpsoft]/nl.nikhef.ndpf.mcfloat/trunk/mcfloat
ViewVC logotype

Contents of /nl.nikhef.ndpf.mcfloat/trunk/mcfloat

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2841 - (show annotations) (download)
Mon Jan 25 18:45:02 2016 UTC (6 years, 6 months ago) by templon
File size: 16182 byte(s)
version used in production as of 25 jan 2015

1 #!/usr/bin/env python
2 from torque_utils import pbsnodes
3
4 # this config allows mcore to run up to full ATLAS allocation (07.04.2015)
5 # full allocation right now is 47% of "non-retired" capacity = knal + mars + car
6 # just configure 47% of each class.
7
8 MAXDRAIN = 32 # max num of nodes allowed to drain
9 MAXFREE = 49 # max num of free slots to tolerate
10 MAXFREEPERNODE = 3 # standard for four-core jobs
11 NODESPERGRAB = 3 # number of nodes to grab each time around
12
13 # override when lsgrid is running
14
15 # MAXFREEPERNODE = 29 # allow lsgrid 30-core jobs
16 # MAXFREE = 149 # max num of free slots to tolerate
17
18 # override when bbmri is running
19
20 # MAXFREE = 149 # max num of free slots to tolerate
21 # MAXFREEPERNODE = 9 # allow bbmri 10-core jobs
22
23 # parameterize percent capacity -- 47% is too conservative, make this variable.
24
25
26 # original car was [ 'wn-car-0%02d.farm.nikhef.nl' % (n) for n in range( 96-int(CAPACFRAC*96), 97) ]
27 # temporarily changed due to low-numbered cars offline and reserved and such.
28
29 CAPACFRAC = 0.85
30
31 NUM_CAR = int(CAPACFRAC * 96)
32 NUM_KNAL = int(CAPACFRAC * 18)
33 NUM_MARS = int(CAPACFRAC * 58)
34
35 CANDIDATE_NODES = [ 'wn-car-0%02d.farm.nikhef.nl' % (n) for n in range( 1, NUM_CAR+1) ] + \
36 [ 'wn-knal-0%02d.farm.nikhef.nl' % (n) for n in range( 1, NUM_KNAL+1) ] + \
37 [ 'wn-mars-0%02d.farm.nikhef.nl' % (n) for n in range( 1, NUM_MARS+1) ]
38
39 MCQUEUE = 'atlasmc'
40
41 # settings below appropriate for knal nodes (32 cores)
42 # ... want to keep num of draining nodes
43 # to MAXFREE / 3 since giving back a node could be expensive.
44 # it might already be running several mc jobs
45
46 # MAXDRAIN = 16 # max num of nodes allowed to drain
47 # MAXFREE = 49 # max num of free slots to tolerate
48 # MAXFREEPERNODE = 3
49 # NODESPERGRAB = 1
50 # CANDIDATE_NODES = [ 'wn-knal-0%02d.farm.nikhef.nl' % (n) for n in range(1,19) ]
51 # MCQUEUE = 'atlasmc'
52
53 # settings below more appropriate for smrt nodes (8 core)
54 # here giving a node back doesn't really matter, it wasn't
55 # running any mc jobs yet anyway. better to grab lots
56 # of nodes and give back the ones that drain the slowest if
57 # too many slots are free.
58
59 # MAXDRAIN = 16 # max num of nodes allowed to drain
60 # MAXFREE = 49 # max num of free slots to tolerate
61 # CANDIDATE_NODES = [ 'wn-smrt-0%02d.farm.nikhef.nl' % (n) for n in range(1,73) ]
62
63 UNDERPOP_NODES_FILE = ".nodes_with_too_few_jobs"
64 TORQUE = "stro.nikhef.nl"
65
66 import os
67 import pickle
68 import sys
69 import subprocess
70
71 def getmcjobinfo(nodes):
72
73 # input is the list of nodes for which to get info
74 # function returns tuple (jobs_waiting, node_info)
75 # jobs_waiting boolean value : are there MC jobs waiting to be run (state Q)? (independent of nodes!)
76 # node_info: list of tuples (wn_name, wn, num_running_mc, num_free_slots)
77 # where wn is the node object for the machine with name wn_name, num_running_mc is the number of
78 # mc jobs running on that node, and num_free_slots is the number of unused job slots on that node
79
80 usedkeys = ['egroup', 'jobid', 'queue', 'job_state', 'euser', 'exec_host' ]
81
82 import torqueJobs
83 import torqueAttMappers as tam
84
85 import tempfile
86 qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp")
87
88 import time
89 os.system('/usr/bin/qstat -f @' + TORQUE + ' > ' + qsfname)
90 now = time.mktime(time.localtime())
91
92 jlist = torqueJobs.qs_parsefile(qsfname)
93
94 os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt')
95
96 def mapatts(indict,inkeys):
97 sdict = tam.sub_dict(indict,inkeys)
98 odict = dict()
99 for k in sdict.keys():
100 if k in tam.tfl and sdict[k]:
101 secs = tam.hms(sdict[k])
102 sdict[k] = secs/3600.
103 if k in tam.mfl and sdict[k]:
104 mebi = tam.memconvert(sdict[k])
105 sdict[k] = mebi
106 if k in tam.tfl2 and sdict[k]:
107 secs = tam.tconv(sdict[k])
108 sdict[k] = secs
109 elif k == 'job_state':
110 statelett = sdict[k]
111 if statelett in ['Q','W']:
112 sdict[k] = 'queued'
113 elif statelett in ['R','E']:
114 sdict[k] = 'running'
115 elif k == 'exec_host' and sdict[k]:
116 termpos = sdict[k].find('/')
117 wnstring = sdict[k][:termpos]
118 sdict[k] = wnstring
119 if sdict[k]:
120 odict[k] = sdict[k]
121 return odict
122
123 mcjoblist = list()
124 waitingJobs = False
125 for j in jlist:
126 if j['queue'] == MCQUEUE:
127 newj = mapatts(j,usedkeys)
128 mcjoblist.append(newj)
129 if newj['job_state'] == 'queued':
130 waitingJobs = True
131
132 # find out how many running mc jobs each node has
133
134 running_mc = dict()
135 for n in nodes:
136 running_mc[n.name] = 0
137 for j in mcjoblist:
138 if j['job_state'] =='running':
139 wn = j['exec_host']
140 if wn in running_mc.keys():
141 running_mc[wn] += 1
142 rawlist = list()
143 for n in nodes:
144 rawlist.append( (n.name, n, running_mc[n.name], n.freeCpu) )
145 from operator import itemgetter
146 def srtrunningfree(item):
147 # sort optimal for dropping nodes; for nodes with zero running mc jobs, want to
148 # drop least-drained nodes (fewer free slots first in list); for nodes with
149 # running mc jobs, want to shed as few nodes as possible so pick the more-drained
150 # nodes first. this probably also works for shedding nodes when queue dries up.
151 mcrun = item[2]
152 emptyslots = item[3]
153 if mcrun == 0:
154 rank = emptyslots
155 else:
156 # the original "optimal" order: rank = 32*mcrun - emptyslots # 32 just a number bigger than expected emptyslots value
157 rank = 7168 - 224*float(emptyslots)/mcrun + mcrun # constants 7168 and 224 only serve to enable a multi-level sort.
158 return rank
159 slist = sorted(rawlist, key=srtrunningfree)
160
161 return waitingJobs, slist
162
163 def remove_from_mc_pool(node):
164 # print "adding el6 tag back to node", node.name
165 if "el6" not in node.properties :
166 proc = subprocess.Popen(['/usr/bin/qmgr'],
167 stdin=subprocess.PIPE,
168 stdout=subprocess.PIPE,)
169 proc.stdin.write( 'set node ' + node.name + \
170 ' properties += el6\n' )
171 proc.stdin.write( 'print node ' + node.name + \
172 ' properties\n' )
173 out = proc.communicate()[0]
174 # print out
175 if "mc" in node.properties :
176 proc = subprocess.Popen(['/usr/bin/qmgr'],
177 stdin=subprocess.PIPE,
178 stdout=subprocess.PIPE,)
179 proc.stdin.write( 'set node ' + node.name + \
180 ' properties -= mc\n' )
181 proc.stdin.write( 'print node ' + node.name + \
182 ' properties\n' )
183 out = proc.communicate()[0]
184 # print out
185
186 def add_to_mc_pool(node):
187 # print "node props b4:", grabbed_node.properties
188 if "el6" in node.properties :
189 proc = subprocess.Popen(['/usr/bin/qmgr'],
190 stdin=subprocess.PIPE,
191 stdout=subprocess.PIPE,)
192 proc.stdin.write( 'set node ' + node.name + \
193 ' properties -= el6\n' )
194 proc.stdin.write( 'print node ' + node.name + \
195 ' properties\n' )
196 out = proc.communicate()[0]
197
198 if "mc" not in node.properties :
199 proc = subprocess.Popen(['/usr/bin/qmgr'],
200 stdin=subprocess.PIPE,
201 stdout=subprocess.PIPE,)
202 proc.stdin.write( 'set node ' + node.name + \
203 ' properties += mc\n' )
204 proc.stdin.write( 'print node ' + node.name + \
205 ' properties\n' )
206 out = proc.communicate()[0]
207
208 import optparse
209
210 usage = "usage: %prog [-d debug_level] [-n]"
211
212 p = optparse.OptionParser(description="Monitor state of multicore pool and adjust size as needed",
213 usage=usage)
214
215 p.add_option("-A",action="store_true",dest="addall",default=False,
216 help="add all eligible nodes to multicore pool")
217 p.add_option("-l",action="store",dest="logfile",default=None,
218 help="log actions and information to LOGFILE (default stdout)")
219 p.add_option("-L",action="store",dest="loglevel",default="INFO",
220 help="print messages of LOGLEVEL (DEBUG, INFO, WARNING, ..., default INFO")
221 p.add_option("-n",action="store_true",dest="noopt",default=False,
222 help="don't do anything, just print what would have been done")
223 p.add_option("-i",action="store_true",dest="info",default=False,
224 help="print info on all nodes in multicore pool")
225
226 import logging
227
228 opts, args = p.parse_args()
229
230 # assuming loglevel is bound to the string value obtained from the
231 # command line argument. Convert to upper case to allow the user to
232 # specify --log=DEBUG or --log=debug
233
234 numeric_level = getattr(logging, opts.loglevel.upper(), None)
235 if not isinstance(numeric_level, int):
236 raise ValueError('Invalid log level: %s' % loglevel)
237 if opts.logfile:
238 logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
239 level=numeric_level,filename=opts.logfile)
240 else:
241 logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
242 level=numeric_level)
243
244 if os.path.isfile(UNDERPOP_NODES_FILE):
245 pkl_file = open(UNDERPOP_NODES_FILE,'rb')
246 nodes_too_few_jobs_last_run = pickle.load(pkl_file)
247 pkl_file.close()
248 else:
249 nodes_too_few_jobs_last_run = list()
250
251 logging.debug("nodes from pickle file, marked from last run: " + \
252 repr(nodes_too_few_jobs_last_run) )
253
254 wnodes = pbsnodes()
255 mcnodes = list()
256 waiting = None
257 node_tuples = None
258
259 for node in wnodes:
260 if node.name in CANDIDATE_NODES and node.state.count('offline') == 0:
261 mcnodes.append(node)
262
263 draining_slots = 0
264 mcdedicated = list()
265 draining_nodes = 0
266
267 for node in mcnodes:
268 if 'el6' not in node.properties and 'mc' in node.properties :
269 mcdedicated.append(node)
270 draining_slots += node.freeCpu
271 if node.freeCpu > 0 : draining_nodes += 1
272
273 if opts.info:
274 waiting, node_tuples = getmcjobinfo(mcdedicated)
275 for t in node_tuples:
276 if t[2] == 0:
277 print "%28s has %2d running mc jobs, %2d empty slots" % (t[0], t[2], t[3])
278 else:
279 print "%28s has %2d running mc jobs, %2d empty slots, ratio %4.1f" % (t[0], t[2], t[3], float(t[3])/t[2])
280 sys.exit(0)
281 candidate_nodes = list()
282 for n in mcnodes:
283 if n not in mcdedicated:
284 candidate_nodes.append(n)
285
286 if opts.addall:
287 for grabbed_node in candidate_nodes:
288 logging.info("%s added to mc node pool" % (grabbed_node.name))
289 # candidate_nodes.remove(grabbed_node)
290 if not opts.noopt: add_to_mc_pool(grabbed_node)
291 sys.exit(0)
292
293 # check for dedicated nodes with too few jobs
294
295 nodes_with_too_few_jobs = list()
296
297 for node in mcdedicated:
298 logging.debug(node.name + " has " + repr(node.freeCpu) + " free slots")
299 if node.freeCpu > MAXFREEPERNODE:
300 nodes_with_too_few_jobs.append(node)
301
302 logging.debug("there are " + repr(len(nodes_with_too_few_jobs)) + \
303 " nodes with too few jobs")
304
305 nodes_consistently_underpopulated = list()
306 for n in nodes_with_too_few_jobs:
307 if n.name in nodes_too_few_jobs_last_run:
308 nodes_consistently_underpopulated.append(n)
309
310 undcount = len(nodes_consistently_underpopulated)
311 if undcount > 0:
312 logging.debug("there are " + repr(undcount) + \
313 " nodes with too few jobs that were also marked last run" )
314
315 import random
316
317 removed_a_node = False
318
319 if undcount > 0:
320 logging.info("nodes consistently underpopulated:")
321 for n in nodes_consistently_underpopulated:
322 logging.info(" " + n.name)
323 if undcount > 1:
324 remcount = undcount / 2 # number to remove
325 logging.info("going to remove " + repr(remcount) + " from mc pool")
326 else:
327 remcount = 1
328
329 # find out how many running mc jobs each node has
330
331 waiting, node_tuples = getmcjobinfo(nodes_consistently_underpopulated)
332
333 for node_t in node_tuples[:remcount]:
334 nname, nnode, running_mc, unused_slots = node_t
335 logging.info("dumped %d empty slots, %d mc jobs on %s" % \
336 (unused_slots, running_mc, nname) )
337 if not opts.noopt : remove_from_mc_pool(nnode)
338 nodes_with_too_few_jobs.remove(nnode)
339 nodes_consistently_underpopulated.remove(nnode)
340 removed_a_node = True
341
342 namelist = list()
343 if len(nodes_with_too_few_jobs) > 0:
344 logging.debug("placing %d nodes " % (len(nodes_with_too_few_jobs)) + \
345 "with too few jobs on list for next run:")
346
347 for n in nodes_with_too_few_jobs:
348 logging.debug(n.name)
349 namelist.append(n.name)
350 if not opts.noopt:
351 pkl_file = open(UNDERPOP_NODES_FILE,'wb')
352 pickle.dump(namelist,pkl_file)
353 pkl_file.close()
354 if len(nodes_with_too_few_jobs) > 0 or removed_a_node :
355 sys.exit(0)
356
357 # if we survive to this point, all nodes 'dedicated' are being efficiently
358 # used. are we draining less than the configured max?
359
360 logging.debug("There are " + repr(len(mcdedicated)) + " dedicated nodes and " + \
361 repr(draining_slots) + " unused slots")
362 if (MAXFREE - draining_slots) >= MAXFREEPERNODE:
363 logging.debug("%d unused slots are permitted, so %d more" % (MAXFREE, MAXFREE - draining_slots))
364 logging.debug("headroom of more than %d+ slots means we can try to grab another node" % (MAXFREEPERNODE) )
365 if draining_nodes < MAXDRAIN :
366 # first check if there are actually any waiting jobs to run; if not makes no sense to grab a node.
367 waiting, node_tuples = getmcjobinfo(mcdedicated)
368 if not waiting:
369 logging.debug("No waiting jobs found, nothing to do")
370 sys.exit(0)
371 logging.debug("max %d node(s) with unused slots permitted, %d seen" % \
372 (MAXDRAIN, draining_nodes))
373 logging.debug("there are also waiting jobs: try to grab another node")
374 # build a list of candidates to grab
375 logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))
376 if len(candidate_nodes) < 1:
377 logging.debug("no more nodes, bailing out, nothing more I can do")
378 sys.exit(0)
379 for nn in range(min(NODESPERGRAB,len(candidate_nodes))):
380 # logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))
381 grabbed_node=random.choice(candidate_nodes)
382 logging.info("%s added to mc node pool" % (grabbed_node.name))
383 candidate_nodes.remove(grabbed_node)
384 if not opts.noopt: add_to_mc_pool(grabbed_node)
385 else:
386 logging.debug("There are %d nodes with unused slots (draining)" % (draining_nodes))
387 logging.debug("This equals or exceeds the configured max of %d" % (MAXDRAIN))
388 logging.debug("Doing nothing now")
389
390 elif draining_slots > MAXFREE:
391
392 # find out how many running mc jobs each node has
393
394 waiting, node_tuples = getmcjobinfo(mcdedicated)
395
396 slots_to_recover = draining_slots - MAXFREE
397 logging.info("unused slot limit (%d) exceeded by %d " \
398 % (MAXFREE, slots_to_recover) + ": remove node(s) from mc pool")
399 slots_recovered = 0
400 while slots_recovered < slots_to_recover:
401 node_t = node_tuples.pop(0)
402 nname, nnode, running_mc, unused_slots = node_t
403 if unused_slots > 0:
404 logging.info("dumped %d empty slots, %d mc jobs on %s" % \
405 (unused_slots, running_mc, nname) )
406 if not opts.noopt : remove_from_mc_pool(nnode)
407 slots_recovered += unused_slots
408 else:
409 logging.debug("%d unused slots of allowed %d" % (draining_slots, MAXFREE))
410 logging.debug("difference is %d which is less than %d" % \
411 (MAXFREE - draining_slots, MAXFREEPERNODE) )
412 logging.debug("so: doing nothing now.")

Properties

Name Value
svn:executable *

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28