/[pdpsoft]/trunk/nl.nikhef.pdp.lrmsutils/lcg-gip/lcg-info-dynamic-scheduler.cin
ViewVC logotype

Contents of /trunk/nl.nikhef.pdp.lrmsutils/lcg-gip/lcg-info-dynamic-scheduler.cin

Parent Directory Parent Directory | Revision Log Revision Log


Revision 86 - (show annotations) (download)
Fri Feb 27 14:28:24 2009 UTC (13 years, 7 months ago) by templon
File size: 15197 byte(s)
bug fix: FreeCPUs value was computed incorrectly for queue views. (cut and paste error).

1 #!/usr/bin/python2
2 # lcg-info-dynamic-scheduler
3 # $Id: lcg-info-dynamic-scheduler.cin,v 4.4 2007/08/14 12:05:58 templon Exp $
4 # J. A. Templon, NIKHEF/PDP 2005
5
6 # plugin for gip framework. generates Glue values associated
7 # with the scheduler, like FreeSlots or *ResponseTime
8
9 # first set up logging
10
11 import sys, logging
12 import syslog
13
14 # note: this class works around the bug in the python logging
15 # package fixed in patch #642974. If the version of python-logging
16 # is upgraded, this class can be replaced by logging.SysLogHandler
17
18 class SLHandler(logging.Handler):
19 def __init__(self, ident, logopt=0, facility=syslog.LOG_USER):
20 logging.Handler.__init__(self)
21 self.ident = ident
22 self.logopt = logopt
23 self.facility = facility
24 self.mappings = {
25 logging.DEBUG: syslog.LOG_DEBUG,
26 logging.INFO: syslog.LOG_INFO,
27 logging.WARN: syslog.LOG_WARNING,
28 logging.ERROR: syslog.LOG_ERR,
29 logging.CRITICAL: syslog.LOG_CRIT,
30 }
31
32 def encodeLevel(self, level):
33 return self.mappings.get(level, syslog.LOG_INFO)
34
35 def emit(self, record):
36 syslog.openlog(self.ident, self.logopt, self.facility)
37 msg = self.format(record)
38 prio = self.encodeLevel(record.levelno)
39 syslog.syslog(prio, msg)
40 syslog.closelog()
41
42 logging.getLogger("").setLevel(logging.INFO)
43 # syslog handler
44 shdlr = SLHandler("lcg-info-dynamic-scheduler")
45 logging.getLogger("").addHandler(shdlr)
46 # stderr handler
47 stdrhdlr = logging.StreamHandler()
48 fmt=logging.Formatter("%(asctime)s lcg-info-dynamic-scheduler:"
49 + " %(message)s","%F %T")
50 logging.getLogger("").addHandler(stdrhdlr)
51 stdrhdlr.setFormatter(fmt)
52
53 def usage():
54 print "Usage: lcg-info-dynamic-scheduler -c <cfg_file>"
55
56 import sys
57
58 def abort_without_output(msg):
59 logging.error(msg)
60 logging.error("Exiting without output, GIP will use static values")
61 sys.exit(2)
62
63 import getopt
64 import string
65
66 try:
67 opts, args = getopt.getopt(sys.argv[1:], "c:",
68 ["config="])
69 except getopt.GetoptError:
70 # print help information and exit:
71 emsg = "Error parsing command line: " + string.join(sys.argv)
72 usage()
73 abort_without_output(emsg)
74
75 cfgfile = None
76
77 for o, a in opts:
78 if o in ("-c", "--config"):
79 cfgfile = a
80
81 if not cfgfile:
82 usage()
83 abort_without_output("No config file specified.")
84
85 try:
86 fp = open(cfgfile)
87 except IOError:
88 abort_without_output("Error opening config file " + cfgfile)
89
90 import commands
91 import ConfigParser
92 config = ConfigParser.ConfigParser()
93 config.readfp(fp)
94
95 # note: vomap is only intended for the cases where voname != groupname
96 # hence at the bottom, we throw away all entries for which voname = groupname
97 # this is the "default scenario" in the code, adding such entries to the
98 # vomap "exception list" causes problems later on.
99
100 if config.has_option('Main','vomap'):
101 s = config.get('Main','vomap')
102 vomap = dict()
103 lines = s.split('\n')
104 for l in lines:
105 if len(l) > 0:
106 t = l.split(':')
107 group = t[0].strip()
108 vo = t[1].strip()
109 if vo != group : vomap[group] = vo
110
111 else:
112 vomap = dict() # empty dict, later 'if grp in vomap.keys()' => if False
113
114 if config.has_option('Scheduler','vo_max_jobs_cmd'):
115 cmd = config.get('Scheduler','vo_max_jobs_cmd')
116 (stat,out) = commands.getstatusoutput(cmd)
117 if stat:
118 abort_without_output(
119 "VO max jobs backend command returned nonzero exit status")
120 try:
121 pcaps = eval(out, {"__builtins__" : {}}) # namespace at closes security hole
122 except SyntaxError:
123 abort_without_output('vo max slot command output in wrong format')
124 else:
125 pcaps = dict() # empty dict, will trigger use of defaults later.
126
127 # apply group -> vo mappings to process cap dict
128 # sgroup (source group) is original from input files,
129 # targvo (target vo) is what we want to replace it with
130
131 #DEBUG opcaps = pcaps.copy()
132
133 for sgroup in vomap.keys():
134 targ = vomap[sgroup]
135 if sgroup in pcaps.keys():
136 if targ in pcaps.keys():
137 pcaps[targ] = pcaps[targ] + pcaps[sgroup]
138 else:
139 pcaps[targ] = pcaps[sgroup]
140 del pcaps[sgroup]
141
142 #DEBUGkl = opcaps.keys() + pcaps.keys()
143 #DEBUGkl.sort()
144 #DEBUGukl = []
145 #DEBUGfor k in kl:
146 #DEBUG if k not in ukl: ukl.append(k)
147 #DEBUGfor k in ukl:
148 #DEBUG v1 = -1
149 #DEBUG v2 = -1
150 #DEBUG if k in opcaps.keys():
151 #DEBUG v1 = opcaps[k]
152 #DEBUG if k in pcaps.keys():
153 #DEBUG v2 = pcaps[k]
154 #DEBUG
155 #DEBUG print "%10s %4d %4d" % (k, v1, v2)
156
157 if not config.has_option('LRMS','lrms_backend_cmd'):
158 abort_without_output("Spec for lrms backend cmd missing in config file")
159
160 # read cfg file to find spec for static ldif file
161
162 if not config.has_option('Main','static_ldif_file'):
163 abort_without_output("Spec for static ldif file missing in config file")
164
165 lines = open(config.get('Main','static_ldif_file'),'r').readlines()
166
167 # get the dns from the ldif file along with their VOs.
168 # first do the VOView dns
169
170 voviewlist = [ ]
171 qname = None
172 import re
173 jmpatt = re.compile(r'^GlueCEUniqueID=[-a-zA-Z0-9.:]+/[a-z]+-[a-z]+-([a-z_A-Z0-9]+)$')
174
175 class CEView(object):
176 def __init__(self, dn):
177 self.dn = dn
178 # if this is not present, then it's a GlueCE object
179 # if it is present, it's a GlueVOView object
180 self.localID = None
181 self.queuename = None
182 self.ACBRs = list()
183 def getCEUniqueID(self):
184 if self.localID == None:
185 return self.dn
186 else:
187 loc = self.dn.find(",") + 1
188 return self.dn[loc:]
189 CEUniqueID = property(getCEUniqueID,
190 doc="UniqueID of CE to which this view belongs")
191
192 dndict = dict() # key: dn string (not including leading "dn: ")
193 thisview = None
194
195 for line in lines:
196
197 s = line.strip()
198 if s.find('dn:') == 0 and s.find('GlueCEUniqueID') > 0:
199 dn = s[3:].strip()
200 thisview = CEView(dn)
201 if s.find('GlueVOViewLocalID') >= 0:
202 loc1 = s.find('=') + 1
203 loc2 = s.find(',GlueCEUniqueID')
204 id = s[loc1:loc2].strip()
205 thisview.localID = id
206 elif s.find('GlueCEName') == 0:
207 qname = s.split(':')[1].strip()
208 thisview.queuename = qname
209 elif s.find('GlueCEAccessControlBaseRule:') == 0:
210 loc = s.find(':') + 1
211 rule = s[loc:].split(":")
212 if rule[0].strip() in ['VO', 'VOMS']:
213 thisview.ACBRs.append( ( rule[0].strip(), rule[1].strip() ) )
214
215 elif s == '' and thisview :
216 dndict[thisview.dn] = thisview
217 thisview = None # reset, about to enter new block.
218
219 # now find the queues that go with the VOViews
220 for d in dndict.keys():
221 view = dndict[d]
222 if view.localID: # then it's a VOView
223 view.queuename = dndict[view.CEUniqueID].queuename
224
225 DEBUG = 0
226
227 if DEBUG:
228 print "dumping parse tree for static ldif file"
229 for d in dndict.keys():
230 print "For dn:", d
231 print " LocalID: ", dndict[d].localID
232 print " CEUniqueID: ", dndict[d].CEUniqueID
233 print " Queue Name:", dndict[d].queuename
234 print " ACBRs: ", dndict[d].ACBRs
235
236 def tconv(timeval): # convert time to desired units
237 timeval = min(timeval, 2146060842)
238 return repr( int(timeval) ) # seconds->printable
239
240 cmd = config.get('LRMS','lrms_backend_cmd')
241
242 (stat,out) = commands.getstatusoutput(cmd)
243 if stat:
244 abort_without_output("LRMS backend command returned nonzero exit status")
245
246 lrmlines = out.split('\n')
247
248 sys.path.append('@MODDIR@')
249 import lrms
250 bq = lrms.Server()
251
252 # generate a cache of indices returned from possible lrms queries
253 # this makes a big speed diff when there are many (thousands) of jobs
254 # first we need to find all combinations VO/fqan, queue, state
255
256 ql = []
257 vol = []
258
259 for dn in dndict.keys():
260 view = dndict[dn]
261 if not view.localID : continue
262 if view.queuename not in ql : ql.append(view.queuename)
263 if view.ACBRs[-1][1] not in vol : vol.append(view.ACBRs[-1][1])
264
265 sl = ['running', 'queued']
266
267 # now construct an empty cache with all the index keys, ready to fill.
268
269 scache = { }
270
271 for v in vol:
272 for s in sl:
273 indstr = lrms.filt2str({'group':v, 'state': s})
274 scache[indstr] = {}
275
276 for q in ql:
277 for s in sl:
278 indstr = lrms.filt2str({'queue': q, 'state': s})
279 scache[indstr] = {}
280 for v in vol:
281 indstr = lrms.filt2str({'queue': q, 'state': s, 'group': v})
282 scache[indstr] = {}
283
284 #DEBUG print scache.keys()
285
286 # traverse the job list, filling the index cache as we go.
287
288 for line in lrmlines:
289 s = line.strip()
290 f = s.split()
291 if s[0] == '{' and s[-1] == '}': # looks like a dict
292 nj = lrms.Job(eval(s, {"__builtins__" : {}}))
293 grp = nj.get('group')
294 if grp in vomap.keys():
295 nj.set('group',vomap[grp])
296 jid = nj.get('jobid')
297 bq.addjob(jid,nj)
298 tst = nj.get('state')
299 if tst not in sl: continue
300 tgr = nj.get('group')
301 tq = nj.get('queue')
302 if tgr in vol and tq in ql:
303 dlis = [
304 {'group': tgr, 'state': tst},
305 {'queue': tq, 'state': tst},
306 {'queue': tq, 'state': tst, 'group': tgr}
307 ]
308 else:
309 if tgr in vol:
310 dlis = [
311 {'group': tgr, 'state': tst},
312 ]
313 elif tq in ql:
314 dlis = [
315 {'queue': tq, 'state': tst},
316 {'queue': tq, 'state': tst, 'group': tgr}
317 ]
318 else:
319 continue
320
321 for td in dlis :
322 indstr = lrms.filt2str(td)
323 if indstr not in scache.keys():
324 if DEBUG : print "huh??", indstr
325 scache[indstr] = {}
326 scache[lrms.filt2str(td)][jid] = nj
327 elif len(f) == 2:
328 if f[0] == 'nactive' : bq.slotsUp = int(f[1])
329 elif f[0] == 'nfree' : bq.slotsFree = int(f[1])
330 elif f[0] == 'now' : bq.now = int(f[1])
331 elif f[0] == 'schedCycle' : bq.schedCycle = int(f[1])
332 else:
333 logging.warn("invalid input in result of lrms plugin command " + cmd)
334 logging.warn("invalid input: " + s)
335 else:
336 logging.warn("invalid input in result of lrms plugin command " + cmd)
337 logging.warn("invalid input: " + s)
338
339 bq.__scache__ = scache
340
341 # for k in scache.keys():
342 # print k, len(scache[k])
343
344 if config.has_option('Scheduler','cycle_time'):
345 bq.schedCycle = config.getint('Scheduler','cycle_time')
346
347 # sys.exit(0)
348
349 from EstTT import *
350 wq = TTEstimator(WaitTime())
351
352 # first we do the VOViews
353 # sort the keys : needed to make sure output is reproducible for test harness :(
354
355 dnk = dndict.keys()
356 dnk.sort()
357
358 for dn in dnk:
359
360 view = dndict[dn]
361 if not view.localID: continue # we only want VOViews here
362 q = view.queuename
363 if len(view.ACBRs) > 1:
364 logging.warn("For dn: " + dn)
365 logging.warn("detected multiple ACBRs:")
366 for vo in view.ACBRs:
367 logging.warn(repr(vo))
368 logging.warn("only using the last one!")
369 vo = view.ACBRs[-1][1]
370
371 if DEBUG: print "'" + vo + "'"
372
373 print "dn: " + dn
374 print "GlueVOViewLocalID: " + view.localID
375 if DEBUG:
376 for acbr in view.ACBRs:
377 print "GlueCEAccessControlBaseRule: " + acbr[0] + ":" + acbr[1]
378
379 # this is tricky: reporting job counts in queues cares about queues,
380 # but scheduling doesn't as far as I know. So we need four job
381 # counts, running and waiting, both in the current queue and for all
382 # queues.
383
384 # first find what is happening in this queue (q) to report the
385 # WaitingJobs RunningJobs and TotalJobs
386
387
388 qrund = {'state': 'running', 'group': vo, 'queue': q}
389 qwaitd = {'state': 'queued', 'group': vo, 'queue': q}
390
391 runningJobs = bq.matchingJobs(qrund) ; nrun = len(runningJobs)
392 waitingJobs = bq.matchingJobs(qwaitd); nwait = len(waitingJobs)
393 if nwait > 0:
394 qwt = waitingJobs[0].get('maxwalltime')
395 else:
396 qwt = -1
397
398 print "GlueCEStateRunningJobs: " + repr(nrun)
399 print "GlueCEStateWaitingJobs: " + repr(nwait)
400 print "GlueCEStateTotalJobs: " + repr(nrun + nwait)
401
402 # now we find the same numbers, but integrated across all queues
403 # since the scheduler applies process caps without regard to queues.
404
405 rund = {'state': 'running', 'group': vo}
406 waitd = {'state': 'queued', 'group': vo}
407
408 runningJobs = bq.matchingJobs(rund) ; nrun = len(runningJobs)
409 waitingJobs = bq.matchingJobs(waitd); nwait = len(waitingJobs)
410
411 if nwait > 0:
412 freeslots = 0
413 else:
414 if vo in pcaps.keys():
415 headroom = max(pcaps[vo] - nrun,0)
416 freeslots = min(bq.slotsFree,headroom)
417 else:
418 freeslots = bq.slotsFree
419
420 ert = wq.estimate(bq,vo,debug=0)
421 print "GlueCEStateFreeJobSlots: " + repr(freeslots)
422 print "GlueCEStateEstimatedResponseTime: " + tconv(ert)
423 if nwait == 0:
424 wrt = 2 * ert
425 else:
426 if qwt < 0 :
427 qwt = waitingJobs[0].get('maxwalltime')
428 wrt = qwt * nwait
429
430 print "GlueCEStateWorstResponseTime: " + \
431 tconv((max(wrt,bq.schedCycle)))
432 print
433
434 # now do CEs (no VOs) ... this should be done 'right' some day
435 # and merged with the code above. Want to get it out quick
436 # (sigh, i am mortal) so ...
437
438 for dn in dnk:
439
440 view = dndict[dn]
441 if view.localID: continue # only want CEs, they have no localID
442 q = view.queuename
443 vo = '' # vo-independent
444
445 print "dn: " + dn
446
447 # for VO-independent numbers we only care about what is in the current
448 # queue. so we only need one set of stats
449
450 # first find what is happening in this queue (q) to report the
451 # WaitingJobs RunningJobs and TotalJobs
452
453 qrund = {'state': 'running', 'queue': q}
454 qwaitd = {'state': 'queued', 'queue': q}
455
456 runningJobs = bq.matchingJobs(qrund) ; nrun = len(runningJobs)
457 waitingJobs = bq.matchingJobs(qwaitd); nwait = len(waitingJobs)
458
459 if nwait > 0:
460 freeslots = 0
461 else:
462 freeslots = bq.slotsFree
463
464 ert = wq.estimate(bq,vo,debug=0)
465 print "GlueCEStateFreeJobSlots: " + repr(freeslots)
466 print "GlueCEStateFreeCPUs: " + repr(nrun + nwait) # backw compat.
467 print "GlueCEStateRunningJobs: " + repr(nrun)
468 print "GlueCEStateWaitingJobs: " + repr(nwait)
469 print "GlueCEStateTotalJobs: " + repr(nrun + nwait)
470
471 wq.strategy.queue = q
472 ert = wq.estimate(bq,vo,debug=0)
473 print "GlueCEStateEstimatedResponseTime: " + tconv(ert)
474 if nwait == 0:
475 wrt = 2 * ert
476 else:
477 wrt = waitingJobs[0].get('maxwalltime') * nwait
478
479 print "GlueCEStateWorstResponseTime: " + \
480 tconv((max(wrt,bq.schedCycle)))
481 if DEBUG: print 'DEBUG: q', q
482 print
483
484 ### Local Variables: ***
485 ### mode: python ***
486 ### End: ***
487

Properties

Name Value
svn:executable *

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28