1 |
#!/usr/bin/python2 |
2 |
# lcg-info-dynamic-scheduler |
3 |
# $Id: lcg-info-dynamic-scheduler.cin,v 4.4 2007/08/14 12:05:58 templon Exp $ |
4 |
# J. A. Templon, NIKHEF/PDP 2005 |
5 |
|
6 |
# plugin for gip framework. generates Glue values associated |
7 |
# with the scheduler, like FreeSlots or *ResponseTime |
8 |
|
9 |
# first set up logging |
10 |
|
11 |
import sys, logging |
12 |
import syslog |
13 |
|
14 |
# note: this class works around the bug in the python logging |
15 |
# package fixed in patch #642974. If the version of python-logging |
16 |
# is upgraded, this class can be replaced by logging.SysLogHandler |
17 |
|
18 |
class SLHandler(logging.Handler): |
19 |
def __init__(self, ident, logopt=0, facility=syslog.LOG_USER): |
20 |
logging.Handler.__init__(self) |
21 |
self.ident = ident |
22 |
self.logopt = logopt |
23 |
self.facility = facility |
24 |
self.mappings = { |
25 |
logging.DEBUG: syslog.LOG_DEBUG, |
26 |
logging.INFO: syslog.LOG_INFO, |
27 |
logging.WARN: syslog.LOG_WARNING, |
28 |
logging.ERROR: syslog.LOG_ERR, |
29 |
logging.CRITICAL: syslog.LOG_CRIT, |
30 |
} |
31 |
|
32 |
def encodeLevel(self, level): |
33 |
return self.mappings.get(level, syslog.LOG_INFO) |
34 |
|
35 |
def emit(self, record): |
36 |
syslog.openlog(self.ident, self.logopt, self.facility) |
37 |
msg = self.format(record) |
38 |
prio = self.encodeLevel(record.levelno) |
39 |
syslog.syslog(prio, msg) |
40 |
syslog.closelog() |
41 |
|
42 |
logging.getLogger("").setLevel(logging.INFO) |
43 |
# syslog handler |
44 |
shdlr = SLHandler("lcg-info-dynamic-scheduler") |
45 |
logging.getLogger("").addHandler(shdlr) |
46 |
# stderr handler |
47 |
stdrhdlr = logging.StreamHandler() |
48 |
fmt=logging.Formatter("%(asctime)s lcg-info-dynamic-scheduler:" |
49 |
+ " %(message)s","%F %T") |
50 |
logging.getLogger("").addHandler(stdrhdlr) |
51 |
stdrhdlr.setFormatter(fmt) |
52 |
|
53 |
def usage(): |
54 |
print "Usage: lcg-info-dynamic-scheduler -c <cfg_file>" |
55 |
|
56 |
import sys |
57 |
|
58 |
def abort_without_output(msg): |
59 |
logging.error(msg) |
60 |
logging.error("Exiting without output, GIP will use static values") |
61 |
sys.exit(2) |
62 |
|
63 |
import getopt |
64 |
import string |
65 |
|
66 |
try: |
67 |
opts, args = getopt.getopt(sys.argv[1:], "c:", |
68 |
["config="]) |
69 |
except getopt.GetoptError: |
70 |
# print help information and exit: |
71 |
emsg = "Error parsing command line: " + string.join(sys.argv) |
72 |
usage() |
73 |
abort_without_output(emsg) |
74 |
|
75 |
cfgfile = None |
76 |
|
77 |
for o, a in opts: |
78 |
if o in ("-c", "--config"): |
79 |
cfgfile = a |
80 |
|
81 |
if not cfgfile: |
82 |
usage() |
83 |
abort_without_output("No config file specified.") |
84 |
|
85 |
try: |
86 |
fp = open(cfgfile) |
87 |
except IOError: |
88 |
abort_without_output("Error opening config file " + cfgfile) |
89 |
|
90 |
import commands |
91 |
import ConfigParser |
92 |
config = ConfigParser.ConfigParser() |
93 |
config.readfp(fp) |
94 |
|
95 |
# note: vomap is only intended for the cases where voname != groupname |
96 |
# hence at the bottom, we throw away all entries for which voname = groupname |
97 |
# this is the "default scenario" in the code, adding such entries to the |
98 |
# vomap "exception list" causes problems later on. |
99 |
|
100 |
if config.has_option('Main','vomap'): |
101 |
s = config.get('Main','vomap') |
102 |
vomap = dict() |
103 |
lines = s.split('\n') |
104 |
for l in lines: |
105 |
if len(l) > 0: |
106 |
t = l.split(':') |
107 |
group = t[0].strip() |
108 |
vo = t[1].strip() |
109 |
if vo != group : vomap[group] = vo |
110 |
|
111 |
else: |
112 |
vomap = dict() # empty dict, later 'if grp in vomap.keys()' => if False |
113 |
|
114 |
if config.has_option('Scheduler','vo_max_jobs_cmd'): |
115 |
cmd = config.get('Scheduler','vo_max_jobs_cmd') |
116 |
(stat,out) = commands.getstatusoutput(cmd) |
117 |
if stat: |
118 |
abort_without_output( |
119 |
"VO max jobs backend command returned nonzero exit status") |
120 |
try: |
121 |
pcaps = eval(out, {"__builtins__" : {}}) # namespace at closes security hole |
122 |
except SyntaxError: |
123 |
abort_without_output('vo max slot command output in wrong format') |
124 |
else: |
125 |
pcaps = dict() # empty dict, will trigger use of defaults later. |
126 |
|
127 |
# apply group -> vo mappings to process cap dict |
128 |
# sgroup (source group) is original from input files, |
129 |
# targvo (target vo) is what we want to replace it with |
130 |
|
131 |
#DEBUG opcaps = pcaps.copy() |
132 |
|
133 |
for sgroup in vomap.keys(): |
134 |
targ = vomap[sgroup] |
135 |
if sgroup in pcaps.keys(): |
136 |
if targ in pcaps.keys(): |
137 |
pcaps[targ] = pcaps[targ] + pcaps[sgroup] |
138 |
else: |
139 |
pcaps[targ] = pcaps[sgroup] |
140 |
del pcaps[sgroup] |
141 |
|
142 |
#DEBUGkl = opcaps.keys() + pcaps.keys() |
143 |
#DEBUGkl.sort() |
144 |
#DEBUGukl = [] |
145 |
#DEBUGfor k in kl: |
146 |
#DEBUG if k not in ukl: ukl.append(k) |
147 |
#DEBUGfor k in ukl: |
148 |
#DEBUG v1 = -1 |
149 |
#DEBUG v2 = -1 |
150 |
#DEBUG if k in opcaps.keys(): |
151 |
#DEBUG v1 = opcaps[k] |
152 |
#DEBUG if k in pcaps.keys(): |
153 |
#DEBUG v2 = pcaps[k] |
154 |
#DEBUG |
155 |
#DEBUG print "%10s %4d %4d" % (k, v1, v2) |
156 |
|
157 |
if not config.has_option('LRMS','lrms_backend_cmd'): |
158 |
abort_without_output("Spec for lrms backend cmd missing in config file") |
159 |
|
160 |
# read cfg file to find spec for static ldif file |
161 |
|
162 |
if not config.has_option('Main','static_ldif_file'): |
163 |
abort_without_output("Spec for static ldif file missing in config file") |
164 |
|
165 |
lines = open(config.get('Main','static_ldif_file'),'r').readlines() |
166 |
|
167 |
# get the dns from the ldif file along with their VOs. |
168 |
# first do the VOView dns |
169 |
|
170 |
voviewlist = [ ] |
171 |
qname = None |
172 |
import re |
173 |
jmpatt = re.compile(r'^GlueCEUniqueID=[-a-zA-Z0-9.:]+/[a-z]+-[a-z]+-([a-z_A-Z0-9]+)$') |
174 |
|
175 |
class CEView(object): |
176 |
def __init__(self, dn): |
177 |
self.dn = dn |
178 |
# if this is not present, then it's a GlueCE object |
179 |
# if it is present, it's a GlueVOView object |
180 |
self.localID = None |
181 |
self.queuename = None |
182 |
self.ACBRs = list() |
183 |
def getCEUniqueID(self): |
184 |
if self.localID == None: |
185 |
return self.dn |
186 |
else: |
187 |
loc = self.dn.find(",") + 1 |
188 |
return self.dn[loc:] |
189 |
CEUniqueID = property(getCEUniqueID, |
190 |
doc="UniqueID of CE to which this view belongs") |
191 |
|
192 |
dndict = dict() # key: dn string (not including leading "dn: ") |
193 |
thisview = None |
194 |
|
195 |
for line in lines: |
196 |
|
197 |
s = line.strip() |
198 |
if s.find('dn:') == 0 and s.find('GlueCEUniqueID') > 0: |
199 |
dn = s[3:].strip() |
200 |
thisview = CEView(dn) |
201 |
if s.find('GlueVOViewLocalID') >= 0: |
202 |
loc1 = s.find('=') + 1 |
203 |
loc2 = s.find(',GlueCEUniqueID') |
204 |
id = s[loc1:loc2].strip() |
205 |
thisview.localID = id |
206 |
elif s.find('GlueCEName') == 0: |
207 |
qname = s.split(':')[1].strip() |
208 |
thisview.queuename = qname |
209 |
elif s.find('GlueCEAccessControlBaseRule:') == 0: |
210 |
loc = s.find(':') + 1 |
211 |
rule = s[loc:].split(":") |
212 |
if rule[0].strip() in ['VO', 'VOMS']: |
213 |
thisview.ACBRs.append( ( rule[0].strip(), rule[1].strip() ) ) |
214 |
|
215 |
elif s == '' and thisview : |
216 |
dndict[thisview.dn] = thisview |
217 |
thisview = None # reset, about to enter new block. |
218 |
|
219 |
# now find the queues that go with the VOViews |
220 |
for d in dndict.keys(): |
221 |
view = dndict[d] |
222 |
if view.localID: # then it's a VOView |
223 |
view.queuename = dndict[view.CEUniqueID].queuename |
224 |
|
225 |
DEBUG = 0 |
226 |
|
227 |
if DEBUG: |
228 |
print "dumping parse tree for static ldif file" |
229 |
for d in dndict.keys(): |
230 |
print "For dn:", d |
231 |
print " LocalID: ", dndict[d].localID |
232 |
print " CEUniqueID: ", dndict[d].CEUniqueID |
233 |
print " Queue Name:", dndict[d].queuename |
234 |
print " ACBRs: ", dndict[d].ACBRs |
235 |
|
236 |
def tconv(timeval): # convert time to desired units |
237 |
timeval = min(timeval, 2146060842) |
238 |
return repr( int(timeval) ) # seconds->printable |
239 |
|
240 |
cmd = config.get('LRMS','lrms_backend_cmd') |
241 |
|
242 |
(stat,out) = commands.getstatusoutput(cmd) |
243 |
if stat: |
244 |
abort_without_output("LRMS backend command returned nonzero exit status") |
245 |
|
246 |
lrmlines = out.split('\n') |
247 |
|
248 |
sys.path.append('@MODDIR@') |
249 |
import lrms |
250 |
bq = lrms.Server() |
251 |
|
252 |
# generate a cache of indices returned from possible lrms queries |
253 |
# this makes a big speed diff when there are many (thousands) of jobs |
254 |
# first we need to find all combinations VO/fqan, queue, state |
255 |
|
256 |
ql = [] |
257 |
vol = [] |
258 |
|
259 |
for dn in dndict.keys(): |
260 |
view = dndict[dn] |
261 |
if not view.localID : continue |
262 |
if view.queuename not in ql : ql.append(view.queuename) |
263 |
if view.ACBRs[-1][1] not in vol : vol.append(view.ACBRs[-1][1]) |
264 |
|
265 |
sl = ['running', 'queued'] |
266 |
|
267 |
# now construct an empty cache with all the index keys, ready to fill. |
268 |
|
269 |
scache = { } |
270 |
|
271 |
for v in vol: |
272 |
for s in sl: |
273 |
indstr = lrms.filt2str({'group':v, 'state': s}) |
274 |
scache[indstr] = {} |
275 |
|
276 |
for q in ql: |
277 |
for s in sl: |
278 |
indstr = lrms.filt2str({'queue': q, 'state': s}) |
279 |
scache[indstr] = {} |
280 |
for v in vol: |
281 |
indstr = lrms.filt2str({'queue': q, 'state': s, 'group': v}) |
282 |
scache[indstr] = {} |
283 |
|
284 |
#DEBUG print scache.keys() |
285 |
|
286 |
# traverse the job list, filling the index cache as we go. |
287 |
|
288 |
for line in lrmlines: |
289 |
s = line.strip() |
290 |
f = s.split() |
291 |
if s[0] == '{' and s[-1] == '}': # looks like a dict |
292 |
nj = lrms.Job(eval(s, {"__builtins__" : {}})) |
293 |
grp = nj.get('group') |
294 |
if grp in vomap.keys(): |
295 |
nj.set('group',vomap[grp]) |
296 |
jid = nj.get('jobid') |
297 |
bq.addjob(jid,nj) |
298 |
tst = nj.get('state') |
299 |
if tst not in sl: continue |
300 |
tgr = nj.get('group') |
301 |
tq = nj.get('queue') |
302 |
if tgr in vol and tq in ql: |
303 |
dlis = [ |
304 |
{'group': tgr, 'state': tst}, |
305 |
{'queue': tq, 'state': tst}, |
306 |
{'queue': tq, 'state': tst, 'group': tgr} |
307 |
] |
308 |
else: |
309 |
if tgr in vol: |
310 |
dlis = [ |
311 |
{'group': tgr, 'state': tst}, |
312 |
] |
313 |
elif tq in ql: |
314 |
dlis = [ |
315 |
{'queue': tq, 'state': tst}, |
316 |
{'queue': tq, 'state': tst, 'group': tgr} |
317 |
] |
318 |
else: |
319 |
continue |
320 |
|
321 |
for td in dlis : |
322 |
indstr = lrms.filt2str(td) |
323 |
if indstr not in scache.keys(): |
324 |
if DEBUG : print "huh??", indstr |
325 |
scache[indstr] = {} |
326 |
scache[lrms.filt2str(td)][jid] = nj |
327 |
elif len(f) == 2: |
328 |
if f[0] == 'nactive' : bq.slotsUp = int(f[1]) |
329 |
elif f[0] == 'nfree' : bq.slotsFree = int(f[1]) |
330 |
elif f[0] == 'now' : bq.now = int(f[1]) |
331 |
elif f[0] == 'schedCycle' : bq.schedCycle = int(f[1]) |
332 |
else: |
333 |
logging.warn("invalid input in result of lrms plugin command " + cmd) |
334 |
logging.warn("invalid input: " + s) |
335 |
else: |
336 |
logging.warn("invalid input in result of lrms plugin command " + cmd) |
337 |
logging.warn("invalid input: " + s) |
338 |
|
339 |
bq.__scache__ = scache |
340 |
|
341 |
# for k in scache.keys(): |
342 |
# print k, len(scache[k]) |
343 |
|
344 |
if config.has_option('Scheduler','cycle_time'): |
345 |
bq.schedCycle = config.getint('Scheduler','cycle_time') |
346 |
|
347 |
# sys.exit(0) |
348 |
|
349 |
from EstTT import * |
350 |
wq = TTEstimator(WaitTime()) |
351 |
|
352 |
# first we do the VOViews |
353 |
# sort the keys : needed to make sure output is reproducible for test harness :( |
354 |
|
355 |
dnk = dndict.keys() |
356 |
dnk.sort() |
357 |
|
358 |
for dn in dnk: |
359 |
|
360 |
view = dndict[dn] |
361 |
if not view.localID: continue # we only want VOViews here |
362 |
q = view.queuename |
363 |
if len(view.ACBRs) > 1: |
364 |
logging.warn("For dn: " + dn) |
365 |
logging.warn("detected multiple ACBRs:") |
366 |
for vo in view.ACBRs: |
367 |
logging.warn(repr(vo)) |
368 |
logging.warn("only using the last one!") |
369 |
vo = view.ACBRs[-1][1] |
370 |
|
371 |
if DEBUG: print "'" + vo + "'" |
372 |
|
373 |
print "dn: " + dn |
374 |
print "GlueVOViewLocalID: " + view.localID |
375 |
if DEBUG: |
376 |
for acbr in view.ACBRs: |
377 |
print "GlueCEAccessControlBaseRule: " + acbr[0] + ":" + acbr[1] |
378 |
|
379 |
# this is tricky: reporting job counts in queues cares about queues, |
380 |
# but scheduling doesn't as far as I know. So we need four job |
381 |
# counts, running and waiting, both in the current queue and for all |
382 |
# queues. |
383 |
|
384 |
# first find what is happening in this queue (q) to report the |
385 |
# WaitingJobs RunningJobs and TotalJobs |
386 |
|
387 |
|
388 |
qrund = {'state': 'running', 'group': vo, 'queue': q} |
389 |
qwaitd = {'state': 'queued', 'group': vo, 'queue': q} |
390 |
|
391 |
runningJobs = bq.matchingJobs(qrund) ; nrun = len(runningJobs) |
392 |
waitingJobs = bq.matchingJobs(qwaitd); nwait = len(waitingJobs) |
393 |
if nwait > 0: |
394 |
qwt = waitingJobs[0].get('maxwalltime') |
395 |
else: |
396 |
qwt = -1 |
397 |
|
398 |
print "GlueCEStateRunningJobs: " + repr(nrun) |
399 |
print "GlueCEStateWaitingJobs: " + repr(nwait) |
400 |
print "GlueCEStateTotalJobs: " + repr(nrun + nwait) |
401 |
|
402 |
# now we find the same numbers, but integrated across all queues |
403 |
# since the scheduler applies process caps without regard to queues. |
404 |
|
405 |
rund = {'state': 'running', 'group': vo} |
406 |
waitd = {'state': 'queued', 'group': vo} |
407 |
|
408 |
runningJobs = bq.matchingJobs(rund) ; nrun = len(runningJobs) |
409 |
waitingJobs = bq.matchingJobs(waitd); nwait = len(waitingJobs) |
410 |
|
411 |
if nwait > 0: |
412 |
freeslots = 0 |
413 |
else: |
414 |
if vo in pcaps.keys(): |
415 |
headroom = max(pcaps[vo] - nrun,0) |
416 |
freeslots = min(bq.slotsFree,headroom) |
417 |
else: |
418 |
freeslots = bq.slotsFree |
419 |
|
420 |
ert = wq.estimate(bq,vo,debug=0) |
421 |
print "GlueCEStateFreeJobSlots: " + repr(freeslots) |
422 |
print "GlueCEStateEstimatedResponseTime: " + tconv(ert) |
423 |
if nwait == 0: |
424 |
wrt = 2 * ert |
425 |
else: |
426 |
if qwt < 0 : |
427 |
qwt = waitingJobs[0].get('maxwalltime') |
428 |
wrt = qwt * nwait |
429 |
|
430 |
print "GlueCEStateWorstResponseTime: " + \ |
431 |
tconv((max(wrt,bq.schedCycle))) |
432 |
print |
433 |
|
434 |
# now do CEs (no VOs) ... this should be done 'right' some day |
435 |
# and merged with the code above. Want to get it out quick |
436 |
# (sigh, i am mortal) so ... |
437 |
|
438 |
for dn in dnk: |
439 |
|
440 |
view = dndict[dn] |
441 |
if view.localID: continue # only want CEs, they have no localID |
442 |
q = view.queuename |
443 |
vo = '' # vo-independent |
444 |
|
445 |
print "dn: " + dn |
446 |
|
447 |
# for VO-independent numbers we only care about what is in the current |
448 |
# queue. so we only need one set of stats |
449 |
|
450 |
# first find what is happening in this queue (q) to report the |
451 |
# WaitingJobs RunningJobs and TotalJobs |
452 |
|
453 |
qrund = {'state': 'running', 'queue': q} |
454 |
qwaitd = {'state': 'queued', 'queue': q} |
455 |
|
456 |
runningJobs = bq.matchingJobs(qrund) ; nrun = len(runningJobs) |
457 |
waitingJobs = bq.matchingJobs(qwaitd); nwait = len(waitingJobs) |
458 |
|
459 |
if nwait > 0: |
460 |
freeslots = 0 |
461 |
else: |
462 |
freeslots = bq.slotsFree |
463 |
|
464 |
ert = wq.estimate(bq,vo,debug=0) |
465 |
print "GlueCEStateFreeJobSlots: " + repr(freeslots) |
466 |
print "GlueCEStateFreeCPUs: " + repr(nrun + nwait) # backw compat. |
467 |
print "GlueCEStateRunningJobs: " + repr(nrun) |
468 |
print "GlueCEStateWaitingJobs: " + repr(nwait) |
469 |
print "GlueCEStateTotalJobs: " + repr(nrun + nwait) |
470 |
|
471 |
wq.strategy.queue = q |
472 |
ert = wq.estimate(bq,vo,debug=0) |
473 |
print "GlueCEStateEstimatedResponseTime: " + tconv(ert) |
474 |
if nwait == 0: |
475 |
wrt = 2 * ert |
476 |
else: |
477 |
wrt = waitingJobs[0].get('maxwalltime') * nwait |
478 |
|
479 |
print "GlueCEStateWorstResponseTime: " + \ |
480 |
tconv((max(wrt,bq.schedCycle))) |
481 |
if DEBUG: print 'DEBUG: q', q |
482 |
print |
483 |
|
484 |
### Local Variables: *** |
485 |
### mode: python *** |
486 |
### End: *** |
487 |
|