/[pdpsoft]/nl.nikhef.ndpf.mcfloat/trunk/mcfloat
ViewVC logotype

Annotation of /nl.nikhef.ndpf.mcfloat/trunk/mcfloat

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2841 - (hide annotations) (download)
Mon Jan 25 18:45:02 2016 UTC (6 years, 8 months ago) by templon
File size: 16182 byte(s)
version used in production as of 25 jan 2015

1 templon 2708 #!/usr/bin/env python
2     from torque_utils import pbsnodes
3    
4 templon 2785 # this config allows mcore to run up to full ATLAS allocation (07.04.2015)
5     # full allocation right now is 47% of "non-retired" capacity = knal + mars + car
6     # just configure 47% of each class.
7    
8 templon 2770 MAXDRAIN = 32 # max num of nodes allowed to drain
9     MAXFREE = 49 # max num of free slots to tolerate
10 templon 2841 MAXFREEPERNODE = 3 # standard for four-core jobs
11 templon 2770 NODESPERGRAB = 3 # number of nodes to grab each time around
12 templon 2785
13 templon 2841 # override when lsgrid is running
14    
15     # MAXFREEPERNODE = 29 # allow lsgrid 30-core jobs
16     # MAXFREE = 149 # max num of free slots to tolerate
17    
18     # override when bbmri is running
19    
20     # MAXFREE = 149 # max num of free slots to tolerate
21     # MAXFREEPERNODE = 9 # allow bbmri 10-core jobs
22    
23 templon 2785 # parameterize percent capacity -- 47% is too conservative, make this variable.
24    
25    
26 templon 2841 # original car was [ 'wn-car-0%02d.farm.nikhef.nl' % (n) for n in range( 96-int(CAPACFRAC*96), 97) ]
27     # temporarily changed due to low-numbered cars offline and reserved and such.
28    
29     CAPACFRAC = 0.85
30    
31     NUM_CAR = int(CAPACFRAC * 96)
32     NUM_KNAL = int(CAPACFRAC * 18)
33     NUM_MARS = int(CAPACFRAC * 58)
34    
35     CANDIDATE_NODES = [ 'wn-car-0%02d.farm.nikhef.nl' % (n) for n in range( 1, NUM_CAR+1) ] + \
36     [ 'wn-knal-0%02d.farm.nikhef.nl' % (n) for n in range( 1, NUM_KNAL+1) ] + \
37     [ 'wn-mars-0%02d.farm.nikhef.nl' % (n) for n in range( 1, NUM_MARS+1) ]
38    
39 templon 2770 MCQUEUE = 'atlasmc'
40    
41 templon 2708 # settings below appropriate for knal nodes (32 cores)
42     # ... want to keep num of draining nodes
43 templon 2770 # to MAXFREE / 3 since giving back a node could be expensive.
44 templon 2708 # it might already be running several mc jobs
45    
46 templon 2770 # MAXDRAIN = 16 # max num of nodes allowed to drain
47     # MAXFREE = 49 # max num of free slots to tolerate
48     # MAXFREEPERNODE = 3
49     # NODESPERGRAB = 1
50     # CANDIDATE_NODES = [ 'wn-knal-0%02d.farm.nikhef.nl' % (n) for n in range(1,19) ]
51     # MCQUEUE = 'atlasmc'
52 templon 2708
53     # settings below more appropriate for smrt nodes (8 core)
54     # here giving a node back doesn't really matter, it wasn't
55     # running any mc jobs yet anyway. better to grab lots
56     # of nodes and give back the ones that drain the slowest if
57     # too many slots are free.
58    
59     # MAXDRAIN = 16 # max num of nodes allowed to drain
60     # MAXFREE = 49 # max num of free slots to tolerate
61     # CANDIDATE_NODES = [ 'wn-smrt-0%02d.farm.nikhef.nl' % (n) for n in range(1,73) ]
62    
63     UNDERPOP_NODES_FILE = ".nodes_with_too_few_jobs"
64     TORQUE = "stro.nikhef.nl"
65    
66     import os
67     import pickle
68     import sys
69     import subprocess
70    
71     def getmcjobinfo(nodes):
72    
73     # input is the list of nodes for which to get info
74     # function returns tuple (jobs_waiting, node_info)
75     # jobs_waiting boolean value : are there MC jobs waiting to be run (state Q)? (independent of nodes!)
76     # node_info: list of tuples (wn_name, wn, num_running_mc, num_free_slots)
77     # where wn is the node object for the machine with name wn_name, num_running_mc is the number of
78     # mc jobs running on that node, and num_free_slots is the number of unused job slots on that node
79    
80     usedkeys = ['egroup', 'jobid', 'queue', 'job_state', 'euser', 'exec_host' ]
81    
82     import torqueJobs
83     import torqueAttMappers as tam
84    
85     import tempfile
86     qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp")
87    
88     import time
89     os.system('/usr/bin/qstat -f @' + TORQUE + ' > ' + qsfname)
90     now = time.mktime(time.localtime())
91    
92     jlist = torqueJobs.qs_parsefile(qsfname)
93    
94     os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt')
95    
96     def mapatts(indict,inkeys):
97     sdict = tam.sub_dict(indict,inkeys)
98     odict = dict()
99     for k in sdict.keys():
100     if k in tam.tfl and sdict[k]:
101     secs = tam.hms(sdict[k])
102     sdict[k] = secs/3600.
103     if k in tam.mfl and sdict[k]:
104     mebi = tam.memconvert(sdict[k])
105     sdict[k] = mebi
106     if k in tam.tfl2 and sdict[k]:
107     secs = tam.tconv(sdict[k])
108     sdict[k] = secs
109     elif k == 'job_state':
110     statelett = sdict[k]
111     if statelett in ['Q','W']:
112     sdict[k] = 'queued'
113     elif statelett in ['R','E']:
114     sdict[k] = 'running'
115     elif k == 'exec_host' and sdict[k]:
116     termpos = sdict[k].find('/')
117     wnstring = sdict[k][:termpos]
118     sdict[k] = wnstring
119     if sdict[k]:
120     odict[k] = sdict[k]
121     return odict
122    
123     mcjoblist = list()
124     waitingJobs = False
125     for j in jlist:
126     if j['queue'] == MCQUEUE:
127     newj = mapatts(j,usedkeys)
128     mcjoblist.append(newj)
129     if newj['job_state'] == 'queued':
130     waitingJobs = True
131    
132     # find out how many running mc jobs each node has
133    
134     running_mc = dict()
135     for n in nodes:
136     running_mc[n.name] = 0
137     for j in mcjoblist:
138     if j['job_state'] =='running':
139     wn = j['exec_host']
140     if wn in running_mc.keys():
141     running_mc[wn] += 1
142     rawlist = list()
143     for n in nodes:
144     rawlist.append( (n.name, n, running_mc[n.name], n.freeCpu) )
145     from operator import itemgetter
146 templon 2770 def srtrunningfree(item):
147     # sort optimal for dropping nodes; for nodes with zero running mc jobs, want to
148     # drop least-drained nodes (fewer free slots first in list); for nodes with
149     # running mc jobs, want to shed as few nodes as possible so pick the more-drained
150     # nodes first. this probably also works for shedding nodes when queue dries up.
151     mcrun = item[2]
152     emptyslots = item[3]
153     if mcrun == 0:
154     rank = emptyslots
155     else:
156 templon 2786 # the original "optimal" order: rank = 32*mcrun - emptyslots # 32 just a number bigger than expected emptyslots value
157     rank = 7168 - 224*float(emptyslots)/mcrun + mcrun # constants 7168 and 224 only serve to enable a multi-level sort.
158 templon 2770 return rank
159     slist = sorted(rawlist, key=srtrunningfree)
160 templon 2708
161     return waitingJobs, slist
162    
163     def remove_from_mc_pool(node):
164     # print "adding el6 tag back to node", node.name
165     if "el6" not in node.properties :
166     proc = subprocess.Popen(['/usr/bin/qmgr'],
167     stdin=subprocess.PIPE,
168     stdout=subprocess.PIPE,)
169     proc.stdin.write( 'set node ' + node.name + \
170     ' properties += el6\n' )
171     proc.stdin.write( 'print node ' + node.name + \
172     ' properties\n' )
173     out = proc.communicate()[0]
174     # print out
175     if "mc" in node.properties :
176     proc = subprocess.Popen(['/usr/bin/qmgr'],
177     stdin=subprocess.PIPE,
178     stdout=subprocess.PIPE,)
179     proc.stdin.write( 'set node ' + node.name + \
180     ' properties -= mc\n' )
181     proc.stdin.write( 'print node ' + node.name + \
182     ' properties\n' )
183     out = proc.communicate()[0]
184     # print out
185    
186     def add_to_mc_pool(node):
187     # print "node props b4:", grabbed_node.properties
188     if "el6" in node.properties :
189     proc = subprocess.Popen(['/usr/bin/qmgr'],
190     stdin=subprocess.PIPE,
191     stdout=subprocess.PIPE,)
192     proc.stdin.write( 'set node ' + node.name + \
193     ' properties -= el6\n' )
194     proc.stdin.write( 'print node ' + node.name + \
195     ' properties\n' )
196     out = proc.communicate()[0]
197    
198     if "mc" not in node.properties :
199     proc = subprocess.Popen(['/usr/bin/qmgr'],
200     stdin=subprocess.PIPE,
201     stdout=subprocess.PIPE,)
202     proc.stdin.write( 'set node ' + node.name + \
203     ' properties += mc\n' )
204     proc.stdin.write( 'print node ' + node.name + \
205     ' properties\n' )
206     out = proc.communicate()[0]
207    
208     import optparse
209    
210     usage = "usage: %prog [-d debug_level] [-n]"
211    
212     p = optparse.OptionParser(description="Monitor state of multicore pool and adjust size as needed",
213     usage=usage)
214    
215 templon 2785 p.add_option("-A",action="store_true",dest="addall",default=False,
216     help="add all eligible nodes to multicore pool")
217 templon 2708 p.add_option("-l",action="store",dest="logfile",default=None,
218     help="log actions and information to LOGFILE (default stdout)")
219     p.add_option("-L",action="store",dest="loglevel",default="INFO",
220     help="print messages of LOGLEVEL (DEBUG, INFO, WARNING, ..., default INFO")
221 templon 2785 p.add_option("-n",action="store_true",dest="noopt",default=False,
222     help="don't do anything, just print what would have been done")
223     p.add_option("-i",action="store_true",dest="info",default=False,
224     help="print info on all nodes in multicore pool")
225 templon 2708
226     import logging
227    
228     opts, args = p.parse_args()
229    
230     # assuming loglevel is bound to the string value obtained from the
231     # command line argument. Convert to upper case to allow the user to
232     # specify --log=DEBUG or --log=debug
233    
234     numeric_level = getattr(logging, opts.loglevel.upper(), None)
235     if not isinstance(numeric_level, int):
236     raise ValueError('Invalid log level: %s' % loglevel)
237     if opts.logfile:
238     logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
239     level=numeric_level,filename=opts.logfile)
240     else:
241     logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
242     level=numeric_level)
243    
244     if os.path.isfile(UNDERPOP_NODES_FILE):
245     pkl_file = open(UNDERPOP_NODES_FILE,'rb')
246     nodes_too_few_jobs_last_run = pickle.load(pkl_file)
247     pkl_file.close()
248     else:
249     nodes_too_few_jobs_last_run = list()
250    
251     logging.debug("nodes from pickle file, marked from last run: " + \
252     repr(nodes_too_few_jobs_last_run) )
253    
254     wnodes = pbsnodes()
255     mcnodes = list()
256     waiting = None
257     node_tuples = None
258    
259     for node in wnodes:
260     if node.name in CANDIDATE_NODES and node.state.count('offline') == 0:
261     mcnodes.append(node)
262    
263     draining_slots = 0
264     mcdedicated = list()
265     draining_nodes = 0
266    
267     for node in mcnodes:
268     if 'el6' not in node.properties and 'mc' in node.properties :
269     mcdedicated.append(node)
270     draining_slots += node.freeCpu
271     if node.freeCpu > 0 : draining_nodes += 1
272    
273 templon 2785 if opts.info:
274     waiting, node_tuples = getmcjobinfo(mcdedicated)
275     for t in node_tuples:
276     if t[2] == 0:
277     print "%28s has %2d running mc jobs, %2d empty slots" % (t[0], t[2], t[3])
278     else:
279     print "%28s has %2d running mc jobs, %2d empty slots, ratio %4.1f" % (t[0], t[2], t[3], float(t[3])/t[2])
280     sys.exit(0)
281     candidate_nodes = list()
282     for n in mcnodes:
283     if n not in mcdedicated:
284     candidate_nodes.append(n)
285    
286     if opts.addall:
287     for grabbed_node in candidate_nodes:
288     logging.info("%s added to mc node pool" % (grabbed_node.name))
289     # candidate_nodes.remove(grabbed_node)
290     if not opts.noopt: add_to_mc_pool(grabbed_node)
291     sys.exit(0)
292    
293 templon 2708 # check for dedicated nodes with too few jobs
294    
295     nodes_with_too_few_jobs = list()
296    
297     for node in mcdedicated:
298     logging.debug(node.name + " has " + repr(node.freeCpu) + " free slots")
299 templon 2770 if node.freeCpu > MAXFREEPERNODE:
300 templon 2708 nodes_with_too_few_jobs.append(node)
301    
302     logging.debug("there are " + repr(len(nodes_with_too_few_jobs)) + \
303     " nodes with too few jobs")
304    
305     nodes_consistently_underpopulated = list()
306     for n in nodes_with_too_few_jobs:
307     if n.name in nodes_too_few_jobs_last_run:
308     nodes_consistently_underpopulated.append(n)
309    
310     undcount = len(nodes_consistently_underpopulated)
311     if undcount > 0:
312     logging.debug("there are " + repr(undcount) + \
313     " nodes with too few jobs that were also marked last run" )
314    
315     import random
316    
317     removed_a_node = False
318    
319     if undcount > 0:
320     logging.info("nodes consistently underpopulated:")
321     for n in nodes_consistently_underpopulated:
322     logging.info(" " + n.name)
323     if undcount > 1:
324     remcount = undcount / 2 # number to remove
325     logging.info("going to remove " + repr(remcount) + " from mc pool")
326     else:
327     remcount = 1
328    
329     # find out how many running mc jobs each node has
330    
331     waiting, node_tuples = getmcjobinfo(nodes_consistently_underpopulated)
332    
333     for node_t in node_tuples[:remcount]:
334     nname, nnode, running_mc, unused_slots = node_t
335     logging.info("dumped %d empty slots, %d mc jobs on %s" % \
336     (unused_slots, running_mc, nname) )
337     if not opts.noopt : remove_from_mc_pool(nnode)
338     nodes_with_too_few_jobs.remove(nnode)
339     nodes_consistently_underpopulated.remove(nnode)
340     removed_a_node = True
341    
342     namelist = list()
343     if len(nodes_with_too_few_jobs) > 0:
344     logging.debug("placing %d nodes " % (len(nodes_with_too_few_jobs)) + \
345     "with too few jobs on list for next run:")
346    
347     for n in nodes_with_too_few_jobs:
348     logging.debug(n.name)
349     namelist.append(n.name)
350     if not opts.noopt:
351     pkl_file = open(UNDERPOP_NODES_FILE,'wb')
352     pickle.dump(namelist,pkl_file)
353     pkl_file.close()
354     if len(nodes_with_too_few_jobs) > 0 or removed_a_node :
355     sys.exit(0)
356    
357     # if we survive to this point, all nodes 'dedicated' are being efficiently
358     # used. are we draining less than the configured max?
359    
360     logging.debug("There are " + repr(len(mcdedicated)) + " dedicated nodes and " + \
361     repr(draining_slots) + " unused slots")
362 templon 2770 if (MAXFREE - draining_slots) >= MAXFREEPERNODE:
363 templon 2708 logging.debug("%d unused slots are permitted, so %d more" % (MAXFREE, MAXFREE - draining_slots))
364 templon 2770 logging.debug("headroom of more than %d+ slots means we can try to grab another node" % (MAXFREEPERNODE) )
365 templon 2708 if draining_nodes < MAXDRAIN :
366     # first check if there are actually any waiting jobs to run; if not makes no sense to grab a node.
367     waiting, node_tuples = getmcjobinfo(mcdedicated)
368     if not waiting:
369     logging.debug("No waiting jobs found, nothing to do")
370     sys.exit(0)
371     logging.debug("max %d node(s) with unused slots permitted, %d seen" % \
372     (MAXDRAIN, draining_nodes))
373     logging.debug("there are also waiting jobs: try to grab another node")
374     # build a list of candidates to grab
375     logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))
376     if len(candidate_nodes) < 1:
377     logging.debug("no more nodes, bailing out, nothing more I can do")
378     sys.exit(0)
379 templon 2770 for nn in range(min(NODESPERGRAB,len(candidate_nodes))):
380     # logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))
381     grabbed_node=random.choice(candidate_nodes)
382     logging.info("%s added to mc node pool" % (grabbed_node.name))
383     candidate_nodes.remove(grabbed_node)
384     if not opts.noopt: add_to_mc_pool(grabbed_node)
385 templon 2708 else:
386     logging.debug("There are %d nodes with unused slots (draining)" % (draining_nodes))
387     logging.debug("This equals or exceeds the configured max of %d" % (MAXDRAIN))
388     logging.debug("Doing nothing now")
389    
390     elif draining_slots > MAXFREE:
391    
392     # find out how many running mc jobs each node has
393    
394     waiting, node_tuples = getmcjobinfo(mcdedicated)
395    
396     slots_to_recover = draining_slots - MAXFREE
397     logging.info("unused slot limit (%d) exceeded by %d " \
398     % (MAXFREE, slots_to_recover) + ": remove node(s) from mc pool")
399     slots_recovered = 0
400     while slots_recovered < slots_to_recover:
401     node_t = node_tuples.pop(0)
402     nname, nnode, running_mc, unused_slots = node_t
403     if unused_slots > 0:
404     logging.info("dumped %d empty slots, %d mc jobs on %s" % \
405     (unused_slots, running_mc, nname) )
406     if not opts.noopt : remove_from_mc_pool(nnode)
407     slots_recovered += unused_slots
408     else:
409     logging.debug("%d unused slots of allowed %d" % (draining_slots, MAXFREE))
410 templon 2770 logging.debug("difference is %d which is less than %d" % \
411     (MAXFREE - draining_slots, MAXFREEPERNODE) )
412 templon 2708 logging.debug("so: doing nothing now.")

Properties

Name Value
svn:executable *

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28