/[pdpsoft]/nl.nikhef.pdp.dynsched/trunk/lcg-info-dynamic-scheduler.cin
ViewVC logotype

Contents of /nl.nikhef.pdp.dynsched/trunk/lcg-info-dynamic-scheduler.cin

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2006 - (show annotations) (download)
Fri Oct 8 12:47:06 2010 UTC (11 years, 7 months ago) by templon
File size: 15448 byte(s)
works now

1 #!/usr/bin/python2
2 # lcg-info-dynamic-scheduler
3 # $Id$
4 # Source: $URL$
5 # J. A. Templon, NIKHEF/PDP 2005
6
7 # plugin for gip framework. generates Glue values associated
8 # with the scheduler, like FreeSlots or *ResponseTime
9
10 # first set up logging
11
12 import sys, logging
13 import syslog
14
15 # note: this class works around the bug in the python logging
16 # package fixed in patch #642974. If the version of python-logging
17 # is upgraded, this class can be replaced by logging.SysLogHandler
18
19 class SLHandler(logging.Handler):
20 def __init__(self, ident, logopt=0, facility=syslog.LOG_USER):
21 logging.Handler.__init__(self)
22 self.ident = ident
23 self.logopt = logopt
24 self.facility = facility
25 self.mappings = {
26 logging.DEBUG: syslog.LOG_DEBUG,
27 logging.INFO: syslog.LOG_INFO,
28 logging.WARN: syslog.LOG_WARNING,
29 logging.ERROR: syslog.LOG_ERR,
30 logging.CRITICAL: syslog.LOG_CRIT,
31 }
32
33 def encodeLevel(self, level):
34 return self.mappings.get(level, syslog.LOG_INFO)
35
36 def emit(self, record):
37 syslog.openlog(self.ident, self.logopt, self.facility)
38 msg = self.format(record)
39 prio = self.encodeLevel(record.levelno)
40 syslog.syslog(prio, msg)
41 syslog.closelog()
42
43 logging.getLogger("").setLevel(logging.INFO)
44 # syslog handler
45 shdlr = SLHandler("lcg-info-dynamic-scheduler")
46 logging.getLogger("").addHandler(shdlr)
47 # stderr handler
48 stdrhdlr = logging.StreamHandler()
49 fmt=logging.Formatter("%(asctime)s lcg-info-dynamic-scheduler:"
50 + " %(message)s","%F %T")
51 logging.getLogger("").addHandler(stdrhdlr)
52 stdrhdlr.setFormatter(fmt)
53
54 def usage():
55 print "Usage: lcg-info-dynamic-scheduler -c <cfg_file>"
56
57 import sys
58
59 def abort_without_output(msg):
60 logging.error(msg)
61 logging.error("Exiting without output, GIP will use static values")
62 sys.exit(2)
63
64 import getopt
65 import string
66
67 try:
68 opts, args = getopt.getopt(sys.argv[1:], "c:",
69 ["config="])
70 except getopt.GetoptError:
71 # print help information and exit:
72 emsg = "Error parsing command line: " + string.join(sys.argv)
73 usage()
74 abort_without_output(emsg)
75
76 cfgfile = None
77
78 for o, a in opts:
79 if o in ("-c", "--config"):
80 cfgfile = a
81
82 if not cfgfile:
83 usage()
84 abort_without_output("No config file specified.")
85
86 try:
87 fp = open(cfgfile)
88 except IOError:
89 abort_without_output("Error opening config file " + cfgfile)
90
91 import commands
92 import ConfigParser
93 config = ConfigParser.ConfigParser()
94 try:
95 config.readfp(fp)
96 except:
97 abort_without_output("While parsing config file: " + \
98 repr(sys.exc_info()[1]) + " " + cfgfile)
99
100 # note: vomap is only intended for the cases where voname != groupname
101 # hence at the bottom, we throw away all entries for which voname = groupname
102 # this is the "default scenario" in the code, adding such entries to the
103 # vomap "exception list" causes problems later on.
104
105 if config.has_option('Main','vomap'):
106 s = config.get('Main','vomap')
107 vomap = dict()
108 lines = s.split('\n')
109 for l in lines:
110 if len(l) > 0:
111 t = l.split(':')
112 group = t[0].strip()
113 vo = t[1].strip()
114 if vo != group : vomap[group] = vo
115
116 else:
117 vomap = dict() # empty dict, later 'if grp in vomap.keys()' => if False
118
119 if config.has_option('Scheduler','vo_max_jobs_cmd'):
120 cmd = config.get('Scheduler','vo_max_jobs_cmd')
121 (stat,out) = commands.getstatusoutput(cmd)
122 if stat:
123 abort_without_output(
124 "VO max jobs backend command returned nonzero exit status")
125 try:
126 pcaps = eval(out, {"__builtins__" : {}}) # namespace at closes security hole
127 except SyntaxError:
128 abort_without_output('vo max slot command output in wrong format')
129 else:
130 pcaps = dict() # empty dict, will trigger use of defaults later.
131
132 # apply group -> vo mappings to process cap dict
133 # sgroup (source group) is original from input files,
134 # targvo (target vo) is what we want to replace it with
135
136 #DEBUG opcaps = pcaps.copy()
137
138 for sgroup in vomap.keys():
139 targ = vomap[sgroup]
140 if sgroup in pcaps.keys():
141 if targ in pcaps.keys():
142 pcaps[targ] = pcaps[targ] + pcaps[sgroup]
143 else:
144 pcaps[targ] = pcaps[sgroup]
145 del pcaps[sgroup]
146
147 #DEBUGkl = opcaps.keys() + pcaps.keys()
148 #DEBUGkl.sort()
149 #DEBUGukl = []
150 #DEBUGfor k in kl:
151 #DEBUG if k not in ukl: ukl.append(k)
152 #DEBUGfor k in ukl:
153 #DEBUG v1 = -1
154 #DEBUG v2 = -1
155 #DEBUG if k in opcaps.keys():
156 #DEBUG v1 = opcaps[k]
157 #DEBUG if k in pcaps.keys():
158 #DEBUG v2 = pcaps[k]
159 #DEBUG
160 #DEBUG print "%10s %4d %4d" % (k, v1, v2)
161
162 if not config.has_option('LRMS','lrms_backend_cmd'):
163 abort_without_output("Spec for lrms backend cmd missing in config file")
164
165 # read cfg file to find spec for static ldif file
166
167 if not config.has_option('Main','static_ldif_file'):
168 abort_without_output("Spec for static ldif file missing in config file")
169
170 lines = open(config.get('Main','static_ldif_file'),'r').readlines()
171
172 # get the dns from the ldif file along with their VOs.
173 # first do the VOView dns
174
175 voviewlist = [ ]
176 qname = None
177 import re
178 jmpatt = re.compile(r'^GlueCEUniqueID=[-a-zA-Z0-9.:]+/[a-z]+-[a-z]+-([a-z_A-Z0-9]+)$')
179
180 class CEView(object):
181 def __init__(self, dn):
182 self.dn = dn
183 # if this is not present, then it's a GlueCE object
184 # if it is present, it's a GlueVOView object
185 self.localID = None
186 self.queuename = None
187 self.ACBRs = list()
188 def getCEUniqueID(self):
189 if self.localID == None:
190 return self.dn
191 else:
192 loc = self.dn.find(",") + 1
193 return self.dn[loc:]
194 CEUniqueID = property(getCEUniqueID,
195 doc="UniqueID of CE to which this view belongs")
196
197 dndict = dict() # key: dn string (not including leading "dn: ")
198 thisview = None
199
200 for line in lines:
201
202 s = line.strip()
203 if s.find('dn:') == 0 and s.find('GlueCEUniqueID') > 0:
204 dn = s[3:].strip()
205 thisview = CEView(dn)
206 if s.find('GlueVOViewLocalID') >= 0:
207 loc1 = s.find('=') + 1
208 loc2 = s.find(',GlueCEUniqueID')
209 id = s[loc1:loc2].strip()
210 thisview.localID = id
211 elif s.find('GlueCEName') == 0:
212 qname = s.split(':')[1].strip()
213 thisview.queuename = qname
214 elif s.find('GlueCEAccessControlBaseRule:') == 0:
215 loc = s.find(':') + 1
216 if loc == len(s):
217 abort_without_output("In static LDIF file, ACBR is " +
218 "missing a value: " + s)
219 rule = s[loc:].split(":")
220 if rule[0].strip() in ['VO', 'VOMS']:
221 thisview.ACBRs.append( ( rule[0].strip(), rule[1].strip() ) )
222
223 elif s == '' and thisview :
224 dndict[thisview.dn] = thisview
225 thisview = None # reset, about to enter new block.
226
227 # now find the queues that go with the VOViews
228 for d in dndict.keys():
229 view = dndict[d]
230 if view.localID: # then it's a VOView
231 view.queuename = dndict[view.CEUniqueID].queuename
232
233 DEBUG = 0
234
235 if DEBUG:
236 print "dumping parse tree for static ldif file"
237 for d in dndict.keys():
238 print "For dn:", d
239 print " LocalID: ", dndict[d].localID
240 print " CEUniqueID: ", dndict[d].CEUniqueID
241 print " Queue Name:", dndict[d].queuename
242 print " ACBRs: ", dndict[d].ACBRs
243
244 def tconv(timeval): # convert time to desired units
245 timeval = min(timeval, 2146060842)
246 return repr( int(timeval) ) # seconds->printable
247
248 cmd = config.get('LRMS','lrms_backend_cmd')
249
250 (stat,out) = commands.getstatusoutput(cmd)
251 if stat:
252 abort_without_output("LRMS backend command returned nonzero exit status")
253
254 lrmlines = out.split('\n')
255
256 sys.path.append('@TARGPREFIX@/@MODSUFFIX@')
257 import lrms
258 bq = lrms.Server()
259
260 # generate a cache of indices returned from possible lrms queries
261 # this makes a big speed diff when there are many (thousands) of jobs
262 # first we need to find all combinations VO/fqan, queue, state
263
264 ql = []
265 vol = []
266
267 for dn in dndict.keys():
268 view = dndict[dn]
269 if not view.localID : continue
270 if view.queuename not in ql : ql.append(view.queuename)
271 if view.ACBRs[-1][1] not in vol : vol.append(view.ACBRs[-1][1])
272
273 sl = ['running', 'queued']
274
275 # now construct an empty cache with all the index keys, ready to fill.
276
277 scache = { }
278
279 for v in vol:
280 for s in sl:
281 indstr = lrms.filt2str({'group':v, 'state': s})
282 scache[indstr] = {}
283
284 for q in ql:
285 for s in sl:
286 indstr = lrms.filt2str({'queue': q, 'state': s})
287 scache[indstr] = {}
288 for v in vol:
289 indstr = lrms.filt2str({'queue': q, 'state': s, 'group': v})
290 scache[indstr] = {}
291
292 #DEBUG print scache.keys()
293
294 # traverse the job list, filling the index cache as we go.
295
296 for line in lrmlines:
297 s = line.strip()
298 f = s.split()
299 if s[0] == '{' and s[-1] == '}': # looks like a dict
300 nj = lrms.Job(eval(s, {"__builtins__" : {}}))
301 grp = nj.get('group')
302 if grp in vomap.keys():
303 nj.set('group',vomap[grp])
304 jid = nj.get('jobid')
305 bq.addjob(jid,nj)
306 tst = nj.get('state')
307 if tst not in sl: continue
308 tgr = nj.get('group')
309 tq = nj.get('queue')
310 if tgr in vol and tq in ql:
311 dlis = [
312 {'group': tgr, 'state': tst},
313 {'queue': tq, 'state': tst},
314 {'queue': tq, 'state': tst, 'group': tgr}
315 ]
316 else:
317 if tgr in vol:
318 dlis = [
319 {'group': tgr, 'state': tst},
320 ]
321 elif tq in ql:
322 dlis = [
323 {'queue': tq, 'state': tst},
324 {'queue': tq, 'state': tst, 'group': tgr}
325 ]
326 else:
327 continue
328
329 for td in dlis :
330 indstr = lrms.filt2str(td)
331 if indstr not in scache.keys():
332 if DEBUG : print "huh??", indstr
333 scache[indstr] = {}
334 scache[lrms.filt2str(td)][jid] = nj
335 elif len(f) == 2:
336 if f[0] == 'nactive' : bq.slotsUp = int(f[1])
337 elif f[0] == 'nfree' : bq.slotsFree = int(f[1])
338 elif f[0] == 'now' : bq.now = int(f[1])
339 elif f[0] == 'schedCycle' : bq.schedCycle = int(f[1])
340 else:
341 logging.warn("invalid input in result of lrms plugin command " + cmd)
342 logging.warn("invalid input: " + s)
343 else:
344 logging.warn("invalid input in result of lrms plugin command " + cmd)
345 logging.warn("invalid input: " + s)
346
347 bq.__scache__ = scache
348
349 # for k in scache.keys():
350 # print k, len(scache[k])
351
352 if config.has_option('Scheduler','cycle_time'):
353 bq.schedCycle = config.getint('Scheduler','cycle_time')
354
355 # sys.exit(0)
356
357 from EstTT import *
358 wq = TTEstimator(WaitTime())
359
360 # first we do the VOViews
361 # sort the keys : needed to make sure output is reproducible for test harness :(
362
363 dnk = dndict.keys()
364 dnk.sort()
365
366 for dn in dnk:
367
368 view = dndict[dn]
369 if not view.localID: continue # we only want VOViews here
370 q = view.queuename
371 if len(view.ACBRs) > 1:
372 logging.warn("For dn: " + dn)
373 logging.warn("detected multiple ACBRs:")
374 for vo in view.ACBRs:
375 logging.warn(repr(vo))
376 logging.warn("only using the last one!")
377 vo = view.ACBRs[-1][1]
378
379 if DEBUG: print "'" + vo + "'"
380
381 print "dn: " + dn
382 print "GlueVOViewLocalID: " + view.localID
383 if DEBUG:
384 for acbr in view.ACBRs:
385 print "GlueCEAccessControlBaseRule: " + acbr[0] + ":" + acbr[1]
386
387 # this is tricky: reporting job counts in queues cares about queues,
388 # but scheduling doesn't as far as I know. So we need four job
389 # counts, running and waiting, both in the current queue and for all
390 # queues.
391
392 # first find what is happening in this queue (q) to report the
393 # WaitingJobs RunningJobs and TotalJobs
394
395
396 qrund = {'state': 'running', 'group': vo, 'queue': q}
397 qwaitd = {'state': 'queued', 'group': vo, 'queue': q}
398
399 runningJobs = bq.matchingJobs(qrund) ; nrun = len(runningJobs)
400 waitingJobs = bq.matchingJobs(qwaitd); nwait = len(waitingJobs)
401 if nwait > 0:
402 qwt = waitingJobs[0].get('maxwalltime')
403 else:
404 qwt = -1
405
406 print "GlueCEStateRunningJobs: " + repr(nrun)
407 print "GlueCEStateWaitingJobs: " + repr(nwait)
408 print "GlueCEStateTotalJobs: " + repr(nrun + nwait)
409
410 # now we find the same numbers, but integrated across all queues
411 # since the scheduler applies process caps without regard to queues.
412
413 rund = {'state': 'running', 'group': vo}
414 waitd = {'state': 'queued', 'group': vo}
415
416 runningJobs = bq.matchingJobs(rund) ; nrun = len(runningJobs)
417 waitingJobs = bq.matchingJobs(waitd); nwait = len(waitingJobs)
418
419 if nwait > 0:
420 freeslots = 0
421 else:
422 if vo in pcaps.keys():
423 headroom = max(pcaps[vo] - nrun,0)
424 freeslots = min(bq.slotsFree,headroom)
425 else:
426 freeslots = bq.slotsFree
427
428 ert = wq.estimate(bq,vo,debug=0)
429 print "GlueCEStateFreeJobSlots: " + repr(freeslots)
430 print "GlueCEStateEstimatedResponseTime: " + tconv(ert)
431 if nwait == 0:
432 wrt = 2 * ert
433 else:
434 if qwt < 0 :
435 qwt = waitingJobs[0].get('maxwalltime')
436 wrt = qwt * nwait
437
438 print "GlueCEStateWorstResponseTime: " + \
439 tconv((max(wrt,bq.schedCycle)))
440 print
441
442 # now do CEs (no VOs) ... this should be done 'right' some day
443 # and merged with the code above. Want to get it out quick
444 # (sigh, i am mortal) so ...
445
446 for dn in dnk:
447
448 view = dndict[dn]
449 if view.localID: continue # only want CEs, they have no localID
450 q = view.queuename
451 vo = '' # vo-independent
452
453 print "dn: " + dn
454
455 # for VO-independent numbers we only care about what is in the current
456 # queue. so we only need one set of stats
457
458 # first find what is happening in this queue (q) to report the
459 # WaitingJobs RunningJobs and TotalJobs
460
461 qrund = {'state': 'running', 'queue': q}
462 qwaitd = {'state': 'queued', 'queue': q}
463
464 runningJobs = bq.matchingJobs(qrund) ; nrun = len(runningJobs)
465 waitingJobs = bq.matchingJobs(qwaitd); nwait = len(waitingJobs)
466
467 if nwait > 0:
468 freeslots = 0
469 else:
470 freeslots = bq.slotsFree
471
472 ert = wq.estimate(bq,vo,debug=0)
473 print "GlueCEStateFreeJobSlots: " + repr(freeslots)
474 print "GlueCEStateFreeCPUs: " + repr(freeslots) # backw compat.
475 print "GlueCEStateRunningJobs: " + repr(nrun)
476 print "GlueCEStateWaitingJobs: " + repr(nwait)
477 print "GlueCEStateTotalJobs: " + repr(nrun + nwait)
478
479 wq.strategy.queue = q
480 ert = wq.estimate(bq,vo,debug=0)
481 print "GlueCEStateEstimatedResponseTime: " + tconv(ert)
482 if nwait == 0:
483 wrt = 2 * ert
484 else:
485 wrt = waitingJobs[0].get('maxwalltime') * nwait
486
487 print "GlueCEStateWorstResponseTime: " + \
488 tconv((max(wrt,bq.schedCycle)))
489 if DEBUG: print 'DEBUG: q', q
490 print
491
492 ### Local Variables: ***
493 ### mode: python ***
494 ### End: ***
495

Properties

Name Value
svn:eol-style native
svn:executable *
svn:keywords Id URL

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28