/[pdpsoft]/nl.nikhef.pdp.dynsched-pbs-plugin/branches/RB-2.2.0/pbsServer.py
ViewVC logotype

Contents of /nl.nikhef.pdp.dynsched-pbs-plugin/branches/RB-2.2.0/pbsServer.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2133 - (show annotations) (download) (as text)
Fri Jan 14 12:40:11 2011 UTC (11 years, 5 months ago) by templon
File MIME type: application/x-python
File size: 18663 byte(s)
fixed bug involving qstat output when no jobs are in the system.

1 #--
2 # pbsServer.py -- LRMS classes for PBS-based batch systems.
3 # $Revision: 1939 $
4 # $URL$
5 # $Date: 2010-09-23 16:07:57 +0200 (Thu, 23 Sep 2010) $
6 # $Id$
7
8 #--
9
10 # Two classes derived from lrms.Server are included; one for
11 # working with a live PBS server, and one for representing historical
12 # states of the server from PBS accounting log files. A third class
13 # (History) is associated with the historical server class.
14
15 from lrms import * # base Server class, Job class
16 import commands
17 import grp
18 import pwd
19 import re
20 import string
21 import sys
22 import time
23
24 class LiveServer(Server):
25
26 def __init__(self,*arg,**kw):
27
28 Server.__init__(self)
29
30 cnow = time.ctime() # this hack is to get around time zone problems
31 if 'file' in kw.keys() and kw['file'] != None:
32 cmdstr = '/bin/cat ' + kw['file']
33 else:
34 cmdstr = 'qstat -f'
35 (stat, qstatout) = commands.getstatusoutput(cmdstr)
36 if stat:
37 print 'problem getting qstat output; cmd used was', cmdstr
38 print 'returned status ', stat, ', text: ', qstatout
39 from torque_utils import pbsnodes
40 cpucount = 0
41 jobcount = 0
42 for node in pbsnodes():
43 if node.isUp():
44 cpucount += node.numCpu
45 for cpu in range(node.numCpu):
46 jobcount += len(node.jobs[cpu])
47 self.slotsUp = cpucount
48 self.slotsFree = cpucount - jobcount
49
50 nowtuple = time.strptime(cnow,"%c")
51 self.__evtime__ = int(time.mktime(nowtuple))
52
53 keyvalpat = re.compile(r' (\S+) = (.*)')
54
55 # guard against empty or nonconforming qstat output
56 if len(qstatout) == 0:
57 verbose_job_info = [] # assume valid output for system with no jobs running!
58 elif qstatout.find('Job Id') != 0:
59 print 'fatal error, qstat output starts with something other than a job id'
60 print 'first few characters are:', qstatout[:min(32,len(qstatout))]
61 sys.exit(4)
62 else:
63 verbose_job_info=string.split(qstatout,"\n\n")
64
65 for j in verbose_job_info:
66 newj = Job()
67 lines = string.split(j,'\n ')
68 qstatDict = dict()
69 for ll in lines:
70 if string.find(ll,'Job Id') == 0:
71 newj.set('jobid',string.split(ll)[2])
72 continue
73 l = ll.replace('\n\t','')
74 mk = keyvalpat.match(l)
75 if not mk:
76 print "fatal error, should always be able to find a match"
77 print "unmatchable string was:", l
78 sys.exit(3)
79 qstatDict[mk.group(1)] = mk.group(2)
80
81 # print qstatDict.keys()
82
83 # do owner and group. try euser and group first, if not found, back off to old method,
84 # which was "user@host" parsing plus getgrid on user.
85
86 keysfound = qstatDict.keys()
87 if 'euser' in keysfound and 'egroup' in keysfound:
88 newj.set('user', qstatDict['euser'])
89 newj.set('group',qstatDict['egroup'])
90 elif 'Job_Owner' in keysfound:
91 user_and_host = qstatDict['Job_Owner']
92 user = string.split(user_and_host,'@')[0]
93 newj.set('user',user)
94 try:
95 thisgroup=pwd.getpwnam(user)[3]
96 groupname=grp.getgrgid(thisgroup)[0]
97 except:
98 thisgroup='unknown'
99 groupname='unknown'
100 newj.set('group',groupname)
101 else:
102 print "Can't find user and group of job", newj.get('jobid')
103 sys.exit(4)
104
105 # do job state
106 if 'job_state' in keysfound:
107 statelett = qstatDict['job_state']
108 if statelett in ['Q','W']:
109 val = 'queued'
110 elif statelett in ['R','E']:
111 val = 'running'
112 elif statelett in ['H', 'T']:
113 val = 'pending'
114 elif statelett == 'C':
115 val = 'done'
116 else:
117 val = 'unknown'
118 newj.set('state',val)
119 else:
120 newj.set('state','unknown')
121
122 newj.set('queue', qstatDict['queue'])
123
124 if 'qtime' in keysfound:
125 timestring = qstatDict['qtime']
126 timetuple = time.strptime(timestring,"%c")
127 newj.set('qtime',time.mktime(timetuple))
128 if 'Resource_List.walltime' in keysfound:
129 hms = qstatDict['Resource_List.walltime']
130 t = string.split(hms,':')
131 secs = int(t[2]) + \
132 60.0*(int(t[1]) + 60*int(t[0]))
133 newj.set('maxwalltime',secs)
134 if 'resources_used.walltime' in keysfound:
135 hms = qstatDict['resources_used.walltime']
136 t = string.split(hms,':')
137 secs = int(t[2]) + \
138 60.0*(int(t[1]) + 60*int(t[0]))
139 newj.set('walltime', secs)
140
141 # do start time. if start_time is present use that,
142 # otherwise, back off to compute it via subtracting elapsed walltime from snapshot time.
143
144 if 'start_time' in keysfound:
145 timestring = qstatDict['start_time']
146 timetuple = time.strptime(timestring,"%c")
147 newj.set('start',time.mktime(timetuple))
148 newj.set('startAnchor', 'start_time')
149 else:
150 start = self.__evtime__ - secs
151 newj.set('start',start)
152 newj.set('startAnchor', 'resources_used.walltime')
153 if 'Job_Name' in keysfound:
154 jnam = qstatDict['Job_Name']
155 newj.set('name',jnam)
156 if 'exec_host' in keysfound:
157 hlist = qstatDict['exec_host']
158 ncpu = hlist.count('+') + 1
159 newj.set('cpucount',ncpu)
160
161 self.addjob(newj.get('jobid'),newj)
162
163 import copy
164
165 ## following is helper for class Event. superseded by newer keyvallist2dict function.
166 ## takes as arg a string of key=val pairs, returns a dict with the same
167 ## structure. example input string:
168 ## user=tdykstra group=niktheorie jobname=Q11_241828.gjob
169
170 def keyval2dict(astring):
171 flds = string.split(astring)
172 d = {}
173 for f in flds:
174 kv=f.split("=",1)
175 if len(kv) == 2:
176 d[kv[0]] = kv[1]
177 else:
178 print f
179 print kv
180 raise CantHappenException
181 return d
182
183 ## following is helper for class Event.
184 ## takes as arg a list of key=val pairs, returns a dict with the same
185 ## structure. example input string:
186 ## ['user=tdykstra', 'group=niktheorie', 'jobname=Q11_241828.gjob']
187
188 def keyvallist2dict(kvlist):
189 d = {}
190 for f in kvlist:
191 kv=f.split("=",1)
192 if len(kv) == 2:
193 d[kv[0]] = kv[1]
194 else:
195 print "tried to split:", f, ", result was:", kv
196 raise CantHappenException
197 return d
198
199 class Event:
200
201 # simple class to represent events like job queued, job started, etc.
202
203 def __init__(self,evstring,debug=0):
204
205 self.__time__ = None # default values
206 self.__type__ = None
207 self.__jobid__ = None
208 self.__info__ = { }
209
210 # search pattern for parsing string using "re" module
211 # for successful search, fields are:
212 # 1) timestamp
213 # 2) event type (Q,S,E,D, etc)
214 # 3) local PBS jobID
215 # 4) rest of line (key=value) to be parsed otherwise
216 # this structure is matched by evpatt
217
218 evpatt = "^(.+);([A-Z]);(.+);(.*)"
219 m = re.search(evpatt,evstring)
220 if not m:
221 print "parse patt failed, offending line is"
222 print evstring
223 return
224 if debug:
225 print "timestamp", m.group(1)
226 print "code", m.group(2)
227 print "jobid", m.group(3)
228 print "attrs", m.group(4)
229
230 # tpatt matches strings of form key=val
231 # lookahead assertion is necessary to work around presence of ' ' and '=' in some
232 # 'val' strings (like account, or neednodes with multiple processors)
233
234 tpatt = r'[a-z._A-Z]+=[a-z0-9A-Z=/: -@_]+?(?=$| [a-z._A-Z]+=)'
235 tprog = re.compile(tpatt)
236 tmatch = tprog.findall(m.group(4))
237 if debug:
238 print "result of key=val match pattern:", tmatch
239
240 # parse timestamp
241
242 ttup=time.strptime(m.group(1),"%m/%d/%Y %H:%M:%S")
243
244 # last element of time tuple is DST, but PBS log files
245 # don't specify time zone. Setting the last element of
246 # the tuple to -1 asks libC to figure it out based on
247 # local time zone of machine
248
249 atup = ttup[:8] + (-1,)
250 self.__time__ = int(time.mktime(atup))
251 self.__type__ = m.group(2)
252 self.__jobid__ = m.group(3)
253 self.__info__ = keyvallist2dict(tmatch)
254
255 def time(self):
256 return self.__time__
257 def type(self):
258 return self.__type__
259 def jobid(self):
260 return self.__jobid__
261 def info(self,key=''):
262 if key == '':
263 return self.__info__
264 else:
265 if key in self.__info__.keys():
266 return self.__info__[key]
267 else:
268 return None
269 def __str__(self):
270 return self.__jobid__ + ' :: event type ' + self.__type__ + \
271 ' at ' + str(self.__time__)
272
273 class History:
274
275 # the idea here is to generate a couple of data structures:
276 # first the job catalog, which associates things like
277 # queue entry time, start time, stop time, owner username/groupname,
278 # etc. with each PBS jobid.
279 # second the event list, which records when jobs enter the various
280 # queues, start to execute, and terminate. This list only holds
281 # a timestamp, jobid, and event type. This seems to be the
282 # minimal amount of information we'll need to be able to generate
283 # the state of the queue at any given arbitrary time.
284
285 ## functions for using the job catalog. This beast is a dictionary
286 ## that has one entry (an object of class Job) per job seen in the
287 ## log files.
288
289 def addjob(self, jobid, job) : # add new job entry
290 if self.hasjob(jobid):
291 print "job already found, exiting"
292 sys.exit(1)
293 job.set('jobid',jobid)
294 self.__jobcat__[jobid] = job
295
296 def hasjob(self,jobid) : # test if job is already in catalogue
297 if jobid in self.__jobcat__.keys():
298 return 1
299 else:
300 return 0
301
302 def setjobinfo(self,jobid,key,val) : # add or set info to/for existing job
303 if not self.hasjob(jobid):
304 print "job not found:", jobid, key, val
305 return 0
306 self.__jobcat__[jobid].set(key,val)
307 return 1
308
309 def getjobinfo(self,jobid,key) : # get info for key from job jobid
310 if not self.hasjob(jobid):
311 print "job not found:", jobid, key
312 return 0
313 return self.__jobcat__[jobid].get(key)
314
315 def getjoblist(self):
316 return self.__jobcat__.keys()
317
318 def getjob(self,jobid):
319 return self.__jobcat__[jobid]
320
321 ## functions for using event list. This beast is just a list (in sequence)
322 ## of all events seen while parsing the log files
323
324 def getfirst_event_time(self):
325 return self.__evlist__[0].time()
326
327 def getevent(self,index):
328 return self.__evlist__[index]
329
330 ## functions for using the job event list. This beast is just a dictionary
331 ## (key=jobid), so only entry in dict for each job; the value for each
332 ## jobid is a list of events seen for this job (in sequence). Rationale
333 ## for this object is it can help in resolving ambiguous cases (like
334 ## multiple "D" events seen).
335
336 def getjobevts(self,jobid):
337 if jobid not in self.__jobevs__.keys():
338 print 'getjobevs: jobid', jobid, 'not found in job_event db'
339 sys.exit(1)
340 return self.__jobevs__[jobid]
341
342 def addevt(self,ev):
343 ## append to event list
344 self.__evlist__.append(ev)
345 ## append to job event struct; create entry for job if needed
346 if ev.jobid() not in self.__jobevs__.keys():
347 self.__jobevs__[ev.jobid()] = [ev]
348 else:
349 self.__jobevs__[ev.jobid()].append(ev)
350
351 def __init__(self,logfilelist):
352
353 self.__evlist__ = [ ]
354 self.__jobcat__ = { }
355 self.__jobevs__ = { }
356
357 for f in logfilelist:
358 inf = open(f,'r')
359 line = inf.readline()
360 while line:
361 line = line[:-1] # chop off trailing newline
362 ev = Event(line) # parse line, create event
363 self.addevt(ev) # adds event to evlist and jobevt struct
364 ## vars for convenience in typing:
365 if ev.type() == 'Q': # job enters sys for 1st time
366 if self.hasjob(ev.jobid()): # should not happen
367 print 'Error, Q event seen for pre-existing job', \
368 ev.jobid()
369 sys.exit(1)
370 newentry = Job() # make new entry, fill info
371 newentry.set('qtime',ev.time())
372 newentry.set('queue',ev.info('queue'))
373 self.addjob(ev.jobid(),newentry)
374 else:
375 ## for all other event types
376 if not self.hasjob(ev.jobid()):
377 newentry = Job() # make new entry
378 self.addjob(ev.jobid(),newentry)
379 job = self.getjob(ev.jobid())
380 if ev.info('qtime'):
381 job.set('qtime',int(ev.info('qtime')))
382 if ev.info('queue') and not job.get('queue'):
383 job.set('queue',ev.info('queue'))
384 if ev.info('user') and not job.get('user'):
385 job.set('user',ev.info('user'))
386 if ev.info('group') and not job.get('group'):
387 job.set('group',ev.info('group'))
388 if ev.info('start') and not job.get('start'):
389 job.set('start',int(ev.info('start')))
390 if ev.info('Resource_List.walltime') and not \
391 job.get('maxwalltime'):
392 hms = ev.info('Resource_List.walltime')
393 (h,m,s) = hms.split(":")
394 maxwallsecs = int(h) * 3600 + int(m) * 60 + int(s)
395 job.set('maxwalltime',maxwallsecs)
396 if ev.info('end') and not job.get('end'):
397 job.set('end',int(ev.info('end')))
398
399 ## special handling for troublesome event types
400
401 if ev.type() == 'T': # job restart after checkpoint
402 ## previous job start record may not have been seen; if not, we don't
403 ## know if it was running or queued before, so just set current event time
404 ## as start since we know it's running now.
405 if not job.get('start'):
406 job.set('start',ev.time())
407 ## in some cases the T record may be the first (or even only) record
408 ## we see. in that case, the only thing we can say is that the qtime
409 ## was before the beginning of the log files ... set qtime like that.
410 ## if we see a later record for this job, that record will set the correct
411 ## qtime.
412 if not job.get('qtime'):
413 job.set('qtime',self.getfirst_event_time()-1)
414
415 line = inf.readline()
416
417 def get_evlist(self):
418 return self.__evlist__
419
420 class LogFileServer(Server):
421 def __init__(self,history,debug=0):
422
423 Server.__init__(self)
424
425 self.__history__ = history
426 # want to get first event on first getnextevent call, so set initial index
427 # to -1 (one previous to 0!)
428 self.__evindx__ = -1
429 first_event = history.get_evlist()[0]
430 if debug:
431 print 'first event data:'
432 print first_event.jobid(), first_event.type(), first_event.time()
433 starttime = first_event.time()
434 jobidlist = history.getjoblist()
435 jobidlist.sort()
436 if debug:
437 print jobidlist
438 for jid in jobidlist:
439 entry = history.getjob(jid)
440 if debug:
441 print jid, entry.get('qtime'), starttime
442 print entry.get('qtime') >= starttime
443 if entry.get('qtime') >= starttime : break # done
444
445 # we are guaranteed that the qtime of all jobs that make
446 # it this far are before the first event, so we need to
447 # figure if the job is queued or running, nothing else
448
449 job = copy.deepcopy(entry)
450 job.set('jobid',jid)
451 if job.get('start') and job.get('start') < starttime :
452 job.set('state','running')
453 else:
454 job.set('state','queued')
455
456 self.__jobdict__[jid] = job
457
458 ## getnextevent shows you what the next event will be. step actually
459 ## changes the server state to reflect what happened in that event.
460 ## you need the two since you want to calculate ETT for an event
461 ## BEFORE you actually submit the job to the queue!
462
463 def getnextevent(self): # return the next event
464 return self.__history__.getevent(self.__evindx__ + 1)
465 def step(self): # step to next event
466 self.__evindx__ = self.__evindx__ + 1
467 ev = self.__history__.getevent(self.__evindx__)
468 self.__evtime__ = ev.time()
469
470 # take care of implications for queue states
471
472 if ev.type() == 'Q' :
473 job = self.__history__.getjob(ev.jobid())
474 jobcopy = copy.deepcopy(job)
475 jobcopy.set('state','queued')
476 jobcopy.set('jobid',ev.jobid())
477 self.addjob(ev.jobid(),jobcopy)
478 elif ev.type() == 'S' :
479 job = self.getjob(ev.jobid())
480 job.set('state','running')
481 elif ev.type() == 'T' :
482 job = self.getjob(ev.jobid())
483 job.set('state','running')
484 job.set('start',ev.time())
485 elif ev.type() == 'E' :
486 self.deletejob(ev.jobid())
487 elif ev.type() == 'D' : # if it's the last delete evt seen, do it
488 jevtl = self.__history__.getjobevts(ev.jobid())
489 if jevtl[-1] == ev:
490 self.deletejob(ev.jobid())
491 return ev
492

Properties

Name Value
svn:eol-style native
svn:keywords Id URL

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28