/[pdpsoft]/nl.nikhef.pdp.dynsched-pbs-plugin/trunk/pbsServer.py
ViewVC logotype

Annotation of /nl.nikhef.pdp.dynsched-pbs-plugin/trunk/pbsServer.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2012 - (hide annotations) (download) (as text)
Fri Oct 8 13:11:04 2010 UTC (11 years, 11 months ago) by templon
File MIME type: application/x-python
File size: 18078 byte(s)
first checkin, copied from old tree

1 templon 2012 #--
2     # pbsServer.py -- LRMS classes for PBS-based batch systems.
3     # $Revision: 1939 $
4     # $HeadURL$
5     # $Date: 2010-09-23 16:07:57 +0200 (Thu, 23 Sep 2010) $
6     # $Id$
7    
8     #--
9    
10     # Two classes derived from lrms.Server are included; one for
11     # working with a live PBS server, and one for representing historical
12     # states of the server from PBS accounting log files. A third class
13     # (History) is associated with the historical server class.
14    
15     from lrms import * # base Server class, Job class
16     import commands
17     import grp
18     import pwd
19     import re
20     import string
21     import sys
22     import time
23    
24     class LiveServer(Server):
25    
26     def __init__(self,*arg,**kw):
27    
28     Server.__init__(self)
29    
30     cnow = time.ctime() # this hack is to get around time zone problems
31     if 'file' in kw.keys() and kw['file'] != None:
32     cmdstr = '/bin/cat ' + kw['file']
33     else:
34     cmdstr = 'qstat -f'
35     (stat, qstatout) = commands.getstatusoutput(cmdstr)
36     from torque_utils import pbsnodes
37     cpucount = 0
38     jobcount = 0
39     for node in pbsnodes():
40     if node.isUp():
41     cpucount += node.numCpu
42     for cpu in range(node.numCpu):
43     jobcount += len(node.jobs[cpu])
44     self.slotsUp = cpucount
45     self.slotsFree = cpucount - jobcount
46    
47     nowtuple = time.strptime(cnow,"%c")
48     self.__evtime__ = int(time.mktime(nowtuple))
49    
50     keyvalpat = re.compile(r' (\S+) = (.*)')
51    
52     verbose_job_info=string.split(qstatout,"\n\n")
53     for j in verbose_job_info:
54     newj = Job()
55     lines = string.split(j,'\n ')
56     qstatDict = dict()
57     for ll in lines:
58     if string.find(ll,'Job Id') == 0:
59     newj.set('jobid',string.split(ll)[2])
60     continue
61     l = ll.replace('\n\t','')
62     mk = keyvalpat.match(l)
63     if not mk:
64     print "fatal error, should always be able to find a match"
65     print "unmatchable string was:", l
66     sys.exit(3)
67     qstatDict[mk.group(1)] = mk.group(2)
68    
69     # print qstatDict.keys()
70    
71     # do owner and group. try euser and group first, if not found, back off to old method,
72     # which was "user@host" parsing plus getgrid on user.
73    
74     keysfound = qstatDict.keys()
75     if 'euser' in keysfound and 'egroup' in keysfound:
76     newj.set('user', qstatDict['euser'])
77     newj.set('group',qstatDict['egroup'])
78     elif 'Job_Owner' in keysfound:
79     user_and_host = qstatDict['Job_Owner']
80     user = string.split(user_and_host,'@')[0]
81     newj.set('user',user)
82     try:
83     thisgroup=pwd.getpwnam(user)[3]
84     groupname=grp.getgrgid(thisgroup)[0]
85     except:
86     thisgroup='unknown'
87     groupname='unknown'
88     newj.set('group',groupname)
89     else:
90     print "Can't find user and group of job", newj.get('jobid')
91     sys.exit(4)
92    
93     # do job state
94     if 'job_state' in keysfound:
95     statelett = qstatDict['job_state']
96     if statelett in ['Q','W']:
97     val = 'queued'
98     elif statelett in ['R','E']:
99     val = 'running'
100     elif statelett in ['H', 'T']:
101     val = 'pending'
102     elif statelett == 'C':
103     val = 'done'
104     else:
105     val = 'unknown'
106     newj.set('state',val)
107     else:
108     newj.set('state','unknown')
109    
110     newj.set('queue', qstatDict['queue'])
111    
112     if 'qtime' in keysfound:
113     timestring = qstatDict['qtime']
114     timetuple = time.strptime(timestring,"%c")
115     newj.set('qtime',time.mktime(timetuple))
116     if 'Resource_List.walltime' in keysfound:
117     hms = qstatDict['Resource_List.walltime']
118     t = string.split(hms,':')
119     secs = int(t[2]) + \
120     60.0*(int(t[1]) + 60*int(t[0]))
121     newj.set('maxwalltime',secs)
122     if 'resources_used.walltime' in keysfound:
123     hms = qstatDict['resources_used.walltime']
124     t = string.split(hms,':')
125     secs = int(t[2]) + \
126     60.0*(int(t[1]) + 60*int(t[0]))
127     newj.set('walltime', secs)
128    
129     # do start time. if start_time is present use that,
130     # otherwise, back off to compute it via subtracting elapsed walltime from snapshot time.
131    
132     if 'start_time' in keysfound:
133     timestring = qstatDict['start_time']
134     timetuple = time.strptime(timestring,"%c")
135     newj.set('start',time.mktime(timetuple))
136     newj.set('startAnchor', 'start_time')
137     else:
138     start = self.__evtime__ - secs
139     newj.set('start',start)
140     newj.set('startAnchor', 'resources_used.walltime')
141     if 'Job_Name' in keysfound:
142     jnam = qstatDict['Job_Name']
143     newj.set('name',jnam)
144     if 'exec_host' in keysfound:
145     hlist = qstatDict['exec_host']
146     ncpu = hlist.count('+') + 1
147     newj.set('cpucount',ncpu)
148    
149     self.addjob(newj.get('jobid'),newj)
150    
151     import copy
152    
153     ## following is helper for class Event. superseded by newer keyvallist2dict function.
154     ## takes as arg a string of key=val pairs, returns a dict with the same
155     ## structure. example input string:
156     ## user=tdykstra group=niktheorie jobname=Q11_241828.gjob
157    
158     def keyval2dict(astring):
159     flds = string.split(astring)
160     d = {}
161     for f in flds:
162     kv=f.split("=",1)
163     if len(kv) == 2:
164     d[kv[0]] = kv[1]
165     else:
166     print f
167     print kv
168     raise CantHappenException
169     return d
170    
171     ## following is helper for class Event.
172     ## takes as arg a list of key=val pairs, returns a dict with the same
173     ## structure. example input string:
174     ## ['user=tdykstra', 'group=niktheorie', 'jobname=Q11_241828.gjob']
175    
176     def keyvallist2dict(kvlist):
177     d = {}
178     for f in kvlist:
179     kv=f.split("=",1)
180     if len(kv) == 2:
181     d[kv[0]] = kv[1]
182     else:
183     print "tried to split:", f, ", result was:", kv
184     raise CantHappenException
185     return d
186    
187     class Event:
188    
189     # simple class to represent events like job queued, job started, etc.
190    
191     def __init__(self,evstring,debug=0):
192    
193     self.__time__ = None # default values
194     self.__type__ = None
195     self.__jobid__ = None
196     self.__info__ = { }
197    
198     # search pattern for parsing string using "re" module
199     # for successful search, fields are:
200     # 1) timestamp
201     # 2) event type (Q,S,E,D, etc)
202     # 3) local PBS jobID
203     # 4) rest of line (key=value) to be parsed otherwise
204     # this structure is matched by evpatt
205    
206     evpatt = "^(.+);([A-Z]);(.+);(.*)"
207     m = re.search(evpatt,evstring)
208     if not m:
209     print "parse patt failed, offending line is"
210     print evstring
211     return
212     if debug:
213     print "timestamp", m.group(1)
214     print "code", m.group(2)
215     print "jobid", m.group(3)
216     print "attrs", m.group(4)
217    
218     # tpatt matches strings of form key=val
219     # lookahead assertion is necessary to work around presence of ' ' and '=' in some
220     # 'val' strings (like account, or neednodes with multiple processors)
221    
222     tpatt = r'[a-z._A-Z]+=[a-z0-9A-Z=/: -@_]+?(?=$| [a-z._A-Z]+=)'
223     tprog = re.compile(tpatt)
224     tmatch = tprog.findall(m.group(4))
225     if debug:
226     print "result of key=val match pattern:", tmatch
227    
228     # parse timestamp
229    
230     ttup=time.strptime(m.group(1),"%m/%d/%Y %H:%M:%S")
231    
232     # last element of time tuple is DST, but PBS log files
233     # don't specify time zone. Setting the last element of
234     # the tuple to -1 asks libC to figure it out based on
235     # local time zone of machine
236    
237     atup = ttup[:8] + (-1,)
238     self.__time__ = int(time.mktime(atup))
239     self.__type__ = m.group(2)
240     self.__jobid__ = m.group(3)
241     self.__info__ = keyvallist2dict(tmatch)
242    
243     def time(self):
244     return self.__time__
245     def type(self):
246     return self.__type__
247     def jobid(self):
248     return self.__jobid__
249     def info(self,key=''):
250     if key == '':
251     return self.__info__
252     else:
253     if key in self.__info__.keys():
254     return self.__info__[key]
255     else:
256     return None
257     def __str__(self):
258     return self.__jobid__ + ' :: event type ' + self.__type__ + \
259     ' at ' + str(self.__time__)
260    
261     class History:
262    
263     # the idea here is to generate a couple of data structures:
264     # first the job catalog, which associates things like
265     # queue entry time, start time, stop time, owner username/groupname,
266     # etc. with each PBS jobid.
267     # second the event list, which records when jobs enter the various
268     # queues, start to execute, and terminate. This list only holds
269     # a timestamp, jobid, and event type. This seems to be the
270     # minimal amount of information we'll need to be able to generate
271     # the state of the queue at any given arbitrary time.
272    
273     ## functions for using the job catalog. This beast is a dictionary
274     ## that has one entry (an object of class Job) per job seen in the
275     ## log files.
276    
277     def addjob(self, jobid, job) : # add new job entry
278     if self.hasjob(jobid):
279     print "job already found, exiting"
280     sys.exit(1)
281     job.set('jobid',jobid)
282     self.__jobcat__[jobid] = job
283    
284     def hasjob(self,jobid) : # test if job is already in catalogue
285     if jobid in self.__jobcat__.keys():
286     return 1
287     else:
288     return 0
289    
290     def setjobinfo(self,jobid,key,val) : # add or set info to/for existing job
291     if not self.hasjob(jobid):
292     print "job not found:", jobid, key, val
293     return 0
294     self.__jobcat__[jobid].set(key,val)
295     return 1
296    
297     def getjobinfo(self,jobid,key) : # get info for key from job jobid
298     if not self.hasjob(jobid):
299     print "job not found:", jobid, key
300     return 0
301     return self.__jobcat__[jobid].get(key)
302    
303     def getjoblist(self):
304     return self.__jobcat__.keys()
305    
306     def getjob(self,jobid):
307     return self.__jobcat__[jobid]
308    
309     ## functions for using event list. This beast is just a list (in sequence)
310     ## of all events seen while parsing the log files
311    
312     def getfirst_event_time(self):
313     return self.__evlist__[0].time()
314    
315     def getevent(self,index):
316     return self.__evlist__[index]
317    
318     ## functions for using the job event list. This beast is just a dictionary
319     ## (key=jobid), so only entry in dict for each job; the value for each
320     ## jobid is a list of events seen for this job (in sequence). Rationale
321     ## for this object is it can help in resolving ambiguous cases (like
322     ## multiple "D" events seen).
323    
324     def getjobevts(self,jobid):
325     if jobid not in self.__jobevs__.keys():
326     print 'getjobevs: jobid', jobid, 'not found in job_event db'
327     sys.exit(1)
328     return self.__jobevs__[jobid]
329    
330     def addevt(self,ev):
331     ## append to event list
332     self.__evlist__.append(ev)
333     ## append to job event struct; create entry for job if needed
334     if ev.jobid() not in self.__jobevs__.keys():
335     self.__jobevs__[ev.jobid()] = [ev]
336     else:
337     self.__jobevs__[ev.jobid()].append(ev)
338    
339     def __init__(self,logfilelist):
340    
341     self.__evlist__ = [ ]
342     self.__jobcat__ = { }
343     self.__jobevs__ = { }
344    
345     for f in logfilelist:
346     inf = open(f,'r')
347     line = inf.readline()
348     while line:
349     line = line[:-1] # chop off trailing newline
350     ev = Event(line) # parse line, create event
351     self.addevt(ev) # adds event to evlist and jobevt struct
352     ## vars for convenience in typing:
353     if ev.type() == 'Q': # job enters sys for 1st time
354     if self.hasjob(ev.jobid()): # should not happen
355     print 'Error, Q event seen for pre-existing job', \
356     ev.jobid()
357     sys.exit(1)
358     newentry = Job() # make new entry, fill info
359     newentry.set('qtime',ev.time())
360     newentry.set('queue',ev.info('queue'))
361     self.addjob(ev.jobid(),newentry)
362     else:
363     ## for all other event types
364     if not self.hasjob(ev.jobid()):
365     newentry = Job() # make new entry
366     self.addjob(ev.jobid(),newentry)
367     job = self.getjob(ev.jobid())
368     if ev.info('qtime'):
369     job.set('qtime',int(ev.info('qtime')))
370     if ev.info('queue') and not job.get('queue'):
371     job.set('queue',ev.info('queue'))
372     if ev.info('user') and not job.get('user'):
373     job.set('user',ev.info('user'))
374     if ev.info('group') and not job.get('group'):
375     job.set('group',ev.info('group'))
376     if ev.info('start') and not job.get('start'):
377     job.set('start',int(ev.info('start')))
378     if ev.info('Resource_List.walltime') and not \
379     job.get('maxwalltime'):
380     hms = ev.info('Resource_List.walltime')
381     (h,m,s) = hms.split(":")
382     maxwallsecs = int(h) * 3600 + int(m) * 60 + int(s)
383     job.set('maxwalltime',maxwallsecs)
384     if ev.info('end') and not job.get('end'):
385     job.set('end',int(ev.info('end')))
386    
387     ## special handling for troublesome event types
388    
389     if ev.type() == 'T': # job restart after checkpoint
390     ## previous job start record may not have been seen; if not, we don't
391     ## know if it was running or queued before, so just set current event time
392     ## as start since we know it's running now.
393     if not job.get('start'):
394     job.set('start',ev.time())
395     ## in some cases the T record may be the first (or even only) record
396     ## we see. in that case, the only thing we can say is that the qtime
397     ## was before the beginning of the log files ... set qtime like that.
398     ## if we see a later record for this job, that record will set the correct
399     ## qtime.
400     if not job.get('qtime'):
401     job.set('qtime',self.getfirst_event_time()-1)
402    
403     line = inf.readline()
404    
405     def get_evlist(self):
406     return self.__evlist__
407    
408     class LogFileServer(Server):
409     def __init__(self,history,debug=0):
410    
411     Server.__init__(self)
412    
413     self.__history__ = history
414     # want to get first event on first getnextevent call, so set initial index
415     # to -1 (one previous to 0!)
416     self.__evindx__ = -1
417     first_event = history.get_evlist()[0]
418     if debug:
419     print 'first event data:'
420     print first_event.jobid(), first_event.type(), first_event.time()
421     starttime = first_event.time()
422     jobidlist = history.getjoblist()
423     jobidlist.sort()
424     if debug:
425     print jobidlist
426     for jid in jobidlist:
427     entry = history.getjob(jid)
428     if debug:
429     print jid, entry.get('qtime'), starttime
430     print entry.get('qtime') >= starttime
431     if entry.get('qtime') >= starttime : break # done
432    
433     # we are guaranteed that the qtime of all jobs that make
434     # it this far are before the first event, so we need to
435     # figure if the job is queued or running, nothing else
436    
437     job = copy.deepcopy(entry)
438     job.set('jobid',jid)
439     if job.get('start') and job.get('start') < starttime :
440     job.set('state','running')
441     else:
442     job.set('state','queued')
443    
444     self.__jobdict__[jid] = job
445    
446     ## getnextevent shows you what the next event will be. step actually
447     ## changes the server state to reflect what happened in that event.
448     ## you need the two since you want to calculate ETT for an event
449     ## BEFORE you actually submit the job to the queue!
450    
451     def getnextevent(self): # return the next event
452     return self.__history__.getevent(self.__evindx__ + 1)
453     def step(self): # step to next event
454     self.__evindx__ = self.__evindx__ + 1
455     ev = self.__history__.getevent(self.__evindx__)
456     self.__evtime__ = ev.time()
457    
458     # take care of implications for queue states
459    
460     if ev.type() == 'Q' :
461     job = self.__history__.getjob(ev.jobid())
462     jobcopy = copy.deepcopy(job)
463     jobcopy.set('state','queued')
464     jobcopy.set('jobid',ev.jobid())
465     self.addjob(ev.jobid(),jobcopy)
466     elif ev.type() == 'S' :
467     job = self.getjob(ev.jobid())
468     job.set('state','running')
469     elif ev.type() == 'T' :
470     job = self.getjob(ev.jobid())
471     job.set('state','running')
472     job.set('start',ev.time())
473     elif ev.type() == 'E' :
474     self.deletejob(ev.jobid())
475     elif ev.type() == 'D' : # if it's the last delete evt seen, do it
476     jevtl = self.__history__.getjobevts(ev.jobid())
477     if jevtl[-1] == ev:
478     self.deletejob(ev.jobid())
479     return ev
480    

Properties

Name Value
svn:eol-style native
svn:keywords Id URL

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28