/[pdpsoft]/trunk/nl.nikhef.pdp.lrmsutils/lcg-gip/lcg-info-dynamic-scheduler.cin
ViewVC logotype

Annotation of /trunk/nl.nikhef.pdp.lrmsutils/lcg-gip/lcg-info-dynamic-scheduler.cin

Parent Directory Parent Directory | Revision Log Revision Log


Revision 78 - (hide annotations) (download)
Fri Feb 27 10:22:17 2009 UTC (13 years, 7 months ago) by templon
File size: 15188 byte(s)
changing repo structure

1 templon 78 #!/usr/bin/python2
2     # lcg-info-dynamic-scheduler
3     # $Id: lcg-info-dynamic-scheduler.cin,v 4.4 2007/08/14 12:05:58 templon Exp $
4     # J. A. Templon, NIKHEF/PDP 2005
5    
6     # plugin for gip framework. generates Glue values associated
7     # with the scheduler, like FreeSlots or *ResponseTime
8    
9     # first set up logging
10    
11     import sys, logging
12     import syslog
13    
14     # note: this class works around the bug in the python logging
15     # package fixed in patch #642974. If the version of python-logging
16     # is upgraded, this class can be replaced by logging.SysLogHandler
17    
18     class SLHandler(logging.Handler):
19     def __init__(self, ident, logopt=0, facility=syslog.LOG_USER):
20     logging.Handler.__init__(self)
21     self.ident = ident
22     self.logopt = logopt
23     self.facility = facility
24     self.mappings = {
25     logging.DEBUG: syslog.LOG_DEBUG,
26     logging.INFO: syslog.LOG_INFO,
27     logging.WARN: syslog.LOG_WARNING,
28     logging.ERROR: syslog.LOG_ERR,
29     logging.CRITICAL: syslog.LOG_CRIT,
30     }
31    
32     def encodeLevel(self, level):
33     return self.mappings.get(level, syslog.LOG_INFO)
34    
35     def emit(self, record):
36     syslog.openlog(self.ident, self.logopt, self.facility)
37     msg = self.format(record)
38     prio = self.encodeLevel(record.levelno)
39     syslog.syslog(prio, msg)
40     syslog.closelog()
41    
42     logging.getLogger("").setLevel(logging.INFO)
43     # syslog handler
44     shdlr = SLHandler("lcg-info-dynamic-scheduler")
45     logging.getLogger("").addHandler(shdlr)
46     # stderr handler
47     stdrhdlr = logging.StreamHandler()
48     fmt=logging.Formatter("%(asctime)s lcg-info-dynamic-scheduler:"
49     + " %(message)s","%F %T")
50     logging.getLogger("").addHandler(stdrhdlr)
51     stdrhdlr.setFormatter(fmt)
52    
53     def usage():
54     print "Usage: lcg-info-dynamic-scheduler -c <cfg_file>"
55    
56     import sys
57    
58     def abort_without_output(msg):
59     logging.error(msg)
60     logging.error("Exiting without output, GIP will use static values")
61     sys.exit(2)
62    
63     import getopt
64     import string
65    
66     try:
67     opts, args = getopt.getopt(sys.argv[1:], "c:",
68     ["config="])
69     except getopt.GetoptError:
70     # print help information and exit:
71     emsg = "Error parsing command line: " + string.join(sys.argv)
72     usage()
73     abort_without_output(emsg)
74    
75     cfgfile = None
76    
77     for o, a in opts:
78     if o in ("-c", "--config"):
79     cfgfile = a
80    
81     if not cfgfile:
82     usage()
83     abort_without_output("No config file specified.")
84    
85     try:
86     fp = open(cfgfile)
87     except IOError:
88     abort_without_output("Error opening config file " + cfgfile)
89    
90     import commands
91     import ConfigParser
92     config = ConfigParser.ConfigParser()
93     config.readfp(fp)
94    
95     # note: vomap is only intended for the cases where voname != groupname
96     # hence at the bottom, we throw away all entries for which voname = groupname
97     # this is the "default scenario" in the code, adding such entries to the
98     # vomap "exception list" causes problems later on.
99    
100     if config.has_option('Main','vomap'):
101     s = config.get('Main','vomap')
102     vomap = dict()
103     lines = s.split('\n')
104     for l in lines:
105     if len(l) > 0:
106     t = l.split(':')
107     group = t[0].strip()
108     vo = t[1].strip()
109     if vo != group : vomap[group] = vo
110    
111     else:
112     vomap = dict() # empty dict, later 'if grp in vomap.keys()' => if False
113    
114     if config.has_option('Scheduler','vo_max_jobs_cmd'):
115     cmd = config.get('Scheduler','vo_max_jobs_cmd')
116     (stat,out) = commands.getstatusoutput(cmd)
117     if stat:
118     abort_without_output(
119     "VO max jobs backend command returned nonzero exit status")
120     try:
121     pcaps = eval(out, {"__builtins__" : {}}) # namespace at closes security hole
122     except SyntaxError:
123     abort_without_output('vo max slot command output in wrong format')
124     else:
125     pcaps = dict() # empty dict, will trigger use of defaults later.
126    
127     # apply group -> vo mappings to process cap dict
128     # sgroup (source group) is original from input files,
129     # targvo (target vo) is what we want to replace it with
130    
131     #DEBUG opcaps = pcaps.copy()
132    
133     for sgroup in vomap.keys():
134     targ = vomap[sgroup]
135     if sgroup in pcaps.keys():
136     if targ in pcaps.keys():
137     pcaps[targ] = pcaps[targ] + pcaps[sgroup]
138     else:
139     pcaps[targ] = pcaps[sgroup]
140     del pcaps[sgroup]
141    
142     #DEBUGkl = opcaps.keys() + pcaps.keys()
143     #DEBUGkl.sort()
144     #DEBUGukl = []
145     #DEBUGfor k in kl:
146     #DEBUG if k not in ukl: ukl.append(k)
147     #DEBUGfor k in ukl:
148     #DEBUG v1 = -1
149     #DEBUG v2 = -1
150     #DEBUG if k in opcaps.keys():
151     #DEBUG v1 = opcaps[k]
152     #DEBUG if k in pcaps.keys():
153     #DEBUG v2 = pcaps[k]
154     #DEBUG
155     #DEBUG print "%10s %4d %4d" % (k, v1, v2)
156    
157     if not config.has_option('LRMS','lrms_backend_cmd'):
158     abort_without_output("Spec for lrms backend cmd missing in config file")
159    
160     # read cfg file to find spec for static ldif file
161    
162     if not config.has_option('Main','static_ldif_file'):
163     abort_without_output("Spec for static ldif file missing in config file")
164    
165     lines = open(config.get('Main','static_ldif_file'),'r').readlines()
166    
167     # get the dns from the ldif file along with their VOs.
168     # first do the VOView dns
169    
170     voviewlist = [ ]
171     qname = None
172     import re
173     jmpatt = re.compile(r'^GlueCEUniqueID=[-a-zA-Z0-9.:]+/[a-z]+-[a-z]+-([a-z_A-Z0-9]+)$')
174    
175     class CEView(object):
176     def __init__(self, dn):
177     self.dn = dn
178     # if this is not present, then it's a GlueCE object
179     # if it is present, it's a GlueVOView object
180     self.localID = None
181     self.queuename = None
182     self.ACBRs = list()
183     def getCEUniqueID(self):
184     if self.localID == None:
185     return self.dn
186     else:
187     loc = self.dn.find(",") + 1
188     return self.dn[loc:]
189     CEUniqueID = property(getCEUniqueID,
190     doc="UniqueID of CE to which this view belongs")
191    
192     dndict = dict() # key: dn string (not including leading "dn: ")
193     thisview = None
194    
195     for line in lines:
196    
197     s = line.strip()
198     if s.find('dn:') == 0 and s.find('GlueCEUniqueID') > 0:
199     dn = s[3:].strip()
200     thisview = CEView(dn)
201     if s.find('GlueVOViewLocalID') >= 0:
202     loc1 = s.find('=') + 1
203     loc2 = s.find(',GlueCEUniqueID')
204     id = s[loc1:loc2].strip()
205     thisview.localID = id
206     elif s.find('GlueCEName') == 0:
207     qname = s.split(':')[1].strip()
208     thisview.queuename = qname
209     elif s.find('GlueCEAccessControlBaseRule:') == 0:
210     loc = s.find(':') + 1
211     rule = s[loc:].split(":")
212     if rule[0].strip() in ['VO', 'VOMS']:
213     thisview.ACBRs.append( ( rule[0].strip(), rule[1].strip() ) )
214    
215     elif s == '' and thisview :
216     dndict[thisview.dn] = thisview
217     thisview = None # reset, about to enter new block.
218    
219     # now find the queues that go with the VOViews
220     for d in dndict.keys():
221     view = dndict[d]
222     if view.localID: # then it's a VOView
223     view.queuename = dndict[view.CEUniqueID].queuename
224    
225     DEBUG = 0
226    
227     if DEBUG:
228     print "dumping parse tree for static ldif file"
229     for d in dndict.keys():
230     print "For dn:", d
231     print " LocalID: ", dndict[d].localID
232     print " CEUniqueID: ", dndict[d].CEUniqueID
233     print " Queue Name:", dndict[d].queuename
234     print " ACBRs: ", dndict[d].ACBRs
235    
236     def tconv(timeval): # convert time to desired units
237     timeval = min(timeval, 2146060842)
238     return repr( int(timeval) ) # seconds->printable
239    
240     cmd = config.get('LRMS','lrms_backend_cmd')
241    
242     (stat,out) = commands.getstatusoutput(cmd)
243     if stat:
244     abort_without_output("LRMS backend command returned nonzero exit status")
245    
246     lrmlines = out.split('\n')
247    
248     sys.path.append('@MODDIR@')
249     import lrms
250     bq = lrms.Server()
251    
252     # generate a cache of indices returned from possible lrms queries
253     # this makes a big speed diff when there are many (thousands) of jobs
254     # first we need to find all combinations VO/fqan, queue, state
255    
256     ql = []
257     vol = []
258    
259     for dn in dndict.keys():
260     view = dndict[dn]
261     if not view.localID : continue
262     if view.queuename not in ql : ql.append(view.queuename)
263     if view.ACBRs[-1][1] not in vol : vol.append(view.ACBRs[-1][1])
264    
265     sl = ['running', 'queued']
266    
267     # now construct an empty cache with all the index keys, ready to fill.
268    
269     scache = { }
270    
271     for v in vol:
272     for s in sl:
273     indstr = lrms.filt2str({'group':v, 'state': s})
274     scache[indstr] = {}
275    
276     for q in ql:
277     for s in sl:
278     indstr = lrms.filt2str({'queue': q, 'state': s})
279     scache[indstr] = {}
280     for v in vol:
281     indstr = lrms.filt2str({'queue': q, 'state': s, 'group': v})
282     scache[indstr] = {}
283    
284     #DEBUG print scache.keys()
285    
286     # traverse the job list, filling the index cache as we go.
287    
288     for line in lrmlines:
289     s = line.strip()
290     f = s.split()
291     if s[0] == '{' and s[-1] == '}': # looks like a dict
292     nj = lrms.Job(eval(s, {"__builtins__" : {}}))
293     grp = nj.get('group')
294     if grp in vomap.keys():
295     nj.set('group',vomap[grp])
296     jid = nj.get('jobid')
297     bq.addjob(jid,nj)
298     tst = nj.get('state')
299     if tst not in sl: continue
300     tgr = nj.get('group')
301     tq = nj.get('queue')
302     if tgr in vol and tq in ql:
303     dlis = [
304     {'group': tgr, 'state': tst},
305     {'queue': tq, 'state': tst},
306     {'queue': tq, 'state': tst, 'group': tgr}
307     ]
308     else:
309     if tgr in vol:
310     dlis = [
311     {'group': tgr, 'state': tst},
312     ]
313     elif tq in ql:
314     dlis = [
315     {'queue': tq, 'state': tst},
316     {'queue': tq, 'state': tst, 'group': tgr}
317     ]
318     else:
319     continue
320    
321     for td in dlis :
322     indstr = lrms.filt2str(td)
323     if indstr not in scache.keys():
324     if DEBUG : print "huh??", indstr
325     scache[indstr] = {}
326     scache[lrms.filt2str(td)][jid] = nj
327     elif len(f) == 2:
328     if f[0] == 'nactive' : bq.slotsUp = int(f[1])
329     elif f[0] == 'nfree' : bq.slotsFree = int(f[1])
330     elif f[0] == 'now' : bq.now = int(f[1])
331     elif f[0] == 'schedCycle' : bq.schedCycle = int(f[1])
332     else:
333     logging.warn("invalid input in result of lrms plugin command " + cmd)
334     logging.warn("invalid input: " + s)
335     else:
336     logging.warn("invalid input in result of lrms plugin command " + cmd)
337     logging.warn("invalid input: " + s)
338    
339     bq.__scache__ = scache
340    
341     # for k in scache.keys():
342     # print k, len(scache[k])
343    
344     if config.has_option('Scheduler','cycle_time'):
345     bq.schedCycle = config.getint('Scheduler','cycle_time')
346    
347     # sys.exit(0)
348    
349     from EstTT import *
350     wq = TTEstimator(WaitTime())
351    
352     # first we do the VOViews
353     # sort the keys : needed to make sure output is reproducible for test harness :(
354    
355     dnk = dndict.keys()
356     dnk.sort()
357    
358     for dn in dnk:
359    
360     view = dndict[dn]
361     if not view.localID: continue # we only want VOViews here
362     q = view.queuename
363     if len(view.ACBRs) > 1:
364     logging.warn("For dn: " + dn)
365     logging.warn("detected multiple ACBRs:")
366     for vo in view.ACBRs:
367     logging.warn(repr(vo))
368     logging.warn("only using the last one!")
369     vo = view.ACBRs[-1][1]
370    
371     if DEBUG: print "'" + vo + "'"
372    
373     print "dn: " + dn
374     print "GlueVOViewLocalID: " + view.localID
375     if DEBUG:
376     for acbr in view.ACBRs:
377     print "GlueCEAccessControlBaseRule: " + acbr[0] + ":" + acbr[1]
378    
379     # this is tricky: reporting job counts in queues cares about queues,
380     # but scheduling doesn't as far as I know. So we need four job
381     # counts, running and waiting, both in the current queue and for all
382     # queues.
383    
384     # first find what is happening in this queue (q) to report the
385     # WaitingJobs RunningJobs and TotalJobs
386    
387    
388     qrund = {'state': 'running', 'group': vo, 'queue': q}
389     qwaitd = {'state': 'queued', 'group': vo, 'queue': q}
390    
391     runningJobs = bq.matchingJobs(qrund) ; nrun = len(runningJobs)
392     waitingJobs = bq.matchingJobs(qwaitd); nwait = len(waitingJobs)
393     if nwait > 0:
394     qwt = waitingJobs[0].get('maxwalltime')
395     else:
396     qwt = -1
397    
398     print "GlueCEStateRunningJobs: " + repr(nrun)
399     print "GlueCEStateWaitingJobs: " + repr(nwait)
400     print "GlueCEStateTotalJobs: " + repr(nrun + nwait)
401    
402     # now we find the same numbers, but integrated across all queues
403     # since the scheduler applies process caps without regard to queues.
404    
405     rund = {'state': 'running', 'group': vo}
406     waitd = {'state': 'queued', 'group': vo}
407    
408     runningJobs = bq.matchingJobs(rund) ; nrun = len(runningJobs)
409     waitingJobs = bq.matchingJobs(waitd); nwait = len(waitingJobs)
410    
411     if nwait > 0:
412     freeslots = 0
413     else:
414     if vo in pcaps.keys():
415     headroom = max(pcaps[vo] - nrun,0)
416     freeslots = min(bq.slotsFree,headroom)
417     else:
418     freeslots = bq.slotsFree
419    
420     ert = wq.estimate(bq,vo,debug=0)
421     print "GlueCEStateFreeJobSlots: " + repr(freeslots)
422     print "GlueCEStateEstimatedResponseTime: " + tconv(ert)
423     if nwait == 0:
424     wrt = 2 * ert
425     else:
426     if qwt < 0 :
427     qwt = waitingJobs[0].get('maxwalltime')
428     wrt = qwt * nwait
429    
430     print "GlueCEStateWorstResponseTime: " + \
431     tconv((max(wrt,bq.schedCycle)))
432     print
433    
434     # now do CEs (no VOs) ... this should be done 'right' some day
435     # and merged with the code above. Want to get it out quick
436     # (sigh, i am mortal) so ...
437    
438     for dn in dnk:
439    
440     view = dndict[dn]
441     if view.localID: continue # only want CEs, they have no localID
442     q = view.queuename
443     vo = '' # vo-independent
444    
445     print "dn: " + dn
446    
447     # for VO-independent numbers we only care about what is in the current
448     # queue. so we only need one set of stats
449    
450     # first find what is happening in this queue (q) to report the
451     # WaitingJobs RunningJobs and TotalJobs
452    
453     qrund = {'state': 'running', 'queue': q}
454     qwaitd = {'state': 'queued', 'queue': q}
455    
456     runningJobs = bq.matchingJobs(qrund) ; nrun = len(runningJobs)
457     waitingJobs = bq.matchingJobs(qwaitd); nwait = len(waitingJobs)
458    
459     if nwait > 0:
460     freeslots = 0
461     else:
462     freeslots = bq.slotsFree
463    
464     ert = wq.estimate(bq,vo,debug=0)
465     print "GlueCEStateFreeJobSlots: " + repr(freeslots)
466     print "GlueCEStateRunningJobs: " + repr(nrun)
467     print "GlueCEStateWaitingJobs: " + repr(nwait)
468     print "GlueCEStateTotalJobs: " + repr(nrun + nwait)
469     print "GlueCEStateFreeCPUs: " + repr(nrun + nwait) # backw compat.
470    
471     wq.strategy.queue = q
472     ert = wq.estimate(bq,vo,debug=0)
473     print "GlueCEStateEstimatedResponseTime: " + tconv(ert)
474     if nwait == 0:
475     wrt = 2 * ert
476     else:
477     wrt = waitingJobs[0].get('maxwalltime') * nwait
478    
479     print "GlueCEStateWorstResponseTime: " + \
480     tconv((max(wrt,bq.schedCycle)))
481     if DEBUG: print 'DEBUG: q', q
482     print
483    
484     ### Local Variables: ***
485     ### mode: python ***
486     ### End: ***
487    

Properties

Name Value
svn:executable *

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28