/[pdpsoft]/trunk/nl.nikhef.pdp.lrmsutils/lrms-pbs/pbsServer.py
ViewVC logotype

Annotation of /trunk/nl.nikhef.pdp.lrmsutils/lrms-pbs/pbsServer.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1882 - (hide annotations) (download) (as text)
Thu Aug 19 08:37:24 2010 UTC (12 years, 1 month ago) by templon
File MIME type: application/x-python
File size: 16531 byte(s)
added keyword stuff in header, does it work?

1 templon 78 #--
2     # pbsServer.py -- LRMS classes for PBS-based batch systems.
3 templon 1882 # $Revision$
4     # $HeadURL$
5     # $Date$
6     # $Id$
7    
8 templon 78 #--
9    
10     # Two classes derived from lrms.Server are included; one for
11     # working with a live PBS server, and one for representing historical
12     # states of the server from PBS accounting log files. A third class
13     # (History) is associated with the historical server class.
14    
15     from lrms import * # base Server class, Job class
16     import commands
17     import grp
18     import pwd
19     import re
20     import string
21     import sys
22     import time
23    
24     class LiveServer(Server):
25    
26     def __init__(self,*arg,**kw):
27    
28     Server.__init__(self)
29    
30     cnow = time.ctime() # this hack is to get around time zone problems
31     if 'file' in kw.keys() and kw['file'] != None:
32     cmdstr = '/bin/cat ' + kw['file']
33     else:
34     cmdstr = 'qstat -f'
35     (stat, qstatout) = commands.getstatusoutput(cmdstr)
36     from torque_utils import pbsnodes
37     cpucount = 0
38     jobcount = 0
39     for node in pbsnodes():
40     if node.isUp():
41     cpucount += node.numCpu
42     for cpu in range(node.numCpu):
43     jobcount += len(node.jobs[cpu])
44     self.slotsUp = cpucount
45     self.slotsFree = cpucount - jobcount
46    
47     nowtuple = time.strptime(cnow,"%c")
48     self.__evtime__ = int(time.mktime(nowtuple))
49    
50     verbose_job_info=string.split(qstatout,"\n\n")
51     for j in verbose_job_info:
52     newj = Job()
53     lines = string.split(j,'\n ')
54     for l in lines:
55     thisline = string.strip(l)
56     if string.find(thisline,'Job Id') == 0:
57     newj.set('jobid',string.split(thisline)[2])
58     elif string.find(thisline,'Job_Owner') == 0:
59     user_and_host = string.split(thisline)[2]
60     user = string.split(user_and_host,'@')[0]
61     newj.set('user',user)
62     try:
63     thisgroup=pwd.getpwnam(user)[3]
64     groupname=grp.getgrgid(thisgroup)[0]
65     except:
66     thisgroup='unknown'
67     groupname='unknown'
68     newj.set('group',groupname)
69     elif string.find(thisline,'job_state') == 0:
70     statelett = string.split(thisline)[2]
71     if statelett in ['Q','W']:
72     val = 'queued'
73     elif statelett in ['R','E']:
74     val = 'running'
75     elif statelett in ['H', 'T']:
76     val = 'pending'
77     elif statelett == 'C':
78     val = 'done'
79     else:
80     val = 'unknown'
81     newj.set('state',val)
82     elif string.find(thisline,'queue =') == 0:
83     newj.set('queue',string.split(thisline)[2])
84     elif string.find(thisline,'qtime =') == 0:
85     timestring = string.split(thisline,'=')[1][1:]
86     timetuple = time.strptime(timestring,"%c")
87     newj.set('qtime',time.mktime(timetuple))
88     elif string.find(thisline,'Resource_List.walltime') == 0:
89     hms = string.split(thisline)[2]
90     t = string.split(hms,':')
91     secs = int(t[2]) + \
92     60.0*(int(t[1]) + 60*int(t[0]))
93     newj.set('maxwalltime',secs)
94     elif string.find(thisline,'resources_used.walltime') == 0:
95     hms = string.split(thisline)[2]
96     t = string.split(hms,':')
97     secs = int(t[2]) + \
98     60.0*(int(t[1]) + 60*int(t[0]))
99     start = self.__evtime__ - secs
100     newj.set('start',start)
101     elif thisline.find('Job_Name') == 0:
102     jnam = thisline.split()[2]
103     newj.set('name',jnam)
104     elif thisline.find('exec_host') == 0:
105     hlist = thisline.split('=')[1]
106     ncpu = hlist.count('+') + 1
107     newj.set('cpucount',ncpu)
108    
109     self.addjob(newj.get('jobid'),newj)
110    
111     import copy
112    
113     ## following is helper for class Event.
114     ## takes as arg a string of key=val pairs, returns a dict with the same
115     ## structure. example input string:
116     ## user=tdykstra group=niktheorie jobname=Q11_241828.gjob
117    
118     def keyval2dict(astring):
119     flds = string.split(astring)
120     d = {}
121     for f in flds:
122     kv=string.split(f,"=")
123     if len(kv) == 2:
124     d[kv[0]] = kv[1]
125     else:
126 templon 1881 print kv
127 templon 78 raise CantHappenException
128     return d
129    
130     class Event:
131    
132     # simple class to represent events like job queued, job started, etc.
133    
134     def __init__(self,evstring,debug=0):
135    
136     # search pattern for parsing string using "re" module
137     # for successful search, fields are:
138     # 1) timestamp
139     # 2) event type (Q,S,E,D, etc)
140     # 3) local PBS jobID
141     # 4) rest of line (key=value) to be parsed otherwise
142    
143     self.__time__ = None # default values
144     self.__type__ = None
145     self.__jobid__ = None
146     self.__info__ = { }
147    
148 templon 1881 # search patt breaks on "account" attribute due to embedded whitespace and equals
149     # solution is to replace whitespace in this substring by character '^'
150     # and equals characters by '#'
151    
152     accpatt = "account=."
153     attrpatt = r' [a-zA-Z_.]+='
154     attrprog = re.compile(attrpatt)
155    
156     macc = re.search(accpatt,evstring)
157     if macc:
158     accstart=macc.start() + len(accpatt) # need to get past the = character
159     mnextattr=attrprog.search(evstring,macc.start())
160     if mnextattr:
161     accend = mnextattr.start()
162     accstr = evstring[accstart:accend]
163     tmpev = accstr.replace(' ','^')
164     escaped_version = tmpev.replace('=','#')
165     news = evstring.replace(accstr,escaped_version)
166     evstring=news
167    
168 templon 78 evpatt = "^(.+);([A-Z]);(.+);(.*)"
169     m = re.search(evpatt,evstring)
170     if not m:
171     print "parse patt failed, offending line is"
172     print evstring
173     return
174     if debug:
175     print "timestamp", m.group(1)
176     print "code", m.group(2)
177     print "jobid", m.group(3)
178     print "attrs", m.group(4)
179    
180     # parse timestamp
181    
182     ttup=time.strptime(m.group(1),"%m/%d/%Y %H:%M:%S")
183    
184     # last element of time tuple is DST, but PBS log files
185     # don't specify time zone. Setting the last element of
186     # the tuple to -1 asks libC to figure it out based on
187     # local time zone of machine
188    
189     atup = ttup[:8] + (-1,)
190     self.__time__ = int(time.mktime(atup))
191     self.__type__ = m.group(2)
192     self.__jobid__ = m.group(3)
193     self.__info__ = keyval2dict(m.group(4))
194    
195     def time(self):
196     return self.__time__
197     def type(self):
198     return self.__type__
199     def jobid(self):
200     return self.__jobid__
201     def info(self,key=''):
202     if key == '':
203     return self.__info__
204     else:
205     if key in self.__info__.keys():
206     return self.__info__[key]
207     else:
208     return None
209     def __str__(self):
210     return self.__jobid__ + ' :: event type ' + self.__type__ + \
211     ' at ' + str(self.__time__)
212    
213     class History:
214    
215     # the idea here is to generate a couple of data structures:
216     # first the job catalog, which associates things like
217     # queue entry time, start time, stop time, owner username/groupname,
218     # etc. with each PBS jobid.
219     # second the event list, which records when jobs enter the various
220     # queues, start to execute, and terminate. This list only holds
221     # a timestamp, jobid, and event type. This seems to be the
222     # minimal amount of information we'll need to be able to generate
223     # the state of the queue at any given arbitrary time.
224    
225     ## functions for using the job catalog. This beast is a dictionary
226     ## that has one entry (an object of class Job) per job seen in the
227     ## log files.
228    
229     def addjob(self, jobid, job) : # add new job entry
230     if self.hasjob(jobid):
231     print "job already found, exiting"
232     sys.exit(1)
233     job.set('jobid',jobid)
234     self.__jobcat__[jobid] = job
235    
236     def hasjob(self,jobid) : # test if job is already in catalogue
237     if jobid in self.__jobcat__.keys():
238     return 1
239     else:
240     return 0
241    
242     def setjobinfo(self,jobid,key,val) : # add or set info to/for existing job
243     if not self.hasjob(jobid):
244     print "job not found:", jobid, key, val
245     return 0
246     self.__jobcat__[jobid].set(key,val)
247     return 1
248    
249     def getjobinfo(self,jobid,key) : # get info for key from job jobid
250     if not self.hasjob(jobid):
251     print "job not found:", jobid, key
252     return 0
253     return self.__jobcat__[jobid].get(key)
254    
255     def getjoblist(self):
256     return self.__jobcat__.keys()
257    
258     def getjob(self,jobid):
259     return self.__jobcat__[jobid]
260    
261     ## functions for using event list. This beast is just a list (in sequence)
262     ## of all events seen while parsing the log files
263    
264     def getfirst_event_time(self):
265     return self.__evlist__[0].time()
266    
267     def getevent(self,index):
268     return self.__evlist__[index]
269    
270     ## functions for using the job event list. This beast is just a dictionary
271     ## (key=jobid), so only entry in dict for each job; the value for each
272     ## jobid is a list of events seen for this job (in sequence). Rationale
273     ## for this object is it can help in resolving ambiguous cases (like
274     ## multiple "D" events seen).
275    
276     def getjobevts(self,jobid):
277     if jobid not in self.__jobevs__.keys():
278     print 'getjobevs: jobid', jobid, 'not found in job_event db'
279     sys.exit(1)
280     return self.__jobevs__[jobid]
281    
282     def addevt(self,ev):
283     ## append to event list
284     self.__evlist__.append(ev)
285     ## append to job event struct; create entry for job if needed
286     if ev.jobid() not in self.__jobevs__.keys():
287     self.__jobevs__[ev.jobid()] = [ev]
288     else:
289     self.__jobevs__[ev.jobid()].append(ev)
290    
291     def __init__(self,logfilelist):
292    
293     self.__evlist__ = [ ]
294     self.__jobcat__ = { }
295     self.__jobevs__ = { }
296    
297     for f in logfilelist:
298     inf = open(f,'r')
299     line = inf.readline()
300     while line:
301     line = line[:-1] # chop off trailing newline
302     ev = Event(line) # parse line, create event
303     self.addevt(ev) # adds event to evlist and jobevt struct
304     ## vars for convenience in typing:
305     if ev.type() == 'Q': # job enters sys for 1st time
306     if self.hasjob(ev.jobid()): # should not happen
307     print 'Error, Q event seen for pre-existing job', \
308     ev.jobid()
309     sys.exit(1)
310     newentry = Job() # make new entry, fill info
311     newentry.set('qtime',ev.time())
312     newentry.set('queue',ev.info('queue'))
313     self.addjob(ev.jobid(),newentry)
314     else:
315     ## for all other event types
316     if not self.hasjob(ev.jobid()):
317     newentry = Job() # make new entry
318     self.addjob(ev.jobid(),newentry)
319     job = self.getjob(ev.jobid())
320     if ev.info('qtime'):
321     job.set('qtime',int(ev.info('qtime')))
322     if ev.info('queue') and not job.get('queue'):
323     job.set('queue',ev.info('queue'))
324     if ev.info('user') and not job.get('user'):
325     job.set('user',ev.info('user'))
326     if ev.info('group') and not job.get('group'):
327     job.set('group',ev.info('group'))
328     if ev.info('start') and not job.get('start'):
329     job.set('start',int(ev.info('start')))
330     if ev.info('Resource_List.walltime') and not \
331     job.get('maxwalltime'):
332     hms = ev.info('Resource_List.walltime')
333     hmsflds = string.split(hms,":")
334     maxwallsecs = int(hmsflds[0]) * 3600 + \
335     int(hmsflds[1]) * 60 + \
336     int(hmsflds[2])
337     job.set('maxwalltime',maxwallsecs)
338     if ev.info('end') and not job.get('end'):
339     job.set('end',int(ev.info('end')))
340    
341     ## special handling for troublesome event types
342    
343     if ev.type() == 'T': # job restart after checkpoint
344     ## previous job start record may not have been seen; if not, we don't
345     ## know if it was running or queued before, so just set current event time
346     ## as start since we know it's running now.
347     if not job.get('start'):
348     job.set('start',ev.time())
349     ## in some cases the T record may be the first (or even only) record
350     ## we see. in that case, the only thing we can say is that the qtime
351     ## was before the beginning of the log files ... set qtime like that.
352     ## if we see a later record for this job, that record will set the correct
353     ## qtime.
354     if not job.get('qtime'):
355     job.set('qtime',self.getfirst_event_time()-1)
356    
357     line = inf.readline()
358    
359     def get_evlist(self):
360     return self.__evlist__
361    
362     class LogFileServer(Server):
363     def __init__(self,history,debug=0):
364    
365     Server.__init__(self)
366    
367     self.__history__ = history
368     # want to get first event on first getnextevent call, so set initial index
369     # to -1 (one previous to 0!)
370     self.__evindx__ = -1
371     first_event = history.get_evlist()[0]
372     if debug:
373     print 'first event data:'
374     print first_event.jobid(), first_event.type(), first_event.time()
375     starttime = first_event.time()
376     jobidlist = history.getjoblist()
377     jobidlist.sort()
378     if debug:
379     print jobidlist
380     for jid in jobidlist:
381     entry = history.getjob(jid)
382     if debug:
383     print jid, entry.get('qtime'), starttime
384     print entry.get('qtime') >= starttime
385     if entry.get('qtime') >= starttime : break # done
386    
387     # we are guaranteed that the qtime of all jobs that make
388     # it this far are before the first event, so we need to
389     # figure if the job is queued or running, nothing else
390    
391     job = copy.deepcopy(entry)
392     job.set('jobid',jid)
393     if job.get('start') and job.get('start') < starttime :
394     job.set('state','running')
395     else:
396     job.set('state','queued')
397    
398     self.__jobdict__[jid] = job
399    
400     ## getnextevent shows you what the next event will be. step actually
401     ## changes the server state to reflect what happened in that event.
402     ## you need the two since you want to calculate ETT for an event
403     ## BEFORE you actually submit the job to the queue!
404    
405     def getnextevent(self): # return the next event
406     return self.__history__.getevent(self.__evindx__ + 1)
407     def step(self): # step to next event
408     self.__evindx__ = self.__evindx__ + 1
409     ev = self.__history__.getevent(self.__evindx__)
410     self.__evtime__ = ev.time()
411    
412     # take care of implications for queue states
413    
414     if ev.type() == 'Q' :
415     job = self.__history__.getjob(ev.jobid())
416     jobcopy = copy.deepcopy(job)
417     jobcopy.set('state','queued')
418     jobcopy.set('jobid',ev.jobid())
419     self.addjob(ev.jobid(),jobcopy)
420     elif ev.type() == 'S' :
421     job = self.getjob(ev.jobid())
422     job.set('state','running')
423     elif ev.type() == 'T' :
424     job = self.getjob(ev.jobid())
425     job.set('state','running')
426     job.set('start',ev.time())
427     elif ev.type() == 'E' :
428     self.deletejob(ev.jobid())
429     elif ev.type() == 'D' : # if it's the last delete evt seen, do it
430     jevtl = self.__history__.getjobevts(ev.jobid())
431     if jevtl[-1] == ev:
432     self.deletejob(ev.jobid())
433     return ev
434    

Properties

Name Value
svn:keyword ‘Revision
svn:keywords Revision HeadURL Date Id

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28