/[pdpsoft]/trunk/nl.nikhef.pdp.lrmsutils/lcg-gip/lcg-info-dynamic-scheduler.cin
ViewVC logotype

Contents of /trunk/nl.nikhef.pdp.lrmsutils/lcg-gip/lcg-info-dynamic-scheduler.cin

Parent Directory Parent Directory | Revision Log Revision Log


Revision 98 - (show annotations) (download)
Fri Feb 27 15:45:58 2009 UTC (13 years, 5 months ago) by templon
File size: 15336 byte(s)
make error message for config file parsing clearer.

1 #!/usr/bin/python2
2 # lcg-info-dynamic-scheduler
3 # $Id: lcg-info-dynamic-scheduler.cin,v 4.4 2007/08/14 12:05:58 templon Exp $
4 # J. A. Templon, NIKHEF/PDP 2005
5
6 # plugin for gip framework. generates Glue values associated
7 # with the scheduler, like FreeSlots or *ResponseTime
8
9 # first set up logging
10
11 import sys, logging
12 import syslog
13
14 # note: this class works around the bug in the python logging
15 # package fixed in patch #642974. If the version of python-logging
16 # is upgraded, this class can be replaced by logging.SysLogHandler
17
18 class SLHandler(logging.Handler):
19 def __init__(self, ident, logopt=0, facility=syslog.LOG_USER):
20 logging.Handler.__init__(self)
21 self.ident = ident
22 self.logopt = logopt
23 self.facility = facility
24 self.mappings = {
25 logging.DEBUG: syslog.LOG_DEBUG,
26 logging.INFO: syslog.LOG_INFO,
27 logging.WARN: syslog.LOG_WARNING,
28 logging.ERROR: syslog.LOG_ERR,
29 logging.CRITICAL: syslog.LOG_CRIT,
30 }
31
32 def encodeLevel(self, level):
33 return self.mappings.get(level, syslog.LOG_INFO)
34
35 def emit(self, record):
36 syslog.openlog(self.ident, self.logopt, self.facility)
37 msg = self.format(record)
38 prio = self.encodeLevel(record.levelno)
39 syslog.syslog(prio, msg)
40 syslog.closelog()
41
42 logging.getLogger("").setLevel(logging.INFO)
43 # syslog handler
44 shdlr = SLHandler("lcg-info-dynamic-scheduler")
45 logging.getLogger("").addHandler(shdlr)
46 # stderr handler
47 stdrhdlr = logging.StreamHandler()
48 fmt=logging.Formatter("%(asctime)s lcg-info-dynamic-scheduler:"
49 + " %(message)s","%F %T")
50 logging.getLogger("").addHandler(stdrhdlr)
51 stdrhdlr.setFormatter(fmt)
52
53 def usage():
54 print "Usage: lcg-info-dynamic-scheduler -c <cfg_file>"
55
56 import sys
57
58 def abort_without_output(msg):
59 logging.error(msg)
60 logging.error("Exiting without output, GIP will use static values")
61 sys.exit(2)
62
63 import getopt
64 import string
65
66 try:
67 opts, args = getopt.getopt(sys.argv[1:], "c:",
68 ["config="])
69 except getopt.GetoptError:
70 # print help information and exit:
71 emsg = "Error parsing command line: " + string.join(sys.argv)
72 usage()
73 abort_without_output(emsg)
74
75 cfgfile = None
76
77 for o, a in opts:
78 if o in ("-c", "--config"):
79 cfgfile = a
80
81 if not cfgfile:
82 usage()
83 abort_without_output("No config file specified.")
84
85 try:
86 fp = open(cfgfile)
87 except IOError:
88 abort_without_output("Error opening config file " + cfgfile)
89
90 import commands
91 import ConfigParser
92 config = ConfigParser.ConfigParser()
93 try:
94 config.readfp(fp)
95 except:
96 abort_without_output("While parsing config file: " + \
97 repr(sys.exc_info()[1]) + " " + cfgfile)
98
99 # note: vomap is only intended for the cases where voname != groupname
100 # hence at the bottom, we throw away all entries for which voname = groupname
101 # this is the "default scenario" in the code, adding such entries to the
102 # vomap "exception list" causes problems later on.
103
104 if config.has_option('Main','vomap'):
105 s = config.get('Main','vomap')
106 vomap = dict()
107 lines = s.split('\n')
108 for l in lines:
109 if len(l) > 0:
110 t = l.split(':')
111 group = t[0].strip()
112 vo = t[1].strip()
113 if vo != group : vomap[group] = vo
114
115 else:
116 vomap = dict() # empty dict, later 'if grp in vomap.keys()' => if False
117
118 if config.has_option('Scheduler','vo_max_jobs_cmd'):
119 cmd = config.get('Scheduler','vo_max_jobs_cmd')
120 (stat,out) = commands.getstatusoutput(cmd)
121 if stat:
122 abort_without_output(
123 "VO max jobs backend command returned nonzero exit status")
124 try:
125 pcaps = eval(out, {"__builtins__" : {}}) # namespace at closes security hole
126 except SyntaxError:
127 abort_without_output('vo max slot command output in wrong format')
128 else:
129 pcaps = dict() # empty dict, will trigger use of defaults later.
130
131 # apply group -> vo mappings to process cap dict
132 # sgroup (source group) is original from input files,
133 # targvo (target vo) is what we want to replace it with
134
135 #DEBUG opcaps = pcaps.copy()
136
137 for sgroup in vomap.keys():
138 targ = vomap[sgroup]
139 if sgroup in pcaps.keys():
140 if targ in pcaps.keys():
141 pcaps[targ] = pcaps[targ] + pcaps[sgroup]
142 else:
143 pcaps[targ] = pcaps[sgroup]
144 del pcaps[sgroup]
145
146 #DEBUGkl = opcaps.keys() + pcaps.keys()
147 #DEBUGkl.sort()
148 #DEBUGukl = []
149 #DEBUGfor k in kl:
150 #DEBUG if k not in ukl: ukl.append(k)
151 #DEBUGfor k in ukl:
152 #DEBUG v1 = -1
153 #DEBUG v2 = -1
154 #DEBUG if k in opcaps.keys():
155 #DEBUG v1 = opcaps[k]
156 #DEBUG if k in pcaps.keys():
157 #DEBUG v2 = pcaps[k]
158 #DEBUG
159 #DEBUG print "%10s %4d %4d" % (k, v1, v2)
160
161 if not config.has_option('LRMS','lrms_backend_cmd'):
162 abort_without_output("Spec for lrms backend cmd missing in config file")
163
164 # read cfg file to find spec for static ldif file
165
166 if not config.has_option('Main','static_ldif_file'):
167 abort_without_output("Spec for static ldif file missing in config file")
168
169 lines = open(config.get('Main','static_ldif_file'),'r').readlines()
170
171 # get the dns from the ldif file along with their VOs.
172 # first do the VOView dns
173
174 voviewlist = [ ]
175 qname = None
176 import re
177 jmpatt = re.compile(r'^GlueCEUniqueID=[-a-zA-Z0-9.:]+/[a-z]+-[a-z]+-([a-z_A-Z0-9]+)$')
178
179 class CEView(object):
180 def __init__(self, dn):
181 self.dn = dn
182 # if this is not present, then it's a GlueCE object
183 # if it is present, it's a GlueVOView object
184 self.localID = None
185 self.queuename = None
186 self.ACBRs = list()
187 def getCEUniqueID(self):
188 if self.localID == None:
189 return self.dn
190 else:
191 loc = self.dn.find(",") + 1
192 return self.dn[loc:]
193 CEUniqueID = property(getCEUniqueID,
194 doc="UniqueID of CE to which this view belongs")
195
196 dndict = dict() # key: dn string (not including leading "dn: ")
197 thisview = None
198
199 for line in lines:
200
201 s = line.strip()
202 if s.find('dn:') == 0 and s.find('GlueCEUniqueID') > 0:
203 dn = s[3:].strip()
204 thisview = CEView(dn)
205 if s.find('GlueVOViewLocalID') >= 0:
206 loc1 = s.find('=') + 1
207 loc2 = s.find(',GlueCEUniqueID')
208 id = s[loc1:loc2].strip()
209 thisview.localID = id
210 elif s.find('GlueCEName') == 0:
211 qname = s.split(':')[1].strip()
212 thisview.queuename = qname
213 elif s.find('GlueCEAccessControlBaseRule:') == 0:
214 loc = s.find(':') + 1
215 rule = s[loc:].split(":")
216 if rule[0].strip() in ['VO', 'VOMS']:
217 thisview.ACBRs.append( ( rule[0].strip(), rule[1].strip() ) )
218
219 elif s == '' and thisview :
220 dndict[thisview.dn] = thisview
221 thisview = None # reset, about to enter new block.
222
223 # now find the queues that go with the VOViews
224 for d in dndict.keys():
225 view = dndict[d]
226 if view.localID: # then it's a VOView
227 view.queuename = dndict[view.CEUniqueID].queuename
228
229 DEBUG = 0
230
231 if DEBUG:
232 print "dumping parse tree for static ldif file"
233 for d in dndict.keys():
234 print "For dn:", d
235 print " LocalID: ", dndict[d].localID
236 print " CEUniqueID: ", dndict[d].CEUniqueID
237 print " Queue Name:", dndict[d].queuename
238 print " ACBRs: ", dndict[d].ACBRs
239
240 def tconv(timeval): # convert time to desired units
241 timeval = min(timeval, 2146060842)
242 return repr( int(timeval) ) # seconds->printable
243
244 cmd = config.get('LRMS','lrms_backend_cmd')
245
246 (stat,out) = commands.getstatusoutput(cmd)
247 if stat:
248 abort_without_output("LRMS backend command returned nonzero exit status")
249
250 lrmlines = out.split('\n')
251
252 sys.path.append('@MODDIR@')
253 import lrms
254 bq = lrms.Server()
255
256 # generate a cache of indices returned from possible lrms queries
257 # this makes a big speed diff when there are many (thousands) of jobs
258 # first we need to find all combinations VO/fqan, queue, state
259
260 ql = []
261 vol = []
262
263 for dn in dndict.keys():
264 view = dndict[dn]
265 if not view.localID : continue
266 if view.queuename not in ql : ql.append(view.queuename)
267 if view.ACBRs[-1][1] not in vol : vol.append(view.ACBRs[-1][1])
268
269 sl = ['running', 'queued']
270
271 # now construct an empty cache with all the index keys, ready to fill.
272
273 scache = { }
274
275 for v in vol:
276 for s in sl:
277 indstr = lrms.filt2str({'group':v, 'state': s})
278 scache[indstr] = {}
279
280 for q in ql:
281 for s in sl:
282 indstr = lrms.filt2str({'queue': q, 'state': s})
283 scache[indstr] = {}
284 for v in vol:
285 indstr = lrms.filt2str({'queue': q, 'state': s, 'group': v})
286 scache[indstr] = {}
287
288 #DEBUG print scache.keys()
289
290 # traverse the job list, filling the index cache as we go.
291
292 for line in lrmlines:
293 s = line.strip()
294 f = s.split()
295 if s[0] == '{' and s[-1] == '}': # looks like a dict
296 nj = lrms.Job(eval(s, {"__builtins__" : {}}))
297 grp = nj.get('group')
298 if grp in vomap.keys():
299 nj.set('group',vomap[grp])
300 jid = nj.get('jobid')
301 bq.addjob(jid,nj)
302 tst = nj.get('state')
303 if tst not in sl: continue
304 tgr = nj.get('group')
305 tq = nj.get('queue')
306 if tgr in vol and tq in ql:
307 dlis = [
308 {'group': tgr, 'state': tst},
309 {'queue': tq, 'state': tst},
310 {'queue': tq, 'state': tst, 'group': tgr}
311 ]
312 else:
313 if tgr in vol:
314 dlis = [
315 {'group': tgr, 'state': tst},
316 ]
317 elif tq in ql:
318 dlis = [
319 {'queue': tq, 'state': tst},
320 {'queue': tq, 'state': tst, 'group': tgr}
321 ]
322 else:
323 continue
324
325 for td in dlis :
326 indstr = lrms.filt2str(td)
327 if indstr not in scache.keys():
328 if DEBUG : print "huh??", indstr
329 scache[indstr] = {}
330 scache[lrms.filt2str(td)][jid] = nj
331 elif len(f) == 2:
332 if f[0] == 'nactive' : bq.slotsUp = int(f[1])
333 elif f[0] == 'nfree' : bq.slotsFree = int(f[1])
334 elif f[0] == 'now' : bq.now = int(f[1])
335 elif f[0] == 'schedCycle' : bq.schedCycle = int(f[1])
336 else:
337 logging.warn("invalid input in result of lrms plugin command " + cmd)
338 logging.warn("invalid input: " + s)
339 else:
340 logging.warn("invalid input in result of lrms plugin command " + cmd)
341 logging.warn("invalid input: " + s)
342
343 bq.__scache__ = scache
344
345 # for k in scache.keys():
346 # print k, len(scache[k])
347
348 if config.has_option('Scheduler','cycle_time'):
349 bq.schedCycle = config.getint('Scheduler','cycle_time')
350
351 # sys.exit(0)
352
353 from EstTT import *
354 wq = TTEstimator(WaitTime())
355
356 # first we do the VOViews
357 # sort the keys : needed to make sure output is reproducible for test harness :(
358
359 dnk = dndict.keys()
360 dnk.sort()
361
362 for dn in dnk:
363
364 view = dndict[dn]
365 if not view.localID: continue # we only want VOViews here
366 q = view.queuename
367 if len(view.ACBRs) > 1:
368 logging.warn("For dn: " + dn)
369 logging.warn("detected multiple ACBRs:")
370 for vo in view.ACBRs:
371 logging.warn(repr(vo))
372 logging.warn("only using the last one!")
373 vo = view.ACBRs[-1][1]
374
375 if DEBUG: print "'" + vo + "'"
376
377 print "dn: " + dn
378 print "GlueVOViewLocalID: " + view.localID
379 if DEBUG:
380 for acbr in view.ACBRs:
381 print "GlueCEAccessControlBaseRule: " + acbr[0] + ":" + acbr[1]
382
383 # this is tricky: reporting job counts in queues cares about queues,
384 # but scheduling doesn't as far as I know. So we need four job
385 # counts, running and waiting, both in the current queue and for all
386 # queues.
387
388 # first find what is happening in this queue (q) to report the
389 # WaitingJobs RunningJobs and TotalJobs
390
391
392 qrund = {'state': 'running', 'group': vo, 'queue': q}
393 qwaitd = {'state': 'queued', 'group': vo, 'queue': q}
394
395 runningJobs = bq.matchingJobs(qrund) ; nrun = len(runningJobs)
396 waitingJobs = bq.matchingJobs(qwaitd); nwait = len(waitingJobs)
397 if nwait > 0:
398 qwt = waitingJobs[0].get('maxwalltime')
399 else:
400 qwt = -1
401
402 print "GlueCEStateRunningJobs: " + repr(nrun)
403 print "GlueCEStateWaitingJobs: " + repr(nwait)
404 print "GlueCEStateTotalJobs: " + repr(nrun + nwait)
405
406 # now we find the same numbers, but integrated across all queues
407 # since the scheduler applies process caps without regard to queues.
408
409 rund = {'state': 'running', 'group': vo}
410 waitd = {'state': 'queued', 'group': vo}
411
412 runningJobs = bq.matchingJobs(rund) ; nrun = len(runningJobs)
413 waitingJobs = bq.matchingJobs(waitd); nwait = len(waitingJobs)
414
415 if nwait > 0:
416 freeslots = 0
417 else:
418 if vo in pcaps.keys():
419 headroom = max(pcaps[vo] - nrun,0)
420 freeslots = min(bq.slotsFree,headroom)
421 else:
422 freeslots = bq.slotsFree
423
424 ert = wq.estimate(bq,vo,debug=0)
425 print "GlueCEStateFreeJobSlots: " + repr(freeslots)
426 print "GlueCEStateEstimatedResponseTime: " + tconv(ert)
427 if nwait == 0:
428 wrt = 2 * ert
429 else:
430 if qwt < 0 :
431 qwt = waitingJobs[0].get('maxwalltime')
432 wrt = qwt * nwait
433
434 print "GlueCEStateWorstResponseTime: " + \
435 tconv((max(wrt,bq.schedCycle)))
436 print
437
438 # now do CEs (no VOs) ... this should be done 'right' some day
439 # and merged with the code above. Want to get it out quick
440 # (sigh, i am mortal) so ...
441
442 for dn in dnk:
443
444 view = dndict[dn]
445 if view.localID: continue # only want CEs, they have no localID
446 q = view.queuename
447 vo = '' # vo-independent
448
449 print "dn: " + dn
450
451 # for VO-independent numbers we only care about what is in the current
452 # queue. so we only need one set of stats
453
454 # first find what is happening in this queue (q) to report the
455 # WaitingJobs RunningJobs and TotalJobs
456
457 qrund = {'state': 'running', 'queue': q}
458 qwaitd = {'state': 'queued', 'queue': q}
459
460 runningJobs = bq.matchingJobs(qrund) ; nrun = len(runningJobs)
461 waitingJobs = bq.matchingJobs(qwaitd); nwait = len(waitingJobs)
462
463 if nwait > 0:
464 freeslots = 0
465 else:
466 freeslots = bq.slotsFree
467
468 ert = wq.estimate(bq,vo,debug=0)
469 print "GlueCEStateFreeJobSlots: " + repr(freeslots)
470 print "GlueCEStateFreeCPUs: " + repr(freeslots) # backw compat.
471 print "GlueCEStateRunningJobs: " + repr(nrun)
472 print "GlueCEStateWaitingJobs: " + repr(nwait)
473 print "GlueCEStateTotalJobs: " + repr(nrun + nwait)
474
475 wq.strategy.queue = q
476 ert = wq.estimate(bq,vo,debug=0)
477 print "GlueCEStateEstimatedResponseTime: " + tconv(ert)
478 if nwait == 0:
479 wrt = 2 * ert
480 else:
481 wrt = waitingJobs[0].get('maxwalltime') * nwait
482
483 print "GlueCEStateWorstResponseTime: " + \
484 tconv((max(wrt,bq.schedCycle)))
485 if DEBUG: print 'DEBUG: q', q
486 print
487
488 ### Local Variables: ***
489 ### mode: python ***
490 ### End: ***
491

Properties

Name Value
svn:executable *

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28