/[pdpsoft]/trunk/nl.nikhef.pdp.lrmsutils/lcg-gip/lcg-info-dynamic-scheduler.cin
ViewVC logotype

Annotation of /trunk/nl.nikhef.pdp.lrmsutils/lcg-gip/lcg-info-dynamic-scheduler.cin

Parent Directory Parent Directory | Revision Log Revision Log


Revision 131 - (hide annotations) (download)
Tue Mar 10 14:16:44 2009 UTC (13 years, 6 months ago) by templon
File size: 15503 byte(s)
ETICS compatibility changes

1 templon 78 #!/usr/bin/python2
2     # lcg-info-dynamic-scheduler
3     # $Id: lcg-info-dynamic-scheduler.cin,v 4.4 2007/08/14 12:05:58 templon Exp $
4     # J. A. Templon, NIKHEF/PDP 2005
5    
6     # plugin for gip framework. generates Glue values associated
7     # with the scheduler, like FreeSlots or *ResponseTime
8    
9     # first set up logging
10    
11     import sys, logging
12     import syslog
13    
14     # note: this class works around the bug in the python logging
15     # package fixed in patch #642974. If the version of python-logging
16     # is upgraded, this class can be replaced by logging.SysLogHandler
17    
18     class SLHandler(logging.Handler):
19     def __init__(self, ident, logopt=0, facility=syslog.LOG_USER):
20     logging.Handler.__init__(self)
21     self.ident = ident
22     self.logopt = logopt
23     self.facility = facility
24     self.mappings = {
25     logging.DEBUG: syslog.LOG_DEBUG,
26     logging.INFO: syslog.LOG_INFO,
27     logging.WARN: syslog.LOG_WARNING,
28     logging.ERROR: syslog.LOG_ERR,
29     logging.CRITICAL: syslog.LOG_CRIT,
30     }
31    
32     def encodeLevel(self, level):
33     return self.mappings.get(level, syslog.LOG_INFO)
34    
35     def emit(self, record):
36     syslog.openlog(self.ident, self.logopt, self.facility)
37     msg = self.format(record)
38     prio = self.encodeLevel(record.levelno)
39     syslog.syslog(prio, msg)
40     syslog.closelog()
41    
42     logging.getLogger("").setLevel(logging.INFO)
43     # syslog handler
44     shdlr = SLHandler("lcg-info-dynamic-scheduler")
45     logging.getLogger("").addHandler(shdlr)
46     # stderr handler
47     stdrhdlr = logging.StreamHandler()
48     fmt=logging.Formatter("%(asctime)s lcg-info-dynamic-scheduler:"
49     + " %(message)s","%F %T")
50     logging.getLogger("").addHandler(stdrhdlr)
51     stdrhdlr.setFormatter(fmt)
52    
53     def usage():
54     print "Usage: lcg-info-dynamic-scheduler -c <cfg_file>"
55    
56     import sys
57    
58     def abort_without_output(msg):
59     logging.error(msg)
60     logging.error("Exiting without output, GIP will use static values")
61     sys.exit(2)
62    
63     import getopt
64     import string
65    
66     try:
67     opts, args = getopt.getopt(sys.argv[1:], "c:",
68     ["config="])
69     except getopt.GetoptError:
70     # print help information and exit:
71     emsg = "Error parsing command line: " + string.join(sys.argv)
72     usage()
73     abort_without_output(emsg)
74    
75     cfgfile = None
76    
77     for o, a in opts:
78     if o in ("-c", "--config"):
79     cfgfile = a
80    
81     if not cfgfile:
82     usage()
83     abort_without_output("No config file specified.")
84    
85     try:
86     fp = open(cfgfile)
87     except IOError:
88     abort_without_output("Error opening config file " + cfgfile)
89    
90     import commands
91     import ConfigParser
92     config = ConfigParser.ConfigParser()
93 templon 96 try:
94     config.readfp(fp)
95     except:
96 templon 98 abort_without_output("While parsing config file: " + \
97 templon 96 repr(sys.exc_info()[1]) + " " + cfgfile)
98 templon 78
99     # note: vomap is only intended for the cases where voname != groupname
100     # hence at the bottom, we throw away all entries for which voname = groupname
101     # this is the "default scenario" in the code, adding such entries to the
102     # vomap "exception list" causes problems later on.
103    
104     if config.has_option('Main','vomap'):
105     s = config.get('Main','vomap')
106     vomap = dict()
107     lines = s.split('\n')
108     for l in lines:
109     if len(l) > 0:
110     t = l.split(':')
111     group = t[0].strip()
112     vo = t[1].strip()
113     if vo != group : vomap[group] = vo
114    
115     else:
116     vomap = dict() # empty dict, later 'if grp in vomap.keys()' => if False
117    
118     if config.has_option('Scheduler','vo_max_jobs_cmd'):
119     cmd = config.get('Scheduler','vo_max_jobs_cmd')
120     (stat,out) = commands.getstatusoutput(cmd)
121     if stat:
122     abort_without_output(
123     "VO max jobs backend command returned nonzero exit status")
124     try:
125     pcaps = eval(out, {"__builtins__" : {}}) # namespace at closes security hole
126     except SyntaxError:
127     abort_without_output('vo max slot command output in wrong format')
128     else:
129     pcaps = dict() # empty dict, will trigger use of defaults later.
130    
131     # apply group -> vo mappings to process cap dict
132     # sgroup (source group) is original from input files,
133     # targvo (target vo) is what we want to replace it with
134    
135     #DEBUG opcaps = pcaps.copy()
136    
137     for sgroup in vomap.keys():
138     targ = vomap[sgroup]
139     if sgroup in pcaps.keys():
140     if targ in pcaps.keys():
141     pcaps[targ] = pcaps[targ] + pcaps[sgroup]
142     else:
143     pcaps[targ] = pcaps[sgroup]
144     del pcaps[sgroup]
145    
146     #DEBUGkl = opcaps.keys() + pcaps.keys()
147     #DEBUGkl.sort()
148     #DEBUGukl = []
149     #DEBUGfor k in kl:
150     #DEBUG if k not in ukl: ukl.append(k)
151     #DEBUGfor k in ukl:
152     #DEBUG v1 = -1
153     #DEBUG v2 = -1
154     #DEBUG if k in opcaps.keys():
155     #DEBUG v1 = opcaps[k]
156     #DEBUG if k in pcaps.keys():
157     #DEBUG v2 = pcaps[k]
158     #DEBUG
159     #DEBUG print "%10s %4d %4d" % (k, v1, v2)
160    
161     if not config.has_option('LRMS','lrms_backend_cmd'):
162     abort_without_output("Spec for lrms backend cmd missing in config file")
163    
164     # read cfg file to find spec for static ldif file
165    
166     if not config.has_option('Main','static_ldif_file'):
167     abort_without_output("Spec for static ldif file missing in config file")
168    
169     lines = open(config.get('Main','static_ldif_file'),'r').readlines()
170    
171     # get the dns from the ldif file along with their VOs.
172     # first do the VOView dns
173    
174     voviewlist = [ ]
175     qname = None
176     import re
177     jmpatt = re.compile(r'^GlueCEUniqueID=[-a-zA-Z0-9.:]+/[a-z]+-[a-z]+-([a-z_A-Z0-9]+)$')
178    
179     class CEView(object):
180     def __init__(self, dn):
181     self.dn = dn
182     # if this is not present, then it's a GlueCE object
183     # if it is present, it's a GlueVOView object
184     self.localID = None
185     self.queuename = None
186     self.ACBRs = list()
187     def getCEUniqueID(self):
188     if self.localID == None:
189     return self.dn
190     else:
191     loc = self.dn.find(",") + 1
192     return self.dn[loc:]
193     CEUniqueID = property(getCEUniqueID,
194     doc="UniqueID of CE to which this view belongs")
195    
196     dndict = dict() # key: dn string (not including leading "dn: ")
197     thisview = None
198    
199     for line in lines:
200    
201     s = line.strip()
202     if s.find('dn:') == 0 and s.find('GlueCEUniqueID') > 0:
203     dn = s[3:].strip()
204     thisview = CEView(dn)
205     if s.find('GlueVOViewLocalID') >= 0:
206     loc1 = s.find('=') + 1
207     loc2 = s.find(',GlueCEUniqueID')
208     id = s[loc1:loc2].strip()
209     thisview.localID = id
210     elif s.find('GlueCEName') == 0:
211     qname = s.split(':')[1].strip()
212     thisview.queuename = qname
213     elif s.find('GlueCEAccessControlBaseRule:') == 0:
214     loc = s.find(':') + 1
215 templon 107 if loc == len(s):
216     abort_without_output("In static LDIF file, ACBR is " +
217     "missing a value: " + s)
218 templon 78 rule = s[loc:].split(":")
219     if rule[0].strip() in ['VO', 'VOMS']:
220     thisview.ACBRs.append( ( rule[0].strip(), rule[1].strip() ) )
221    
222     elif s == '' and thisview :
223     dndict[thisview.dn] = thisview
224     thisview = None # reset, about to enter new block.
225    
226     # now find the queues that go with the VOViews
227     for d in dndict.keys():
228     view = dndict[d]
229     if view.localID: # then it's a VOView
230     view.queuename = dndict[view.CEUniqueID].queuename
231    
232     DEBUG = 0
233    
234     if DEBUG:
235     print "dumping parse tree for static ldif file"
236     for d in dndict.keys():
237     print "For dn:", d
238     print " LocalID: ", dndict[d].localID
239     print " CEUniqueID: ", dndict[d].CEUniqueID
240     print " Queue Name:", dndict[d].queuename
241     print " ACBRs: ", dndict[d].ACBRs
242    
243     def tconv(timeval): # convert time to desired units
244     timeval = min(timeval, 2146060842)
245     return repr( int(timeval) ) # seconds->printable
246    
247     cmd = config.get('LRMS','lrms_backend_cmd')
248    
249     (stat,out) = commands.getstatusoutput(cmd)
250     if stat:
251     abort_without_output("LRMS backend command returned nonzero exit status")
252    
253     lrmlines = out.split('\n')
254    
255 templon 131 sys.path.append('@TARGPREFIX@/@MODSUFFIX@')
256 templon 78 import lrms
257     bq = lrms.Server()
258    
259     # generate a cache of indices returned from possible lrms queries
260     # this makes a big speed diff when there are many (thousands) of jobs
261     # first we need to find all combinations VO/fqan, queue, state
262    
263     ql = []
264     vol = []
265    
266     for dn in dndict.keys():
267     view = dndict[dn]
268     if not view.localID : continue
269     if view.queuename not in ql : ql.append(view.queuename)
270     if view.ACBRs[-1][1] not in vol : vol.append(view.ACBRs[-1][1])
271    
272     sl = ['running', 'queued']
273    
274     # now construct an empty cache with all the index keys, ready to fill.
275    
276     scache = { }
277    
278     for v in vol:
279     for s in sl:
280     indstr = lrms.filt2str({'group':v, 'state': s})
281     scache[indstr] = {}
282    
283     for q in ql:
284     for s in sl:
285     indstr = lrms.filt2str({'queue': q, 'state': s})
286     scache[indstr] = {}
287     for v in vol:
288     indstr = lrms.filt2str({'queue': q, 'state': s, 'group': v})
289     scache[indstr] = {}
290    
291     #DEBUG print scache.keys()
292    
293     # traverse the job list, filling the index cache as we go.
294    
295     for line in lrmlines:
296     s = line.strip()
297     f = s.split()
298     if s[0] == '{' and s[-1] == '}': # looks like a dict
299     nj = lrms.Job(eval(s, {"__builtins__" : {}}))
300     grp = nj.get('group')
301     if grp in vomap.keys():
302     nj.set('group',vomap[grp])
303     jid = nj.get('jobid')
304     bq.addjob(jid,nj)
305     tst = nj.get('state')
306     if tst not in sl: continue
307     tgr = nj.get('group')
308     tq = nj.get('queue')
309     if tgr in vol and tq in ql:
310     dlis = [
311     {'group': tgr, 'state': tst},
312     {'queue': tq, 'state': tst},
313     {'queue': tq, 'state': tst, 'group': tgr}
314     ]
315     else:
316     if tgr in vol:
317     dlis = [
318     {'group': tgr, 'state': tst},
319     ]
320     elif tq in ql:
321     dlis = [
322     {'queue': tq, 'state': tst},
323     {'queue': tq, 'state': tst, 'group': tgr}
324     ]
325     else:
326     continue
327    
328     for td in dlis :
329     indstr = lrms.filt2str(td)
330     if indstr not in scache.keys():
331     if DEBUG : print "huh??", indstr
332     scache[indstr] = {}
333     scache[lrms.filt2str(td)][jid] = nj
334     elif len(f) == 2:
335     if f[0] == 'nactive' : bq.slotsUp = int(f[1])
336     elif f[0] == 'nfree' : bq.slotsFree = int(f[1])
337     elif f[0] == 'now' : bq.now = int(f[1])
338     elif f[0] == 'schedCycle' : bq.schedCycle = int(f[1])
339     else:
340     logging.warn("invalid input in result of lrms plugin command " + cmd)
341     logging.warn("invalid input: " + s)
342     else:
343     logging.warn("invalid input in result of lrms plugin command " + cmd)
344     logging.warn("invalid input: " + s)
345    
346     bq.__scache__ = scache
347    
348     # for k in scache.keys():
349     # print k, len(scache[k])
350    
351     if config.has_option('Scheduler','cycle_time'):
352     bq.schedCycle = config.getint('Scheduler','cycle_time')
353    
354     # sys.exit(0)
355    
356     from EstTT import *
357     wq = TTEstimator(WaitTime())
358    
359     # first we do the VOViews
360     # sort the keys : needed to make sure output is reproducible for test harness :(
361    
362     dnk = dndict.keys()
363     dnk.sort()
364    
365     for dn in dnk:
366    
367     view = dndict[dn]
368     if not view.localID: continue # we only want VOViews here
369     q = view.queuename
370     if len(view.ACBRs) > 1:
371     logging.warn("For dn: " + dn)
372     logging.warn("detected multiple ACBRs:")
373     for vo in view.ACBRs:
374     logging.warn(repr(vo))
375     logging.warn("only using the last one!")
376     vo = view.ACBRs[-1][1]
377    
378     if DEBUG: print "'" + vo + "'"
379    
380     print "dn: " + dn
381     print "GlueVOViewLocalID: " + view.localID
382     if DEBUG:
383     for acbr in view.ACBRs:
384     print "GlueCEAccessControlBaseRule: " + acbr[0] + ":" + acbr[1]
385    
386     # this is tricky: reporting job counts in queues cares about queues,
387     # but scheduling doesn't as far as I know. So we need four job
388     # counts, running and waiting, both in the current queue and for all
389     # queues.
390    
391     # first find what is happening in this queue (q) to report the
392     # WaitingJobs RunningJobs and TotalJobs
393    
394    
395     qrund = {'state': 'running', 'group': vo, 'queue': q}
396     qwaitd = {'state': 'queued', 'group': vo, 'queue': q}
397    
398     runningJobs = bq.matchingJobs(qrund) ; nrun = len(runningJobs)
399     waitingJobs = bq.matchingJobs(qwaitd); nwait = len(waitingJobs)
400     if nwait > 0:
401     qwt = waitingJobs[0].get('maxwalltime')
402     else:
403     qwt = -1
404    
405     print "GlueCEStateRunningJobs: " + repr(nrun)
406     print "GlueCEStateWaitingJobs: " + repr(nwait)
407     print "GlueCEStateTotalJobs: " + repr(nrun + nwait)
408    
409     # now we find the same numbers, but integrated across all queues
410     # since the scheduler applies process caps without regard to queues.
411    
412     rund = {'state': 'running', 'group': vo}
413     waitd = {'state': 'queued', 'group': vo}
414    
415     runningJobs = bq.matchingJobs(rund) ; nrun = len(runningJobs)
416     waitingJobs = bq.matchingJobs(waitd); nwait = len(waitingJobs)
417    
418     if nwait > 0:
419     freeslots = 0
420     else:
421     if vo in pcaps.keys():
422     headroom = max(pcaps[vo] - nrun,0)
423     freeslots = min(bq.slotsFree,headroom)
424     else:
425     freeslots = bq.slotsFree
426    
427     ert = wq.estimate(bq,vo,debug=0)
428     print "GlueCEStateFreeJobSlots: " + repr(freeslots)
429     print "GlueCEStateEstimatedResponseTime: " + tconv(ert)
430     if nwait == 0:
431     wrt = 2 * ert
432     else:
433     if qwt < 0 :
434     qwt = waitingJobs[0].get('maxwalltime')
435     wrt = qwt * nwait
436    
437     print "GlueCEStateWorstResponseTime: " + \
438     tconv((max(wrt,bq.schedCycle)))
439     print
440    
441     # now do CEs (no VOs) ... this should be done 'right' some day
442     # and merged with the code above. Want to get it out quick
443     # (sigh, i am mortal) so ...
444    
445     for dn in dnk:
446    
447     view = dndict[dn]
448     if view.localID: continue # only want CEs, they have no localID
449     q = view.queuename
450     vo = '' # vo-independent
451    
452     print "dn: " + dn
453    
454     # for VO-independent numbers we only care about what is in the current
455     # queue. so we only need one set of stats
456    
457     # first find what is happening in this queue (q) to report the
458     # WaitingJobs RunningJobs and TotalJobs
459    
460     qrund = {'state': 'running', 'queue': q}
461     qwaitd = {'state': 'queued', 'queue': q}
462    
463     runningJobs = bq.matchingJobs(qrund) ; nrun = len(runningJobs)
464     waitingJobs = bq.matchingJobs(qwaitd); nwait = len(waitingJobs)
465    
466     if nwait > 0:
467     freeslots = 0
468     else:
469     freeslots = bq.slotsFree
470    
471     ert = wq.estimate(bq,vo,debug=0)
472     print "GlueCEStateFreeJobSlots: " + repr(freeslots)
473 templon 87 print "GlueCEStateFreeCPUs: " + repr(freeslots) # backw compat.
474 templon 86 print "GlueCEStateRunningJobs: " + repr(nrun)
475     print "GlueCEStateWaitingJobs: " + repr(nwait)
476     print "GlueCEStateTotalJobs: " + repr(nrun + nwait)
477 templon 78
478     wq.strategy.queue = q
479     ert = wq.estimate(bq,vo,debug=0)
480     print "GlueCEStateEstimatedResponseTime: " + tconv(ert)
481     if nwait == 0:
482     wrt = 2 * ert
483     else:
484     wrt = waitingJobs[0].get('maxwalltime') * nwait
485    
486     print "GlueCEStateWorstResponseTime: " + \
487     tconv((max(wrt,bq.schedCycle)))
488     if DEBUG: print 'DEBUG: q', q
489     print
490    
491     ### Local Variables: ***
492     ### mode: python ***
493     ### End: ***
494    

Properties

Name Value
svn:executable *

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28