/[pdpsoft]/nl.nikhef.pdp.dynsched-pbs-plugin/trunk/pbsServer.py
ViewVC logotype

Contents of /nl.nikhef.pdp.dynsched-pbs-plugin/trunk/pbsServer.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2322 - (show annotations) (download) (as text)
Tue Jul 5 12:41:21 2011 UTC (11 years, 2 months ago) by templon
File MIME type: application/x-python
File size: 19070 byte(s)
remove svn revision comment as this was no longer being updated.
revision number is part of the Id string now.

1 #--
2 # pbsServer.py -- LRMS classes for PBS-based batch systems.
3 # $URL$
4 # $Id$
5
6 #--
7
8 # Two classes derived from lrms.Server are included; one for
9 # working with a live PBS server, and one for representing historical
10 # states of the server from PBS accounting log files. A third class
11 # (History) is associated with the historical server class.
12
13 from lrms import * # base Server class, Job class
14 import commands
15 import grp
16 import pwd
17 import re
18 import string
19 import sys
20 import time
21
22 class LiveServer(Server):
23
24 def __init__(self,*arg,**kw):
25
26 Server.__init__(self)
27
28 cnow = time.ctime() # this hack is to get around time zone problems
29 if 'file' in kw.keys() and kw['file'] != None:
30 cmdstr = '/bin/cat ' + kw['file']
31 else:
32 cmdstr = 'qstat -f'
33 (stat, qstatout) = commands.getstatusoutput(cmdstr)
34 if stat:
35 print 'problem getting qstat output; cmd used was', cmdstr
36 print 'returned status ', stat, ', text: ', qstatout
37 from torque_utils import pbsnodes
38 cpucount = 0
39 jobcount = 0
40 for node in pbsnodes():
41 if node.isUp():
42 cpucount += node.numCpu
43 for cpu in range(node.numCpu):
44 jobcount += len(node.jobs[cpu])
45 self.slotsUp = cpucount
46 self.slotsFree = cpucount - jobcount
47
48 nowtuple = time.strptime(cnow,"%c")
49 self.__evtime__ = int(time.mktime(nowtuple))
50
51 keyvalpat = re.compile(r' (\S+) = (.*)')
52
53 # guard against empty or nonconforming qstat output
54 if len(qstatout) == 0:
55 verbose_job_info = [] # assume valid output for system with no jobs running!
56 elif qstatout.find('Job Id') != 0:
57 print 'fatal error, qstat output starts with something other than a job id'
58 print 'first few characters are:', qstatout[:min(32,len(qstatout))]
59 sys.exit(4)
60 else:
61 # strip off the first 7 chars : Job Id:
62 # since we will use this string as part of the field that splits into job records,
63 # it will need to be removed in the first job as well
64 verbose_job_info=string.split(qstatout[7:],"\n\nJob Id:")
65
66 for j in verbose_job_info:
67 newj = Job()
68 lines = string.split(j,'\n ')
69 qstatDict = dict()
70
71 # first line is jobid
72 newj.set('jobid',lines[0].strip())
73
74 for ll in lines[1:]:
75 l = ll.replace('\n\t','')
76 mk = keyvalpat.match(l)
77 if not mk:
78 print "fatal error, should always be able to find a match"
79 print "unmatchable string was:", l
80 sys.exit(3)
81 qstatDict[mk.group(1)] = mk.group(2)
82
83 # print qstatDict.keys()
84
85 # do owner and group. try euser and group first, if not found, back off to old method,
86 # which was "user@host" parsing plus getgrid on user.
87
88 keysfound = qstatDict.keys()
89 if 'euser' in keysfound and 'egroup' in keysfound:
90 newj.set('user', qstatDict['euser'])
91 newj.set('group',qstatDict['egroup'])
92 elif 'Job_Owner' in keysfound:
93 user_and_host = qstatDict['Job_Owner']
94 user = string.split(user_and_host,'@')[0]
95 newj.set('user',user)
96 try:
97 thisgroup=pwd.getpwnam(user)[3]
98 groupname=grp.getgrgid(thisgroup)[0]
99 except:
100 thisgroup='unknown'
101 groupname='unknown'
102 newj.set('group',groupname)
103 else:
104 print "Can't find user and group of job", newj.get('jobid')
105 sys.exit(4)
106
107 # do job state
108 if 'job_state' in keysfound:
109 statelett = qstatDict['job_state']
110 if statelett in ['Q','W']:
111 val = 'queued'
112 elif statelett in ['R','E']:
113 val = 'running'
114 elif statelett in ['H', 'T']:
115 val = 'pending'
116 elif statelett == 'C':
117 val = 'done'
118 else:
119 val = 'unknown'
120 newj.set('state',val)
121 else:
122 newj.set('state','unknown')
123
124 newj.set('queue', qstatDict['queue'])
125
126 if 'qtime' in keysfound:
127 timestring = qstatDict['qtime']
128 timetuple = time.strptime(timestring,"%c")
129 newj.set('qtime',time.mktime(timetuple))
130 if 'Resource_List.walltime' in keysfound:
131 hms = qstatDict['Resource_List.walltime']
132 t = string.split(hms,':')
133 secs = int(t[2]) + \
134 60.0*(int(t[1]) + 60*int(t[0]))
135 newj.set('maxwalltime',secs)
136
137 # do start and wall times. default is to use qstat "start_time" for start times and
138 # qstat "resources_used.walltime" for walltime. Code can use walltime for start and
139 # start for walltime in situations where one is missing.
140
141 if 'start_time' in keysfound:
142 timestring = qstatDict['start_time']
143 timetuple = time.strptime(timestring,"%c")
144 newj.set('start',time.mktime(timetuple))
145 newj.set('startAnchor', 'start_time')
146 if 'resources_used.walltime' not in keysfound: # just after job starts, walltime is not printed
147 newj.set('walltime', self.__evtime__ - newj.get('start'))
148
149 if 'resources_used.walltime' in keysfound:
150 hms = qstatDict['resources_used.walltime']
151 t = string.split(hms,':')
152 secs = int(t[2]) + \
153 60.0*(int(t[1]) + 60*int(t[0]))
154 newj.set('walltime', secs)
155 if 'start_time' not in keysfound: # older torque version
156 start = self.__evtime__ - secs
157 newj.set('start',start)
158 newj.set('startAnchor', 'resources_used.walltime')
159 if 'Job_Name' in keysfound:
160 jnam = qstatDict['Job_Name']
161 newj.set('name',jnam)
162 if 'exec_host' in keysfound:
163 hlist = qstatDict['exec_host']
164 ncpu = hlist.count('+') + 1
165 newj.set('cpucount',ncpu)
166
167 self.addjob(newj.get('jobid'),newj)
168
169 import copy
170
171 ## following is helper for class Event. superseded by newer keyvallist2dict function.
172 ## takes as arg a string of key=val pairs, returns a dict with the same
173 ## structure. example input string:
174 ## user=tdykstra group=niktheorie jobname=Q11_241828.gjob
175
176 def keyval2dict(astring):
177 flds = string.split(astring)
178 d = {}
179 for f in flds:
180 kv=f.split("=",1)
181 if len(kv) == 2:
182 d[kv[0]] = kv[1]
183 else:
184 print f
185 print kv
186 raise CantHappenException
187 return d
188
189 ## following is helper for class Event.
190 ## takes as arg a list of key=val pairs, returns a dict with the same
191 ## structure. example input string:
192 ## ['user=tdykstra', 'group=niktheorie', 'jobname=Q11_241828.gjob']
193
194 def keyvallist2dict(kvlist):
195 d = {}
196 for f in kvlist:
197 kv=f.split("=",1)
198 if len(kv) == 2:
199 d[kv[0]] = kv[1]
200 else:
201 print "tried to split:", f, ", result was:", kv
202 raise CantHappenException
203 return d
204
205 class Event:
206
207 # simple class to represent events like job queued, job started, etc.
208
209 def __init__(self,evstring,debug=0):
210
211 self.__time__ = None # default values
212 self.__type__ = None
213 self.__jobid__ = None
214 self.__info__ = { }
215
216 # search pattern for parsing string using "re" module
217 # for successful search, fields are:
218 # 1) timestamp
219 # 2) event type (Q,S,E,D, etc)
220 # 3) local PBS jobID
221 # 4) rest of line (key=value) to be parsed otherwise
222 # this structure is matched by evpatt
223
224 evpatt = "^(.+);([A-Z]);(.+);(.*)"
225 m = re.search(evpatt,evstring)
226 if not m:
227 print "parse patt failed, offending line is"
228 print evstring
229 return
230 if debug:
231 print "timestamp", m.group(1)
232 print "code", m.group(2)
233 print "jobid", m.group(3)
234 print "attrs", m.group(4)
235
236 # tpatt matches strings of form key=val
237 # lookahead assertion is necessary to work around presence of ' ' and '=' in some
238 # 'val' strings (like account, or neednodes with multiple processors)
239
240 tpatt = r'[a-z._A-Z]+=[a-z0-9A-Z=/: -@_]+?(?=$| [a-z._A-Z]+=)'
241 tprog = re.compile(tpatt)
242 tmatch = tprog.findall(m.group(4))
243 if debug:
244 print "result of key=val match pattern:", tmatch
245
246 # parse timestamp
247
248 ttup=time.strptime(m.group(1),"%m/%d/%Y %H:%M:%S")
249
250 # last element of time tuple is DST, but PBS log files
251 # don't specify time zone. Setting the last element of
252 # the tuple to -1 asks libC to figure it out based on
253 # local time zone of machine
254
255 atup = ttup[:8] + (-1,)
256 self.__time__ = int(time.mktime(atup))
257 self.__type__ = m.group(2)
258 self.__jobid__ = m.group(3)
259 self.__info__ = keyvallist2dict(tmatch)
260
261 def time(self):
262 return self.__time__
263 def type(self):
264 return self.__type__
265 def jobid(self):
266 return self.__jobid__
267 def info(self,key=''):
268 if key == '':
269 return self.__info__
270 else:
271 if key in self.__info__.keys():
272 return self.__info__[key]
273 else:
274 return None
275 def __str__(self):
276 return self.__jobid__ + ' :: event type ' + self.__type__ + \
277 ' at ' + str(self.__time__)
278
279 class History:
280
281 # the idea here is to generate a couple of data structures:
282 # first the job catalog, which associates things like
283 # queue entry time, start time, stop time, owner username/groupname,
284 # etc. with each PBS jobid.
285 # second the event list, which records when jobs enter the various
286 # queues, start to execute, and terminate. This list only holds
287 # a timestamp, jobid, and event type. This seems to be the
288 # minimal amount of information we'll need to be able to generate
289 # the state of the queue at any given arbitrary time.
290
291 ## functions for using the job catalog. This beast is a dictionary
292 ## that has one entry (an object of class Job) per job seen in the
293 ## log files.
294
295 def addjob(self, jobid, job) : # add new job entry
296 if self.hasjob(jobid):
297 print "job already found, exiting"
298 sys.exit(1)
299 job.set('jobid',jobid)
300 self.__jobcat__[jobid] = job
301
302 def hasjob(self,jobid) : # test if job is already in catalogue
303 if jobid in self.__jobcat__.keys():
304 return 1
305 else:
306 return 0
307
308 def setjobinfo(self,jobid,key,val) : # add or set info to/for existing job
309 if not self.hasjob(jobid):
310 print "job not found:", jobid, key, val
311 return 0
312 self.__jobcat__[jobid].set(key,val)
313 return 1
314
315 def getjobinfo(self,jobid,key) : # get info for key from job jobid
316 if not self.hasjob(jobid):
317 print "job not found:", jobid, key
318 return 0
319 return self.__jobcat__[jobid].get(key)
320
321 def getjoblist(self):
322 return self.__jobcat__.keys()
323
324 def getjob(self,jobid):
325 return self.__jobcat__[jobid]
326
327 ## functions for using event list. This beast is just a list (in sequence)
328 ## of all events seen while parsing the log files
329
330 def getfirst_event_time(self):
331 return self.__evlist__[0].time()
332
333 def getevent(self,index):
334 return self.__evlist__[index]
335
336 ## functions for using the job event list. This beast is just a dictionary
337 ## (key=jobid), so only entry in dict for each job; the value for each
338 ## jobid is a list of events seen for this job (in sequence). Rationale
339 ## for this object is it can help in resolving ambiguous cases (like
340 ## multiple "D" events seen).
341
342 def getjobevts(self,jobid):
343 if jobid not in self.__jobevs__.keys():
344 print 'getjobevs: jobid', jobid, 'not found in job_event db'
345 sys.exit(1)
346 return self.__jobevs__[jobid]
347
348 def addevt(self,ev):
349 ## append to event list
350 self.__evlist__.append(ev)
351 ## append to job event struct; create entry for job if needed
352 if ev.jobid() not in self.__jobevs__.keys():
353 self.__jobevs__[ev.jobid()] = [ev]
354 else:
355 self.__jobevs__[ev.jobid()].append(ev)
356
357 def __init__(self,logfilelist):
358
359 self.__evlist__ = [ ]
360 self.__jobcat__ = { }
361 self.__jobevs__ = { }
362
363 for f in logfilelist:
364 inf = open(f,'r')
365 line = inf.readline()
366 while line:
367 line = line[:-1] # chop off trailing newline
368 ev = Event(line) # parse line, create event
369 self.addevt(ev) # adds event to evlist and jobevt struct
370 ## vars for convenience in typing:
371 if ev.type() == 'Q': # job enters sys for 1st time
372 if self.hasjob(ev.jobid()): # should not happen
373 print 'Error, Q event seen for pre-existing job', \
374 ev.jobid()
375 sys.exit(1)
376 newentry = Job() # make new entry, fill info
377 newentry.set('qtime',ev.time())
378 newentry.set('queue',ev.info('queue'))
379 self.addjob(ev.jobid(),newentry)
380 else:
381 ## for all other event types
382 if not self.hasjob(ev.jobid()):
383 newentry = Job() # make new entry
384 self.addjob(ev.jobid(),newentry)
385 job = self.getjob(ev.jobid())
386 if ev.info('qtime'):
387 job.set('qtime',int(ev.info('qtime')))
388 if ev.info('queue') and not job.get('queue'):
389 job.set('queue',ev.info('queue'))
390 if ev.info('user') and not job.get('user'):
391 job.set('user',ev.info('user'))
392 if ev.info('group') and not job.get('group'):
393 job.set('group',ev.info('group'))
394 if ev.info('start') and not job.get('start'):
395 job.set('start',int(ev.info('start')))
396 if ev.info('Resource_List.walltime') and not \
397 job.get('maxwalltime'):
398 hms = ev.info('Resource_List.walltime')
399 (h,m,s) = hms.split(":")
400 maxwallsecs = int(h) * 3600 + int(m) * 60 + int(s)
401 job.set('maxwalltime',maxwallsecs)
402 if ev.info('end') and not job.get('end'):
403 job.set('end',int(ev.info('end')))
404
405 ## special handling for troublesome event types
406
407 if ev.type() == 'T': # job restart after checkpoint
408 ## previous job start record may not have been seen; if not, we don't
409 ## know if it was running or queued before, so just set current event time
410 ## as start since we know it's running now.
411 if not job.get('start'):
412 job.set('start',ev.time())
413 ## in some cases the T record may be the first (or even only) record
414 ## we see. in that case, the only thing we can say is that the qtime
415 ## was before the beginning of the log files ... set qtime like that.
416 ## if we see a later record for this job, that record will set the correct
417 ## qtime.
418 if not job.get('qtime'):
419 job.set('qtime',self.getfirst_event_time()-1)
420
421 line = inf.readline()
422
423 def get_evlist(self):
424 return self.__evlist__
425
426 class LogFileServer(Server):
427 def __init__(self,history,debug=0):
428
429 Server.__init__(self)
430
431 self.__history__ = history
432 # want to get first event on first getnextevent call, so set initial index
433 # to -1 (one previous to 0!)
434 self.__evindx__ = -1
435 first_event = history.get_evlist()[0]
436 if debug:
437 print 'first event data:'
438 print first_event.jobid(), first_event.type(), first_event.time()
439 starttime = first_event.time()
440 jobidlist = history.getjoblist()
441 jobidlist.sort()
442 if debug:
443 print jobidlist
444 for jid in jobidlist:
445 entry = history.getjob(jid)
446 if debug:
447 print jid, entry.get('qtime'), starttime
448 print entry.get('qtime') >= starttime
449 if entry.get('qtime') >= starttime : break # done
450
451 # we are guaranteed that the qtime of all jobs that make
452 # it this far are before the first event, so we need to
453 # figure if the job is queued or running, nothing else
454
455 job = copy.deepcopy(entry)
456 job.set('jobid',jid)
457 if job.get('start') and job.get('start') < starttime :
458 job.set('state','running')
459 else:
460 job.set('state','queued')
461
462 self.__jobdict__[jid] = job
463
464 ## getnextevent shows you what the next event will be. step actually
465 ## changes the server state to reflect what happened in that event.
466 ## you need the two since you want to calculate ETT for an event
467 ## BEFORE you actually submit the job to the queue!
468
469 def getnextevent(self): # return the next event
470 return self.__history__.getevent(self.__evindx__ + 1)
471 def step(self): # step to next event
472 self.__evindx__ = self.__evindx__ + 1
473 ev = self.__history__.getevent(self.__evindx__)
474 self.__evtime__ = ev.time()
475
476 # take care of implications for queue states
477
478 if ev.type() == 'Q' :
479 job = self.__history__.getjob(ev.jobid())
480 jobcopy = copy.deepcopy(job)
481 jobcopy.set('state','queued')
482 jobcopy.set('jobid',ev.jobid())
483 self.addjob(ev.jobid(),jobcopy)
484 elif ev.type() == 'S' :
485 job = self.getjob(ev.jobid())
486 job.set('state','running')
487 elif ev.type() == 'T' :
488 job = self.getjob(ev.jobid())
489 job.set('state','running')
490 job.set('start',ev.time())
491 elif ev.type() == 'E' :
492 self.deletejob(ev.jobid())
493 elif ev.type() == 'D' : # if it's the last delete evt seen, do it
494 jevtl = self.__history__.getjobevts(ev.jobid())
495 if jevtl[-1] == ev:
496 self.deletejob(ev.jobid())
497 return ev
498

Properties

Name Value
svn:eol-style native
svn:keywords Id URL

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28