/[pdpsoft]/nl.nikhef.ndpf.mcfloat/trunk/mcfloat
ViewVC logotype

Contents of /nl.nikhef.ndpf.mcfloat/trunk/mcfloat

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2785 - (show annotations) (download)
Wed May 13 13:32:01 2015 UTC (7 years, 4 months ago) by templon
File size: 15432 byte(s)
this is the version that was in production from March 2015
until May 13 2015

1 #!/usr/bin/env python
2 from torque_utils import pbsnodes
3
4 # this config allows mcore to run up to full ATLAS allocation (07.04.2015)
5 # full allocation right now is 47% of "non-retired" capacity = knal + mars + car
6 # just configure 47% of each class.
7
8 MAXDRAIN = 32 # max num of nodes allowed to drain
9 MAXFREE = 49 # max num of free slots to tolerate
10 MAXFREEPERNODE = 3
11 NODESPERGRAB = 3 # number of nodes to grab each time around
12
13 # parameterize percent capacity -- 47% is too conservative, make this variable.
14
15 CAPACFRAC = 0.60
16 CANDIDATE_NODES = [ 'wn-car-0%02d.farm.nikhef.nl' % (n) for n in range(1, int(CAPACFRAC*96)+1) ] + \
17 [ 'wn-knal-0%02d.farm.nikhef.nl' % (n) for n in range(1,int(CAPACFRAC*18)+1) ] + \
18 [ 'wn-mars-0%02d.farm.nikhef.nl' % (n) for n in range(20,20+int(CAPACFRAC*57)+1) ]
19
20 MCQUEUE = 'atlasmc'
21
22 # settings below appropriate for knal nodes (32 cores)
23 # ... want to keep num of draining nodes
24 # to MAXFREE / 3 since giving back a node could be expensive.
25 # it might already be running several mc jobs
26
27 # MAXDRAIN = 16 # max num of nodes allowed to drain
28 # MAXFREE = 49 # max num of free slots to tolerate
29 # MAXFREEPERNODE = 3
30 # NODESPERGRAB = 1
31 # CANDIDATE_NODES = [ 'wn-knal-0%02d.farm.nikhef.nl' % (n) for n in range(1,19) ]
32 # MCQUEUE = 'atlasmc'
33
34 # settings below more appropriate for smrt nodes (8 core)
35 # here giving a node back doesn't really matter, it wasn't
36 # running any mc jobs yet anyway. better to grab lots
37 # of nodes and give back the ones that drain the slowest if
38 # too many slots are free.
39
40 # MAXDRAIN = 16 # max num of nodes allowed to drain
41 # MAXFREE = 49 # max num of free slots to tolerate
42 # CANDIDATE_NODES = [ 'wn-smrt-0%02d.farm.nikhef.nl' % (n) for n in range(1,73) ]
43
44 UNDERPOP_NODES_FILE = ".nodes_with_too_few_jobs"
45 TORQUE = "stro.nikhef.nl"
46
47 import os
48 import pickle
49 import sys
50 import subprocess
51
52 def getmcjobinfo(nodes):
53
54 # input is the list of nodes for which to get info
55 # function returns tuple (jobs_waiting, node_info)
56 # jobs_waiting boolean value : are there MC jobs waiting to be run (state Q)? (independent of nodes!)
57 # node_info: list of tuples (wn_name, wn, num_running_mc, num_free_slots)
58 # where wn is the node object for the machine with name wn_name, num_running_mc is the number of
59 # mc jobs running on that node, and num_free_slots is the number of unused job slots on that node
60
61 usedkeys = ['egroup', 'jobid', 'queue', 'job_state', 'euser', 'exec_host' ]
62
63 import torqueJobs
64 import torqueAttMappers as tam
65
66 import tempfile
67 qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp")
68
69 import time
70 os.system('/usr/bin/qstat -f @' + TORQUE + ' > ' + qsfname)
71 now = time.mktime(time.localtime())
72
73 jlist = torqueJobs.qs_parsefile(qsfname)
74
75 os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt')
76
77 def mapatts(indict,inkeys):
78 sdict = tam.sub_dict(indict,inkeys)
79 odict = dict()
80 for k in sdict.keys():
81 if k in tam.tfl and sdict[k]:
82 secs = tam.hms(sdict[k])
83 sdict[k] = secs/3600.
84 if k in tam.mfl and sdict[k]:
85 mebi = tam.memconvert(sdict[k])
86 sdict[k] = mebi
87 if k in tam.tfl2 and sdict[k]:
88 secs = tam.tconv(sdict[k])
89 sdict[k] = secs
90 elif k == 'job_state':
91 statelett = sdict[k]
92 if statelett in ['Q','W']:
93 sdict[k] = 'queued'
94 elif statelett in ['R','E']:
95 sdict[k] = 'running'
96 elif k == 'exec_host' and sdict[k]:
97 termpos = sdict[k].find('/')
98 wnstring = sdict[k][:termpos]
99 sdict[k] = wnstring
100 if sdict[k]:
101 odict[k] = sdict[k]
102 return odict
103
104 mcjoblist = list()
105 waitingJobs = False
106 for j in jlist:
107 if j['queue'] == MCQUEUE:
108 newj = mapatts(j,usedkeys)
109 mcjoblist.append(newj)
110 if newj['job_state'] == 'queued':
111 waitingJobs = True
112
113 # find out how many running mc jobs each node has
114
115 running_mc = dict()
116 for n in nodes:
117 running_mc[n.name] = 0
118 for j in mcjoblist:
119 if j['job_state'] =='running':
120 wn = j['exec_host']
121 if wn in running_mc.keys():
122 running_mc[wn] += 1
123 rawlist = list()
124 for n in nodes:
125 rawlist.append( (n.name, n, running_mc[n.name], n.freeCpu) )
126 from operator import itemgetter
127 def srtrunningfree(item):
128 # sort optimal for dropping nodes; for nodes with zero running mc jobs, want to
129 # drop least-drained nodes (fewer free slots first in list); for nodes with
130 # running mc jobs, want to shed as few nodes as possible so pick the more-drained
131 # nodes first. this probably also works for shedding nodes when queue dries up.
132 mcrun = item[2]
133 emptyslots = item[3]
134 if mcrun == 0:
135 rank = emptyslots
136 else:
137 rank = 32*mcrun - emptyslots # 32 just a number bigger than expected emptyslots value
138 return rank
139 slist = sorted(rawlist, key=srtrunningfree)
140
141 return waitingJobs, slist
142
143 def remove_from_mc_pool(node):
144 # print "adding el6 tag back to node", node.name
145 if "el6" not in node.properties :
146 proc = subprocess.Popen(['/usr/bin/qmgr'],
147 stdin=subprocess.PIPE,
148 stdout=subprocess.PIPE,)
149 proc.stdin.write( 'set node ' + node.name + \
150 ' properties += el6\n' )
151 proc.stdin.write( 'print node ' + node.name + \
152 ' properties\n' )
153 out = proc.communicate()[0]
154 # print out
155 if "mc" in node.properties :
156 proc = subprocess.Popen(['/usr/bin/qmgr'],
157 stdin=subprocess.PIPE,
158 stdout=subprocess.PIPE,)
159 proc.stdin.write( 'set node ' + node.name + \
160 ' properties -= mc\n' )
161 proc.stdin.write( 'print node ' + node.name + \
162 ' properties\n' )
163 out = proc.communicate()[0]
164 # print out
165
166 def add_to_mc_pool(node):
167 # print "node props b4:", grabbed_node.properties
168 if "el6" in node.properties :
169 proc = subprocess.Popen(['/usr/bin/qmgr'],
170 stdin=subprocess.PIPE,
171 stdout=subprocess.PIPE,)
172 proc.stdin.write( 'set node ' + node.name + \
173 ' properties -= el6\n' )
174 proc.stdin.write( 'print node ' + node.name + \
175 ' properties\n' )
176 out = proc.communicate()[0]
177
178 if "mc" not in node.properties :
179 proc = subprocess.Popen(['/usr/bin/qmgr'],
180 stdin=subprocess.PIPE,
181 stdout=subprocess.PIPE,)
182 proc.stdin.write( 'set node ' + node.name + \
183 ' properties += mc\n' )
184 proc.stdin.write( 'print node ' + node.name + \
185 ' properties\n' )
186 out = proc.communicate()[0]
187
188 import optparse
189
190 usage = "usage: %prog [-d debug_level] [-n]"
191
192 p = optparse.OptionParser(description="Monitor state of multicore pool and adjust size as needed",
193 usage=usage)
194
195 p.add_option("-A",action="store_true",dest="addall",default=False,
196 help="add all eligible nodes to multicore pool")
197 p.add_option("-l",action="store",dest="logfile",default=None,
198 help="log actions and information to LOGFILE (default stdout)")
199 p.add_option("-L",action="store",dest="loglevel",default="INFO",
200 help="print messages of LOGLEVEL (DEBUG, INFO, WARNING, ..., default INFO")
201 p.add_option("-n",action="store_true",dest="noopt",default=False,
202 help="don't do anything, just print what would have been done")
203 p.add_option("-i",action="store_true",dest="info",default=False,
204 help="print info on all nodes in multicore pool")
205
206 import logging
207
208 opts, args = p.parse_args()
209
210 # assuming loglevel is bound to the string value obtained from the
211 # command line argument. Convert to upper case to allow the user to
212 # specify --log=DEBUG or --log=debug
213
214 numeric_level = getattr(logging, opts.loglevel.upper(), None)
215 if not isinstance(numeric_level, int):
216 raise ValueError('Invalid log level: %s' % loglevel)
217 if opts.logfile:
218 logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
219 level=numeric_level,filename=opts.logfile)
220 else:
221 logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
222 level=numeric_level)
223
224 if os.path.isfile(UNDERPOP_NODES_FILE):
225 pkl_file = open(UNDERPOP_NODES_FILE,'rb')
226 nodes_too_few_jobs_last_run = pickle.load(pkl_file)
227 pkl_file.close()
228 else:
229 nodes_too_few_jobs_last_run = list()
230
231 logging.debug("nodes from pickle file, marked from last run: " + \
232 repr(nodes_too_few_jobs_last_run) )
233
234 wnodes = pbsnodes()
235 mcnodes = list()
236 waiting = None
237 node_tuples = None
238
239 for node in wnodes:
240 if node.name in CANDIDATE_NODES and node.state.count('offline') == 0:
241 mcnodes.append(node)
242
243 draining_slots = 0
244 mcdedicated = list()
245 draining_nodes = 0
246
247 for node in mcnodes:
248 if 'el6' not in node.properties and 'mc' in node.properties :
249 mcdedicated.append(node)
250 draining_slots += node.freeCpu
251 if node.freeCpu > 0 : draining_nodes += 1
252
253 if opts.info:
254 waiting, node_tuples = getmcjobinfo(mcdedicated)
255 for t in node_tuples:
256 if t[2] == 0:
257 print "%28s has %2d running mc jobs, %2d empty slots" % (t[0], t[2], t[3])
258 else:
259 print "%28s has %2d running mc jobs, %2d empty slots, ratio %4.1f" % (t[0], t[2], t[3], float(t[3])/t[2])
260 sys.exit(0)
261 candidate_nodes = list()
262 for n in mcnodes:
263 if n not in mcdedicated:
264 candidate_nodes.append(n)
265
266 if opts.addall:
267 for grabbed_node in candidate_nodes:
268 logging.info("%s added to mc node pool" % (grabbed_node.name))
269 # candidate_nodes.remove(grabbed_node)
270 if not opts.noopt: add_to_mc_pool(grabbed_node)
271 sys.exit(0)
272
273 # check for dedicated nodes with too few jobs
274
275 nodes_with_too_few_jobs = list()
276
277 for node in mcdedicated:
278 logging.debug(node.name + " has " + repr(node.freeCpu) + " free slots")
279 if node.freeCpu > MAXFREEPERNODE:
280 nodes_with_too_few_jobs.append(node)
281
282 logging.debug("there are " + repr(len(nodes_with_too_few_jobs)) + \
283 " nodes with too few jobs")
284
285 nodes_consistently_underpopulated = list()
286 for n in nodes_with_too_few_jobs:
287 if n.name in nodes_too_few_jobs_last_run:
288 nodes_consistently_underpopulated.append(n)
289
290 undcount = len(nodes_consistently_underpopulated)
291 if undcount > 0:
292 logging.debug("there are " + repr(undcount) + \
293 " nodes with too few jobs that were also marked last run" )
294
295 import random
296
297 removed_a_node = False
298
299 if undcount > 0:
300 logging.info("nodes consistently underpopulated:")
301 for n in nodes_consistently_underpopulated:
302 logging.info(" " + n.name)
303 if undcount > 1:
304 remcount = undcount / 2 # number to remove
305 logging.info("going to remove " + repr(remcount) + " from mc pool")
306 else:
307 remcount = 1
308
309 # find out how many running mc jobs each node has
310
311 waiting, node_tuples = getmcjobinfo(nodes_consistently_underpopulated)
312
313 for node_t in node_tuples[:remcount]:
314 nname, nnode, running_mc, unused_slots = node_t
315 logging.info("dumped %d empty slots, %d mc jobs on %s" % \
316 (unused_slots, running_mc, nname) )
317 if not opts.noopt : remove_from_mc_pool(nnode)
318 nodes_with_too_few_jobs.remove(nnode)
319 nodes_consistently_underpopulated.remove(nnode)
320 removed_a_node = True
321
322 namelist = list()
323 if len(nodes_with_too_few_jobs) > 0:
324 logging.debug("placing %d nodes " % (len(nodes_with_too_few_jobs)) + \
325 "with too few jobs on list for next run:")
326
327 for n in nodes_with_too_few_jobs:
328 logging.debug(n.name)
329 namelist.append(n.name)
330 if not opts.noopt:
331 pkl_file = open(UNDERPOP_NODES_FILE,'wb')
332 pickle.dump(namelist,pkl_file)
333 pkl_file.close()
334 if len(nodes_with_too_few_jobs) > 0 or removed_a_node :
335 sys.exit(0)
336
337 # if we survive to this point, all nodes 'dedicated' are being efficiently
338 # used. are we draining less than the configured max?
339
340 logging.debug("There are " + repr(len(mcdedicated)) + " dedicated nodes and " + \
341 repr(draining_slots) + " unused slots")
342 if (MAXFREE - draining_slots) >= MAXFREEPERNODE:
343 logging.debug("%d unused slots are permitted, so %d more" % (MAXFREE, MAXFREE - draining_slots))
344 logging.debug("headroom of more than %d+ slots means we can try to grab another node" % (MAXFREEPERNODE) )
345 if draining_nodes < MAXDRAIN :
346 # first check if there are actually any waiting jobs to run; if not makes no sense to grab a node.
347 waiting, node_tuples = getmcjobinfo(mcdedicated)
348 if not waiting:
349 logging.debug("No waiting jobs found, nothing to do")
350 sys.exit(0)
351 logging.debug("max %d node(s) with unused slots permitted, %d seen" % \
352 (MAXDRAIN, draining_nodes))
353 logging.debug("there are also waiting jobs: try to grab another node")
354 # build a list of candidates to grab
355 logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))
356 if len(candidate_nodes) < 1:
357 logging.debug("no more nodes, bailing out, nothing more I can do")
358 sys.exit(0)
359 for nn in range(min(NODESPERGRAB,len(candidate_nodes))):
360 # logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))
361 grabbed_node=random.choice(candidate_nodes)
362 logging.info("%s added to mc node pool" % (grabbed_node.name))
363 candidate_nodes.remove(grabbed_node)
364 if not opts.noopt: add_to_mc_pool(grabbed_node)
365 else:
366 logging.debug("There are %d nodes with unused slots (draining)" % (draining_nodes))
367 logging.debug("This equals or exceeds the configured max of %d" % (MAXDRAIN))
368 logging.debug("Doing nothing now")
369
370 elif draining_slots > MAXFREE:
371
372 # find out how many running mc jobs each node has
373
374 waiting, node_tuples = getmcjobinfo(mcdedicated)
375
376 slots_to_recover = draining_slots - MAXFREE
377 logging.info("unused slot limit (%d) exceeded by %d " \
378 % (MAXFREE, slots_to_recover) + ": remove node(s) from mc pool")
379 slots_recovered = 0
380 while slots_recovered < slots_to_recover:
381 node_t = node_tuples.pop(0)
382 nname, nnode, running_mc, unused_slots = node_t
383 if unused_slots > 0:
384 logging.info("dumped %d empty slots, %d mc jobs on %s" % \
385 (unused_slots, running_mc, nname) )
386 if not opts.noopt : remove_from_mc_pool(nnode)
387 slots_recovered += unused_slots
388 else:
389 logging.debug("%d unused slots of allowed %d" % (draining_slots, MAXFREE))
390 logging.debug("difference is %d which is less than %d" % \
391 (MAXFREE - draining_slots, MAXFREEPERNODE) )
392 logging.debug("so: doing nothing now.")

Properties

Name Value
svn:executable *

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28