1 |
#!/usr/bin/python2 |
2 |
# lcg-info-dynamic-scheduler |
3 |
# $Id$ |
4 |
# Source: $URL$ |
5 |
# J. A. Templon, NIKHEF/PDP 2005 |
6 |
|
7 |
# plugin for gip framework. generates Glue values associated |
8 |
# with the scheduler, like FreeSlots or *ResponseTime |
9 |
|
10 |
# first set up logging |
11 |
|
12 |
import sys, logging |
13 |
import syslog |
14 |
|
15 |
# note: this class works around the bug in the python logging |
16 |
# package fixed in patch #642974. If the version of python-logging |
17 |
# is upgraded, this class can be replaced by logging.SysLogHandler |
18 |
|
19 |
class SLHandler(logging.Handler): |
20 |
def __init__(self, ident, logopt=0, facility=syslog.LOG_USER): |
21 |
logging.Handler.__init__(self) |
22 |
self.ident = ident |
23 |
self.logopt = logopt |
24 |
self.facility = facility |
25 |
self.mappings = { |
26 |
logging.DEBUG: syslog.LOG_DEBUG, |
27 |
logging.INFO: syslog.LOG_INFO, |
28 |
logging.WARN: syslog.LOG_WARNING, |
29 |
logging.ERROR: syslog.LOG_ERR, |
30 |
logging.CRITICAL: syslog.LOG_CRIT, |
31 |
} |
32 |
|
33 |
def encodeLevel(self, level): |
34 |
return self.mappings.get(level, syslog.LOG_INFO) |
35 |
|
36 |
def emit(self, record): |
37 |
syslog.openlog(self.ident, self.logopt, self.facility) |
38 |
msg = self.format(record) |
39 |
prio = self.encodeLevel(record.levelno) |
40 |
syslog.syslog(prio, msg) |
41 |
syslog.closelog() |
42 |
|
43 |
logging.getLogger("").setLevel(logging.INFO) |
44 |
# syslog handler |
45 |
shdlr = SLHandler("lcg-info-dynamic-scheduler") |
46 |
logging.getLogger("").addHandler(shdlr) |
47 |
# stderr handler |
48 |
stdrhdlr = logging.StreamHandler() |
49 |
fmt=logging.Formatter("%(asctime)s lcg-info-dynamic-scheduler:" |
50 |
+ " %(message)s","%F %T") |
51 |
logging.getLogger("").addHandler(stdrhdlr) |
52 |
stdrhdlr.setFormatter(fmt) |
53 |
|
54 |
def usage(): |
55 |
print "Usage: lcg-info-dynamic-scheduler -c <cfg_file>" |
56 |
|
57 |
import sys |
58 |
|
59 |
def abort_without_output(msg): |
60 |
logging.error(msg) |
61 |
logging.error("Exiting without output, GIP will use static values") |
62 |
sys.exit(2) |
63 |
|
64 |
import getopt |
65 |
import string |
66 |
|
67 |
try: |
68 |
opts, args = getopt.getopt(sys.argv[1:], "c:", |
69 |
["config="]) |
70 |
except getopt.GetoptError: |
71 |
# print help information and exit: |
72 |
emsg = "Error parsing command line: " + string.join(sys.argv) |
73 |
usage() |
74 |
abort_without_output(emsg) |
75 |
|
76 |
cfgfile = None |
77 |
|
78 |
for o, a in opts: |
79 |
if o in ("-c", "--config"): |
80 |
cfgfile = a |
81 |
|
82 |
if not cfgfile: |
83 |
usage() |
84 |
abort_without_output("No config file specified.") |
85 |
|
86 |
try: |
87 |
fp = open(cfgfile) |
88 |
except IOError: |
89 |
abort_without_output("Error opening config file " + cfgfile) |
90 |
|
91 |
import commands |
92 |
import ConfigParser |
93 |
config = ConfigParser.ConfigParser() |
94 |
try: |
95 |
config.readfp(fp) |
96 |
except: |
97 |
abort_without_output("While parsing config file: " + \ |
98 |
repr(sys.exc_info()[1]) + " " + cfgfile) |
99 |
|
100 |
# note: vomap is only intended for the cases where voname != groupname |
101 |
# hence at the bottom, we throw away all entries for which voname = groupname |
102 |
# this is the "default scenario" in the code, adding such entries to the |
103 |
# vomap "exception list" causes problems later on. |
104 |
|
105 |
if config.has_option('Main','vomap'): |
106 |
s = config.get('Main','vomap') |
107 |
vomap = dict() |
108 |
lines = s.split('\n') |
109 |
for l in lines: |
110 |
if len(l) > 0: |
111 |
t = l.split(':') |
112 |
group = t[0].strip() |
113 |
vo = t[1].strip() |
114 |
if vo != group : vomap[group] = vo |
115 |
|
116 |
else: |
117 |
vomap = dict() # empty dict, later 'if grp in vomap.keys()' => if False |
118 |
|
119 |
if config.has_option('Scheduler','vo_max_jobs_cmd'): |
120 |
cmd = config.get('Scheduler','vo_max_jobs_cmd') |
121 |
(stat,out) = commands.getstatusoutput(cmd) |
122 |
if stat: |
123 |
abort_without_output( |
124 |
"VO max jobs backend command returned nonzero exit status") |
125 |
try: |
126 |
pcaps = eval(out, {"__builtins__" : {}}) # namespace at closes security hole |
127 |
except SyntaxError: |
128 |
abort_without_output('vo max slot command output in wrong format') |
129 |
else: |
130 |
pcaps = dict() # empty dict, will trigger use of defaults later. |
131 |
|
132 |
# apply group -> vo mappings to process cap dict |
133 |
# sgroup (source group) is original from input files, |
134 |
# targvo (target vo) is what we want to replace it with |
135 |
|
136 |
#DEBUG opcaps = pcaps.copy() |
137 |
|
138 |
for sgroup in vomap.keys(): |
139 |
targ = vomap[sgroup] |
140 |
if sgroup in pcaps.keys(): |
141 |
if targ in pcaps.keys(): |
142 |
pcaps[targ] = pcaps[targ] + pcaps[sgroup] |
143 |
else: |
144 |
pcaps[targ] = pcaps[sgroup] |
145 |
del pcaps[sgroup] |
146 |
|
147 |
#DEBUGkl = opcaps.keys() + pcaps.keys() |
148 |
#DEBUGkl.sort() |
149 |
#DEBUGukl = [] |
150 |
#DEBUGfor k in kl: |
151 |
#DEBUG if k not in ukl: ukl.append(k) |
152 |
#DEBUGfor k in ukl: |
153 |
#DEBUG v1 = -1 |
154 |
#DEBUG v2 = -1 |
155 |
#DEBUG if k in opcaps.keys(): |
156 |
#DEBUG v1 = opcaps[k] |
157 |
#DEBUG if k in pcaps.keys(): |
158 |
#DEBUG v2 = pcaps[k] |
159 |
#DEBUG |
160 |
#DEBUG print "%10s %4d %4d" % (k, v1, v2) |
161 |
|
162 |
if not config.has_option('LRMS','lrms_backend_cmd'): |
163 |
abort_without_output("Spec for lrms backend cmd missing in config file") |
164 |
|
165 |
# read cfg file to find spec for static ldif file |
166 |
|
167 |
if not config.has_option('Main','static_ldif_file'): |
168 |
abort_without_output("Spec for static ldif file missing in config file") |
169 |
|
170 |
lines = open(config.get('Main','static_ldif_file'),'r').readlines() |
171 |
|
172 |
# get the dns from the ldif file along with their VOs. |
173 |
# first do the VOView dns |
174 |
|
175 |
voviewlist = [ ] |
176 |
qname = None |
177 |
import re |
178 |
jmpatt = re.compile(r'^GlueCEUniqueID=[-a-zA-Z0-9.:]+/[a-z]+-[a-z]+-([a-z_A-Z0-9]+)$') |
179 |
|
180 |
class CEView(object): |
181 |
def __init__(self, dn): |
182 |
self.dn = dn |
183 |
# if this is not present, then it's a GlueCE object |
184 |
# if it is present, it's a GlueVOView object |
185 |
self.localID = None |
186 |
self.queuename = None |
187 |
self.ACBRs = list() |
188 |
def getCEUniqueID(self): |
189 |
if self.localID == None: |
190 |
return self.dn |
191 |
else: |
192 |
loc = self.dn.find(",") + 1 |
193 |
return self.dn[loc:] |
194 |
CEUniqueID = property(getCEUniqueID, |
195 |
doc="UniqueID of CE to which this view belongs") |
196 |
|
197 |
dndict = dict() # key: dn string (not including leading "dn: ") |
198 |
thisview = None |
199 |
|
200 |
for line in lines: |
201 |
|
202 |
s = line.strip() |
203 |
if s.find('dn:') == 0 and s.find('GlueCEUniqueID') > 0: |
204 |
dn = s[3:].strip() |
205 |
thisview = CEView(dn) |
206 |
if s.find('GlueVOViewLocalID') >= 0: |
207 |
loc1 = s.find('=') + 1 |
208 |
loc2 = s.find(',GlueCEUniqueID') |
209 |
id = s[loc1:loc2].strip() |
210 |
thisview.localID = id |
211 |
elif s.find('GlueCEName') == 0: |
212 |
qname = s.split(':')[1].strip() |
213 |
thisview.queuename = qname |
214 |
elif s.find('GlueCEAccessControlBaseRule:') == 0: |
215 |
loc = s.find(':') + 1 |
216 |
if loc == len(s): |
217 |
abort_without_output("In static LDIF file, ACBR is " + |
218 |
"missing a value: " + s) |
219 |
rule = s[loc:].split(":") |
220 |
if rule[0].strip() in ['VO', 'VOMS']: |
221 |
thisview.ACBRs.append( ( rule[0].strip(), rule[1].strip() ) ) |
222 |
|
223 |
elif s == '' and thisview : |
224 |
dndict[thisview.dn] = thisview |
225 |
thisview = None # reset, about to enter new block. |
226 |
|
227 |
# now find the queues that go with the VOViews |
228 |
for d in dndict.keys(): |
229 |
view = dndict[d] |
230 |
if view.localID: # then it's a VOView |
231 |
view.queuename = dndict[view.CEUniqueID].queuename |
232 |
|
233 |
DEBUG = 0 |
234 |
|
235 |
if DEBUG: |
236 |
print "dumping parse tree for static ldif file" |
237 |
for d in dndict.keys(): |
238 |
print "For dn:", d |
239 |
print " LocalID: ", dndict[d].localID |
240 |
print " CEUniqueID: ", dndict[d].CEUniqueID |
241 |
print " Queue Name:", dndict[d].queuename |
242 |
print " ACBRs: ", dndict[d].ACBRs |
243 |
|
244 |
def tconv(timeval): # convert time to desired units |
245 |
timeval = min(timeval, 2146060842) |
246 |
return repr( int(timeval) ) # seconds->printable |
247 |
|
248 |
cmd = config.get('LRMS','lrms_backend_cmd') |
249 |
|
250 |
(stat,out) = commands.getstatusoutput(cmd) |
251 |
if stat: |
252 |
abort_without_output("LRMS backend command returned nonzero exit status") |
253 |
|
254 |
lrmlines = out.split('\n') |
255 |
|
256 |
sys.path.append('@TARGPREFIX@/@MODSUFFIX@') |
257 |
import lrms |
258 |
bq = lrms.Server() |
259 |
|
260 |
# generate a cache of indices returned from possible lrms queries |
261 |
# this makes a big speed diff when there are many (thousands) of jobs |
262 |
# first we need to find all combinations VO/fqan, queue, state |
263 |
|
264 |
ql = [] |
265 |
vol = [] |
266 |
|
267 |
for dn in dndict.keys(): |
268 |
view = dndict[dn] |
269 |
if not view.localID : continue |
270 |
if view.queuename not in ql : ql.append(view.queuename) |
271 |
if view.ACBRs[-1][1] not in vol : vol.append(view.ACBRs[-1][1]) |
272 |
|
273 |
sl = ['running', 'queued'] |
274 |
|
275 |
# now construct an empty cache with all the index keys, ready to fill. |
276 |
|
277 |
scache = { } |
278 |
|
279 |
for v in vol: |
280 |
for s in sl: |
281 |
indstr = lrms.filt2str({'group':v, 'state': s}) |
282 |
scache[indstr] = {} |
283 |
|
284 |
for q in ql: |
285 |
for s in sl: |
286 |
indstr = lrms.filt2str({'queue': q, 'state': s}) |
287 |
scache[indstr] = {} |
288 |
for v in vol: |
289 |
indstr = lrms.filt2str({'queue': q, 'state': s, 'group': v}) |
290 |
scache[indstr] = {} |
291 |
|
292 |
#DEBUG print scache.keys() |
293 |
|
294 |
# traverse the job list, filling the index cache as we go. |
295 |
|
296 |
for line in lrmlines: |
297 |
s = line.strip() |
298 |
f = s.split() |
299 |
if s[0] == '{' and s[-1] == '}': # looks like a dict |
300 |
nj = lrms.Job(eval(s, {"__builtins__" : {}})) |
301 |
grp = nj.get('group') |
302 |
if grp in vomap.keys(): |
303 |
nj.set('group',vomap[grp]) |
304 |
jid = nj.get('jobid') |
305 |
bq.addjob(jid,nj) |
306 |
tst = nj.get('state') |
307 |
if tst not in sl: continue |
308 |
tgr = nj.get('group') |
309 |
tq = nj.get('queue') |
310 |
if tgr in vol and tq in ql: |
311 |
dlis = [ |
312 |
{'group': tgr, 'state': tst}, |
313 |
{'queue': tq, 'state': tst}, |
314 |
{'queue': tq, 'state': tst, 'group': tgr} |
315 |
] |
316 |
else: |
317 |
if tgr in vol: |
318 |
dlis = [ |
319 |
{'group': tgr, 'state': tst}, |
320 |
] |
321 |
elif tq in ql: |
322 |
dlis = [ |
323 |
{'queue': tq, 'state': tst}, |
324 |
{'queue': tq, 'state': tst, 'group': tgr} |
325 |
] |
326 |
else: |
327 |
continue |
328 |
|
329 |
for td in dlis : |
330 |
indstr = lrms.filt2str(td) |
331 |
if indstr not in scache.keys(): |
332 |
if DEBUG : print "huh??", indstr |
333 |
scache[indstr] = {} |
334 |
scache[lrms.filt2str(td)][jid] = nj |
335 |
elif len(f) == 2: |
336 |
if f[0] == 'nactive' : bq.slotsUp = int(f[1]) |
337 |
elif f[0] == 'nfree' : bq.slotsFree = int(f[1]) |
338 |
elif f[0] == 'now' : bq.now = int(f[1]) |
339 |
elif f[0] == 'schedCycle' : bq.schedCycle = int(f[1]) |
340 |
else: |
341 |
logging.warn("invalid input in result of lrms plugin command " + cmd) |
342 |
logging.warn("invalid input: " + s) |
343 |
else: |
344 |
logging.warn("invalid input in result of lrms plugin command " + cmd) |
345 |
logging.warn("invalid input: " + s) |
346 |
|
347 |
bq.__scache__ = scache |
348 |
|
349 |
# for k in scache.keys(): |
350 |
# print k, len(scache[k]) |
351 |
|
352 |
if config.has_option('Scheduler','cycle_time'): |
353 |
bq.schedCycle = config.getint('Scheduler','cycle_time') |
354 |
|
355 |
# sys.exit(0) |
356 |
|
357 |
from EstTT import * |
358 |
wq = TTEstimator(WaitTime()) |
359 |
|
360 |
# first we do the VOViews |
361 |
# sort the keys : needed to make sure output is reproducible for test harness :( |
362 |
|
363 |
dnk = dndict.keys() |
364 |
dnk.sort() |
365 |
|
366 |
for dn in dnk: |
367 |
|
368 |
view = dndict[dn] |
369 |
if not view.localID: continue # we only want VOViews here |
370 |
q = view.queuename |
371 |
if len(view.ACBRs) > 1: |
372 |
logging.warn("For dn: " + dn) |
373 |
logging.warn("detected multiple ACBRs:") |
374 |
for vo in view.ACBRs: |
375 |
logging.warn(repr(vo)) |
376 |
logging.warn("only using the last one!") |
377 |
vo = view.ACBRs[-1][1] |
378 |
|
379 |
if DEBUG: print "'" + vo + "'" |
380 |
|
381 |
print "dn: " + dn |
382 |
print "GlueVOViewLocalID: " + view.localID |
383 |
if DEBUG: |
384 |
for acbr in view.ACBRs: |
385 |
print "GlueCEAccessControlBaseRule: " + acbr[0] + ":" + acbr[1] |
386 |
|
387 |
# this is tricky: reporting job counts in queues cares about queues, |
388 |
# but scheduling doesn't as far as I know. So we need four job |
389 |
# counts, running and waiting, both in the current queue and for all |
390 |
# queues. |
391 |
|
392 |
# first find what is happening in this queue (q) to report the |
393 |
# WaitingJobs RunningJobs and TotalJobs |
394 |
|
395 |
|
396 |
qrund = {'state': 'running', 'group': vo, 'queue': q} |
397 |
qwaitd = {'state': 'queued', 'group': vo, 'queue': q} |
398 |
|
399 |
runningJobs = bq.matchingJobs(qrund) ; nrun = len(runningJobs) |
400 |
waitingJobs = bq.matchingJobs(qwaitd); nwait = len(waitingJobs) |
401 |
if nwait > 0: |
402 |
qwt = waitingJobs[0].get('maxwalltime') |
403 |
else: |
404 |
qwt = -1 |
405 |
|
406 |
print "GlueCEStateRunningJobs: " + repr(nrun) |
407 |
print "GlueCEStateWaitingJobs: " + repr(nwait) |
408 |
print "GlueCEStateTotalJobs: " + repr(nrun + nwait) |
409 |
|
410 |
# now we find the same numbers, but integrated across all queues |
411 |
# since the scheduler applies process caps without regard to queues. |
412 |
|
413 |
rund = {'state': 'running', 'group': vo} |
414 |
waitd = {'state': 'queued', 'group': vo} |
415 |
|
416 |
runningJobs = bq.matchingJobs(rund) ; nrun = len(runningJobs) |
417 |
waitingJobs = bq.matchingJobs(waitd); nwait = len(waitingJobs) |
418 |
|
419 |
if nwait > 0: |
420 |
freeslots = 0 |
421 |
else: |
422 |
if vo in pcaps.keys(): |
423 |
headroom = max(pcaps[vo] - nrun,0) |
424 |
freeslots = min(bq.slotsFree,headroom) |
425 |
else: |
426 |
freeslots = bq.slotsFree |
427 |
|
428 |
ert = wq.estimate(bq,vo,debug=0) |
429 |
print "GlueCEStateFreeJobSlots: " + repr(freeslots) |
430 |
print "GlueCEStateEstimatedResponseTime: " + tconv(ert) |
431 |
if nwait == 0: |
432 |
wrt = 2 * ert |
433 |
else: |
434 |
if qwt < 0 : |
435 |
qwt = waitingJobs[0].get('maxwalltime') |
436 |
wrt = qwt * nwait |
437 |
|
438 |
print "GlueCEStateWorstResponseTime: " + \ |
439 |
tconv((max(wrt,bq.schedCycle))) |
440 |
print |
441 |
|
442 |
# now do CEs (no VOs) ... this should be done 'right' some day |
443 |
# and merged with the code above. Want to get it out quick |
444 |
# (sigh, i am mortal) so ... |
445 |
|
446 |
for dn in dnk: |
447 |
|
448 |
view = dndict[dn] |
449 |
if view.localID: continue # only want CEs, they have no localID |
450 |
q = view.queuename |
451 |
vo = '' # vo-independent |
452 |
|
453 |
print "dn: " + dn |
454 |
|
455 |
# for VO-independent numbers we only care about what is in the current |
456 |
# queue. so we only need one set of stats |
457 |
|
458 |
# first find what is happening in this queue (q) to report the |
459 |
# WaitingJobs RunningJobs and TotalJobs |
460 |
|
461 |
qrund = {'state': 'running', 'queue': q} |
462 |
qwaitd = {'state': 'queued', 'queue': q} |
463 |
|
464 |
runningJobs = bq.matchingJobs(qrund) ; nrun = len(runningJobs) |
465 |
waitingJobs = bq.matchingJobs(qwaitd); nwait = len(waitingJobs) |
466 |
|
467 |
if nwait > 0: |
468 |
freeslots = 0 |
469 |
else: |
470 |
freeslots = bq.slotsFree |
471 |
|
472 |
ert = wq.estimate(bq,vo,debug=0) |
473 |
print "GlueCEStateFreeJobSlots: " + repr(freeslots) |
474 |
print "GlueCEStateFreeCPUs: " + repr(freeslots) # backw compat. |
475 |
print "GlueCEStateRunningJobs: " + repr(nrun) |
476 |
print "GlueCEStateWaitingJobs: " + repr(nwait) |
477 |
print "GlueCEStateTotalJobs: " + repr(nrun + nwait) |
478 |
|
479 |
wq.strategy.queue = q |
480 |
ert = wq.estimate(bq,vo,debug=0) |
481 |
print "GlueCEStateEstimatedResponseTime: " + tconv(ert) |
482 |
if nwait == 0: |
483 |
wrt = 2 * ert |
484 |
else: |
485 |
wrt = waitingJobs[0].get('maxwalltime') * nwait |
486 |
|
487 |
print "GlueCEStateWorstResponseTime: " + \ |
488 |
tconv((max(wrt,bq.schedCycle))) |
489 |
if DEBUG: print 'DEBUG: q', q |
490 |
print |
491 |
|
492 |
### Local Variables: *** |
493 |
### mode: python *** |
494 |
### End: *** |
495 |
|