/[pdpsoft]/nl.nikhef.pdp.dynsched-pbs-plugin/trunk/pbsServer.py
ViewVC logotype

Contents of /nl.nikhef.pdp.dynsched-pbs-plugin/trunk/pbsServer.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2146 - (show annotations) (download) (as text)
Fri Jan 14 16:11:45 2011 UTC (11 years, 8 months ago) by templon
File MIME type: application/x-python
File size: 18919 byte(s)
Merge release 2.2.0 changes with trunk
1 #--
2 # pbsServer.py -- LRMS classes for PBS-based batch systems.
3 # $Revision: 1939 $
4 # $URL$
5 # $Id$
6
7 #--
8
9 # Two classes derived from lrms.Server are included; one for
10 # working with a live PBS server, and one for representing historical
11 # states of the server from PBS accounting log files. A third class
12 # (History) is associated with the historical server class.
13
14 from lrms import * # base Server class, Job class
15 import commands
16 import grp
17 import pwd
18 import re
19 import string
20 import sys
21 import time
22
23 class LiveServer(Server):
24
25 def __init__(self,*arg,**kw):
26
27 Server.__init__(self)
28
29 cnow = time.ctime() # this hack is to get around time zone problems
30 if 'file' in kw.keys() and kw['file'] != None:
31 cmdstr = '/bin/cat ' + kw['file']
32 else:
33 cmdstr = 'qstat -f'
34 (stat, qstatout) = commands.getstatusoutput(cmdstr)
35 if stat:
36 print 'problem getting qstat output; cmd used was', cmdstr
37 print 'returned status ', stat, ', text: ', qstatout
38 from torque_utils import pbsnodes
39 cpucount = 0
40 jobcount = 0
41 for node in pbsnodes():
42 if node.isUp():
43 cpucount += node.numCpu
44 for cpu in range(node.numCpu):
45 jobcount += len(node.jobs[cpu])
46 self.slotsUp = cpucount
47 self.slotsFree = cpucount - jobcount
48
49 nowtuple = time.strptime(cnow,"%c")
50 self.__evtime__ = int(time.mktime(nowtuple))
51
52 keyvalpat = re.compile(r' (\S+) = (.*)')
53
54 # guard against empty or nonconforming qstat output
55 if len(qstatout) == 0:
56 verbose_job_info = [] # assume valid output for system with no jobs running!
57 elif qstatout.find('Job Id') != 0:
58 print 'fatal error, qstat output starts with something other than a job id'
59 print 'first few characters are:', qstatout[:min(32,len(qstatout))]
60 sys.exit(4)
61 else:
62 verbose_job_info=string.split(qstatout,"\n\n")
63
64 for j in verbose_job_info:
65 newj = Job()
66 lines = string.split(j,'\n ')
67 qstatDict = dict()
68 for ll in lines:
69 if string.find(ll,'Job Id') == 0:
70 newj.set('jobid',string.split(ll)[2])
71 continue
72 l = ll.replace('\n\t','')
73 mk = keyvalpat.match(l)
74 if not mk:
75 print "fatal error, should always be able to find a match"
76 print "unmatchable string was:", l
77 sys.exit(3)
78 qstatDict[mk.group(1)] = mk.group(2)
79
80 # print qstatDict.keys()
81
82 # do owner and group. try euser and group first, if not found, back off to old method,
83 # which was "user@host" parsing plus getgrid on user.
84
85 keysfound = qstatDict.keys()
86 if 'euser' in keysfound and 'egroup' in keysfound:
87 newj.set('user', qstatDict['euser'])
88 newj.set('group',qstatDict['egroup'])
89 elif 'Job_Owner' in keysfound:
90 user_and_host = qstatDict['Job_Owner']
91 user = string.split(user_and_host,'@')[0]
92 newj.set('user',user)
93 try:
94 thisgroup=pwd.getpwnam(user)[3]
95 groupname=grp.getgrgid(thisgroup)[0]
96 except:
97 thisgroup='unknown'
98 groupname='unknown'
99 newj.set('group',groupname)
100 else:
101 print "Can't find user and group of job", newj.get('jobid')
102 sys.exit(4)
103
104 # do job state
105 if 'job_state' in keysfound:
106 statelett = qstatDict['job_state']
107 if statelett in ['Q','W']:
108 val = 'queued'
109 elif statelett in ['R','E']:
110 val = 'running'
111 elif statelett in ['H', 'T']:
112 val = 'pending'
113 elif statelett == 'C':
114 val = 'done'
115 else:
116 val = 'unknown'
117 newj.set('state',val)
118 else:
119 newj.set('state','unknown')
120
121 newj.set('queue', qstatDict['queue'])
122
123 if 'qtime' in keysfound:
124 timestring = qstatDict['qtime']
125 timetuple = time.strptime(timestring,"%c")
126 newj.set('qtime',time.mktime(timetuple))
127 if 'Resource_List.walltime' in keysfound:
128 hms = qstatDict['Resource_List.walltime']
129 t = string.split(hms,':')
130 secs = int(t[2]) + \
131 60.0*(int(t[1]) + 60*int(t[0]))
132 newj.set('maxwalltime',secs)
133
134 # do start and wall times. default is to use qstat "start_time" for start times and
135 # qstat "resources_used.walltime" for walltime. Code can use walltime for start and
136 # start for walltime in situations where one is missing.
137
138 if 'start_time' in keysfound:
139 timestring = qstatDict['start_time']
140 timetuple = time.strptime(timestring,"%c")
141 newj.set('start',time.mktime(timetuple))
142 newj.set('startAnchor', 'start_time')
143 if 'resources_used.walltime' not in keysfound: # just after job starts, walltime is not printed
144 newj.set('walltime', self.__evtime__ - newj.get('start'))
145
146 if 'resources_used.walltime' in keysfound:
147 hms = qstatDict['resources_used.walltime']
148 t = string.split(hms,':')
149 secs = int(t[2]) + \
150 60.0*(int(t[1]) + 60*int(t[0]))
151 newj.set('walltime', secs)
152 if 'start_time' not in keysfound: # older torque version
153 start = self.__evtime__ - secs
154 newj.set('start',start)
155 newj.set('startAnchor', 'resources_used.walltime')
156 if 'Job_Name' in keysfound:
157 jnam = qstatDict['Job_Name']
158 newj.set('name',jnam)
159 if 'exec_host' in keysfound:
160 hlist = qstatDict['exec_host']
161 ncpu = hlist.count('+') + 1
162 newj.set('cpucount',ncpu)
163
164 self.addjob(newj.get('jobid'),newj)
165
166 import copy
167
168 ## following is helper for class Event. superseded by newer keyvallist2dict function.
169 ## takes as arg a string of key=val pairs, returns a dict with the same
170 ## structure. example input string:
171 ## user=tdykstra group=niktheorie jobname=Q11_241828.gjob
172
173 def keyval2dict(astring):
174 flds = string.split(astring)
175 d = {}
176 for f in flds:
177 kv=f.split("=",1)
178 if len(kv) == 2:
179 d[kv[0]] = kv[1]
180 else:
181 print f
182 print kv
183 raise CantHappenException
184 return d
185
186 ## following is helper for class Event.
187 ## takes as arg a list of key=val pairs, returns a dict with the same
188 ## structure. example input string:
189 ## ['user=tdykstra', 'group=niktheorie', 'jobname=Q11_241828.gjob']
190
191 def keyvallist2dict(kvlist):
192 d = {}
193 for f in kvlist:
194 kv=f.split("=",1)
195 if len(kv) == 2:
196 d[kv[0]] = kv[1]
197 else:
198 print "tried to split:", f, ", result was:", kv
199 raise CantHappenException
200 return d
201
202 class Event:
203
204 # simple class to represent events like job queued, job started, etc.
205
206 def __init__(self,evstring,debug=0):
207
208 self.__time__ = None # default values
209 self.__type__ = None
210 self.__jobid__ = None
211 self.__info__ = { }
212
213 # search pattern for parsing string using "re" module
214 # for successful search, fields are:
215 # 1) timestamp
216 # 2) event type (Q,S,E,D, etc)
217 # 3) local PBS jobID
218 # 4) rest of line (key=value) to be parsed otherwise
219 # this structure is matched by evpatt
220
221 evpatt = "^(.+);([A-Z]);(.+);(.*)"
222 m = re.search(evpatt,evstring)
223 if not m:
224 print "parse patt failed, offending line is"
225 print evstring
226 return
227 if debug:
228 print "timestamp", m.group(1)
229 print "code", m.group(2)
230 print "jobid", m.group(3)
231 print "attrs", m.group(4)
232
233 # tpatt matches strings of form key=val
234 # lookahead assertion is necessary to work around presence of ' ' and '=' in some
235 # 'val' strings (like account, or neednodes with multiple processors)
236
237 tpatt = r'[a-z._A-Z]+=[a-z0-9A-Z=/: -@_]+?(?=$| [a-z._A-Z]+=)'
238 tprog = re.compile(tpatt)
239 tmatch = tprog.findall(m.group(4))
240 if debug:
241 print "result of key=val match pattern:", tmatch
242
243 # parse timestamp
244
245 ttup=time.strptime(m.group(1),"%m/%d/%Y %H:%M:%S")
246
247 # last element of time tuple is DST, but PBS log files
248 # don't specify time zone. Setting the last element of
249 # the tuple to -1 asks libC to figure it out based on
250 # local time zone of machine
251
252 atup = ttup[:8] + (-1,)
253 self.__time__ = int(time.mktime(atup))
254 self.__type__ = m.group(2)
255 self.__jobid__ = m.group(3)
256 self.__info__ = keyvallist2dict(tmatch)
257
258 def time(self):
259 return self.__time__
260 def type(self):
261 return self.__type__
262 def jobid(self):
263 return self.__jobid__
264 def info(self,key=''):
265 if key == '':
266 return self.__info__
267 else:
268 if key in self.__info__.keys():
269 return self.__info__[key]
270 else:
271 return None
272 def __str__(self):
273 return self.__jobid__ + ' :: event type ' + self.__type__ + \
274 ' at ' + str(self.__time__)
275
276 class History:
277
278 # the idea here is to generate a couple of data structures:
279 # first the job catalog, which associates things like
280 # queue entry time, start time, stop time, owner username/groupname,
281 # etc. with each PBS jobid.
282 # second the event list, which records when jobs enter the various
283 # queues, start to execute, and terminate. This list only holds
284 # a timestamp, jobid, and event type. This seems to be the
285 # minimal amount of information we'll need to be able to generate
286 # the state of the queue at any given arbitrary time.
287
288 ## functions for using the job catalog. This beast is a dictionary
289 ## that has one entry (an object of class Job) per job seen in the
290 ## log files.
291
292 def addjob(self, jobid, job) : # add new job entry
293 if self.hasjob(jobid):
294 print "job already found, exiting"
295 sys.exit(1)
296 job.set('jobid',jobid)
297 self.__jobcat__[jobid] = job
298
299 def hasjob(self,jobid) : # test if job is already in catalogue
300 if jobid in self.__jobcat__.keys():
301 return 1
302 else:
303 return 0
304
305 def setjobinfo(self,jobid,key,val) : # add or set info to/for existing job
306 if not self.hasjob(jobid):
307 print "job not found:", jobid, key, val
308 return 0
309 self.__jobcat__[jobid].set(key,val)
310 return 1
311
312 def getjobinfo(self,jobid,key) : # get info for key from job jobid
313 if not self.hasjob(jobid):
314 print "job not found:", jobid, key
315 return 0
316 return self.__jobcat__[jobid].get(key)
317
318 def getjoblist(self):
319 return self.__jobcat__.keys()
320
321 def getjob(self,jobid):
322 return self.__jobcat__[jobid]
323
324 ## functions for using event list. This beast is just a list (in sequence)
325 ## of all events seen while parsing the log files
326
327 def getfirst_event_time(self):
328 return self.__evlist__[0].time()
329
330 def getevent(self,index):
331 return self.__evlist__[index]
332
333 ## functions for using the job event list. This beast is just a dictionary
334 ## (key=jobid), so only entry in dict for each job; the value for each
335 ## jobid is a list of events seen for this job (in sequence). Rationale
336 ## for this object is it can help in resolving ambiguous cases (like
337 ## multiple "D" events seen).
338
339 def getjobevts(self,jobid):
340 if jobid not in self.__jobevs__.keys():
341 print 'getjobevs: jobid', jobid, 'not found in job_event db'
342 sys.exit(1)
343 return self.__jobevs__[jobid]
344
345 def addevt(self,ev):
346 ## append to event list
347 self.__evlist__.append(ev)
348 ## append to job event struct; create entry for job if needed
349 if ev.jobid() not in self.__jobevs__.keys():
350 self.__jobevs__[ev.jobid()] = [ev]
351 else:
352 self.__jobevs__[ev.jobid()].append(ev)
353
354 def __init__(self,logfilelist):
355
356 self.__evlist__ = [ ]
357 self.__jobcat__ = { }
358 self.__jobevs__ = { }
359
360 for f in logfilelist:
361 inf = open(f,'r')
362 line = inf.readline()
363 while line:
364 line = line[:-1] # chop off trailing newline
365 ev = Event(line) # parse line, create event
366 self.addevt(ev) # adds event to evlist and jobevt struct
367 ## vars for convenience in typing:
368 if ev.type() == 'Q': # job enters sys for 1st time
369 if self.hasjob(ev.jobid()): # should not happen
370 print 'Error, Q event seen for pre-existing job', \
371 ev.jobid()
372 sys.exit(1)
373 newentry = Job() # make new entry, fill info
374 newentry.set('qtime',ev.time())
375 newentry.set('queue',ev.info('queue'))
376 self.addjob(ev.jobid(),newentry)
377 else:
378 ## for all other event types
379 if not self.hasjob(ev.jobid()):
380 newentry = Job() # make new entry
381 self.addjob(ev.jobid(),newentry)
382 job = self.getjob(ev.jobid())
383 if ev.info('qtime'):
384 job.set('qtime',int(ev.info('qtime')))
385 if ev.info('queue') and not job.get('queue'):
386 job.set('queue',ev.info('queue'))
387 if ev.info('user') and not job.get('user'):
388 job.set('user',ev.info('user'))
389 if ev.info('group') and not job.get('group'):
390 job.set('group',ev.info('group'))
391 if ev.info('start') and not job.get('start'):
392 job.set('start',int(ev.info('start')))
393 if ev.info('Resource_List.walltime') and not \
394 job.get('maxwalltime'):
395 hms = ev.info('Resource_List.walltime')
396 (h,m,s) = hms.split(":")
397 maxwallsecs = int(h) * 3600 + int(m) * 60 + int(s)
398 job.set('maxwalltime',maxwallsecs)
399 if ev.info('end') and not job.get('end'):
400 job.set('end',int(ev.info('end')))
401
402 ## special handling for troublesome event types
403
404 if ev.type() == 'T': # job restart after checkpoint
405 ## previous job start record may not have been seen; if not, we don't
406 ## know if it was running or queued before, so just set current event time
407 ## as start since we know it's running now.
408 if not job.get('start'):
409 job.set('start',ev.time())
410 ## in some cases the T record may be the first (or even only) record
411 ## we see. in that case, the only thing we can say is that the qtime
412 ## was before the beginning of the log files ... set qtime like that.
413 ## if we see a later record for this job, that record will set the correct
414 ## qtime.
415 if not job.get('qtime'):
416 job.set('qtime',self.getfirst_event_time()-1)
417
418 line = inf.readline()
419
420 def get_evlist(self):
421 return self.__evlist__
422
423 class LogFileServer(Server):
424 def __init__(self,history,debug=0):
425
426 Server.__init__(self)
427
428 self.__history__ = history
429 # want to get first event on first getnextevent call, so set initial index
430 # to -1 (one previous to 0!)
431 self.__evindx__ = -1
432 first_event = history.get_evlist()[0]
433 if debug:
434 print 'first event data:'
435 print first_event.jobid(), first_event.type(), first_event.time()
436 starttime = first_event.time()
437 jobidlist = history.getjoblist()
438 jobidlist.sort()
439 if debug:
440 print jobidlist
441 for jid in jobidlist:
442 entry = history.getjob(jid)
443 if debug:
444 print jid, entry.get('qtime'), starttime
445 print entry.get('qtime') >= starttime
446 if entry.get('qtime') >= starttime : break # done
447
448 # we are guaranteed that the qtime of all jobs that make
449 # it this far are before the first event, so we need to
450 # figure if the job is queued or running, nothing else
451
452 job = copy.deepcopy(entry)
453 job.set('jobid',jid)
454 if job.get('start') and job.get('start') < starttime :
455 job.set('state','running')
456 else:
457 job.set('state','queued')
458
459 self.__jobdict__[jid] = job
460
461 ## getnextevent shows you what the next event will be. step actually
462 ## changes the server state to reflect what happened in that event.
463 ## you need the two since you want to calculate ETT for an event
464 ## BEFORE you actually submit the job to the queue!
465
466 def getnextevent(self): # return the next event
467 return self.__history__.getevent(self.__evindx__ + 1)
468 def step(self): # step to next event
469 self.__evindx__ = self.__evindx__ + 1
470 ev = self.__history__.getevent(self.__evindx__)
471 self.__evtime__ = ev.time()
472
473 # take care of implications for queue states
474
475 if ev.type() == 'Q' :
476 job = self.__history__.getjob(ev.jobid())
477 jobcopy = copy.deepcopy(job)
478 jobcopy.set('state','queued')
479 jobcopy.set('jobid',ev.jobid())
480 self.addjob(ev.jobid(),jobcopy)
481 elif ev.type() == 'S' :
482 job = self.getjob(ev.jobid())
483 job.set('state','running')
484 elif ev.type() == 'T' :
485 job = self.getjob(ev.jobid())
486 job.set('state','running')
487 job.set('start',ev.time())
488 elif ev.type() == 'E' :
489 self.deletejob(ev.jobid())
490 elif ev.type() == 'D' : # if it's the last delete evt seen, do it
491 jevtl = self.__history__.getjobevts(ev.jobid())
492 if jevtl[-1] == ev:
493 self.deletejob(ev.jobid())
494 return ev
495

Properties

Name Value
svn:eol-style native
svn:keywords Id URL

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28