1 |
templon |
2708 |
#!/usr/bin/env python |
2 |
templon |
2844 |
# $Id$ |
3 |
|
|
# Source: $URL$ |
4 |
|
|
# J. A. Templon, NIKHEF/PDP 2014 |
5 |
templon |
2708 |
|
6 |
templon |
2842 |
# Class to hold parameters: |
7 |
|
|
|
8 |
|
|
class mcfloat_cfg: |
9 |
|
|
def __init__(self): # default values; adjust for your site |
10 |
|
|
self.maxdrain = 32 |
11 |
|
|
self.maxfree = 49 |
12 |
|
|
self.maxfreepernode = 3 |
13 |
|
|
self.nodespergrab = 3 |
14 |
templon |
3245 |
self.capacityfrac = 0.48 |
15 |
templon |
3259 |
self.mcqueue = 'atlasmc7' |
16 |
templon |
2842 |
self.underpop_nodes_file = "/var/db/nodes_with_too_few_jobs" |
17 |
templon |
3139 |
self.torque = "korf.nikhef.nl" |
18 |
templon |
2845 |
def __repr__(self): |
19 |
templon |
3331 |
return ("Capacity Fraction %4.2f\nMax Drain %d\nMax Free %d\n" + \ |
20 |
|
|
"Max Free per Node %d") % (self.capacityfrac, |
21 |
|
|
self.maxdrain, |
22 |
|
|
self.maxfree, |
23 |
|
|
self.maxfreepernode ) |
24 |
templon |
2842 |
|
25 |
|
|
MCFLOAT_CONFIG_FILE = "/var/db/mcfloat_config" |
26 |
|
|
|
27 |
|
|
# the above config allows mcore to run up to full ATLAS allocation (07.04.2015) |
28 |
templon |
2785 |
# full allocation right now is 47% of "non-retired" capacity = knal + mars + car |
29 |
|
|
# just configure 47% of each class. |
30 |
|
|
|
31 |
templon |
2842 |
# override when lsgrid is running (legacy comments, might be useful) |
32 |
templon |
2785 |
|
33 |
templon |
2841 |
# MAXFREEPERNODE = 29 # allow lsgrid 30-core jobs |
34 |
|
|
# MAXFREE = 149 # max num of free slots to tolerate |
35 |
|
|
|
36 |
|
|
# override when bbmri is running |
37 |
|
|
|
38 |
|
|
# MAXFREE = 149 # max num of free slots to tolerate |
39 |
|
|
# MAXFREEPERNODE = 9 # allow bbmri 10-core jobs |
40 |
|
|
|
41 |
templon |
2708 |
import os |
42 |
|
|
import pickle |
43 |
|
|
import sys |
44 |
|
|
import subprocess |
45 |
|
|
|
46 |
templon |
2842 |
def getmcjobinfo(nodes, cfg): |
47 |
templon |
2708 |
|
48 |
|
|
# input is the list of nodes for which to get info |
49 |
|
|
# function returns tuple (jobs_waiting, node_info) |
50 |
|
|
# jobs_waiting boolean value : are there MC jobs waiting to be run (state Q)? (independent of nodes!) |
51 |
|
|
# node_info: list of tuples (wn_name, wn, num_running_mc, num_free_slots) |
52 |
|
|
# where wn is the node object for the machine with name wn_name, num_running_mc is the number of |
53 |
|
|
# mc jobs running on that node, and num_free_slots is the number of unused job slots on that node |
54 |
|
|
|
55 |
|
|
usedkeys = ['egroup', 'jobid', 'queue', 'job_state', 'euser', 'exec_host' ] |
56 |
|
|
|
57 |
|
|
import torqueJobs |
58 |
|
|
import torqueAttMappers as tam |
59 |
|
|
|
60 |
|
|
import tempfile |
61 |
|
|
qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp") |
62 |
|
|
|
63 |
|
|
import time |
64 |
templon |
2842 |
os.system('/usr/bin/qstat -f @' + cfg.torque + ' > ' + qsfname) |
65 |
templon |
2708 |
now = time.mktime(time.localtime()) |
66 |
|
|
|
67 |
|
|
jlist = torqueJobs.qs_parsefile(qsfname) |
68 |
|
|
|
69 |
|
|
os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt') |
70 |
|
|
|
71 |
|
|
def mapatts(indict,inkeys): |
72 |
|
|
sdict = tam.sub_dict(indict,inkeys) |
73 |
|
|
odict = dict() |
74 |
|
|
for k in sdict.keys(): |
75 |
|
|
if k in tam.tfl and sdict[k]: |
76 |
|
|
secs = tam.hms(sdict[k]) |
77 |
|
|
sdict[k] = secs/3600. |
78 |
|
|
if k in tam.mfl and sdict[k]: |
79 |
|
|
mebi = tam.memconvert(sdict[k]) |
80 |
|
|
sdict[k] = mebi |
81 |
|
|
if k in tam.tfl2 and sdict[k]: |
82 |
|
|
secs = tam.tconv(sdict[k]) |
83 |
|
|
sdict[k] = secs |
84 |
|
|
elif k == 'job_state': |
85 |
|
|
statelett = sdict[k] |
86 |
|
|
if statelett in ['Q','W']: |
87 |
|
|
sdict[k] = 'queued' |
88 |
|
|
elif statelett in ['R','E']: |
89 |
|
|
sdict[k] = 'running' |
90 |
|
|
elif k == 'exec_host' and sdict[k]: |
91 |
|
|
termpos = sdict[k].find('/') |
92 |
|
|
wnstring = sdict[k][:termpos] |
93 |
|
|
sdict[k] = wnstring |
94 |
|
|
if sdict[k]: |
95 |
|
|
odict[k] = sdict[k] |
96 |
|
|
return odict |
97 |
|
|
|
98 |
|
|
mcjoblist = list() |
99 |
|
|
waitingJobs = False |
100 |
|
|
for j in jlist: |
101 |
templon |
2842 |
if j['queue'] == cfg.mcqueue: |
102 |
templon |
2708 |
newj = mapatts(j,usedkeys) |
103 |
|
|
mcjoblist.append(newj) |
104 |
|
|
if newj['job_state'] == 'queued': |
105 |
|
|
waitingJobs = True |
106 |
|
|
|
107 |
|
|
# find out how many running mc jobs each node has |
108 |
|
|
|
109 |
|
|
running_mc = dict() |
110 |
|
|
for n in nodes: |
111 |
|
|
running_mc[n.name] = 0 |
112 |
|
|
for j in mcjoblist: |
113 |
|
|
if j['job_state'] =='running': |
114 |
|
|
wn = j['exec_host'] |
115 |
|
|
if wn in running_mc.keys(): |
116 |
|
|
running_mc[wn] += 1 |
117 |
|
|
rawlist = list() |
118 |
|
|
for n in nodes: |
119 |
|
|
rawlist.append( (n.name, n, running_mc[n.name], n.freeCpu) ) |
120 |
|
|
from operator import itemgetter |
121 |
templon |
2770 |
def srtrunningfree(item): |
122 |
|
|
# sort optimal for dropping nodes; for nodes with zero running mc jobs, want to |
123 |
|
|
# drop least-drained nodes (fewer free slots first in list); for nodes with |
124 |
|
|
# running mc jobs, want to shed as few nodes as possible so pick the more-drained |
125 |
|
|
# nodes first. this probably also works for shedding nodes when queue dries up. |
126 |
|
|
mcrun = item[2] |
127 |
|
|
emptyslots = item[3] |
128 |
|
|
if mcrun == 0: |
129 |
|
|
rank = emptyslots |
130 |
|
|
else: |
131 |
templon |
2786 |
# the original "optimal" order: rank = 32*mcrun - emptyslots # 32 just a number bigger than expected emptyslots value |
132 |
|
|
rank = 7168 - 224*float(emptyslots)/mcrun + mcrun # constants 7168 and 224 only serve to enable a multi-level sort. |
133 |
templon |
2770 |
return rank |
134 |
|
|
slist = sorted(rawlist, key=srtrunningfree) |
135 |
templon |
2708 |
|
136 |
|
|
return waitingJobs, slist |
137 |
|
|
|
138 |
|
|
def remove_from_mc_pool(node): |
139 |
templon |
3259 |
# print "adding el7 tag back to node", node.name |
140 |
|
|
if "el7" not in node.properties : |
141 |
templon |
2708 |
proc = subprocess.Popen(['/usr/bin/qmgr'], |
142 |
|
|
stdin=subprocess.PIPE, |
143 |
|
|
stdout=subprocess.PIPE,) |
144 |
|
|
proc.stdin.write( 'set node ' + node.name + \ |
145 |
templon |
3259 |
' properties += el7\n' ) |
146 |
templon |
2708 |
proc.stdin.write( 'print node ' + node.name + \ |
147 |
|
|
' properties\n' ) |
148 |
|
|
out = proc.communicate()[0] |
149 |
|
|
# print out |
150 |
|
|
if "mc" in node.properties : |
151 |
|
|
proc = subprocess.Popen(['/usr/bin/qmgr'], |
152 |
|
|
stdin=subprocess.PIPE, |
153 |
|
|
stdout=subprocess.PIPE,) |
154 |
|
|
proc.stdin.write( 'set node ' + node.name + \ |
155 |
|
|
' properties -= mc\n' ) |
156 |
|
|
proc.stdin.write( 'print node ' + node.name + \ |
157 |
|
|
' properties\n' ) |
158 |
|
|
out = proc.communicate()[0] |
159 |
|
|
# print out |
160 |
|
|
|
161 |
|
|
def add_to_mc_pool(node): |
162 |
|
|
# print "node props b4:", grabbed_node.properties |
163 |
templon |
3259 |
if "el7" in node.properties : |
164 |
templon |
2708 |
proc = subprocess.Popen(['/usr/bin/qmgr'], |
165 |
|
|
stdin=subprocess.PIPE, |
166 |
|
|
stdout=subprocess.PIPE,) |
167 |
|
|
proc.stdin.write( 'set node ' + node.name + \ |
168 |
templon |
3259 |
' properties -= el7\n' ) |
169 |
templon |
2708 |
proc.stdin.write( 'print node ' + node.name + \ |
170 |
|
|
' properties\n' ) |
171 |
|
|
out = proc.communicate()[0] |
172 |
|
|
|
173 |
|
|
if "mc" not in node.properties : |
174 |
|
|
proc = subprocess.Popen(['/usr/bin/qmgr'], |
175 |
|
|
stdin=subprocess.PIPE, |
176 |
|
|
stdout=subprocess.PIPE,) |
177 |
|
|
proc.stdin.write( 'set node ' + node.name + \ |
178 |
|
|
' properties += mc\n' ) |
179 |
|
|
proc.stdin.write( 'print node ' + node.name + \ |
180 |
|
|
' properties\n' ) |
181 |
|
|
out = proc.communicate()[0] |
182 |
|
|
|
183 |
|
|
import optparse |
184 |
|
|
|
185 |
templon |
2867 |
usage = "usage: %prog [options]" |
186 |
templon |
2708 |
|
187 |
|
|
p = optparse.OptionParser(description="Monitor state of multicore pool and adjust size as needed", |
188 |
|
|
usage=usage) |
189 |
|
|
|
190 |
templon |
2785 |
p.add_option("-A",action="store_true",dest="addall",default=False, |
191 |
|
|
help="add all eligible nodes to multicore pool") |
192 |
templon |
2848 |
|
193 |
templon |
2842 |
p.add_option("-D",action="store_true",dest="delall",default=False, |
194 |
|
|
help="delete all nodes from multicore pool") |
195 |
templon |
2848 |
|
196 |
templon |
3227 |
p.add_option("-E",action="store_true",dest="add_everything",default=False, |
197 |
|
|
help="Adds all nodes (even non-candidates) to the multicore pool") |
198 |
|
|
|
199 |
templon |
2867 |
p.add_option("-R",action="store_true",dest="remove_strays_from_pool",default=False, |
200 |
|
|
help="remove mc pool nodes that are not one of the configured candidate nodes") |
201 |
templon |
2848 |
|
202 |
templon |
2708 |
p.add_option("-l",action="store",dest="logfile",default=None, |
203 |
|
|
help="log actions and information to LOGFILE (default stdout)") |
204 |
templon |
2848 |
|
205 |
templon |
2708 |
p.add_option("-L",action="store",dest="loglevel",default="INFO", |
206 |
|
|
help="print messages of LOGLEVEL (DEBUG, INFO, WARNING, ..., default INFO") |
207 |
templon |
2848 |
|
208 |
templon |
2785 |
p.add_option("-n",action="store_true",dest="noopt",default=False, |
209 |
|
|
help="don't do anything, just print what would have been done") |
210 |
templon |
2848 |
|
211 |
templon |
2785 |
p.add_option("-i",action="store_true",dest="info",default=False, |
212 |
|
|
help="print info on all nodes in multicore pool") |
213 |
templon |
2848 |
|
214 |
templon |
2867 |
p.add_option("-d",action="store",dest="new_max_drain",default=None, |
215 |
|
|
help="maximum number of nodes allowed in draining state") |
216 |
|
|
|
217 |
|
|
p.add_option("-m",action="store",dest="new_max_free",default=None, |
218 |
|
|
help="maximum number of unused slots to tolerate in the multicore pool") |
219 |
|
|
|
220 |
templon |
3190 |
p.add_option("-p",action="store",dest="new_max_free_per_node",default=None, |
221 |
|
|
help="maximum number of unused slots per node to tolerate in the multicore pool") |
222 |
|
|
|
223 |
templon |
2842 |
p.add_option("-f",action="store",dest="newfrac",default=None, |
224 |
|
|
help="fraction (out of 1.0) of in-service nodes to commit to multicore pool") |
225 |
templon |
2848 |
|
226 |
|
|
p.add_option("-r",action="store_true",dest="reset_config",default=False, |
227 |
|
|
help="reset mcfloat config to the default") |
228 |
|
|
|
229 |
templon |
2845 |
p.add_option("-q",action="store_true",dest="querycfg",default=False, |
230 |
|
|
help="print current config and exit") |
231 |
templon |
2708 |
|
232 |
|
|
import logging |
233 |
|
|
|
234 |
|
|
opts, args = p.parse_args() |
235 |
|
|
|
236 |
|
|
# assuming loglevel is bound to the string value obtained from the |
237 |
|
|
# command line argument. Convert to upper case to allow the user to |
238 |
|
|
# specify --log=DEBUG or --log=debug |
239 |
|
|
|
240 |
|
|
numeric_level = getattr(logging, opts.loglevel.upper(), None) |
241 |
|
|
if not isinstance(numeric_level, int): |
242 |
|
|
raise ValueError('Invalid log level: %s' % loglevel) |
243 |
|
|
if opts.logfile: |
244 |
|
|
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', |
245 |
|
|
level=numeric_level,filename=opts.logfile) |
246 |
|
|
else: |
247 |
|
|
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', |
248 |
|
|
level=numeric_level) |
249 |
|
|
|
250 |
templon |
2842 |
if os.path.isfile(MCFLOAT_CONFIG_FILE): |
251 |
|
|
pkl_file = open(MCFLOAT_CONFIG_FILE,'rb') |
252 |
|
|
cfg = pickle.load(pkl_file) |
253 |
|
|
pkl_file.close() |
254 |
|
|
else: |
255 |
|
|
cfg = mcfloat_cfg() # default values |
256 |
|
|
|
257 |
templon |
2845 |
if opts.querycfg: |
258 |
|
|
print "Current mcfloat configuration:" |
259 |
|
|
print cfg |
260 |
|
|
sys.exit(0) |
261 |
|
|
|
262 |
templon |
2842 |
config_changed = False |
263 |
|
|
|
264 |
|
|
# bundle all config changers here |
265 |
|
|
|
266 |
|
|
if opts.newfrac: |
267 |
|
|
cfg.capacityfrac = float(opts.newfrac) |
268 |
|
|
if cfg.capacityfrac < 0 or cfg.capacityfrac > 1: |
269 |
|
|
raise ValueError('Invalid capacity fraction: %s' % opts.newfrac) |
270 |
templon |
2867 |
logging.info("Changing capacity fraction to %s" % opts.newfrac) |
271 |
templon |
2842 |
config_changed = True |
272 |
|
|
|
273 |
templon |
2848 |
if opts.new_max_free: |
274 |
|
|
cfg.maxfree = int(opts.new_max_free) |
275 |
|
|
if cfg.maxfree < 0: |
276 |
|
|
raise ValueError('Invalid max free slots: %s' % opts.new_max_free) |
277 |
templon |
2867 |
logging.info("Changing max free slots to %s" % opts.new_max_free) |
278 |
templon |
2848 |
config_changed = True |
279 |
|
|
|
280 |
templon |
3190 |
if opts.new_max_free_per_node: |
281 |
|
|
cfg.maxfreepernode = int(opts.new_max_free_per_node) |
282 |
|
|
if cfg.maxfreepernode < 0: |
283 |
|
|
raise ValueError('Invalid max free slots per node: %s' % opts.new_max_free_per_node) |
284 |
|
|
logging.info("Changing max free slots per node to %s" % opts.new_max_free_per_node) |
285 |
|
|
config_changed = True |
286 |
|
|
|
287 |
templon |
2867 |
if opts.new_max_drain: |
288 |
|
|
cfg.maxdrain = int(opts.new_max_drain) |
289 |
|
|
if cfg.maxdrain < 0: |
290 |
|
|
raise ValueError('Invalid max num draining nodes: %s' % opts.new_max_drain) |
291 |
|
|
logging.info("Changing max number draining nodes to %s" % opts.new_max_drain) |
292 |
|
|
config_changed = True |
293 |
|
|
|
294 |
templon |
2849 |
if opts.reset_config: |
295 |
|
|
cfg = mcfloat_cfg() # default values |
296 |
|
|
logging.info("Reset mcfloat config to default values") |
297 |
|
|
config_changed = True |
298 |
|
|
|
299 |
templon |
2842 |
## below after all config changers |
300 |
|
|
|
301 |
|
|
if config_changed: |
302 |
|
|
logging.info("writing out changed mcfloat config to config file") |
303 |
|
|
if not opts.noopt: |
304 |
|
|
pkl_file = open(MCFLOAT_CONFIG_FILE,'wb') |
305 |
|
|
pickle.dump(cfg,pkl_file) |
306 |
|
|
pkl_file.close() |
307 |
|
|
sys.exit(0) |
308 |
|
|
|
309 |
|
|
if opts.delall: |
310 |
|
|
cfrac = 1 |
311 |
|
|
else: |
312 |
|
|
cfrac = cfg.capacityfrac |
313 |
|
|
|
314 |
templon |
3130 |
# NUM_CAR = int(cfrac * 96) |
315 |
templon |
3245 |
# NUM_KNAL = int(cfrac * 18) |
316 |
templon |
3265 |
# NUM_MARS = int(cfrac * 58) |
317 |
templon |
3259 |
|
318 |
templon |
3265 |
NUM_CHOC = int(cfrac * 58) |
319 |
templon |
3244 |
NUM_PEP = int(cfrac * 13) |
320 |
templon |
3246 |
NUM_SATE = int(cfrac * 54) |
321 |
templon |
3265 |
NUM_TAAI = int(cfrac * 12) |
322 |
templon |
3331 |
NUM_LOT = int(cfrac * 48) # lots start at 010 and go to 057 |
323 |
|
|
NUM_SNEL = int(cfrac * 32) |
324 |
templon |
2842 |
|
325 |
templon |
2852 |
# there is no check whether START + NUM < number of nodes |
326 |
|
|
# so careful when changing START from 1 |
327 |
|
|
|
328 |
templon |
3130 |
# START_CAR = 1 |
329 |
templon |
3245 |
# START_KNAL = 1 |
330 |
templon |
3265 |
# START_MARS = 1 |
331 |
|
|
|
332 |
templon |
3166 |
START_CHOC = 1 |
333 |
templon |
3244 |
START_PEP = 1 |
334 |
templon |
3246 |
START_SATE = 1 |
335 |
templon |
3265 |
START_TAAI = 1 |
336 |
templon |
3295 |
START_LOT = 10 |
337 |
templon |
3331 |
START_SNEL = 1 |
338 |
templon |
2842 |
|
339 |
templon |
3130 |
# RANGE_CAR = range(START_CAR, START_CAR + NUM_CAR + 1) |
340 |
templon |
3245 |
# RANGE_KNAL = range(START_KNAL, START_KNAL + NUM_KNAL) |
341 |
templon |
3265 |
# RANGE_MARS = range(START_MARS, START_MARS + NUM_MARS) |
342 |
|
|
|
343 |
templon |
3238 |
RANGE_CHOC = range(START_CHOC, START_CHOC + NUM_CHOC) |
344 |
templon |
3244 |
RANGE_PEP = range(START_PEP, START_PEP + NUM_PEP) |
345 |
templon |
3246 |
RANGE_SATE = range(START_SATE, START_SATE + NUM_SATE) |
346 |
templon |
3265 |
RANGE_TAAI = range(START_TAAI, START_TAAI + NUM_TAAI) |
347 |
templon |
3295 |
RANGE_LOT = range(START_LOT, START_LOT + NUM_LOT) |
348 |
templon |
3331 |
RANGE_SNEL = range(START_SNEL, START_SNEL + NUM_SNEL) |
349 |
templon |
2851 |
|
350 |
templon |
3265 |
CANDIDATE_NODES = [ 'wn-choc-0%02d.farm.nikhef.nl' % (n) for n in RANGE_CHOC ] + \ |
351 |
|
|
[ 'wn-pep-0%02d.farm.nikhef.nl' % (n) for n in RANGE_PEP ] + \ |
352 |
|
|
[ 'wn-sate-0%02d.farm.nikhef.nl' % (n) for n in RANGE_SATE ] + \ |
353 |
templon |
3295 |
[ 'wn-taai-0%02d.farm.nikhef.nl' % (n) for n in RANGE_TAAI ] + \ |
354 |
templon |
3331 |
[ 'wn-lot-0%02d.farm.nikhef.nl' % (n) for n in RANGE_LOT ] + \ |
355 |
|
|
[ 'wn-snel-0%02d.farm.nikhef.nl' % (n) for n in RANGE_SNEL ] |
356 |
templon |
3265 |
|
357 |
templon |
2842 |
from torque_utils import pbsnodes |
358 |
|
|
|
359 |
templon |
2708 |
wnodes = pbsnodes() |
360 |
|
|
mcnodes = list() |
361 |
|
|
waiting = None |
362 |
|
|
node_tuples = None |
363 |
|
|
|
364 |
templon |
3227 |
if opts.add_everything: |
365 |
templon |
3258 |
for gn in [n for n in wnodes if 'offline' not in n.state if ('mc' not in n.properties |
366 |
templon |
3259 |
and 'el7' in n.properties)] : |
367 |
templon |
3227 |
logging.info("%s added to mc node pool" % (gn.name)) |
368 |
|
|
# candidate_nodes.remove(grabbed_node) |
369 |
|
|
if not opts.noopt: add_to_mc_pool(gn) |
370 |
|
|
sys.exit(0) |
371 |
|
|
|
372 |
templon |
2847 |
if opts.remove_strays_from_pool: |
373 |
|
|
for node in wnodes: |
374 |
|
|
if node.name not in CANDIDATE_NODES and 'mc' in node.properties : |
375 |
templon |
3332 |
if not opts.noopt: remove_from_mc_pool(node) |
376 |
templon |
2847 |
logging.info("%s deleted from mc node pool" % (node.name)) |
377 |
|
|
sys.exit(0) |
378 |
|
|
|
379 |
templon |
2708 |
for node in wnodes: |
380 |
templon |
3258 |
if node.name in CANDIDATE_NODES and (node.state.count('offline') == 0 and |
381 |
|
|
('mc' in node.properties or |
382 |
templon |
3259 |
'el7' in node.properties)): |
383 |
templon |
2708 |
mcnodes.append(node) |
384 |
|
|
|
385 |
templon |
2847 |
if os.path.isfile(cfg.underpop_nodes_file): |
386 |
|
|
pkl_file = open(cfg.underpop_nodes_file,'rb') |
387 |
|
|
nodes_too_few_jobs_last_run = pickle.load(pkl_file) |
388 |
|
|
pkl_file.close() |
389 |
|
|
else: |
390 |
|
|
nodes_too_few_jobs_last_run = list() |
391 |
|
|
|
392 |
|
|
logging.debug("nodes from pickle file, marked from last run: " + \ |
393 |
|
|
repr(nodes_too_few_jobs_last_run) ) |
394 |
|
|
|
395 |
templon |
2708 |
draining_slots = 0 |
396 |
|
|
mcdedicated = list() |
397 |
|
|
draining_nodes = 0 |
398 |
|
|
|
399 |
templon |
3226 |
for node in wnodes: |
400 |
templon |
3259 |
if 'el7' not in node.properties and 'mc' in node.properties : |
401 |
templon |
2708 |
mcdedicated.append(node) |
402 |
|
|
draining_slots += node.freeCpu |
403 |
|
|
if node.freeCpu > 0 : draining_nodes += 1 |
404 |
|
|
|
405 |
templon |
2785 |
if opts.info: |
406 |
templon |
2842 |
waiting, node_tuples = getmcjobinfo(mcdedicated,cfg) |
407 |
templon |
2785 |
for t in node_tuples: |
408 |
|
|
if t[2] == 0: |
409 |
|
|
print "%28s has %2d running mc jobs, %2d empty slots" % (t[0], t[2], t[3]) |
410 |
|
|
else: |
411 |
|
|
print "%28s has %2d running mc jobs, %2d empty slots, ratio %4.1f" % (t[0], t[2], t[3], float(t[3])/t[2]) |
412 |
|
|
sys.exit(0) |
413 |
|
|
candidate_nodes = list() |
414 |
|
|
for n in mcnodes: |
415 |
|
|
if n not in mcdedicated: |
416 |
|
|
candidate_nodes.append(n) |
417 |
|
|
|
418 |
templon |
2842 |
if opts.delall: |
419 |
templon |
3134 |
for grabbed_node in wnodes: |
420 |
|
|
if 'mc' in grabbed_node.properties : |
421 |
|
|
logging.info("%s deleted from mc node pool" % (grabbed_node.name)) |
422 |
|
|
if not opts.noopt: remove_from_mc_pool(grabbed_node) |
423 |
templon |
2842 |
sys.exit(0) |
424 |
|
|
|
425 |
templon |
2785 |
if opts.addall: |
426 |
|
|
for grabbed_node in candidate_nodes: |
427 |
|
|
logging.info("%s added to mc node pool" % (grabbed_node.name)) |
428 |
|
|
# candidate_nodes.remove(grabbed_node) |
429 |
|
|
if not opts.noopt: add_to_mc_pool(grabbed_node) |
430 |
|
|
sys.exit(0) |
431 |
|
|
|
432 |
templon |
2708 |
# check for dedicated nodes with too few jobs |
433 |
|
|
|
434 |
|
|
nodes_with_too_few_jobs = list() |
435 |
|
|
|
436 |
|
|
for node in mcdedicated: |
437 |
|
|
logging.debug(node.name + " has " + repr(node.freeCpu) + " free slots") |
438 |
templon |
2842 |
if node.freeCpu > cfg.maxfreepernode: |
439 |
templon |
2708 |
nodes_with_too_few_jobs.append(node) |
440 |
|
|
|
441 |
|
|
logging.debug("there are " + repr(len(nodes_with_too_few_jobs)) + \ |
442 |
|
|
" nodes with too few jobs") |
443 |
|
|
|
444 |
|
|
nodes_consistently_underpopulated = list() |
445 |
|
|
for n in nodes_with_too_few_jobs: |
446 |
|
|
if n.name in nodes_too_few_jobs_last_run: |
447 |
|
|
nodes_consistently_underpopulated.append(n) |
448 |
|
|
|
449 |
|
|
undcount = len(nodes_consistently_underpopulated) |
450 |
|
|
if undcount > 0: |
451 |
|
|
logging.debug("there are " + repr(undcount) + \ |
452 |
|
|
" nodes with too few jobs that were also marked last run" ) |
453 |
|
|
|
454 |
|
|
import random |
455 |
|
|
|
456 |
|
|
removed_a_node = False |
457 |
|
|
|
458 |
|
|
if undcount > 0: |
459 |
|
|
logging.info("nodes consistently underpopulated:") |
460 |
|
|
for n in nodes_consistently_underpopulated: |
461 |
|
|
logging.info(" " + n.name) |
462 |
|
|
if undcount > 1: |
463 |
|
|
remcount = undcount / 2 # number to remove |
464 |
|
|
logging.info("going to remove " + repr(remcount) + " from mc pool") |
465 |
|
|
else: |
466 |
|
|
remcount = 1 |
467 |
|
|
|
468 |
|
|
# find out how many running mc jobs each node has |
469 |
|
|
|
470 |
templon |
2842 |
waiting, node_tuples = getmcjobinfo(nodes_consistently_underpopulated,cfg) |
471 |
templon |
2708 |
|
472 |
|
|
for node_t in node_tuples[:remcount]: |
473 |
|
|
nname, nnode, running_mc, unused_slots = node_t |
474 |
|
|
logging.info("dumped %d empty slots, %d mc jobs on %s" % \ |
475 |
|
|
(unused_slots, running_mc, nname) ) |
476 |
|
|
if not opts.noopt : remove_from_mc_pool(nnode) |
477 |
|
|
nodes_with_too_few_jobs.remove(nnode) |
478 |
|
|
nodes_consistently_underpopulated.remove(nnode) |
479 |
|
|
removed_a_node = True |
480 |
|
|
|
481 |
|
|
namelist = list() |
482 |
|
|
if len(nodes_with_too_few_jobs) > 0: |
483 |
|
|
logging.debug("placing %d nodes " % (len(nodes_with_too_few_jobs)) + \ |
484 |
|
|
"with too few jobs on list for next run:") |
485 |
|
|
|
486 |
|
|
for n in nodes_with_too_few_jobs: |
487 |
|
|
logging.debug(n.name) |
488 |
|
|
namelist.append(n.name) |
489 |
|
|
if not opts.noopt: |
490 |
templon |
2842 |
pkl_file = open(cfg.underpop_nodes_file,'wb') |
491 |
templon |
2708 |
pickle.dump(namelist,pkl_file) |
492 |
|
|
pkl_file.close() |
493 |
|
|
if len(nodes_with_too_few_jobs) > 0 or removed_a_node : |
494 |
|
|
sys.exit(0) |
495 |
|
|
|
496 |
|
|
# if we survive to this point, all nodes 'dedicated' are being efficiently |
497 |
|
|
# used. are we draining less than the configured max? |
498 |
|
|
|
499 |
|
|
logging.debug("There are " + repr(len(mcdedicated)) + " dedicated nodes and " + \ |
500 |
|
|
repr(draining_slots) + " unused slots") |
501 |
templon |
2842 |
if (cfg.maxfree - draining_slots) >= cfg.maxfreepernode: |
502 |
|
|
logging.debug("%d unused slots are permitted, so %d more" % (cfg.maxfree, cfg.maxfree - draining_slots)) |
503 |
|
|
logging.debug("headroom of more than %d+ slots means we can try to grab another node" % (cfg.maxfreepernode) ) |
504 |
|
|
if draining_nodes < cfg.maxdrain : |
505 |
templon |
2708 |
# first check if there are actually any waiting jobs to run; if not makes no sense to grab a node. |
506 |
templon |
2842 |
waiting, node_tuples = getmcjobinfo(mcdedicated,cfg) |
507 |
templon |
2708 |
if not waiting: |
508 |
|
|
logging.debug("No waiting jobs found, nothing to do") |
509 |
|
|
sys.exit(0) |
510 |
|
|
logging.debug("max %d node(s) with unused slots permitted, %d seen" % \ |
511 |
templon |
2842 |
(cfg.maxdrain, draining_nodes)) |
512 |
templon |
2708 |
logging.debug("there are also waiting jobs: try to grab another node") |
513 |
|
|
# build a list of candidates to grab |
514 |
|
|
logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes)) |
515 |
|
|
if len(candidate_nodes) < 1: |
516 |
|
|
logging.debug("no more nodes, bailing out, nothing more I can do") |
517 |
|
|
sys.exit(0) |
518 |
templon |
2842 |
for nn in range(min(cfg.nodespergrab,len(candidate_nodes))): |
519 |
templon |
2770 |
# logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes)) |
520 |
|
|
grabbed_node=random.choice(candidate_nodes) |
521 |
|
|
logging.info("%s added to mc node pool" % (grabbed_node.name)) |
522 |
|
|
candidate_nodes.remove(grabbed_node) |
523 |
|
|
if not opts.noopt: add_to_mc_pool(grabbed_node) |
524 |
templon |
2708 |
else: |
525 |
|
|
logging.debug("There are %d nodes with unused slots (draining)" % (draining_nodes)) |
526 |
templon |
2842 |
logging.debug("This equals or exceeds the configured max of %d" % (cfg.maxdrain)) |
527 |
templon |
2708 |
logging.debug("Doing nothing now") |
528 |
|
|
|
529 |
templon |
2842 |
elif draining_slots > cfg.maxfree: |
530 |
templon |
2708 |
|
531 |
|
|
# find out how many running mc jobs each node has |
532 |
|
|
|
533 |
templon |
2842 |
waiting, node_tuples = getmcjobinfo(mcdedicated,cfg) |
534 |
templon |
2708 |
|
535 |
templon |
2843 |
slots_to_recover = draining_slots - cfg.maxfree |
536 |
templon |
2708 |
logging.info("unused slot limit (%d) exceeded by %d " \ |
537 |
templon |
2842 |
% (cfg.maxfree, slots_to_recover) + ": remove node(s) from mc pool") |
538 |
templon |
2708 |
slots_recovered = 0 |
539 |
|
|
while slots_recovered < slots_to_recover: |
540 |
|
|
node_t = node_tuples.pop(0) |
541 |
|
|
nname, nnode, running_mc, unused_slots = node_t |
542 |
|
|
if unused_slots > 0: |
543 |
|
|
logging.info("dumped %d empty slots, %d mc jobs on %s" % \ |
544 |
|
|
(unused_slots, running_mc, nname) ) |
545 |
|
|
if not opts.noopt : remove_from_mc_pool(nnode) |
546 |
|
|
slots_recovered += unused_slots |
547 |
|
|
else: |
548 |
templon |
2842 |
logging.debug("%d unused slots of allowed %d" % (draining_slots, cfg.maxfree)) |
549 |
templon |
2770 |
logging.debug("difference is %d which is less than %d" % \ |
550 |
templon |
2842 |
(cfg.maxfree - draining_slots, cfg.maxfreepernode) ) |
551 |
templon |
2708 |
logging.debug("so: doing nothing now.") |