/[pdpsoft]/nl.nikhef.ndpf.mcfloat/trunk/mcfloat
ViewVC logotype

Annotation of /nl.nikhef.ndpf.mcfloat/trunk/mcfloat

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2708 - (hide annotations) (download)
Thu May 1 08:21:16 2014 UTC (8 years, 4 months ago) by templon
File size: 12821 byte(s)
First commit

1 templon 2708 #!/usr/bin/env python
2     from torque_utils import pbsnodes
3    
4     # settings below appropriate for knal nodes (32 cores)
5     # ... want to keep num of draining nodes
6     # to MAXFREE / 7 since giving back a node could be expensive.
7     # it might already be running several mc jobs
8    
9     MAXDRAIN = 7 # max num of nodes allowed to drain
10     MAXFREE = 49 # max num of free slots to tolerate
11     CANDIDATE_NODES = [ 'wn-knal-0%02d.farm.nikhef.nl' % (n) for n in range(1,19) ]
12     MCQUEUE = 'atlasmc'
13    
14     # settings below more appropriate for smrt nodes (8 core)
15     # here giving a node back doesn't really matter, it wasn't
16     # running any mc jobs yet anyway. better to grab lots
17     # of nodes and give back the ones that drain the slowest if
18     # too many slots are free.
19    
20     # MAXDRAIN = 16 # max num of nodes allowed to drain
21     # MAXFREE = 49 # max num of free slots to tolerate
22     # CANDIDATE_NODES = [ 'wn-smrt-0%02d.farm.nikhef.nl' % (n) for n in range(1,73) ]
23    
24     UNDERPOP_NODES_FILE = ".nodes_with_too_few_jobs"
25     TORQUE = "stro.nikhef.nl"
26    
27     import os
28     import pickle
29     import sys
30     import subprocess
31    
32     def getmcjobinfo(nodes):
33    
34     # input is the list of nodes for which to get info
35     # function returns tuple (jobs_waiting, node_info)
36     # jobs_waiting boolean value : are there MC jobs waiting to be run (state Q)? (independent of nodes!)
37     # node_info: list of tuples (wn_name, wn, num_running_mc, num_free_slots)
38     # where wn is the node object for the machine with name wn_name, num_running_mc is the number of
39     # mc jobs running on that node, and num_free_slots is the number of unused job slots on that node
40    
41     usedkeys = ['egroup', 'jobid', 'queue', 'job_state', 'euser', 'exec_host' ]
42    
43     import torqueJobs
44     import torqueAttMappers as tam
45    
46     import tempfile
47     qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp")
48    
49     import time
50     os.system('/usr/bin/qstat -f @' + TORQUE + ' > ' + qsfname)
51     now = time.mktime(time.localtime())
52    
53     jlist = torqueJobs.qs_parsefile(qsfname)
54    
55     os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt')
56    
57     def mapatts(indict,inkeys):
58     sdict = tam.sub_dict(indict,inkeys)
59     odict = dict()
60     for k in sdict.keys():
61     if k in tam.tfl and sdict[k]:
62     secs = tam.hms(sdict[k])
63     sdict[k] = secs/3600.
64     if k in tam.mfl and sdict[k]:
65     mebi = tam.memconvert(sdict[k])
66     sdict[k] = mebi
67     if k in tam.tfl2 and sdict[k]:
68     secs = tam.tconv(sdict[k])
69     sdict[k] = secs
70     elif k == 'job_state':
71     statelett = sdict[k]
72     if statelett in ['Q','W']:
73     sdict[k] = 'queued'
74     elif statelett in ['R','E']:
75     sdict[k] = 'running'
76     elif k == 'exec_host' and sdict[k]:
77     termpos = sdict[k].find('/')
78     wnstring = sdict[k][:termpos]
79     sdict[k] = wnstring
80     if sdict[k]:
81     odict[k] = sdict[k]
82     return odict
83    
84     mcjoblist = list()
85     waitingJobs = False
86     for j in jlist:
87     if j['queue'] == MCQUEUE:
88     newj = mapatts(j,usedkeys)
89     mcjoblist.append(newj)
90     if newj['job_state'] == 'queued':
91     waitingJobs = True
92    
93     # find out how many running mc jobs each node has
94    
95     running_mc = dict()
96     for n in nodes:
97     running_mc[n.name] = 0
98     for j in mcjoblist:
99     if j['job_state'] =='running':
100     wn = j['exec_host']
101     if wn in running_mc.keys():
102     running_mc[wn] += 1
103     rawlist = list()
104     for n in nodes:
105     rawlist.append( (n.name, n, running_mc[n.name], n.freeCpu) )
106     from operator import itemgetter
107     slist = sorted(rawlist, key=lambda x: (x[2], x[3]))
108    
109     return waitingJobs, slist
110    
111     def remove_from_mc_pool(node):
112     # print "adding el6 tag back to node", node.name
113     if "el6" not in node.properties :
114     proc = subprocess.Popen(['/usr/bin/qmgr'],
115     stdin=subprocess.PIPE,
116     stdout=subprocess.PIPE,)
117     proc.stdin.write( 'set node ' + node.name + \
118     ' properties += el6\n' )
119     proc.stdin.write( 'print node ' + node.name + \
120     ' properties\n' )
121     out = proc.communicate()[0]
122     # print out
123     if "mc" in node.properties :
124     proc = subprocess.Popen(['/usr/bin/qmgr'],
125     stdin=subprocess.PIPE,
126     stdout=subprocess.PIPE,)
127     proc.stdin.write( 'set node ' + node.name + \
128     ' properties -= mc\n' )
129     proc.stdin.write( 'print node ' + node.name + \
130     ' properties\n' )
131     out = proc.communicate()[0]
132     # print out
133    
134     def add_to_mc_pool(node):
135     # print "node props b4:", grabbed_node.properties
136     if "el6" in node.properties :
137     proc = subprocess.Popen(['/usr/bin/qmgr'],
138     stdin=subprocess.PIPE,
139     stdout=subprocess.PIPE,)
140     proc.stdin.write( 'set node ' + node.name + \
141     ' properties -= el6\n' )
142     proc.stdin.write( 'print node ' + node.name + \
143     ' properties\n' )
144     out = proc.communicate()[0]
145    
146     if "mc" not in node.properties :
147     proc = subprocess.Popen(['/usr/bin/qmgr'],
148     stdin=subprocess.PIPE,
149     stdout=subprocess.PIPE,)
150     proc.stdin.write( 'set node ' + node.name + \
151     ' properties += mc\n' )
152     proc.stdin.write( 'print node ' + node.name + \
153     ' properties\n' )
154     out = proc.communicate()[0]
155    
156     import optparse
157    
158     usage = "usage: %prog [-d debug_level] [-n]"
159    
160     p = optparse.OptionParser(description="Monitor state of multicore pool and adjust size as needed",
161     usage=usage)
162    
163     p.add_option("-n",action="store_true",dest="noopt",default=False,
164     help="don't do anything, just print what would have been done")
165     p.add_option("-l",action="store",dest="logfile",default=None,
166     help="log actions and information to LOGFILE (default stdout)")
167     p.add_option("-L",action="store",dest="loglevel",default="INFO",
168     help="print messages of LOGLEVEL (DEBUG, INFO, WARNING, ..., default INFO")
169    
170     import logging
171    
172     opts, args = p.parse_args()
173    
174     # assuming loglevel is bound to the string value obtained from the
175     # command line argument. Convert to upper case to allow the user to
176     # specify --log=DEBUG or --log=debug
177    
178     numeric_level = getattr(logging, opts.loglevel.upper(), None)
179     if not isinstance(numeric_level, int):
180     raise ValueError('Invalid log level: %s' % loglevel)
181     if opts.logfile:
182     logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
183     level=numeric_level,filename=opts.logfile)
184     else:
185     logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
186     level=numeric_level)
187    
188     if os.path.isfile(UNDERPOP_NODES_FILE):
189     pkl_file = open(UNDERPOP_NODES_FILE,'rb')
190     nodes_too_few_jobs_last_run = pickle.load(pkl_file)
191     pkl_file.close()
192     else:
193     nodes_too_few_jobs_last_run = list()
194    
195     logging.debug("nodes from pickle file, marked from last run: " + \
196     repr(nodes_too_few_jobs_last_run) )
197    
198     wnodes = pbsnodes()
199     mcnodes = list()
200     waiting = None
201     node_tuples = None
202    
203     for node in wnodes:
204     if node.name in CANDIDATE_NODES and node.state.count('offline') == 0:
205     mcnodes.append(node)
206    
207     draining_slots = 0
208     mcdedicated = list()
209     draining_nodes = 0
210    
211     for node in mcnodes:
212     if 'el6' not in node.properties and 'mc' in node.properties :
213     mcdedicated.append(node)
214     draining_slots += node.freeCpu
215     if node.freeCpu > 0 : draining_nodes += 1
216    
217     # check for dedicated nodes with too few jobs
218    
219     nodes_with_too_few_jobs = list()
220    
221     for node in mcdedicated:
222     logging.debug(node.name + " has " + repr(node.freeCpu) + " free slots")
223     if node.freeCpu > 7:
224     nodes_with_too_few_jobs.append(node)
225    
226     logging.debug("there are " + repr(len(nodes_with_too_few_jobs)) + \
227     " nodes with too few jobs")
228    
229     nodes_consistently_underpopulated = list()
230     for n in nodes_with_too_few_jobs:
231     if n.name in nodes_too_few_jobs_last_run:
232     nodes_consistently_underpopulated.append(n)
233    
234     undcount = len(nodes_consistently_underpopulated)
235     if undcount > 0:
236     logging.debug("there are " + repr(undcount) + \
237     " nodes with too few jobs that were also marked last run" )
238    
239     import random
240    
241     removed_a_node = False
242    
243     if undcount > 0:
244     logging.info("nodes consistently underpopulated:")
245     for n in nodes_consistently_underpopulated:
246     logging.info(" " + n.name)
247     if undcount > 1:
248     remcount = undcount / 2 # number to remove
249     logging.info("going to remove " + repr(remcount) + " from mc pool")
250     else:
251     remcount = 1
252    
253     # find out how many running mc jobs each node has
254    
255     waiting, node_tuples = getmcjobinfo(nodes_consistently_underpopulated)
256    
257     for node_t in node_tuples[:remcount]:
258     nname, nnode, running_mc, unused_slots = node_t
259     logging.info("dumped %d empty slots, %d mc jobs on %s" % \
260     (unused_slots, running_mc, nname) )
261     if not opts.noopt : remove_from_mc_pool(nnode)
262     nodes_with_too_few_jobs.remove(nnode)
263     nodes_consistently_underpopulated.remove(nnode)
264     removed_a_node = True
265    
266     namelist = list()
267     if len(nodes_with_too_few_jobs) > 0:
268     logging.debug("placing %d nodes " % (len(nodes_with_too_few_jobs)) + \
269     "with too few jobs on list for next run:")
270    
271     for n in nodes_with_too_few_jobs:
272     logging.debug(n.name)
273     namelist.append(n.name)
274     if not opts.noopt:
275     pkl_file = open(UNDERPOP_NODES_FILE,'wb')
276     pickle.dump(namelist,pkl_file)
277     pkl_file.close()
278     if len(nodes_with_too_few_jobs) > 0 or removed_a_node :
279     sys.exit(0)
280    
281     # if we survive to this point, all nodes 'dedicated' are being efficiently
282     # used. are we draining less than the configured max?
283    
284     logging.debug("There are " + repr(len(mcdedicated)) + " dedicated nodes and " + \
285     repr(draining_slots) + " unused slots")
286     if (MAXFREE - draining_slots) >= 7:
287     logging.debug("%d unused slots are permitted, so %d more" % (MAXFREE, MAXFREE - draining_slots))
288     logging.debug("headroom of 7+ slots means we can try to grab another node")
289     if draining_nodes < MAXDRAIN :
290     # first check if there are actually any waiting jobs to run; if not makes no sense to grab a node.
291     waiting, node_tuples = getmcjobinfo(mcdedicated)
292     if not waiting:
293     logging.debug("No waiting jobs found, nothing to do")
294     sys.exit(0)
295     logging.debug("max %d node(s) with unused slots permitted, %d seen" % \
296     (MAXDRAIN, draining_nodes))
297     logging.debug("there are also waiting jobs: try to grab another node")
298     # build a list of candidates to grab
299     candidate_nodes = list()
300     for n in mcnodes:
301     if n not in mcdedicated:
302     candidate_nodes.append(n)
303     logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))
304     if len(candidate_nodes) < 1:
305     logging.debug("no more nodes, bailing out, nothing more I can do")
306     sys.exit(0)
307     grabbed_node=random.choice(candidate_nodes)
308     logging.info("%s added to mc node pool" % (grabbed_node.name))
309     if not opts.noopt: add_to_mc_pool(grabbed_node)
310     else:
311     logging.debug("There are %d nodes with unused slots (draining)" % (draining_nodes))
312     logging.debug("This equals or exceeds the configured max of %d" % (MAXDRAIN))
313     logging.debug("Doing nothing now")
314    
315     elif draining_slots > MAXFREE:
316    
317     # find out how many running mc jobs each node has
318    
319     waiting, node_tuples = getmcjobinfo(mcdedicated)
320    
321     slots_to_recover = draining_slots - MAXFREE
322     logging.info("unused slot limit (%d) exceeded by %d " \
323     % (MAXFREE, slots_to_recover) + ": remove node(s) from mc pool")
324     slots_recovered = 0
325     while slots_recovered < slots_to_recover:
326     node_t = node_tuples.pop(0)
327     nname, nnode, running_mc, unused_slots = node_t
328     if unused_slots > 0:
329     logging.info("dumped %d empty slots, %d mc jobs on %s" % \
330     (unused_slots, running_mc, nname) )
331     if not opts.noopt : remove_from_mc_pool(nnode)
332     slots_recovered += unused_slots
333     else:
334     logging.debug("%d unused slots of allowed %d" % (draining_slots, MAXFREE))
335     logging.debug("difference is %d which is less than 7" % \
336     (MAXFREE - draining_slots) )
337     logging.debug("so: doing nothing now.")

Properties

Name Value
svn:executable *

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28