/[pdpsoft]/nl.nikhef.pdp.dynsched-pbs-plugin/trunk/pbsServer.py
ViewVC logotype

Annotation of /nl.nikhef.pdp.dynsched-pbs-plugin/trunk/pbsServer.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2322 - (hide annotations) (download) (as text)
Tue Jul 5 12:41:21 2011 UTC (11 years, 2 months ago) by templon
File MIME type: application/x-python
File size: 19070 byte(s)
remove svn revision comment as this was no longer being updated.
revision number is part of the Id string now.

1 templon 2012 #--
2     # pbsServer.py -- LRMS classes for PBS-based batch systems.
3 templon 2013 # $URL$
4 templon 2012 # $Id$
5    
6     #--
7    
8     # Two classes derived from lrms.Server are included; one for
9     # working with a live PBS server, and one for representing historical
10     # states of the server from PBS accounting log files. A third class
11     # (History) is associated with the historical server class.
12    
13     from lrms import * # base Server class, Job class
14     import commands
15     import grp
16     import pwd
17     import re
18     import string
19     import sys
20     import time
21    
22     class LiveServer(Server):
23    
24     def __init__(self,*arg,**kw):
25    
26     Server.__init__(self)
27    
28     cnow = time.ctime() # this hack is to get around time zone problems
29     if 'file' in kw.keys() and kw['file'] != None:
30     cmdstr = '/bin/cat ' + kw['file']
31     else:
32     cmdstr = 'qstat -f'
33     (stat, qstatout) = commands.getstatusoutput(cmdstr)
34 templon 2146 if stat:
35     print 'problem getting qstat output; cmd used was', cmdstr
36     print 'returned status ', stat, ', text: ', qstatout
37 templon 2012 from torque_utils import pbsnodes
38     cpucount = 0
39     jobcount = 0
40     for node in pbsnodes():
41     if node.isUp():
42     cpucount += node.numCpu
43     for cpu in range(node.numCpu):
44     jobcount += len(node.jobs[cpu])
45     self.slotsUp = cpucount
46     self.slotsFree = cpucount - jobcount
47    
48     nowtuple = time.strptime(cnow,"%c")
49     self.__evtime__ = int(time.mktime(nowtuple))
50    
51     keyvalpat = re.compile(r' (\S+) = (.*)')
52    
53 templon 2146 # guard against empty or nonconforming qstat output
54     if len(qstatout) == 0:
55     verbose_job_info = [] # assume valid output for system with no jobs running!
56     elif qstatout.find('Job Id') != 0:
57     print 'fatal error, qstat output starts with something other than a job id'
58     print 'first few characters are:', qstatout[:min(32,len(qstatout))]
59     sys.exit(4)
60     else:
61 templon 2253 # strip off the first 7 chars : Job Id:
62     # since we will use this string as part of the field that splits into job records,
63     # it will need to be removed in the first job as well
64     verbose_job_info=string.split(qstatout[7:],"\n\nJob Id:")
65    
66 templon 2012 for j in verbose_job_info:
67     newj = Job()
68     lines = string.split(j,'\n ')
69     qstatDict = dict()
70 templon 2253
71     # first line is jobid
72     newj.set('jobid',lines[0].strip())
73    
74     for ll in lines[1:]:
75 templon 2012 l = ll.replace('\n\t','')
76     mk = keyvalpat.match(l)
77     if not mk:
78     print "fatal error, should always be able to find a match"
79     print "unmatchable string was:", l
80     sys.exit(3)
81     qstatDict[mk.group(1)] = mk.group(2)
82    
83     # print qstatDict.keys()
84    
85     # do owner and group. try euser and group first, if not found, back off to old method,
86     # which was "user@host" parsing plus getgrid on user.
87    
88     keysfound = qstatDict.keys()
89     if 'euser' in keysfound and 'egroup' in keysfound:
90     newj.set('user', qstatDict['euser'])
91     newj.set('group',qstatDict['egroup'])
92     elif 'Job_Owner' in keysfound:
93     user_and_host = qstatDict['Job_Owner']
94     user = string.split(user_and_host,'@')[0]
95     newj.set('user',user)
96     try:
97     thisgroup=pwd.getpwnam(user)[3]
98     groupname=grp.getgrgid(thisgroup)[0]
99     except:
100     thisgroup='unknown'
101     groupname='unknown'
102     newj.set('group',groupname)
103     else:
104     print "Can't find user and group of job", newj.get('jobid')
105     sys.exit(4)
106    
107     # do job state
108     if 'job_state' in keysfound:
109     statelett = qstatDict['job_state']
110     if statelett in ['Q','W']:
111     val = 'queued'
112     elif statelett in ['R','E']:
113     val = 'running'
114     elif statelett in ['H', 'T']:
115     val = 'pending'
116     elif statelett == 'C':
117     val = 'done'
118     else:
119     val = 'unknown'
120     newj.set('state',val)
121     else:
122     newj.set('state','unknown')
123    
124     newj.set('queue', qstatDict['queue'])
125    
126     if 'qtime' in keysfound:
127     timestring = qstatDict['qtime']
128     timetuple = time.strptime(timestring,"%c")
129     newj.set('qtime',time.mktime(timetuple))
130     if 'Resource_List.walltime' in keysfound:
131     hms = qstatDict['Resource_List.walltime']
132     t = string.split(hms,':')
133     secs = int(t[2]) + \
134     60.0*(int(t[1]) + 60*int(t[0]))
135     newj.set('maxwalltime',secs)
136 templon 2146
137     # do start and wall times. default is to use qstat "start_time" for start times and
138     # qstat "resources_used.walltime" for walltime. Code can use walltime for start and
139     # start for walltime in situations where one is missing.
140    
141     if 'start_time' in keysfound:
142     timestring = qstatDict['start_time']
143     timetuple = time.strptime(timestring,"%c")
144     newj.set('start',time.mktime(timetuple))
145     newj.set('startAnchor', 'start_time')
146     if 'resources_used.walltime' not in keysfound: # just after job starts, walltime is not printed
147     newj.set('walltime', self.__evtime__ - newj.get('start'))
148    
149 templon 2012 if 'resources_used.walltime' in keysfound:
150     hms = qstatDict['resources_used.walltime']
151     t = string.split(hms,':')
152     secs = int(t[2]) + \
153     60.0*(int(t[1]) + 60*int(t[0]))
154     newj.set('walltime', secs)
155 templon 2146 if 'start_time' not in keysfound: # older torque version
156 templon 2012 start = self.__evtime__ - secs
157     newj.set('start',start)
158     newj.set('startAnchor', 'resources_used.walltime')
159     if 'Job_Name' in keysfound:
160     jnam = qstatDict['Job_Name']
161     newj.set('name',jnam)
162     if 'exec_host' in keysfound:
163     hlist = qstatDict['exec_host']
164     ncpu = hlist.count('+') + 1
165     newj.set('cpucount',ncpu)
166    
167     self.addjob(newj.get('jobid'),newj)
168    
169     import copy
170    
171     ## following is helper for class Event. superseded by newer keyvallist2dict function.
172     ## takes as arg a string of key=val pairs, returns a dict with the same
173     ## structure. example input string:
174     ## user=tdykstra group=niktheorie jobname=Q11_241828.gjob
175    
176     def keyval2dict(astring):
177     flds = string.split(astring)
178     d = {}
179     for f in flds:
180     kv=f.split("=",1)
181     if len(kv) == 2:
182     d[kv[0]] = kv[1]
183     else:
184     print f
185     print kv
186     raise CantHappenException
187     return d
188    
189     ## following is helper for class Event.
190     ## takes as arg a list of key=val pairs, returns a dict with the same
191     ## structure. example input string:
192     ## ['user=tdykstra', 'group=niktheorie', 'jobname=Q11_241828.gjob']
193    
194     def keyvallist2dict(kvlist):
195     d = {}
196     for f in kvlist:
197     kv=f.split("=",1)
198     if len(kv) == 2:
199     d[kv[0]] = kv[1]
200     else:
201     print "tried to split:", f, ", result was:", kv
202     raise CantHappenException
203     return d
204    
205     class Event:
206    
207     # simple class to represent events like job queued, job started, etc.
208    
209     def __init__(self,evstring,debug=0):
210    
211     self.__time__ = None # default values
212     self.__type__ = None
213     self.__jobid__ = None
214     self.__info__ = { }
215    
216     # search pattern for parsing string using "re" module
217     # for successful search, fields are:
218     # 1) timestamp
219     # 2) event type (Q,S,E,D, etc)
220     # 3) local PBS jobID
221     # 4) rest of line (key=value) to be parsed otherwise
222     # this structure is matched by evpatt
223    
224     evpatt = "^(.+);([A-Z]);(.+);(.*)"
225     m = re.search(evpatt,evstring)
226     if not m:
227     print "parse patt failed, offending line is"
228     print evstring
229     return
230     if debug:
231     print "timestamp", m.group(1)
232     print "code", m.group(2)
233     print "jobid", m.group(3)
234     print "attrs", m.group(4)
235    
236     # tpatt matches strings of form key=val
237     # lookahead assertion is necessary to work around presence of ' ' and '=' in some
238     # 'val' strings (like account, or neednodes with multiple processors)
239    
240     tpatt = r'[a-z._A-Z]+=[a-z0-9A-Z=/: -@_]+?(?=$| [a-z._A-Z]+=)'
241     tprog = re.compile(tpatt)
242     tmatch = tprog.findall(m.group(4))
243     if debug:
244     print "result of key=val match pattern:", tmatch
245    
246     # parse timestamp
247    
248     ttup=time.strptime(m.group(1),"%m/%d/%Y %H:%M:%S")
249    
250     # last element of time tuple is DST, but PBS log files
251     # don't specify time zone. Setting the last element of
252     # the tuple to -1 asks libC to figure it out based on
253     # local time zone of machine
254    
255     atup = ttup[:8] + (-1,)
256     self.__time__ = int(time.mktime(atup))
257     self.__type__ = m.group(2)
258     self.__jobid__ = m.group(3)
259     self.__info__ = keyvallist2dict(tmatch)
260    
261     def time(self):
262     return self.__time__
263     def type(self):
264     return self.__type__
265     def jobid(self):
266     return self.__jobid__
267     def info(self,key=''):
268     if key == '':
269     return self.__info__
270     else:
271     if key in self.__info__.keys():
272     return self.__info__[key]
273     else:
274     return None
275     def __str__(self):
276     return self.__jobid__ + ' :: event type ' + self.__type__ + \
277     ' at ' + str(self.__time__)
278    
279     class History:
280    
281     # the idea here is to generate a couple of data structures:
282     # first the job catalog, which associates things like
283     # queue entry time, start time, stop time, owner username/groupname,
284     # etc. with each PBS jobid.
285     # second the event list, which records when jobs enter the various
286     # queues, start to execute, and terminate. This list only holds
287     # a timestamp, jobid, and event type. This seems to be the
288     # minimal amount of information we'll need to be able to generate
289     # the state of the queue at any given arbitrary time.
290    
291     ## functions for using the job catalog. This beast is a dictionary
292     ## that has one entry (an object of class Job) per job seen in the
293     ## log files.
294    
295     def addjob(self, jobid, job) : # add new job entry
296     if self.hasjob(jobid):
297     print "job already found, exiting"
298     sys.exit(1)
299     job.set('jobid',jobid)
300     self.__jobcat__[jobid] = job
301    
302     def hasjob(self,jobid) : # test if job is already in catalogue
303     if jobid in self.__jobcat__.keys():
304     return 1
305     else:
306     return 0
307    
308     def setjobinfo(self,jobid,key,val) : # add or set info to/for existing job
309     if not self.hasjob(jobid):
310     print "job not found:", jobid, key, val
311     return 0
312     self.__jobcat__[jobid].set(key,val)
313     return 1
314    
315     def getjobinfo(self,jobid,key) : # get info for key from job jobid
316     if not self.hasjob(jobid):
317     print "job not found:", jobid, key
318     return 0
319     return self.__jobcat__[jobid].get(key)
320    
321     def getjoblist(self):
322     return self.__jobcat__.keys()
323    
324     def getjob(self,jobid):
325     return self.__jobcat__[jobid]
326    
327     ## functions for using event list. This beast is just a list (in sequence)
328     ## of all events seen while parsing the log files
329    
330     def getfirst_event_time(self):
331     return self.__evlist__[0].time()
332    
333     def getevent(self,index):
334     return self.__evlist__[index]
335    
336     ## functions for using the job event list. This beast is just a dictionary
337     ## (key=jobid), so only entry in dict for each job; the value for each
338     ## jobid is a list of events seen for this job (in sequence). Rationale
339     ## for this object is it can help in resolving ambiguous cases (like
340     ## multiple "D" events seen).
341    
342     def getjobevts(self,jobid):
343     if jobid not in self.__jobevs__.keys():
344     print 'getjobevs: jobid', jobid, 'not found in job_event db'
345     sys.exit(1)
346     return self.__jobevs__[jobid]
347    
348     def addevt(self,ev):
349     ## append to event list
350     self.__evlist__.append(ev)
351     ## append to job event struct; create entry for job if needed
352     if ev.jobid() not in self.__jobevs__.keys():
353     self.__jobevs__[ev.jobid()] = [ev]
354     else:
355     self.__jobevs__[ev.jobid()].append(ev)
356    
357     def __init__(self,logfilelist):
358    
359     self.__evlist__ = [ ]
360     self.__jobcat__ = { }
361     self.__jobevs__ = { }
362    
363     for f in logfilelist:
364     inf = open(f,'r')
365     line = inf.readline()
366     while line:
367     line = line[:-1] # chop off trailing newline
368     ev = Event(line) # parse line, create event
369     self.addevt(ev) # adds event to evlist and jobevt struct
370     ## vars for convenience in typing:
371     if ev.type() == 'Q': # job enters sys for 1st time
372     if self.hasjob(ev.jobid()): # should not happen
373     print 'Error, Q event seen for pre-existing job', \
374     ev.jobid()
375     sys.exit(1)
376     newentry = Job() # make new entry, fill info
377     newentry.set('qtime',ev.time())
378     newentry.set('queue',ev.info('queue'))
379     self.addjob(ev.jobid(),newentry)
380     else:
381     ## for all other event types
382     if not self.hasjob(ev.jobid()):
383     newentry = Job() # make new entry
384     self.addjob(ev.jobid(),newentry)
385     job = self.getjob(ev.jobid())
386     if ev.info('qtime'):
387     job.set('qtime',int(ev.info('qtime')))
388     if ev.info('queue') and not job.get('queue'):
389     job.set('queue',ev.info('queue'))
390     if ev.info('user') and not job.get('user'):
391     job.set('user',ev.info('user'))
392     if ev.info('group') and not job.get('group'):
393     job.set('group',ev.info('group'))
394     if ev.info('start') and not job.get('start'):
395     job.set('start',int(ev.info('start')))
396     if ev.info('Resource_List.walltime') and not \
397     job.get('maxwalltime'):
398     hms = ev.info('Resource_List.walltime')
399     (h,m,s) = hms.split(":")
400     maxwallsecs = int(h) * 3600 + int(m) * 60 + int(s)
401     job.set('maxwalltime',maxwallsecs)
402     if ev.info('end') and not job.get('end'):
403     job.set('end',int(ev.info('end')))
404    
405     ## special handling for troublesome event types
406    
407     if ev.type() == 'T': # job restart after checkpoint
408     ## previous job start record may not have been seen; if not, we don't
409     ## know if it was running or queued before, so just set current event time
410     ## as start since we know it's running now.
411     if not job.get('start'):
412     job.set('start',ev.time())
413     ## in some cases the T record may be the first (or even only) record
414     ## we see. in that case, the only thing we can say is that the qtime
415     ## was before the beginning of the log files ... set qtime like that.
416     ## if we see a later record for this job, that record will set the correct
417     ## qtime.
418     if not job.get('qtime'):
419     job.set('qtime',self.getfirst_event_time()-1)
420    
421     line = inf.readline()
422    
423     def get_evlist(self):
424     return self.__evlist__
425    
426     class LogFileServer(Server):
427     def __init__(self,history,debug=0):
428    
429     Server.__init__(self)
430    
431     self.__history__ = history
432     # want to get first event on first getnextevent call, so set initial index
433     # to -1 (one previous to 0!)
434     self.__evindx__ = -1
435     first_event = history.get_evlist()[0]
436     if debug:
437     print 'first event data:'
438     print first_event.jobid(), first_event.type(), first_event.time()
439     starttime = first_event.time()
440     jobidlist = history.getjoblist()
441     jobidlist.sort()
442     if debug:
443     print jobidlist
444     for jid in jobidlist:
445     entry = history.getjob(jid)
446     if debug:
447     print jid, entry.get('qtime'), starttime
448     print entry.get('qtime') >= starttime
449     if entry.get('qtime') >= starttime : break # done
450    
451     # we are guaranteed that the qtime of all jobs that make
452     # it this far are before the first event, so we need to
453     # figure if the job is queued or running, nothing else
454    
455     job = copy.deepcopy(entry)
456     job.set('jobid',jid)
457     if job.get('start') and job.get('start') < starttime :
458     job.set('state','running')
459     else:
460     job.set('state','queued')
461    
462     self.__jobdict__[jid] = job
463    
464     ## getnextevent shows you what the next event will be. step actually
465     ## changes the server state to reflect what happened in that event.
466     ## you need the two since you want to calculate ETT for an event
467     ## BEFORE you actually submit the job to the queue!
468    
469     def getnextevent(self): # return the next event
470     return self.__history__.getevent(self.__evindx__ + 1)
471     def step(self): # step to next event
472     self.__evindx__ = self.__evindx__ + 1
473     ev = self.__history__.getevent(self.__evindx__)
474     self.__evtime__ = ev.time()
475    
476     # take care of implications for queue states
477    
478     if ev.type() == 'Q' :
479     job = self.__history__.getjob(ev.jobid())
480     jobcopy = copy.deepcopy(job)
481     jobcopy.set('state','queued')
482     jobcopy.set('jobid',ev.jobid())
483     self.addjob(ev.jobid(),jobcopy)
484     elif ev.type() == 'S' :
485     job = self.getjob(ev.jobid())
486     job.set('state','running')
487     elif ev.type() == 'T' :
488     job = self.getjob(ev.jobid())
489     job.set('state','running')
490     job.set('start',ev.time())
491     elif ev.type() == 'E' :
492     self.deletejob(ev.jobid())
493     elif ev.type() == 'D' : # if it's the last delete evt seen, do it
494     jevtl = self.__history__.getjobevts(ev.jobid())
495     if jevtl[-1] == ev:
496     self.deletejob(ev.jobid())
497     return ev
498    

Properties

Name Value
svn:eol-style native
svn:keywords Id URL

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28