/[pdpsoft]/nl.nikhef.ndpf.mcfloat/trunk/mcfloat
ViewVC logotype

Annotation of /nl.nikhef.ndpf.mcfloat/trunk/mcfloat

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2786 - (hide annotations) (download)
Thu May 14 18:23:19 2015 UTC (7 years, 1 month ago) by templon
File size: 15586 byte(s)
this is the version described in the CHEP2015 article,
in production as of 13 may 2015.

1 templon 2708 #!/usr/bin/env python
2     from torque_utils import pbsnodes
3    
4 templon 2785 # this config allows mcore to run up to full ATLAS allocation (07.04.2015)
5     # full allocation right now is 47% of "non-retired" capacity = knal + mars + car
6     # just configure 47% of each class.
7    
8 templon 2770 MAXDRAIN = 32 # max num of nodes allowed to drain
9     MAXFREE = 49 # max num of free slots to tolerate
10     MAXFREEPERNODE = 3
11     NODESPERGRAB = 3 # number of nodes to grab each time around
12 templon 2785
13     # parameterize percent capacity -- 47% is too conservative, make this variable.
14    
15     CAPACFRAC = 0.60
16     CANDIDATE_NODES = [ 'wn-car-0%02d.farm.nikhef.nl' % (n) for n in range(1, int(CAPACFRAC*96)+1) ] + \
17     [ 'wn-knal-0%02d.farm.nikhef.nl' % (n) for n in range(1,int(CAPACFRAC*18)+1) ] + \
18     [ 'wn-mars-0%02d.farm.nikhef.nl' % (n) for n in range(20,20+int(CAPACFRAC*57)+1) ]
19    
20 templon 2770 MCQUEUE = 'atlasmc'
21    
22 templon 2708 # settings below appropriate for knal nodes (32 cores)
23     # ... want to keep num of draining nodes
24 templon 2770 # to MAXFREE / 3 since giving back a node could be expensive.
25 templon 2708 # it might already be running several mc jobs
26    
27 templon 2770 # MAXDRAIN = 16 # max num of nodes allowed to drain
28     # MAXFREE = 49 # max num of free slots to tolerate
29     # MAXFREEPERNODE = 3
30     # NODESPERGRAB = 1
31     # CANDIDATE_NODES = [ 'wn-knal-0%02d.farm.nikhef.nl' % (n) for n in range(1,19) ]
32     # MCQUEUE = 'atlasmc'
33 templon 2708
34     # settings below more appropriate for smrt nodes (8 core)
35     # here giving a node back doesn't really matter, it wasn't
36     # running any mc jobs yet anyway. better to grab lots
37     # of nodes and give back the ones that drain the slowest if
38     # too many slots are free.
39    
40     # MAXDRAIN = 16 # max num of nodes allowed to drain
41     # MAXFREE = 49 # max num of free slots to tolerate
42     # CANDIDATE_NODES = [ 'wn-smrt-0%02d.farm.nikhef.nl' % (n) for n in range(1,73) ]
43    
44     UNDERPOP_NODES_FILE = ".nodes_with_too_few_jobs"
45     TORQUE = "stro.nikhef.nl"
46    
47     import os
48     import pickle
49     import sys
50     import subprocess
51    
52     def getmcjobinfo(nodes):
53    
54     # input is the list of nodes for which to get info
55     # function returns tuple (jobs_waiting, node_info)
56     # jobs_waiting boolean value : are there MC jobs waiting to be run (state Q)? (independent of nodes!)
57     # node_info: list of tuples (wn_name, wn, num_running_mc, num_free_slots)
58     # where wn is the node object for the machine with name wn_name, num_running_mc is the number of
59     # mc jobs running on that node, and num_free_slots is the number of unused job slots on that node
60    
61     usedkeys = ['egroup', 'jobid', 'queue', 'job_state', 'euser', 'exec_host' ]
62    
63     import torqueJobs
64     import torqueAttMappers as tam
65    
66     import tempfile
67     qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp")
68    
69     import time
70     os.system('/usr/bin/qstat -f @' + TORQUE + ' > ' + qsfname)
71     now = time.mktime(time.localtime())
72    
73     jlist = torqueJobs.qs_parsefile(qsfname)
74    
75     os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt')
76    
77     def mapatts(indict,inkeys):
78     sdict = tam.sub_dict(indict,inkeys)
79     odict = dict()
80     for k in sdict.keys():
81     if k in tam.tfl and sdict[k]:
82     secs = tam.hms(sdict[k])
83     sdict[k] = secs/3600.
84     if k in tam.mfl and sdict[k]:
85     mebi = tam.memconvert(sdict[k])
86     sdict[k] = mebi
87     if k in tam.tfl2 and sdict[k]:
88     secs = tam.tconv(sdict[k])
89     sdict[k] = secs
90     elif k == 'job_state':
91     statelett = sdict[k]
92     if statelett in ['Q','W']:
93     sdict[k] = 'queued'
94     elif statelett in ['R','E']:
95     sdict[k] = 'running'
96     elif k == 'exec_host' and sdict[k]:
97     termpos = sdict[k].find('/')
98     wnstring = sdict[k][:termpos]
99     sdict[k] = wnstring
100     if sdict[k]:
101     odict[k] = sdict[k]
102     return odict
103    
104     mcjoblist = list()
105     waitingJobs = False
106     for j in jlist:
107     if j['queue'] == MCQUEUE:
108     newj = mapatts(j,usedkeys)
109     mcjoblist.append(newj)
110     if newj['job_state'] == 'queued':
111     waitingJobs = True
112    
113     # find out how many running mc jobs each node has
114    
115     running_mc = dict()
116     for n in nodes:
117     running_mc[n.name] = 0
118     for j in mcjoblist:
119     if j['job_state'] =='running':
120     wn = j['exec_host']
121     if wn in running_mc.keys():
122     running_mc[wn] += 1
123     rawlist = list()
124     for n in nodes:
125     rawlist.append( (n.name, n, running_mc[n.name], n.freeCpu) )
126     from operator import itemgetter
127 templon 2770 def srtrunningfree(item):
128     # sort optimal for dropping nodes; for nodes with zero running mc jobs, want to
129     # drop least-drained nodes (fewer free slots first in list); for nodes with
130     # running mc jobs, want to shed as few nodes as possible so pick the more-drained
131     # nodes first. this probably also works for shedding nodes when queue dries up.
132     mcrun = item[2]
133     emptyslots = item[3]
134     if mcrun == 0:
135     rank = emptyslots
136     else:
137 templon 2786 # the original "optimal" order: rank = 32*mcrun - emptyslots # 32 just a number bigger than expected emptyslots value
138     rank = 7168 - 224*float(emptyslots)/mcrun + mcrun # constants 7168 and 224 only serve to enable a multi-level sort.
139 templon 2770 return rank
140     slist = sorted(rawlist, key=srtrunningfree)
141 templon 2708
142     return waitingJobs, slist
143    
144     def remove_from_mc_pool(node):
145     # print "adding el6 tag back to node", node.name
146     if "el6" not in node.properties :
147     proc = subprocess.Popen(['/usr/bin/qmgr'],
148     stdin=subprocess.PIPE,
149     stdout=subprocess.PIPE,)
150     proc.stdin.write( 'set node ' + node.name + \
151     ' properties += el6\n' )
152     proc.stdin.write( 'print node ' + node.name + \
153     ' properties\n' )
154     out = proc.communicate()[0]
155     # print out
156     if "mc" in node.properties :
157     proc = subprocess.Popen(['/usr/bin/qmgr'],
158     stdin=subprocess.PIPE,
159     stdout=subprocess.PIPE,)
160     proc.stdin.write( 'set node ' + node.name + \
161     ' properties -= mc\n' )
162     proc.stdin.write( 'print node ' + node.name + \
163     ' properties\n' )
164     out = proc.communicate()[0]
165     # print out
166    
167     def add_to_mc_pool(node):
168     # print "node props b4:", grabbed_node.properties
169     if "el6" in node.properties :
170     proc = subprocess.Popen(['/usr/bin/qmgr'],
171     stdin=subprocess.PIPE,
172     stdout=subprocess.PIPE,)
173     proc.stdin.write( 'set node ' + node.name + \
174     ' properties -= el6\n' )
175     proc.stdin.write( 'print node ' + node.name + \
176     ' properties\n' )
177     out = proc.communicate()[0]
178    
179     if "mc" not in node.properties :
180     proc = subprocess.Popen(['/usr/bin/qmgr'],
181     stdin=subprocess.PIPE,
182     stdout=subprocess.PIPE,)
183     proc.stdin.write( 'set node ' + node.name + \
184     ' properties += mc\n' )
185     proc.stdin.write( 'print node ' + node.name + \
186     ' properties\n' )
187     out = proc.communicate()[0]
188    
189     import optparse
190    
191     usage = "usage: %prog [-d debug_level] [-n]"
192    
193     p = optparse.OptionParser(description="Monitor state of multicore pool and adjust size as needed",
194     usage=usage)
195    
196 templon 2785 p.add_option("-A",action="store_true",dest="addall",default=False,
197     help="add all eligible nodes to multicore pool")
198 templon 2708 p.add_option("-l",action="store",dest="logfile",default=None,
199     help="log actions and information to LOGFILE (default stdout)")
200     p.add_option("-L",action="store",dest="loglevel",default="INFO",
201     help="print messages of LOGLEVEL (DEBUG, INFO, WARNING, ..., default INFO")
202 templon 2785 p.add_option("-n",action="store_true",dest="noopt",default=False,
203     help="don't do anything, just print what would have been done")
204     p.add_option("-i",action="store_true",dest="info",default=False,
205     help="print info on all nodes in multicore pool")
206 templon 2708
207     import logging
208    
209     opts, args = p.parse_args()
210    
211     # assuming loglevel is bound to the string value obtained from the
212     # command line argument. Convert to upper case to allow the user to
213     # specify --log=DEBUG or --log=debug
214    
215     numeric_level = getattr(logging, opts.loglevel.upper(), None)
216     if not isinstance(numeric_level, int):
217     raise ValueError('Invalid log level: %s' % loglevel)
218     if opts.logfile:
219     logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
220     level=numeric_level,filename=opts.logfile)
221     else:
222     logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
223     level=numeric_level)
224    
225     if os.path.isfile(UNDERPOP_NODES_FILE):
226     pkl_file = open(UNDERPOP_NODES_FILE,'rb')
227     nodes_too_few_jobs_last_run = pickle.load(pkl_file)
228     pkl_file.close()
229     else:
230     nodes_too_few_jobs_last_run = list()
231    
232     logging.debug("nodes from pickle file, marked from last run: " + \
233     repr(nodes_too_few_jobs_last_run) )
234    
235     wnodes = pbsnodes()
236     mcnodes = list()
237     waiting = None
238     node_tuples = None
239    
240     for node in wnodes:
241     if node.name in CANDIDATE_NODES and node.state.count('offline') == 0:
242     mcnodes.append(node)
243    
244     draining_slots = 0
245     mcdedicated = list()
246     draining_nodes = 0
247    
248     for node in mcnodes:
249     if 'el6' not in node.properties and 'mc' in node.properties :
250     mcdedicated.append(node)
251     draining_slots += node.freeCpu
252     if node.freeCpu > 0 : draining_nodes += 1
253    
254 templon 2785 if opts.info:
255     waiting, node_tuples = getmcjobinfo(mcdedicated)
256     for t in node_tuples:
257     if t[2] == 0:
258     print "%28s has %2d running mc jobs, %2d empty slots" % (t[0], t[2], t[3])
259     else:
260     print "%28s has %2d running mc jobs, %2d empty slots, ratio %4.1f" % (t[0], t[2], t[3], float(t[3])/t[2])
261     sys.exit(0)
262     candidate_nodes = list()
263     for n in mcnodes:
264     if n not in mcdedicated:
265     candidate_nodes.append(n)
266    
267     if opts.addall:
268     for grabbed_node in candidate_nodes:
269     logging.info("%s added to mc node pool" % (grabbed_node.name))
270     # candidate_nodes.remove(grabbed_node)
271     if not opts.noopt: add_to_mc_pool(grabbed_node)
272     sys.exit(0)
273    
274 templon 2708 # check for dedicated nodes with too few jobs
275    
276     nodes_with_too_few_jobs = list()
277    
278     for node in mcdedicated:
279     logging.debug(node.name + " has " + repr(node.freeCpu) + " free slots")
280 templon 2770 if node.freeCpu > MAXFREEPERNODE:
281 templon 2708 nodes_with_too_few_jobs.append(node)
282    
283     logging.debug("there are " + repr(len(nodes_with_too_few_jobs)) + \
284     " nodes with too few jobs")
285    
286     nodes_consistently_underpopulated = list()
287     for n in nodes_with_too_few_jobs:
288     if n.name in nodes_too_few_jobs_last_run:
289     nodes_consistently_underpopulated.append(n)
290    
291     undcount = len(nodes_consistently_underpopulated)
292     if undcount > 0:
293     logging.debug("there are " + repr(undcount) + \
294     " nodes with too few jobs that were also marked last run" )
295    
296     import random
297    
298     removed_a_node = False
299    
300     if undcount > 0:
301     logging.info("nodes consistently underpopulated:")
302     for n in nodes_consistently_underpopulated:
303     logging.info(" " + n.name)
304     if undcount > 1:
305     remcount = undcount / 2 # number to remove
306     logging.info("going to remove " + repr(remcount) + " from mc pool")
307     else:
308     remcount = 1
309    
310     # find out how many running mc jobs each node has
311    
312     waiting, node_tuples = getmcjobinfo(nodes_consistently_underpopulated)
313    
314     for node_t in node_tuples[:remcount]:
315     nname, nnode, running_mc, unused_slots = node_t
316     logging.info("dumped %d empty slots, %d mc jobs on %s" % \
317     (unused_slots, running_mc, nname) )
318     if not opts.noopt : remove_from_mc_pool(nnode)
319     nodes_with_too_few_jobs.remove(nnode)
320     nodes_consistently_underpopulated.remove(nnode)
321     removed_a_node = True
322    
323     namelist = list()
324     if len(nodes_with_too_few_jobs) > 0:
325     logging.debug("placing %d nodes " % (len(nodes_with_too_few_jobs)) + \
326     "with too few jobs on list for next run:")
327    
328     for n in nodes_with_too_few_jobs:
329     logging.debug(n.name)
330     namelist.append(n.name)
331     if not opts.noopt:
332     pkl_file = open(UNDERPOP_NODES_FILE,'wb')
333     pickle.dump(namelist,pkl_file)
334     pkl_file.close()
335     if len(nodes_with_too_few_jobs) > 0 or removed_a_node :
336     sys.exit(0)
337    
338     # if we survive to this point, all nodes 'dedicated' are being efficiently
339     # used. are we draining less than the configured max?
340    
341     logging.debug("There are " + repr(len(mcdedicated)) + " dedicated nodes and " + \
342     repr(draining_slots) + " unused slots")
343 templon 2770 if (MAXFREE - draining_slots) >= MAXFREEPERNODE:
344 templon 2708 logging.debug("%d unused slots are permitted, so %d more" % (MAXFREE, MAXFREE - draining_slots))
345 templon 2770 logging.debug("headroom of more than %d+ slots means we can try to grab another node" % (MAXFREEPERNODE) )
346 templon 2708 if draining_nodes < MAXDRAIN :
347     # first check if there are actually any waiting jobs to run; if not makes no sense to grab a node.
348     waiting, node_tuples = getmcjobinfo(mcdedicated)
349     if not waiting:
350     logging.debug("No waiting jobs found, nothing to do")
351     sys.exit(0)
352     logging.debug("max %d node(s) with unused slots permitted, %d seen" % \
353     (MAXDRAIN, draining_nodes))
354     logging.debug("there are also waiting jobs: try to grab another node")
355     # build a list of candidates to grab
356     logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))
357     if len(candidate_nodes) < 1:
358     logging.debug("no more nodes, bailing out, nothing more I can do")
359     sys.exit(0)
360 templon 2770 for nn in range(min(NODESPERGRAB,len(candidate_nodes))):
361     # logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))
362     grabbed_node=random.choice(candidate_nodes)
363     logging.info("%s added to mc node pool" % (grabbed_node.name))
364     candidate_nodes.remove(grabbed_node)
365     if not opts.noopt: add_to_mc_pool(grabbed_node)
366 templon 2708 else:
367     logging.debug("There are %d nodes with unused slots (draining)" % (draining_nodes))
368     logging.debug("This equals or exceeds the configured max of %d" % (MAXDRAIN))
369     logging.debug("Doing nothing now")
370    
371     elif draining_slots > MAXFREE:
372    
373     # find out how many running mc jobs each node has
374    
375     waiting, node_tuples = getmcjobinfo(mcdedicated)
376    
377     slots_to_recover = draining_slots - MAXFREE
378     logging.info("unused slot limit (%d) exceeded by %d " \
379     % (MAXFREE, slots_to_recover) + ": remove node(s) from mc pool")
380     slots_recovered = 0
381     while slots_recovered < slots_to_recover:
382     node_t = node_tuples.pop(0)
383     nname, nnode, running_mc, unused_slots = node_t
384     if unused_slots > 0:
385     logging.info("dumped %d empty slots, %d mc jobs on %s" % \
386     (unused_slots, running_mc, nname) )
387     if not opts.noopt : remove_from_mc_pool(nnode)
388     slots_recovered += unused_slots
389     else:
390     logging.debug("%d unused slots of allowed %d" % (draining_slots, MAXFREE))
391 templon 2770 logging.debug("difference is %d which is less than %d" % \
392     (MAXFREE - draining_slots, MAXFREEPERNODE) )
393 templon 2708 logging.debug("so: doing nothing now.")

Properties

Name Value
svn:executable *

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28