/[pdpsoft]/trunk/nl.nikhef.pdp.lrmsutils/lcg-gip/lcg-info-dynamic-scheduler.cin
ViewVC logotype

Contents of /trunk/nl.nikhef.pdp.lrmsutils/lcg-gip/lcg-info-dynamic-scheduler.cin

Parent Directory Parent Directory | Revision Log Revision Log


Revision 131 - (show annotations) (download)
Tue Mar 10 14:16:44 2009 UTC (13 years, 5 months ago) by templon
File size: 15503 byte(s)
ETICS compatibility changes

1 #!/usr/bin/python2
2 # lcg-info-dynamic-scheduler
3 # $Id: lcg-info-dynamic-scheduler.cin,v 4.4 2007/08/14 12:05:58 templon Exp $
4 # J. A. Templon, NIKHEF/PDP 2005
5
6 # plugin for gip framework. generates Glue values associated
7 # with the scheduler, like FreeSlots or *ResponseTime
8
9 # first set up logging
10
11 import sys, logging
12 import syslog
13
14 # note: this class works around the bug in the python logging
15 # package fixed in patch #642974. If the version of python-logging
16 # is upgraded, this class can be replaced by logging.SysLogHandler
17
18 class SLHandler(logging.Handler):
19 def __init__(self, ident, logopt=0, facility=syslog.LOG_USER):
20 logging.Handler.__init__(self)
21 self.ident = ident
22 self.logopt = logopt
23 self.facility = facility
24 self.mappings = {
25 logging.DEBUG: syslog.LOG_DEBUG,
26 logging.INFO: syslog.LOG_INFO,
27 logging.WARN: syslog.LOG_WARNING,
28 logging.ERROR: syslog.LOG_ERR,
29 logging.CRITICAL: syslog.LOG_CRIT,
30 }
31
32 def encodeLevel(self, level):
33 return self.mappings.get(level, syslog.LOG_INFO)
34
35 def emit(self, record):
36 syslog.openlog(self.ident, self.logopt, self.facility)
37 msg = self.format(record)
38 prio = self.encodeLevel(record.levelno)
39 syslog.syslog(prio, msg)
40 syslog.closelog()
41
42 logging.getLogger("").setLevel(logging.INFO)
43 # syslog handler
44 shdlr = SLHandler("lcg-info-dynamic-scheduler")
45 logging.getLogger("").addHandler(shdlr)
46 # stderr handler
47 stdrhdlr = logging.StreamHandler()
48 fmt=logging.Formatter("%(asctime)s lcg-info-dynamic-scheduler:"
49 + " %(message)s","%F %T")
50 logging.getLogger("").addHandler(stdrhdlr)
51 stdrhdlr.setFormatter(fmt)
52
53 def usage():
54 print "Usage: lcg-info-dynamic-scheduler -c <cfg_file>"
55
56 import sys
57
58 def abort_without_output(msg):
59 logging.error(msg)
60 logging.error("Exiting without output, GIP will use static values")
61 sys.exit(2)
62
63 import getopt
64 import string
65
66 try:
67 opts, args = getopt.getopt(sys.argv[1:], "c:",
68 ["config="])
69 except getopt.GetoptError:
70 # print help information and exit:
71 emsg = "Error parsing command line: " + string.join(sys.argv)
72 usage()
73 abort_without_output(emsg)
74
75 cfgfile = None
76
77 for o, a in opts:
78 if o in ("-c", "--config"):
79 cfgfile = a
80
81 if not cfgfile:
82 usage()
83 abort_without_output("No config file specified.")
84
85 try:
86 fp = open(cfgfile)
87 except IOError:
88 abort_without_output("Error opening config file " + cfgfile)
89
90 import commands
91 import ConfigParser
92 config = ConfigParser.ConfigParser()
93 try:
94 config.readfp(fp)
95 except:
96 abort_without_output("While parsing config file: " + \
97 repr(sys.exc_info()[1]) + " " + cfgfile)
98
99 # note: vomap is only intended for the cases where voname != groupname
100 # hence at the bottom, we throw away all entries for which voname = groupname
101 # this is the "default scenario" in the code, adding such entries to the
102 # vomap "exception list" causes problems later on.
103
104 if config.has_option('Main','vomap'):
105 s = config.get('Main','vomap')
106 vomap = dict()
107 lines = s.split('\n')
108 for l in lines:
109 if len(l) > 0:
110 t = l.split(':')
111 group = t[0].strip()
112 vo = t[1].strip()
113 if vo != group : vomap[group] = vo
114
115 else:
116 vomap = dict() # empty dict, later 'if grp in vomap.keys()' => if False
117
118 if config.has_option('Scheduler','vo_max_jobs_cmd'):
119 cmd = config.get('Scheduler','vo_max_jobs_cmd')
120 (stat,out) = commands.getstatusoutput(cmd)
121 if stat:
122 abort_without_output(
123 "VO max jobs backend command returned nonzero exit status")
124 try:
125 pcaps = eval(out, {"__builtins__" : {}}) # namespace at closes security hole
126 except SyntaxError:
127 abort_without_output('vo max slot command output in wrong format')
128 else:
129 pcaps = dict() # empty dict, will trigger use of defaults later.
130
131 # apply group -> vo mappings to process cap dict
132 # sgroup (source group) is original from input files,
133 # targvo (target vo) is what we want to replace it with
134
135 #DEBUG opcaps = pcaps.copy()
136
137 for sgroup in vomap.keys():
138 targ = vomap[sgroup]
139 if sgroup in pcaps.keys():
140 if targ in pcaps.keys():
141 pcaps[targ] = pcaps[targ] + pcaps[sgroup]
142 else:
143 pcaps[targ] = pcaps[sgroup]
144 del pcaps[sgroup]
145
146 #DEBUGkl = opcaps.keys() + pcaps.keys()
147 #DEBUGkl.sort()
148 #DEBUGukl = []
149 #DEBUGfor k in kl:
150 #DEBUG if k not in ukl: ukl.append(k)
151 #DEBUGfor k in ukl:
152 #DEBUG v1 = -1
153 #DEBUG v2 = -1
154 #DEBUG if k in opcaps.keys():
155 #DEBUG v1 = opcaps[k]
156 #DEBUG if k in pcaps.keys():
157 #DEBUG v2 = pcaps[k]
158 #DEBUG
159 #DEBUG print "%10s %4d %4d" % (k, v1, v2)
160
161 if not config.has_option('LRMS','lrms_backend_cmd'):
162 abort_without_output("Spec for lrms backend cmd missing in config file")
163
164 # read cfg file to find spec for static ldif file
165
166 if not config.has_option('Main','static_ldif_file'):
167 abort_without_output("Spec for static ldif file missing in config file")
168
169 lines = open(config.get('Main','static_ldif_file'),'r').readlines()
170
171 # get the dns from the ldif file along with their VOs.
172 # first do the VOView dns
173
174 voviewlist = [ ]
175 qname = None
176 import re
177 jmpatt = re.compile(r'^GlueCEUniqueID=[-a-zA-Z0-9.:]+/[a-z]+-[a-z]+-([a-z_A-Z0-9]+)$')
178
179 class CEView(object):
180 def __init__(self, dn):
181 self.dn = dn
182 # if this is not present, then it's a GlueCE object
183 # if it is present, it's a GlueVOView object
184 self.localID = None
185 self.queuename = None
186 self.ACBRs = list()
187 def getCEUniqueID(self):
188 if self.localID == None:
189 return self.dn
190 else:
191 loc = self.dn.find(",") + 1
192 return self.dn[loc:]
193 CEUniqueID = property(getCEUniqueID,
194 doc="UniqueID of CE to which this view belongs")
195
196 dndict = dict() # key: dn string (not including leading "dn: ")
197 thisview = None
198
199 for line in lines:
200
201 s = line.strip()
202 if s.find('dn:') == 0 and s.find('GlueCEUniqueID') > 0:
203 dn = s[3:].strip()
204 thisview = CEView(dn)
205 if s.find('GlueVOViewLocalID') >= 0:
206 loc1 = s.find('=') + 1
207 loc2 = s.find(',GlueCEUniqueID')
208 id = s[loc1:loc2].strip()
209 thisview.localID = id
210 elif s.find('GlueCEName') == 0:
211 qname = s.split(':')[1].strip()
212 thisview.queuename = qname
213 elif s.find('GlueCEAccessControlBaseRule:') == 0:
214 loc = s.find(':') + 1
215 if loc == len(s):
216 abort_without_output("In static LDIF file, ACBR is " +
217 "missing a value: " + s)
218 rule = s[loc:].split(":")
219 if rule[0].strip() in ['VO', 'VOMS']:
220 thisview.ACBRs.append( ( rule[0].strip(), rule[1].strip() ) )
221
222 elif s == '' and thisview :
223 dndict[thisview.dn] = thisview
224 thisview = None # reset, about to enter new block.
225
226 # now find the queues that go with the VOViews
227 for d in dndict.keys():
228 view = dndict[d]
229 if view.localID: # then it's a VOView
230 view.queuename = dndict[view.CEUniqueID].queuename
231
232 DEBUG = 0
233
234 if DEBUG:
235 print "dumping parse tree for static ldif file"
236 for d in dndict.keys():
237 print "For dn:", d
238 print " LocalID: ", dndict[d].localID
239 print " CEUniqueID: ", dndict[d].CEUniqueID
240 print " Queue Name:", dndict[d].queuename
241 print " ACBRs: ", dndict[d].ACBRs
242
243 def tconv(timeval): # convert time to desired units
244 timeval = min(timeval, 2146060842)
245 return repr( int(timeval) ) # seconds->printable
246
247 cmd = config.get('LRMS','lrms_backend_cmd')
248
249 (stat,out) = commands.getstatusoutput(cmd)
250 if stat:
251 abort_without_output("LRMS backend command returned nonzero exit status")
252
253 lrmlines = out.split('\n')
254
255 sys.path.append('@TARGPREFIX@/@MODSUFFIX@')
256 import lrms
257 bq = lrms.Server()
258
259 # generate a cache of indices returned from possible lrms queries
260 # this makes a big speed diff when there are many (thousands) of jobs
261 # first we need to find all combinations VO/fqan, queue, state
262
263 ql = []
264 vol = []
265
266 for dn in dndict.keys():
267 view = dndict[dn]
268 if not view.localID : continue
269 if view.queuename not in ql : ql.append(view.queuename)
270 if view.ACBRs[-1][1] not in vol : vol.append(view.ACBRs[-1][1])
271
272 sl = ['running', 'queued']
273
274 # now construct an empty cache with all the index keys, ready to fill.
275
276 scache = { }
277
278 for v in vol:
279 for s in sl:
280 indstr = lrms.filt2str({'group':v, 'state': s})
281 scache[indstr] = {}
282
283 for q in ql:
284 for s in sl:
285 indstr = lrms.filt2str({'queue': q, 'state': s})
286 scache[indstr] = {}
287 for v in vol:
288 indstr = lrms.filt2str({'queue': q, 'state': s, 'group': v})
289 scache[indstr] = {}
290
291 #DEBUG print scache.keys()
292
293 # traverse the job list, filling the index cache as we go.
294
295 for line in lrmlines:
296 s = line.strip()
297 f = s.split()
298 if s[0] == '{' and s[-1] == '}': # looks like a dict
299 nj = lrms.Job(eval(s, {"__builtins__" : {}}))
300 grp = nj.get('group')
301 if grp in vomap.keys():
302 nj.set('group',vomap[grp])
303 jid = nj.get('jobid')
304 bq.addjob(jid,nj)
305 tst = nj.get('state')
306 if tst not in sl: continue
307 tgr = nj.get('group')
308 tq = nj.get('queue')
309 if tgr in vol and tq in ql:
310 dlis = [
311 {'group': tgr, 'state': tst},
312 {'queue': tq, 'state': tst},
313 {'queue': tq, 'state': tst, 'group': tgr}
314 ]
315 else:
316 if tgr in vol:
317 dlis = [
318 {'group': tgr, 'state': tst},
319 ]
320 elif tq in ql:
321 dlis = [
322 {'queue': tq, 'state': tst},
323 {'queue': tq, 'state': tst, 'group': tgr}
324 ]
325 else:
326 continue
327
328 for td in dlis :
329 indstr = lrms.filt2str(td)
330 if indstr not in scache.keys():
331 if DEBUG : print "huh??", indstr
332 scache[indstr] = {}
333 scache[lrms.filt2str(td)][jid] = nj
334 elif len(f) == 2:
335 if f[0] == 'nactive' : bq.slotsUp = int(f[1])
336 elif f[0] == 'nfree' : bq.slotsFree = int(f[1])
337 elif f[0] == 'now' : bq.now = int(f[1])
338 elif f[0] == 'schedCycle' : bq.schedCycle = int(f[1])
339 else:
340 logging.warn("invalid input in result of lrms plugin command " + cmd)
341 logging.warn("invalid input: " + s)
342 else:
343 logging.warn("invalid input in result of lrms plugin command " + cmd)
344 logging.warn("invalid input: " + s)
345
346 bq.__scache__ = scache
347
348 # for k in scache.keys():
349 # print k, len(scache[k])
350
351 if config.has_option('Scheduler','cycle_time'):
352 bq.schedCycle = config.getint('Scheduler','cycle_time')
353
354 # sys.exit(0)
355
356 from EstTT import *
357 wq = TTEstimator(WaitTime())
358
359 # first we do the VOViews
360 # sort the keys : needed to make sure output is reproducible for test harness :(
361
362 dnk = dndict.keys()
363 dnk.sort()
364
365 for dn in dnk:
366
367 view = dndict[dn]
368 if not view.localID: continue # we only want VOViews here
369 q = view.queuename
370 if len(view.ACBRs) > 1:
371 logging.warn("For dn: " + dn)
372 logging.warn("detected multiple ACBRs:")
373 for vo in view.ACBRs:
374 logging.warn(repr(vo))
375 logging.warn("only using the last one!")
376 vo = view.ACBRs[-1][1]
377
378 if DEBUG: print "'" + vo + "'"
379
380 print "dn: " + dn
381 print "GlueVOViewLocalID: " + view.localID
382 if DEBUG:
383 for acbr in view.ACBRs:
384 print "GlueCEAccessControlBaseRule: " + acbr[0] + ":" + acbr[1]
385
386 # this is tricky: reporting job counts in queues cares about queues,
387 # but scheduling doesn't as far as I know. So we need four job
388 # counts, running and waiting, both in the current queue and for all
389 # queues.
390
391 # first find what is happening in this queue (q) to report the
392 # WaitingJobs RunningJobs and TotalJobs
393
394
395 qrund = {'state': 'running', 'group': vo, 'queue': q}
396 qwaitd = {'state': 'queued', 'group': vo, 'queue': q}
397
398 runningJobs = bq.matchingJobs(qrund) ; nrun = len(runningJobs)
399 waitingJobs = bq.matchingJobs(qwaitd); nwait = len(waitingJobs)
400 if nwait > 0:
401 qwt = waitingJobs[0].get('maxwalltime')
402 else:
403 qwt = -1
404
405 print "GlueCEStateRunningJobs: " + repr(nrun)
406 print "GlueCEStateWaitingJobs: " + repr(nwait)
407 print "GlueCEStateTotalJobs: " + repr(nrun + nwait)
408
409 # now we find the same numbers, but integrated across all queues
410 # since the scheduler applies process caps without regard to queues.
411
412 rund = {'state': 'running', 'group': vo}
413 waitd = {'state': 'queued', 'group': vo}
414
415 runningJobs = bq.matchingJobs(rund) ; nrun = len(runningJobs)
416 waitingJobs = bq.matchingJobs(waitd); nwait = len(waitingJobs)
417
418 if nwait > 0:
419 freeslots = 0
420 else:
421 if vo in pcaps.keys():
422 headroom = max(pcaps[vo] - nrun,0)
423 freeslots = min(bq.slotsFree,headroom)
424 else:
425 freeslots = bq.slotsFree
426
427 ert = wq.estimate(bq,vo,debug=0)
428 print "GlueCEStateFreeJobSlots: " + repr(freeslots)
429 print "GlueCEStateEstimatedResponseTime: " + tconv(ert)
430 if nwait == 0:
431 wrt = 2 * ert
432 else:
433 if qwt < 0 :
434 qwt = waitingJobs[0].get('maxwalltime')
435 wrt = qwt * nwait
436
437 print "GlueCEStateWorstResponseTime: " + \
438 tconv((max(wrt,bq.schedCycle)))
439 print
440
441 # now do CEs (no VOs) ... this should be done 'right' some day
442 # and merged with the code above. Want to get it out quick
443 # (sigh, i am mortal) so ...
444
445 for dn in dnk:
446
447 view = dndict[dn]
448 if view.localID: continue # only want CEs, they have no localID
449 q = view.queuename
450 vo = '' # vo-independent
451
452 print "dn: " + dn
453
454 # for VO-independent numbers we only care about what is in the current
455 # queue. so we only need one set of stats
456
457 # first find what is happening in this queue (q) to report the
458 # WaitingJobs RunningJobs and TotalJobs
459
460 qrund = {'state': 'running', 'queue': q}
461 qwaitd = {'state': 'queued', 'queue': q}
462
463 runningJobs = bq.matchingJobs(qrund) ; nrun = len(runningJobs)
464 waitingJobs = bq.matchingJobs(qwaitd); nwait = len(waitingJobs)
465
466 if nwait > 0:
467 freeslots = 0
468 else:
469 freeslots = bq.slotsFree
470
471 ert = wq.estimate(bq,vo,debug=0)
472 print "GlueCEStateFreeJobSlots: " + repr(freeslots)
473 print "GlueCEStateFreeCPUs: " + repr(freeslots) # backw compat.
474 print "GlueCEStateRunningJobs: " + repr(nrun)
475 print "GlueCEStateWaitingJobs: " + repr(nwait)
476 print "GlueCEStateTotalJobs: " + repr(nrun + nwait)
477
478 wq.strategy.queue = q
479 ert = wq.estimate(bq,vo,debug=0)
480 print "GlueCEStateEstimatedResponseTime: " + tconv(ert)
481 if nwait == 0:
482 wrt = 2 * ert
483 else:
484 wrt = waitingJobs[0].get('maxwalltime') * nwait
485
486 print "GlueCEStateWorstResponseTime: " + \
487 tconv((max(wrt,bq.schedCycle)))
488 if DEBUG: print 'DEBUG: q', q
489 print
490
491 ### Local Variables: ***
492 ### mode: python ***
493 ### End: ***
494

Properties

Name Value
svn:executable *

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28