/[pdpsoft]/nl.nikhef.ndpf.mcfloat/trunk/mcfloat
ViewVC logotype

Annotation of /nl.nikhef.ndpf.mcfloat/trunk/mcfloat

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2770 - (hide annotations) (download)
Thu Mar 12 12:54:43 2015 UTC (7 years, 6 months ago) by templon
File size: 14251 byte(s)
numerous changes, mostly improvements to algorithm -- make wiser choices about which
nodes to throw away (preserve drained and draining slots as much as possible).
also now default config is for 4-core jobs with more nodes configured as candidate

1 templon 2708 #!/usr/bin/env python
2     from torque_utils import pbsnodes
3    
4 templon 2770 # this config allows mcore to run up to full ATLAS allocation (27.02.2015)
5     MAXDRAIN = 32 # max num of nodes allowed to drain
6     MAXFREE = 49 # max num of free slots to tolerate
7     MAXFREEPERNODE = 3
8     NODESPERGRAB = 3 # number of nodes to grab each time around
9     CANDIDATE_NODES = [ 'wn-car-0%02d.farm.nikhef.nl' % (n) for n in range(1,97) ] + \
10     [ 'wn-knal-0%02d.farm.nikhef.nl' % (n) for n in range(1,19) ]
11     MCQUEUE = 'atlasmc'
12    
13 templon 2708 # settings below appropriate for knal nodes (32 cores)
14     # ... want to keep num of draining nodes
15 templon 2770 # to MAXFREE / 3 since giving back a node could be expensive.
16 templon 2708 # it might already be running several mc jobs
17    
18 templon 2770 # MAXDRAIN = 16 # max num of nodes allowed to drain
19     # MAXFREE = 49 # max num of free slots to tolerate
20     # MAXFREEPERNODE = 3
21     # NODESPERGRAB = 1
22     # CANDIDATE_NODES = [ 'wn-knal-0%02d.farm.nikhef.nl' % (n) for n in range(1,19) ]
23     # MCQUEUE = 'atlasmc'
24 templon 2708
25     # settings below more appropriate for smrt nodes (8 core)
26     # here giving a node back doesn't really matter, it wasn't
27     # running any mc jobs yet anyway. better to grab lots
28     # of nodes and give back the ones that drain the slowest if
29     # too many slots are free.
30    
31     # MAXDRAIN = 16 # max num of nodes allowed to drain
32     # MAXFREE = 49 # max num of free slots to tolerate
33     # CANDIDATE_NODES = [ 'wn-smrt-0%02d.farm.nikhef.nl' % (n) for n in range(1,73) ]
34    
35     UNDERPOP_NODES_FILE = ".nodes_with_too_few_jobs"
36     TORQUE = "stro.nikhef.nl"
37    
38     import os
39     import pickle
40     import sys
41     import subprocess
42    
43     def getmcjobinfo(nodes):
44    
45     # input is the list of nodes for which to get info
46     # function returns tuple (jobs_waiting, node_info)
47     # jobs_waiting boolean value : are there MC jobs waiting to be run (state Q)? (independent of nodes!)
48     # node_info: list of tuples (wn_name, wn, num_running_mc, num_free_slots)
49     # where wn is the node object for the machine with name wn_name, num_running_mc is the number of
50     # mc jobs running on that node, and num_free_slots is the number of unused job slots on that node
51    
52     usedkeys = ['egroup', 'jobid', 'queue', 'job_state', 'euser', 'exec_host' ]
53    
54     import torqueJobs
55     import torqueAttMappers as tam
56    
57     import tempfile
58     qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp")
59    
60     import time
61     os.system('/usr/bin/qstat -f @' + TORQUE + ' > ' + qsfname)
62     now = time.mktime(time.localtime())
63    
64     jlist = torqueJobs.qs_parsefile(qsfname)
65    
66     os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt')
67    
68     def mapatts(indict,inkeys):
69     sdict = tam.sub_dict(indict,inkeys)
70     odict = dict()
71     for k in sdict.keys():
72     if k in tam.tfl and sdict[k]:
73     secs = tam.hms(sdict[k])
74     sdict[k] = secs/3600.
75     if k in tam.mfl and sdict[k]:
76     mebi = tam.memconvert(sdict[k])
77     sdict[k] = mebi
78     if k in tam.tfl2 and sdict[k]:
79     secs = tam.tconv(sdict[k])
80     sdict[k] = secs
81     elif k == 'job_state':
82     statelett = sdict[k]
83     if statelett in ['Q','W']:
84     sdict[k] = 'queued'
85     elif statelett in ['R','E']:
86     sdict[k] = 'running'
87     elif k == 'exec_host' and sdict[k]:
88     termpos = sdict[k].find('/')
89     wnstring = sdict[k][:termpos]
90     sdict[k] = wnstring
91     if sdict[k]:
92     odict[k] = sdict[k]
93     return odict
94    
95     mcjoblist = list()
96     waitingJobs = False
97     for j in jlist:
98     if j['queue'] == MCQUEUE:
99     newj = mapatts(j,usedkeys)
100     mcjoblist.append(newj)
101     if newj['job_state'] == 'queued':
102     waitingJobs = True
103    
104     # find out how many running mc jobs each node has
105    
106     running_mc = dict()
107     for n in nodes:
108     running_mc[n.name] = 0
109     for j in mcjoblist:
110     if j['job_state'] =='running':
111     wn = j['exec_host']
112     if wn in running_mc.keys():
113     running_mc[wn] += 1
114     rawlist = list()
115     for n in nodes:
116     rawlist.append( (n.name, n, running_mc[n.name], n.freeCpu) )
117     from operator import itemgetter
118 templon 2770 def srtrunningfree(item):
119     # sort optimal for dropping nodes; for nodes with zero running mc jobs, want to
120     # drop least-drained nodes (fewer free slots first in list); for nodes with
121     # running mc jobs, want to shed as few nodes as possible so pick the more-drained
122     # nodes first. this probably also works for shedding nodes when queue dries up.
123     mcrun = item[2]
124     emptyslots = item[3]
125     if mcrun == 0:
126     rank = emptyslots
127     else:
128     rank = 32*mcrun - emptyslots # 32 just a number bigger than expected emptyslots value
129     return rank
130     slist = sorted(rawlist, key=srtrunningfree)
131 templon 2708
132     return waitingJobs, slist
133    
134     def remove_from_mc_pool(node):
135     # print "adding el6 tag back to node", node.name
136     if "el6" not in node.properties :
137     proc = subprocess.Popen(['/usr/bin/qmgr'],
138     stdin=subprocess.PIPE,
139     stdout=subprocess.PIPE,)
140     proc.stdin.write( 'set node ' + node.name + \
141     ' properties += el6\n' )
142     proc.stdin.write( 'print node ' + node.name + \
143     ' properties\n' )
144     out = proc.communicate()[0]
145     # print out
146     if "mc" in node.properties :
147     proc = subprocess.Popen(['/usr/bin/qmgr'],
148     stdin=subprocess.PIPE,
149     stdout=subprocess.PIPE,)
150     proc.stdin.write( 'set node ' + node.name + \
151     ' properties -= mc\n' )
152     proc.stdin.write( 'print node ' + node.name + \
153     ' properties\n' )
154     out = proc.communicate()[0]
155     # print out
156    
157     def add_to_mc_pool(node):
158     # print "node props b4:", grabbed_node.properties
159     if "el6" in node.properties :
160     proc = subprocess.Popen(['/usr/bin/qmgr'],
161     stdin=subprocess.PIPE,
162     stdout=subprocess.PIPE,)
163     proc.stdin.write( 'set node ' + node.name + \
164     ' properties -= el6\n' )
165     proc.stdin.write( 'print node ' + node.name + \
166     ' properties\n' )
167     out = proc.communicate()[0]
168    
169     if "mc" not in node.properties :
170     proc = subprocess.Popen(['/usr/bin/qmgr'],
171     stdin=subprocess.PIPE,
172     stdout=subprocess.PIPE,)
173     proc.stdin.write( 'set node ' + node.name + \
174     ' properties += mc\n' )
175     proc.stdin.write( 'print node ' + node.name + \
176     ' properties\n' )
177     out = proc.communicate()[0]
178    
179     import optparse
180    
181     usage = "usage: %prog [-d debug_level] [-n]"
182    
183     p = optparse.OptionParser(description="Monitor state of multicore pool and adjust size as needed",
184     usage=usage)
185    
186     p.add_option("-n",action="store_true",dest="noopt",default=False,
187     help="don't do anything, just print what would have been done")
188     p.add_option("-l",action="store",dest="logfile",default=None,
189     help="log actions and information to LOGFILE (default stdout)")
190     p.add_option("-L",action="store",dest="loglevel",default="INFO",
191     help="print messages of LOGLEVEL (DEBUG, INFO, WARNING, ..., default INFO")
192    
193     import logging
194    
195     opts, args = p.parse_args()
196    
197     # assuming loglevel is bound to the string value obtained from the
198     # command line argument. Convert to upper case to allow the user to
199     # specify --log=DEBUG or --log=debug
200    
201     numeric_level = getattr(logging, opts.loglevel.upper(), None)
202     if not isinstance(numeric_level, int):
203     raise ValueError('Invalid log level: %s' % loglevel)
204     if opts.logfile:
205     logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
206     level=numeric_level,filename=opts.logfile)
207     else:
208     logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
209     level=numeric_level)
210    
211     if os.path.isfile(UNDERPOP_NODES_FILE):
212     pkl_file = open(UNDERPOP_NODES_FILE,'rb')
213     nodes_too_few_jobs_last_run = pickle.load(pkl_file)
214     pkl_file.close()
215     else:
216     nodes_too_few_jobs_last_run = list()
217    
218     logging.debug("nodes from pickle file, marked from last run: " + \
219     repr(nodes_too_few_jobs_last_run) )
220    
221     wnodes = pbsnodes()
222     mcnodes = list()
223     waiting = None
224     node_tuples = None
225    
226     for node in wnodes:
227     if node.name in CANDIDATE_NODES and node.state.count('offline') == 0:
228     mcnodes.append(node)
229    
230     draining_slots = 0
231     mcdedicated = list()
232     draining_nodes = 0
233    
234     for node in mcnodes:
235     if 'el6' not in node.properties and 'mc' in node.properties :
236     mcdedicated.append(node)
237     draining_slots += node.freeCpu
238     if node.freeCpu > 0 : draining_nodes += 1
239    
240     # check for dedicated nodes with too few jobs
241    
242     nodes_with_too_few_jobs = list()
243    
244     for node in mcdedicated:
245     logging.debug(node.name + " has " + repr(node.freeCpu) + " free slots")
246 templon 2770 if node.freeCpu > MAXFREEPERNODE:
247 templon 2708 nodes_with_too_few_jobs.append(node)
248    
249     logging.debug("there are " + repr(len(nodes_with_too_few_jobs)) + \
250     " nodes with too few jobs")
251    
252     nodes_consistently_underpopulated = list()
253     for n in nodes_with_too_few_jobs:
254     if n.name in nodes_too_few_jobs_last_run:
255     nodes_consistently_underpopulated.append(n)
256    
257     undcount = len(nodes_consistently_underpopulated)
258     if undcount > 0:
259     logging.debug("there are " + repr(undcount) + \
260     " nodes with too few jobs that were also marked last run" )
261    
262     import random
263    
264     removed_a_node = False
265    
266     if undcount > 0:
267     logging.info("nodes consistently underpopulated:")
268     for n in nodes_consistently_underpopulated:
269     logging.info(" " + n.name)
270     if undcount > 1:
271     remcount = undcount / 2 # number to remove
272     logging.info("going to remove " + repr(remcount) + " from mc pool")
273     else:
274     remcount = 1
275    
276     # find out how many running mc jobs each node has
277    
278     waiting, node_tuples = getmcjobinfo(nodes_consistently_underpopulated)
279    
280     for node_t in node_tuples[:remcount]:
281     nname, nnode, running_mc, unused_slots = node_t
282     logging.info("dumped %d empty slots, %d mc jobs on %s" % \
283     (unused_slots, running_mc, nname) )
284     if not opts.noopt : remove_from_mc_pool(nnode)
285     nodes_with_too_few_jobs.remove(nnode)
286     nodes_consistently_underpopulated.remove(nnode)
287     removed_a_node = True
288    
289     namelist = list()
290     if len(nodes_with_too_few_jobs) > 0:
291     logging.debug("placing %d nodes " % (len(nodes_with_too_few_jobs)) + \
292     "with too few jobs on list for next run:")
293    
294     for n in nodes_with_too_few_jobs:
295     logging.debug(n.name)
296     namelist.append(n.name)
297     if not opts.noopt:
298     pkl_file = open(UNDERPOP_NODES_FILE,'wb')
299     pickle.dump(namelist,pkl_file)
300     pkl_file.close()
301     if len(nodes_with_too_few_jobs) > 0 or removed_a_node :
302     sys.exit(0)
303    
304     # if we survive to this point, all nodes 'dedicated' are being efficiently
305     # used. are we draining less than the configured max?
306    
307     logging.debug("There are " + repr(len(mcdedicated)) + " dedicated nodes and " + \
308     repr(draining_slots) + " unused slots")
309 templon 2770 if (MAXFREE - draining_slots) >= MAXFREEPERNODE:
310 templon 2708 logging.debug("%d unused slots are permitted, so %d more" % (MAXFREE, MAXFREE - draining_slots))
311 templon 2770 logging.debug("headroom of more than %d+ slots means we can try to grab another node" % (MAXFREEPERNODE) )
312 templon 2708 if draining_nodes < MAXDRAIN :
313     # first check if there are actually any waiting jobs to run; if not makes no sense to grab a node.
314     waiting, node_tuples = getmcjobinfo(mcdedicated)
315     if not waiting:
316     logging.debug("No waiting jobs found, nothing to do")
317     sys.exit(0)
318     logging.debug("max %d node(s) with unused slots permitted, %d seen" % \
319     (MAXDRAIN, draining_nodes))
320     logging.debug("there are also waiting jobs: try to grab another node")
321     # build a list of candidates to grab
322     candidate_nodes = list()
323     for n in mcnodes:
324     if n not in mcdedicated:
325     candidate_nodes.append(n)
326     logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))
327     if len(candidate_nodes) < 1:
328     logging.debug("no more nodes, bailing out, nothing more I can do")
329     sys.exit(0)
330 templon 2770 for nn in range(min(NODESPERGRAB,len(candidate_nodes))):
331     # logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))
332     grabbed_node=random.choice(candidate_nodes)
333     logging.info("%s added to mc node pool" % (grabbed_node.name))
334     candidate_nodes.remove(grabbed_node)
335     if not opts.noopt: add_to_mc_pool(grabbed_node)
336 templon 2708 else:
337     logging.debug("There are %d nodes with unused slots (draining)" % (draining_nodes))
338     logging.debug("This equals or exceeds the configured max of %d" % (MAXDRAIN))
339     logging.debug("Doing nothing now")
340    
341     elif draining_slots > MAXFREE:
342    
343     # find out how many running mc jobs each node has
344    
345     waiting, node_tuples = getmcjobinfo(mcdedicated)
346    
347     slots_to_recover = draining_slots - MAXFREE
348     logging.info("unused slot limit (%d) exceeded by %d " \
349     % (MAXFREE, slots_to_recover) + ": remove node(s) from mc pool")
350     slots_recovered = 0
351     while slots_recovered < slots_to_recover:
352     node_t = node_tuples.pop(0)
353     nname, nnode, running_mc, unused_slots = node_t
354     if unused_slots > 0:
355     logging.info("dumped %d empty slots, %d mc jobs on %s" % \
356     (unused_slots, running_mc, nname) )
357     if not opts.noopt : remove_from_mc_pool(nnode)
358     slots_recovered += unused_slots
359     else:
360     logging.debug("%d unused slots of allowed %d" % (draining_slots, MAXFREE))
361 templon 2770 logging.debug("difference is %d which is less than %d" % \
362     (MAXFREE - draining_slots, MAXFREEPERNODE) )
363 templon 2708 logging.debug("so: doing nothing now.")

Properties

Name Value
svn:executable *

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28