/[pdpsoft]/trunk/nl.nikhef.pdp.lrmsutils/lrms-pbs/pbsServer.py
ViewVC logotype

Contents of /trunk/nl.nikhef.pdp.lrmsutils/lrms-pbs/pbsServer.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1886 - (show annotations) (download) (as text)
Thu Aug 19 13:04:30 2010 UTC (11 years, 8 months ago) by templon
File MIME type: application/x-python
File size: 16584 byte(s)
working version with code to properly parse new accounting log files.

1 #--
2 # pbsServer.py -- LRMS classes for PBS-based batch systems.
3 # $Revision$
4 # $HeadURL$
5 # $Date$
6 # $Id$
7
8 #--
9
10 # Two classes derived from lrms.Server are included; one for
11 # working with a live PBS server, and one for representing historical
12 # states of the server from PBS accounting log files. A third class
13 # (History) is associated with the historical server class.
14
15 from lrms import * # base Server class, Job class
16 import commands
17 import grp
18 import pwd
19 import re
20 import string
21 import sys
22 import time
23
24 class LiveServer(Server):
25
26 def __init__(self,*arg,**kw):
27
28 Server.__init__(self)
29
30 cnow = time.ctime() # this hack is to get around time zone problems
31 if 'file' in kw.keys() and kw['file'] != None:
32 cmdstr = '/bin/cat ' + kw['file']
33 else:
34 cmdstr = 'qstat -f'
35 (stat, qstatout) = commands.getstatusoutput(cmdstr)
36 from torque_utils import pbsnodes
37 cpucount = 0
38 jobcount = 0
39 for node in pbsnodes():
40 if node.isUp():
41 cpucount += node.numCpu
42 for cpu in range(node.numCpu):
43 jobcount += len(node.jobs[cpu])
44 self.slotsUp = cpucount
45 self.slotsFree = cpucount - jobcount
46
47 nowtuple = time.strptime(cnow,"%c")
48 self.__evtime__ = int(time.mktime(nowtuple))
49
50 verbose_job_info=string.split(qstatout,"\n\n")
51 for j in verbose_job_info:
52 newj = Job()
53 lines = string.split(j,'\n ')
54 for l in lines:
55 thisline = string.strip(l)
56 if string.find(thisline,'Job Id') == 0:
57 newj.set('jobid',string.split(thisline)[2])
58 elif string.find(thisline,'Job_Owner') == 0:
59 user_and_host = string.split(thisline)[2]
60 user = string.split(user_and_host,'@')[0]
61 newj.set('user',user)
62 try:
63 thisgroup=pwd.getpwnam(user)[3]
64 groupname=grp.getgrgid(thisgroup)[0]
65 except:
66 thisgroup='unknown'
67 groupname='unknown'
68 newj.set('group',groupname)
69 elif string.find(thisline,'job_state') == 0:
70 statelett = string.split(thisline)[2]
71 if statelett in ['Q','W']:
72 val = 'queued'
73 elif statelett in ['R','E']:
74 val = 'running'
75 elif statelett in ['H', 'T']:
76 val = 'pending'
77 elif statelett == 'C':
78 val = 'done'
79 else:
80 val = 'unknown'
81 newj.set('state',val)
82 elif string.find(thisline,'queue =') == 0:
83 newj.set('queue',string.split(thisline)[2])
84 elif string.find(thisline,'qtime =') == 0:
85 timestring = string.split(thisline,'=')[1][1:]
86 timetuple = time.strptime(timestring,"%c")
87 newj.set('qtime',time.mktime(timetuple))
88 elif string.find(thisline,'Resource_List.walltime') == 0:
89 hms = string.split(thisline)[2]
90 t = string.split(hms,':')
91 secs = int(t[2]) + \
92 60.0*(int(t[1]) + 60*int(t[0]))
93 newj.set('maxwalltime',secs)
94 elif string.find(thisline,'resources_used.walltime') == 0:
95 hms = string.split(thisline)[2]
96 t = string.split(hms,':')
97 secs = int(t[2]) + \
98 60.0*(int(t[1]) + 60*int(t[0]))
99 start = self.__evtime__ - secs
100 newj.set('start',start)
101 elif thisline.find('Job_Name') == 0:
102 jnam = thisline.split()[2]
103 newj.set('name',jnam)
104 elif thisline.find('exec_host') == 0:
105 hlist = thisline.split('=')[1]
106 ncpu = hlist.count('+') + 1
107 newj.set('cpucount',ncpu)
108
109 self.addjob(newj.get('jobid'),newj)
110
111 import copy
112
113 ## following is helper for class Event. superseded by newer keyvallist2dict function.
114 ## takes as arg a string of key=val pairs, returns a dict with the same
115 ## structure. example input string:
116 ## user=tdykstra group=niktheorie jobname=Q11_241828.gjob
117
118 def keyval2dict(astring):
119 flds = string.split(astring)
120 d = {}
121 for f in flds:
122 kv=f.split("=",1)
123 if len(kv) == 2:
124 d[kv[0]] = kv[1]
125 else:
126 print f
127 print kv
128 raise CantHappenException
129 return d
130
131 ## following is helper for class Event.
132 ## takes as arg a list of key=val pairs, returns a dict with the same
133 ## structure. example input string:
134 ## ['user=tdykstra', 'group=niktheorie', 'jobname=Q11_241828.gjob']
135
136 def keyvallist2dict(kvlist):
137 d = {}
138 for f in kvlist:
139 kv=f.split("=",1)
140 if len(kv) == 2:
141 d[kv[0]] = kv[1]
142 else:
143 print "tried to split:", f, ", result was:", kv
144 raise CantHappenException
145 return d
146
147 class Event:
148
149 # simple class to represent events like job queued, job started, etc.
150
151 def __init__(self,evstring,debug=0):
152
153 self.__time__ = None # default values
154 self.__type__ = None
155 self.__jobid__ = None
156 self.__info__ = { }
157
158 # search pattern for parsing string using "re" module
159 # for successful search, fields are:
160 # 1) timestamp
161 # 2) event type (Q,S,E,D, etc)
162 # 3) local PBS jobID
163 # 4) rest of line (key=value) to be parsed otherwise
164 # this structure is matched by evpatt
165
166 evpatt = "^(.+);([A-Z]);(.+);(.*)"
167 m = re.search(evpatt,evstring)
168 if not m:
169 print "parse patt failed, offending line is"
170 print evstring
171 return
172 if debug:
173 print "timestamp", m.group(1)
174 print "code", m.group(2)
175 print "jobid", m.group(3)
176 print "attrs", m.group(4)
177
178 # tpatt matches strings of form key=val
179 # lookahead assertion is necessary to work around presence of ' ' and '=' in some
180 # 'val' strings (like account, or neednodes with multiple processors)
181
182 tpatt = r'[a-z._A-Z]+=[a-z0-9A-Z=/: -@_]+?(?=$| [a-z._A-Z]+=)'
183 tprog = re.compile(tpatt)
184 tmatch = tprog.findall(m.group(4))
185 if debug:
186 print "result of key=val match pattern:", tmatch
187
188 # parse timestamp
189
190 ttup=time.strptime(m.group(1),"%m/%d/%Y %H:%M:%S")
191
192 # last element of time tuple is DST, but PBS log files
193 # don't specify time zone. Setting the last element of
194 # the tuple to -1 asks libC to figure it out based on
195 # local time zone of machine
196
197 atup = ttup[:8] + (-1,)
198 self.__time__ = int(time.mktime(atup))
199 self.__type__ = m.group(2)
200 self.__jobid__ = m.group(3)
201 self.__info__ = keyvallist2dict(tmatch)
202
203 def time(self):
204 return self.__time__
205 def type(self):
206 return self.__type__
207 def jobid(self):
208 return self.__jobid__
209 def info(self,key=''):
210 if key == '':
211 return self.__info__
212 else:
213 if key in self.__info__.keys():
214 return self.__info__[key]
215 else:
216 return None
217 def __str__(self):
218 return self.__jobid__ + ' :: event type ' + self.__type__ + \
219 ' at ' + str(self.__time__)
220
221 class History:
222
223 # the idea here is to generate a couple of data structures:
224 # first the job catalog, which associates things like
225 # queue entry time, start time, stop time, owner username/groupname,
226 # etc. with each PBS jobid.
227 # second the event list, which records when jobs enter the various
228 # queues, start to execute, and terminate. This list only holds
229 # a timestamp, jobid, and event type. This seems to be the
230 # minimal amount of information we'll need to be able to generate
231 # the state of the queue at any given arbitrary time.
232
233 ## functions for using the job catalog. This beast is a dictionary
234 ## that has one entry (an object of class Job) per job seen in the
235 ## log files.
236
237 def addjob(self, jobid, job) : # add new job entry
238 if self.hasjob(jobid):
239 print "job already found, exiting"
240 sys.exit(1)
241 job.set('jobid',jobid)
242 self.__jobcat__[jobid] = job
243
244 def hasjob(self,jobid) : # test if job is already in catalogue
245 if jobid in self.__jobcat__.keys():
246 return 1
247 else:
248 return 0
249
250 def setjobinfo(self,jobid,key,val) : # add or set info to/for existing job
251 if not self.hasjob(jobid):
252 print "job not found:", jobid, key, val
253 return 0
254 self.__jobcat__[jobid].set(key,val)
255 return 1
256
257 def getjobinfo(self,jobid,key) : # get info for key from job jobid
258 if not self.hasjob(jobid):
259 print "job not found:", jobid, key
260 return 0
261 return self.__jobcat__[jobid].get(key)
262
263 def getjoblist(self):
264 return self.__jobcat__.keys()
265
266 def getjob(self,jobid):
267 return self.__jobcat__[jobid]
268
269 ## functions for using event list. This beast is just a list (in sequence)
270 ## of all events seen while parsing the log files
271
272 def getfirst_event_time(self):
273 return self.__evlist__[0].time()
274
275 def getevent(self,index):
276 return self.__evlist__[index]
277
278 ## functions for using the job event list. This beast is just a dictionary
279 ## (key=jobid), so only entry in dict for each job; the value for each
280 ## jobid is a list of events seen for this job (in sequence). Rationale
281 ## for this object is it can help in resolving ambiguous cases (like
282 ## multiple "D" events seen).
283
284 def getjobevts(self,jobid):
285 if jobid not in self.__jobevs__.keys():
286 print 'getjobevs: jobid', jobid, 'not found in job_event db'
287 sys.exit(1)
288 return self.__jobevs__[jobid]
289
290 def addevt(self,ev):
291 ## append to event list
292 self.__evlist__.append(ev)
293 ## append to job event struct; create entry for job if needed
294 if ev.jobid() not in self.__jobevs__.keys():
295 self.__jobevs__[ev.jobid()] = [ev]
296 else:
297 self.__jobevs__[ev.jobid()].append(ev)
298
299 def __init__(self,logfilelist):
300
301 self.__evlist__ = [ ]
302 self.__jobcat__ = { }
303 self.__jobevs__ = { }
304
305 for f in logfilelist:
306 inf = open(f,'r')
307 line = inf.readline()
308 while line:
309 line = line[:-1] # chop off trailing newline
310 ev = Event(line) # parse line, create event
311 self.addevt(ev) # adds event to evlist and jobevt struct
312 ## vars for convenience in typing:
313 if ev.type() == 'Q': # job enters sys for 1st time
314 if self.hasjob(ev.jobid()): # should not happen
315 print 'Error, Q event seen for pre-existing job', \
316 ev.jobid()
317 sys.exit(1)
318 newentry = Job() # make new entry, fill info
319 newentry.set('qtime',ev.time())
320 newentry.set('queue',ev.info('queue'))
321 self.addjob(ev.jobid(),newentry)
322 else:
323 ## for all other event types
324 if not self.hasjob(ev.jobid()):
325 newentry = Job() # make new entry
326 self.addjob(ev.jobid(),newentry)
327 job = self.getjob(ev.jobid())
328 if ev.info('qtime'):
329 job.set('qtime',int(ev.info('qtime')))
330 if ev.info('queue') and not job.get('queue'):
331 job.set('queue',ev.info('queue'))
332 if ev.info('user') and not job.get('user'):
333 job.set('user',ev.info('user'))
334 if ev.info('group') and not job.get('group'):
335 job.set('group',ev.info('group'))
336 if ev.info('start') and not job.get('start'):
337 job.set('start',int(ev.info('start')))
338 if ev.info('Resource_List.walltime') and not \
339 job.get('maxwalltime'):
340 hms = ev.info('Resource_List.walltime')
341 (h,m,s) = hms.split(":")
342 maxwallsecs = int(h) * 3600 + int(m) * 60 + int(s)
343 job.set('maxwalltime',maxwallsecs)
344 if ev.info('end') and not job.get('end'):
345 job.set('end',int(ev.info('end')))
346
347 ## special handling for troublesome event types
348
349 if ev.type() == 'T': # job restart after checkpoint
350 ## previous job start record may not have been seen; if not, we don't
351 ## know if it was running or queued before, so just set current event time
352 ## as start since we know it's running now.
353 if not job.get('start'):
354 job.set('start',ev.time())
355 ## in some cases the T record may be the first (or even only) record
356 ## we see. in that case, the only thing we can say is that the qtime
357 ## was before the beginning of the log files ... set qtime like that.
358 ## if we see a later record for this job, that record will set the correct
359 ## qtime.
360 if not job.get('qtime'):
361 job.set('qtime',self.getfirst_event_time()-1)
362
363 line = inf.readline()
364
365 def get_evlist(self):
366 return self.__evlist__
367
368 class LogFileServer(Server):
369 def __init__(self,history,debug=0):
370
371 Server.__init__(self)
372
373 self.__history__ = history
374 # want to get first event on first getnextevent call, so set initial index
375 # to -1 (one previous to 0!)
376 self.__evindx__ = -1
377 first_event = history.get_evlist()[0]
378 if debug:
379 print 'first event data:'
380 print first_event.jobid(), first_event.type(), first_event.time()
381 starttime = first_event.time()
382 jobidlist = history.getjoblist()
383 jobidlist.sort()
384 if debug:
385 print jobidlist
386 for jid in jobidlist:
387 entry = history.getjob(jid)
388 if debug:
389 print jid, entry.get('qtime'), starttime
390 print entry.get('qtime') >= starttime
391 if entry.get('qtime') >= starttime : break # done
392
393 # we are guaranteed that the qtime of all jobs that make
394 # it this far are before the first event, so we need to
395 # figure if the job is queued or running, nothing else
396
397 job = copy.deepcopy(entry)
398 job.set('jobid',jid)
399 if job.get('start') and job.get('start') < starttime :
400 job.set('state','running')
401 else:
402 job.set('state','queued')
403
404 self.__jobdict__[jid] = job
405
406 ## getnextevent shows you what the next event will be. step actually
407 ## changes the server state to reflect what happened in that event.
408 ## you need the two since you want to calculate ETT for an event
409 ## BEFORE you actually submit the job to the queue!
410
411 def getnextevent(self): # return the next event
412 return self.__history__.getevent(self.__evindx__ + 1)
413 def step(self): # step to next event
414 self.__evindx__ = self.__evindx__ + 1
415 ev = self.__history__.getevent(self.__evindx__)
416 self.__evtime__ = ev.time()
417
418 # take care of implications for queue states
419
420 if ev.type() == 'Q' :
421 job = self.__history__.getjob(ev.jobid())
422 jobcopy = copy.deepcopy(job)
423 jobcopy.set('state','queued')
424 jobcopy.set('jobid',ev.jobid())
425 self.addjob(ev.jobid(),jobcopy)
426 elif ev.type() == 'S' :
427 job = self.getjob(ev.jobid())
428 job.set('state','running')
429 elif ev.type() == 'T' :
430 job = self.getjob(ev.jobid())
431 job.set('state','running')
432 job.set('start',ev.time())
433 elif ev.type() == 'E' :
434 self.deletejob(ev.jobid())
435 elif ev.type() == 'D' : # if it's the last delete evt seen, do it
436 jevtl = self.__history__.getjobevts(ev.jobid())
437 if jevtl[-1] == ev:
438 self.deletejob(ev.jobid())
439 return ev
440

Properties

Name Value
svn:keyword ‘Revision
svn:keywords Revision HeadURL Date Id

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28