1 |
#!/usr/bin/python2 |
2 |
# lcg-info-dynamic-scheduler |
3 |
# $Id$ |
4 |
# J. A. Templon, NIKHEF/PDP 2005 |
5 |
|
6 |
# plugin for gip framework. generates Glue values associated |
7 |
# with the scheduler, like FreeSlots or *ResponseTime |
8 |
|
9 |
# first set up logging |
10 |
|
11 |
import sys, logging |
12 |
import syslog |
13 |
|
14 |
# note: this class works around the bug in the python logging |
15 |
# package fixed in patch #642974. If the version of python-logging |
16 |
# is upgraded, this class can be replaced by logging.SysLogHandler |
17 |
|
18 |
class SLHandler(logging.Handler): |
19 |
def __init__(self, ident, logopt=0, facility=syslog.LOG_USER): |
20 |
logging.Handler.__init__(self) |
21 |
self.ident = ident |
22 |
self.logopt = logopt |
23 |
self.facility = facility |
24 |
self.mappings = { |
25 |
logging.DEBUG: syslog.LOG_DEBUG, |
26 |
logging.INFO: syslog.LOG_INFO, |
27 |
logging.WARN: syslog.LOG_WARNING, |
28 |
logging.ERROR: syslog.LOG_ERR, |
29 |
logging.CRITICAL: syslog.LOG_CRIT, |
30 |
} |
31 |
|
32 |
def encodeLevel(self, level): |
33 |
return self.mappings.get(level, syslog.LOG_INFO) |
34 |
|
35 |
def emit(self, record): |
36 |
syslog.openlog(self.ident, self.logopt, self.facility) |
37 |
msg = self.format(record) |
38 |
prio = self.encodeLevel(record.levelno) |
39 |
syslog.syslog(prio, msg) |
40 |
syslog.closelog() |
41 |
|
42 |
logging.getLogger("").setLevel(logging.INFO) |
43 |
# syslog handler |
44 |
shdlr = SLHandler("lcg-info-dynamic-scheduler") |
45 |
logging.getLogger("").addHandler(shdlr) |
46 |
# stderr handler |
47 |
stdrhdlr = logging.StreamHandler() |
48 |
fmt=logging.Formatter("%(asctime)s lcg-info-dynamic-scheduler:" |
49 |
+ " %(message)s","%F %T") |
50 |
logging.getLogger("").addHandler(stdrhdlr) |
51 |
stdrhdlr.setFormatter(fmt) |
52 |
|
53 |
def usage(): |
54 |
print "Usage: lcg-info-dynamic-scheduler -c <cfg_file>" |
55 |
|
56 |
import sys |
57 |
|
58 |
def abort_without_output(msg): |
59 |
logging.error(msg) |
60 |
logging.error("Exiting without output, GIP will use static values") |
61 |
sys.exit(2) |
62 |
|
63 |
import getopt |
64 |
import string |
65 |
|
66 |
try: |
67 |
opts, args = getopt.getopt(sys.argv[1:], "c:", |
68 |
["config="]) |
69 |
except getopt.GetoptError: |
70 |
# print help information and exit: |
71 |
emsg = "Error parsing command line: " + string.join(sys.argv) |
72 |
usage() |
73 |
abort_without_output(emsg) |
74 |
|
75 |
cfgfile = None |
76 |
|
77 |
for o, a in opts: |
78 |
if o in ("-c", "--config"): |
79 |
cfgfile = a |
80 |
|
81 |
if not cfgfile: |
82 |
usage() |
83 |
abort_without_output("No config file specified.") |
84 |
|
85 |
try: |
86 |
fp = open(cfgfile) |
87 |
except IOError: |
88 |
abort_without_output("Error opening config file " + cfgfile) |
89 |
|
90 |
import commands |
91 |
import ConfigParser |
92 |
config = ConfigParser.ConfigParser() |
93 |
try: |
94 |
config.readfp(fp) |
95 |
except: |
96 |
abort_without_output("While parsing config file: " + \ |
97 |
repr(sys.exc_info()[1]) + " " + cfgfile) |
98 |
|
99 |
# note: vomap is only intended for the cases where voname != groupname |
100 |
# hence at the bottom, we throw away all entries for which voname = groupname |
101 |
# this is the "default scenario" in the code, adding such entries to the |
102 |
# vomap "exception list" causes problems later on. |
103 |
|
104 |
if config.has_option('Main','vomap'): |
105 |
s = config.get('Main','vomap') |
106 |
vomap = dict() |
107 |
lines = s.split('\n') |
108 |
for l in lines: |
109 |
if len(l) > 0: |
110 |
t = l.split(':') |
111 |
group = t[0].strip() |
112 |
vo = t[1].strip() |
113 |
if vo != group : vomap[group] = vo |
114 |
|
115 |
else: |
116 |
vomap = dict() # empty dict, later 'if grp in vomap.keys()' => if False |
117 |
|
118 |
if config.has_option('Scheduler','vo_max_jobs_cmd'): |
119 |
cmd = config.get('Scheduler','vo_max_jobs_cmd') |
120 |
(stat,out) = commands.getstatusoutput(cmd) |
121 |
if stat: |
122 |
abort_without_output( |
123 |
"VO max jobs backend command returned nonzero exit status") |
124 |
try: |
125 |
pcaps = eval(out, {"__builtins__" : {}}) # namespace at closes security hole |
126 |
except SyntaxError: |
127 |
abort_without_output('vo max slot command output in wrong format') |
128 |
else: |
129 |
pcaps = dict() # empty dict, will trigger use of defaults later. |
130 |
|
131 |
# apply group -> vo mappings to process cap dict |
132 |
# sgroup (source group) is original from input files, |
133 |
# targvo (target vo) is what we want to replace it with |
134 |
|
135 |
#DEBUG opcaps = pcaps.copy() |
136 |
|
137 |
for sgroup in vomap.keys(): |
138 |
targ = vomap[sgroup] |
139 |
if sgroup in pcaps.keys(): |
140 |
if targ in pcaps.keys(): |
141 |
pcaps[targ] = pcaps[targ] + pcaps[sgroup] |
142 |
else: |
143 |
pcaps[targ] = pcaps[sgroup] |
144 |
del pcaps[sgroup] |
145 |
|
146 |
#DEBUGkl = opcaps.keys() + pcaps.keys() |
147 |
#DEBUGkl.sort() |
148 |
#DEBUGukl = [] |
149 |
#DEBUGfor k in kl: |
150 |
#DEBUG if k not in ukl: ukl.append(k) |
151 |
#DEBUGfor k in ukl: |
152 |
#DEBUG v1 = -1 |
153 |
#DEBUG v2 = -1 |
154 |
#DEBUG if k in opcaps.keys(): |
155 |
#DEBUG v1 = opcaps[k] |
156 |
#DEBUG if k in pcaps.keys(): |
157 |
#DEBUG v2 = pcaps[k] |
158 |
#DEBUG |
159 |
#DEBUG print "%10s %4d %4d" % (k, v1, v2) |
160 |
|
161 |
if not config.has_option('LRMS','lrms_backend_cmd'): |
162 |
abort_without_output("Spec for lrms backend cmd missing in config file") |
163 |
|
164 |
# read cfg file to find spec for static ldif file |
165 |
|
166 |
if not config.has_option('Main','static_ldif_file'): |
167 |
abort_without_output("Spec for static ldif file missing in config file") |
168 |
|
169 |
lines = open(config.get('Main','static_ldif_file'),'r').readlines() |
170 |
|
171 |
# get the dns from the ldif file along with their VOs. |
172 |
# first do the VOView dns |
173 |
|
174 |
voviewlist = [ ] |
175 |
qname = None |
176 |
import re |
177 |
jmpatt = re.compile(r'^GlueCEUniqueID=[-a-zA-Z0-9.:]+/[a-z]+-[a-z]+-([a-z_A-Z0-9]+)$') |
178 |
|
179 |
class CEView(object): |
180 |
def __init__(self, dn): |
181 |
self.dn = dn |
182 |
# if this is not present, then it's a GlueCE object |
183 |
# if it is present, it's a GlueVOView object |
184 |
self.localID = None |
185 |
self.queuename = None |
186 |
self.ACBRs = list() |
187 |
def getCEUniqueID(self): |
188 |
if self.localID == None: |
189 |
return self.dn |
190 |
else: |
191 |
loc = self.dn.find(",") + 1 |
192 |
return self.dn[loc:] |
193 |
CEUniqueID = property(getCEUniqueID, |
194 |
doc="UniqueID of CE to which this view belongs") |
195 |
|
196 |
dndict = dict() # key: dn string (not including leading "dn: ") |
197 |
thisview = None |
198 |
|
199 |
for line in lines: |
200 |
|
201 |
s = line.strip() |
202 |
if s.find('dn:') == 0 and s.find('GlueCEUniqueID') > 0: |
203 |
dn = s[3:].strip() |
204 |
thisview = CEView(dn) |
205 |
if s.find('GlueVOViewLocalID') >= 0: |
206 |
loc1 = s.find('=') + 1 |
207 |
loc2 = s.find(',GlueCEUniqueID') |
208 |
id = s[loc1:loc2].strip() |
209 |
thisview.localID = id |
210 |
elif s.find('GlueCEName') == 0: |
211 |
qname = s.split(':')[1].strip() |
212 |
thisview.queuename = qname |
213 |
elif s.find('GlueCEAccessControlBaseRule:') == 0: |
214 |
loc = s.find(':') + 1 |
215 |
if loc == len(s): |
216 |
abort_without_output("In static LDIF file, ACBR is " + |
217 |
"missing a value: " + s) |
218 |
rule = s[loc:].split(":") |
219 |
if rule[0].strip() in ['VO', 'VOMS']: |
220 |
thisview.ACBRs.append( ( rule[0].strip(), rule[1].strip() ) ) |
221 |
|
222 |
elif s == '' and thisview : |
223 |
dndict[thisview.dn] = thisview |
224 |
thisview = None # reset, about to enter new block. |
225 |
|
226 |
# now find the queues that go with the VOViews |
227 |
for d in dndict.keys(): |
228 |
view = dndict[d] |
229 |
if view.localID: # then it's a VOView |
230 |
view.queuename = dndict[view.CEUniqueID].queuename |
231 |
|
232 |
DEBUG = 0 |
233 |
|
234 |
if DEBUG: |
235 |
print "dumping parse tree for static ldif file" |
236 |
for d in dndict.keys(): |
237 |
print "For dn:", d |
238 |
print " LocalID: ", dndict[d].localID |
239 |
print " CEUniqueID: ", dndict[d].CEUniqueID |
240 |
print " Queue Name:", dndict[d].queuename |
241 |
print " ACBRs: ", dndict[d].ACBRs |
242 |
|
243 |
def tconv(timeval): # convert time to desired units |
244 |
timeval = min(timeval, 2146060842) |
245 |
return repr( int(timeval) ) # seconds->printable |
246 |
|
247 |
cmd = config.get('LRMS','lrms_backend_cmd') |
248 |
|
249 |
(stat,out) = commands.getstatusoutput(cmd) |
250 |
if stat: |
251 |
abort_without_output("LRMS backend command returned nonzero exit status") |
252 |
|
253 |
lrmlines = out.split('\n') |
254 |
|
255 |
sys.path.append('@TARGPREFIX@/@MODSUFFIX@') |
256 |
import lrms |
257 |
bq = lrms.Server() |
258 |
|
259 |
# generate a cache of indices returned from possible lrms queries |
260 |
# this makes a big speed diff when there are many (thousands) of jobs |
261 |
# first we need to find all combinations VO/fqan, queue, state |
262 |
|
263 |
ql = [] |
264 |
vol = [] |
265 |
|
266 |
for dn in dndict.keys(): |
267 |
view = dndict[dn] |
268 |
if not view.localID : continue |
269 |
if view.queuename not in ql : ql.append(view.queuename) |
270 |
if view.ACBRs[-1][1] not in vol : vol.append(view.ACBRs[-1][1]) |
271 |
|
272 |
sl = ['running', 'queued'] |
273 |
|
274 |
# now construct an empty cache with all the index keys, ready to fill. |
275 |
|
276 |
scache = { } |
277 |
|
278 |
for v in vol: |
279 |
for s in sl: |
280 |
indstr = lrms.filt2str({'group':v, 'state': s}) |
281 |
scache[indstr] = {} |
282 |
|
283 |
for q in ql: |
284 |
for s in sl: |
285 |
indstr = lrms.filt2str({'queue': q, 'state': s}) |
286 |
scache[indstr] = {} |
287 |
for v in vol: |
288 |
indstr = lrms.filt2str({'queue': q, 'state': s, 'group': v}) |
289 |
scache[indstr] = {} |
290 |
|
291 |
#DEBUG print scache.keys() |
292 |
|
293 |
# traverse the job list, filling the index cache as we go. |
294 |
|
295 |
for line in lrmlines: |
296 |
s = line.strip() |
297 |
f = s.split() |
298 |
if s[0] == '{' and s[-1] == '}': # looks like a dict |
299 |
nj = lrms.Job(eval(s, {"__builtins__" : {}})) |
300 |
grp = nj.get('group') |
301 |
if grp in vomap.keys(): |
302 |
nj.set('group',vomap[grp]) |
303 |
jid = nj.get('jobid') |
304 |
bq.addjob(jid,nj) |
305 |
tst = nj.get('state') |
306 |
if tst not in sl: continue |
307 |
tgr = nj.get('group') |
308 |
tq = nj.get('queue') |
309 |
if tgr in vol and tq in ql: |
310 |
dlis = [ |
311 |
{'group': tgr, 'state': tst}, |
312 |
{'queue': tq, 'state': tst}, |
313 |
{'queue': tq, 'state': tst, 'group': tgr} |
314 |
] |
315 |
else: |
316 |
if tgr in vol: |
317 |
dlis = [ |
318 |
{'group': tgr, 'state': tst}, |
319 |
] |
320 |
elif tq in ql: |
321 |
dlis = [ |
322 |
{'queue': tq, 'state': tst}, |
323 |
{'queue': tq, 'state': tst, 'group': tgr} |
324 |
] |
325 |
else: |
326 |
continue |
327 |
|
328 |
for td in dlis : |
329 |
indstr = lrms.filt2str(td) |
330 |
if indstr not in scache.keys(): |
331 |
if DEBUG : print "huh??", indstr |
332 |
scache[indstr] = {} |
333 |
scache[lrms.filt2str(td)][jid] = nj |
334 |
elif len(f) == 2: |
335 |
if f[0] == 'nactive' : bq.slotsUp = int(f[1]) |
336 |
elif f[0] == 'nfree' : bq.slotsFree = int(f[1]) |
337 |
elif f[0] == 'now' : bq.now = int(f[1]) |
338 |
elif f[0] == 'schedCycle' : bq.schedCycle = int(f[1]) |
339 |
else: |
340 |
logging.warn("invalid input in result of lrms plugin command " + cmd) |
341 |
logging.warn("invalid input: " + s) |
342 |
else: |
343 |
logging.warn("invalid input in result of lrms plugin command " + cmd) |
344 |
logging.warn("invalid input: " + s) |
345 |
|
346 |
bq.__scache__ = scache |
347 |
|
348 |
# for k in scache.keys(): |
349 |
# print k, len(scache[k]) |
350 |
|
351 |
if config.has_option('Scheduler','cycle_time'): |
352 |
bq.schedCycle = config.getint('Scheduler','cycle_time') |
353 |
|
354 |
# sys.exit(0) |
355 |
|
356 |
from EstTT import * |
357 |
wq = TTEstimator(WaitTime()) |
358 |
|
359 |
# first we do the VOViews |
360 |
# sort the keys : needed to make sure output is reproducible for test harness :( |
361 |
|
362 |
dnk = dndict.keys() |
363 |
dnk.sort() |
364 |
|
365 |
for dn in dnk: |
366 |
|
367 |
view = dndict[dn] |
368 |
if not view.localID: continue # we only want VOViews here |
369 |
q = view.queuename |
370 |
if len(view.ACBRs) > 1: |
371 |
logging.warn("For dn: " + dn) |
372 |
logging.warn("detected multiple ACBRs:") |
373 |
for vo in view.ACBRs: |
374 |
logging.warn(repr(vo)) |
375 |
logging.warn("only using the last one!") |
376 |
vo = view.ACBRs[-1][1] |
377 |
|
378 |
if DEBUG: print "'" + vo + "'" |
379 |
|
380 |
print "dn: " + dn |
381 |
print "GlueVOViewLocalID: " + view.localID |
382 |
if DEBUG: |
383 |
for acbr in view.ACBRs: |
384 |
print "GlueCEAccessControlBaseRule: " + acbr[0] + ":" + acbr[1] |
385 |
|
386 |
# this is tricky: reporting job counts in queues cares about queues, |
387 |
# but scheduling doesn't as far as I know. So we need four job |
388 |
# counts, running and waiting, both in the current queue and for all |
389 |
# queues. |
390 |
|
391 |
# first find what is happening in this queue (q) to report the |
392 |
# WaitingJobs RunningJobs and TotalJobs |
393 |
|
394 |
|
395 |
qrund = {'state': 'running', 'group': vo, 'queue': q} |
396 |
qwaitd = {'state': 'queued', 'group': vo, 'queue': q} |
397 |
|
398 |
runningJobs = bq.matchingJobs(qrund) ; nrun = len(runningJobs) |
399 |
waitingJobs = bq.matchingJobs(qwaitd); nwait = len(waitingJobs) |
400 |
if nwait > 0: |
401 |
qwt = waitingJobs[0].get('maxwalltime') |
402 |
else: |
403 |
qwt = -1 |
404 |
|
405 |
print "GlueCEStateRunningJobs: " + repr(nrun) |
406 |
print "GlueCEStateWaitingJobs: " + repr(nwait) |
407 |
print "GlueCEStateTotalJobs: " + repr(nrun + nwait) |
408 |
|
409 |
# now we find the same numbers, but integrated across all queues |
410 |
# since the scheduler applies process caps without regard to queues. |
411 |
|
412 |
rund = {'state': 'running', 'group': vo} |
413 |
waitd = {'state': 'queued', 'group': vo} |
414 |
|
415 |
runningJobs = bq.matchingJobs(rund) ; nrun = len(runningJobs) |
416 |
waitingJobs = bq.matchingJobs(waitd); nwait = len(waitingJobs) |
417 |
|
418 |
if nwait > 0: |
419 |
freeslots = 0 |
420 |
else: |
421 |
if vo in pcaps.keys(): |
422 |
headroom = max(pcaps[vo] - nrun,0) |
423 |
freeslots = min(bq.slotsFree,headroom) |
424 |
else: |
425 |
freeslots = bq.slotsFree |
426 |
|
427 |
ert = wq.estimate(bq,vo,debug=0) |
428 |
print "GlueCEStateFreeJobSlots: " + repr(freeslots) |
429 |
print "GlueCEStateEstimatedResponseTime: " + tconv(ert) |
430 |
if nwait == 0: |
431 |
wrt = 2 * ert |
432 |
else: |
433 |
if qwt < 0 : |
434 |
qwt = waitingJobs[0].get('maxwalltime') |
435 |
wrt = qwt * nwait |
436 |
|
437 |
print "GlueCEStateWorstResponseTime: " + \ |
438 |
tconv((max(wrt,bq.schedCycle))) |
439 |
print |
440 |
|
441 |
# now do CEs (no VOs) ... this should be done 'right' some day |
442 |
# and merged with the code above. Want to get it out quick |
443 |
# (sigh, i am mortal) so ... |
444 |
|
445 |
for dn in dnk: |
446 |
|
447 |
view = dndict[dn] |
448 |
if view.localID: continue # only want CEs, they have no localID |
449 |
q = view.queuename |
450 |
vo = '' # vo-independent |
451 |
|
452 |
print "dn: " + dn |
453 |
|
454 |
# for VO-independent numbers we only care about what is in the current |
455 |
# queue. so we only need one set of stats |
456 |
|
457 |
# first find what is happening in this queue (q) to report the |
458 |
# WaitingJobs RunningJobs and TotalJobs |
459 |
|
460 |
qrund = {'state': 'running', 'queue': q} |
461 |
qwaitd = {'state': 'queued', 'queue': q} |
462 |
|
463 |
runningJobs = bq.matchingJobs(qrund) ; nrun = len(runningJobs) |
464 |
waitingJobs = bq.matchingJobs(qwaitd); nwait = len(waitingJobs) |
465 |
|
466 |
if nwait > 0: |
467 |
freeslots = 0 |
468 |
else: |
469 |
freeslots = bq.slotsFree |
470 |
|
471 |
ert = wq.estimate(bq,vo,debug=0) |
472 |
print "GlueCEStateFreeJobSlots: " + repr(freeslots) |
473 |
print "GlueCEStateFreeCPUs: " + repr(freeslots) # backw compat. |
474 |
print "GlueCEStateRunningJobs: " + repr(nrun) |
475 |
print "GlueCEStateWaitingJobs: " + repr(nwait) |
476 |
print "GlueCEStateTotalJobs: " + repr(nrun + nwait) |
477 |
|
478 |
wq.strategy.queue = q |
479 |
ert = wq.estimate(bq,vo,debug=0) |
480 |
print "GlueCEStateEstimatedResponseTime: " + tconv(ert) |
481 |
if nwait == 0: |
482 |
wrt = 2 * ert |
483 |
else: |
484 |
wrt = waitingJobs[0].get('maxwalltime') * nwait |
485 |
|
486 |
print "GlueCEStateWorstResponseTime: " + \ |
487 |
tconv((max(wrt,bq.schedCycle))) |
488 |
if DEBUG: print 'DEBUG: q', q |
489 |
print |
490 |
|
491 |
### Local Variables: *** |
492 |
### mode: python *** |
493 |
### End: *** |
494 |
|